mms2mail/mms2mail

126 lines
4.7 KiB
Python
Executable file

#!/usr/bin/python3
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-689586587
import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-713319548
import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value):
return base64.b64encode(value)
base64.encodestring = encodestring
#end bugfix
import argparse
import configparser
import re
import time
import getpass
import socket
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message
class Watcher:
mms_folder = f"{Path.home()}/.mms/modemmanager"
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.mms_folder, recursive=False)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Exiting")
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created' or event.event_type == 'modified':
if m.is_mmsd_mms_file(event.src_path):
print(f"New MMS found : {event.src_path}.", file=sys.stderr)
m.convert(event.src_path)
elif event.event_type == 'moved':
if m.is_mmsd_mms_file(event.dest_path):
print(f"New MMS found : {event.dest_path}.", file=sys.stderr)
m.convert(event.dest_path)
class MMS2Mail:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")})
self.pattern = re.compile('[0-9A-F]{40}$')
def convert(self, path):
print(path, file=sys.stderr)
self.mailer.start()
if Path(f"{path}.mail").is_file() and args.daemon:
print(f"Already converted MMS : doing nothing ({path})", file=sys.stderr)
return
status = configparser.ConfigParser()
status.read_file(open(f"{path}.status"))
if status['info']['state'] == 'downloaded' or status['info']['state'] == 'received':
print(f"New incomming MMS : converting to Mail ({path})", file=sys.stderr)
else:
print(f"New outgoing MMS : doing nothing ({path})", file=sys.stderr)
return
mms = MMSMessage.from_file(path)
mms_from, mms_from_type = mms.headers['From'].split('/')
mms_to, mms_to_type = mms.headers['To'].split('/')
message = Message(author=f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}", to=f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}")
message.subject = f"MMS from {mms_from}"
message.date = mms.headers['Date']
message.headers = [('X-MMS-From', mms.headers['From']), ('X-MMS-To', mms.headers['To']), ('X-MMS-ID', mms.headers['Message-ID'])]
message.plain = f"MMS from {mms_from}"
if self.config.getboolean('mail','attach_mms', fallback=False):
message.attach(path, None, None, None, False, "mms.bin")
for data_part in mms.data_parts:
datacontent=data_part.headers['Content-Type']
if datacontent is not None:
if 'text/plain' in datacontent[0]:
message.plain = data_part.data
if 'Name' in datacontent[1]:
filename = datacontent[1]['Name']
message.attach(filename,data_part.data)
Path(f"{path}.mail").touch()
self.mailer.send(message)
self.mailer.stop()
def is_mmsd_mms_file(self,path):
return self.pattern.search(path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group()
mode.add_argument("-d","--daemon", help="watch for new mms in the mmsd storage folder", action="store_true")
mode.add_argument("-f","--file", nargs=1, help="parse a single mms file")
args = parser.parse_args()
m = MMS2Mail()
if args.daemon:
w = Watcher()
w.run()
elif args.file:
m.convert(args.file[0])
else:
parser.print_help()
del m