mms2mail/mms2mail
Alex df9d5acf76 [feat] DBus
* Use DBus Signal by default in daemon mode
* Allow multiple files in standalone mode
* Better handle mms headers and content
2021-05-04 13:10:28 +02:00

276 lines
9.4 KiB
Python
Executable file

#!/usr/bin/python3
"""
An mms to mail converter for mmsd
"""
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-689586587
import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-713319548
import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value):
return base64.b64encode(value)
base64.encodestring = encodestring
#end bugfix
import argparse
import configparser
import re
import time
import getpass
import socket
from pathlib import Path
from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message
from gi.repository import GObject, GLib
import dbus
import dbus.mainloop.glib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MMS2Mail:
"""
The class handling the conversion between MMS and mail format
MMS support is provided by python-messaging
Mail support is provided by marrow.mailer
"""
def __init__(self):
"""
Constructor loading configuration
"""
self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")})
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus()
def get_bus(self):
"""
Return the DBus SessionBus
:rtype dbus.SessionBus()
:return: an active SessionBus
"""
return self.bus
def mark_mms_read(self,dbus_path):
"""
Ask MMSd to mark the mms as read
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
message = dbus.Interface(self.bus.get_object('org.ofono.mms', dbus_path),
'org.ofono.mms.Message')
message.MarkRead()
def convert(self, path, dbus_path = None):
"""
Convert a provided mms file to a mail stored in a mbox
:param path: the mms filesystem path
:type path: str
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
# Check if the provided file present
if not Path(f"{path}").is_file():
print(f"MMS file not found : aborting", file=sys.stderr)
return
# Generate its dbus path, for future operation (mark as read, delete)
if not dbus_path:
dbus_path=f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}"
if Path(f"{path}.mail").is_file() and args.watcher:
print(f"Already converted MMS : doing nothing ({path})", file=sys.stderr)
return
# Check for mmsd status file
status = configparser.ConfigParser()
if not Path(f"{path}.status").is_file():
print(f"MMS status file not found : aborting", file=sys.stderr)
return
status.read_file(open(f"{path}.status"))
# Allow only incoming MMS for the time beeing
if status['info']['state'] == 'downloaded' or status['info']['state'] == 'received':
print(f"New incoming MMS : converting to Mail ({path})", file=sys.stderr)
else:
print(f"New outgoing MMS : doing nothing ({path})", file=sys.stderr)
return
mms = MMSMessage.from_file(path)
self.mailer.start()
message = Message()
# Generate Mail Headers
if 'From' in mms.headers and mms.headers['From']:
mms_from, mms_from_type = mms.headers['From'].split('/')
else:
mms_from = "unknown"
mms_from_type = "undef"
message.author = f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}"
if 'To' in mms.headers and mms.headers['To']:
mms_to, mms_to_type = mms.headers['To'].split('/')
else:
mms_to = "unknown"
mms_to_type = "undef"
message.to = f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}"
if 'Subject' in mms.headers and mms.headers['Subject']:
message.subject = mms.headers['Subject']
else:
message.subject = f"MMS from {mms_from}"
if 'Date' in mms.headers and mms.headers['Date']:
message.date = mms.headers['Date']
# Recopy MMS HEADERS
for header in mms.headers:
message.headers.append((f"X-MMS-{header}", f"{mms.headers[header]}"))
for data_part in mms.data_parts:
datacontent=data_part.headers['Content-Type']
if datacontent is not None:
if 'text/plain' in datacontent[0]:
message.plain = f"{data_part.data} \n"
if 'Name' in datacontent[1]:
filename = datacontent[1]['Name']
message.attach(filename,data_part.data)
# Ensure a proper body content in the resulting mail
if not message.plain:
message.plain = " "
# Add MMS binary file, for debugging purpose or reparsing in the future
if self.config.getboolean('mail','attach_mms', fallback=False):
message.attach(path, None, None, None, False, "mms.bin")
# Creating an empty file stating the mms as been converted
Path(f"{path}.mail").touch()
# Write the mail
self.mailer.send(message)
self.mailer.stop()
class FSWatcher:
"""
Use OS filesystem notification to watch for new MMS (DEPRECATED)
Events are send to the FSHandler class
"""
# Path to modemmanager storage
mms_folder = f"{Path.home()}/.mms/modemmanager"
def __init__(self):
self.observer = Observer()
self.patternold = re.compile('[0-9A-F]{40}$')
self.pattern = re.compile('[0-9a-f]{36}$')
def is_mmsd_mms_file(self,path):
"""
Test if the provided file seems to be a mms file created by mmsd
:param path: the mms filesystem path
:type path: str
:rtype boolean
:return: the test result
"""
if self.pattern.search(path) or self.patternold.search(path):
return True
else:
return False
def run(self):
"""
Run the watcher mainloop
"""
event_handler = FSHandler()
self.observer.schedule(event_handler, self.mms_folder, recursive=False)
self.observer.start()
try:
while True:
time.sleep(5)
except:
self.observer.stop()
print("Exiting")
self.observer.join()
class FSHandler(FileSystemEventHandler):
"""
Handle the FSWatcher event
"""
@staticmethod
def on_any_event(event):
"""
Function triggered on event by the FSWatcher
"""
if event.is_directory:
return None
elif event.event_type == 'created' or event.event_type == 'modified':
if w.is_mmsd_mms_file(event.src_path):
print(f"New MMS found : {event.src_path}.", file=sys.stderr)
m.convert(event.src_path)
elif event.event_type == 'moved':
if w.is_mmsd_mms_file(event.dest_path):
print(f"New MMS found : {event.dest_path}.", file=sys.stderr)
m.convert(event.dest_path)
class DbusWatcher():
"""
Use DBus Signal notification to watch for new MMS
"""
def run(self):
"""
Run the watcher mainloop
"""
bus = m.get_bus()
bus.add_signal_receiver(self.message_added,
bus_name="org.ofono.mms",
signal_name = "MessageAdded",
member_keyword="member",
path_keyword="path",
interface_keyword="interface")
mainloop = GLib.MainLoop()
mainloop.run()
def message_added(self,name, value, member, path, interface):
"""
Function triggered on MessageAdded signal
"""
if value['Status'] == 'downloaded' or value['Status'] == 'received':
print(f"New incoming MMS found ({name.split('/')[-1]})")
m.convert(value['Attachments'][0][2],name)
else:
print(f"New outgoing MMS found ({name.split('/')[-1]})")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group()
mode.add_argument("-d","--daemon", help="Use dbus signal from mmsd by default but can also watch mmsd storage folder (useful for mmsd < 1.0)", nargs="?", default="dbus", choices=['dbus','filesystem'], dest='watcher')
mode.add_argument("-f","--file", nargs='+', help="parse specified mms files", dest='files')
args = parser.parse_args()
m = MMS2Mail()
if args.files:
for mms_file in args.files:
m.convert(mms_file)
elif args.watcher == 'dbus':
print("Starting mms2mail in daemon mode with dbus watcher")
w = DbusWatcher()
w.run()
elif args.watcher == 'filesystem':
print("Starting mms2mail in daemon mode with filesystem watcher")
w = FSWatcher()
w.run()
else:
parser.print_help()
del m