mms2mail/mms2mail

115 lines
4.2 KiB
Python
Executable file

#!/usr/bin/python3
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-689586587
import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-713319548
import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value):
return base64.b64encode(value)
base64.encodestring = encodestring
#end bugfix
import argparse
import configparser
import time
import getpass
import socket
from pathlib import Path
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message
class Watcher:
DIRECTORY_TO_WATCH = "/home/alex/tmp"
mms_folder = f"{Path.home()}/.mms/modemmanager"
def __init__(self):
self.observer = Observer()
def run(self):
event_handler = Handler()
self.observer.schedule(event_handler, self.DIRECTORY_TO_WATCH, recursive=False)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Exiting")
self.observer.join()
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
if '.status' in event.src_path:
mms_path = event.src_path[:-7]
print(f"New MMS found : {event.src_path}.")
m.convert(mms_path)
class MMS2Mail:
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")})
self.mailer.start()
def __del__(self):
self.mailer.stop()
def convert(self, path):
status = configparser.ConfigParser()
status.read_file(open(f"{path}.status"))
if 'downloaded' in status['info']['state'] or 'received' in status['info']['state']:
print(f"New incomming MMS : converting to Mail ({path})")
else:
print(f"New outgoing MMS : doing nothing ({path})")
return
mms = MMSMessage.from_file(path)
mms_from, mms_from_type = mms.headers['From'].split('/')
mms_to, mms_to_type = mms.headers['To'].split('/')
message = Message(author=f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}", to=f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}")
message.subject = f"MMS from {mms_from}"
message.date = mms.headers['Date']
message.headers = [('X-MMS-From', mms.headers['From']), ('X-MMS-To', mms.headers['To']), ('X-MMS-ID', mms.headers['Message-ID'])]
message.plain = f"MMS from {mms_from}"
if self.config.getboolean('mail','attach_mms', fallback=False):
message.attach(path, None, None, None, False, "mms.bin")
for data_part in mms.data_parts:
datacontent=data_part.headers['Content-Type']
if datacontent is not None:
if 'text/plain' in datacontent[0]:
message.plain = data_part.data
if 'Name' in datacontent[1]:
filename = datacontent[1]['Name']
message.attach(filename,data_part.data)
self.mailer.send(message)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group()
mode.add_argument("-d","--daemon", help="watch for new mms in the mmsd storage folder", action="store_true")
mode.add_argument("-f","--file", nargs=1, help="parse a single mms file")
args = parser.parse_args()
m = MMS2Mail()
if args.daemon:
w = Watcher()
w.run()
elif args.file:
m.convert(args.file[0])
else:
parser.print_help()
del m