mms2mail/mms2mail

330 lines
11 KiB
Text
Raw Normal View History

2021-05-01 06:43:13 +00:00
#!/usr/bin/python3
2021-05-05 03:49:38 +00:00
"""An mms to mail converter for mmsd."""
# upstream bug dirty fix
# https://github.com/marrow/mailer/issues/87#issuecomment-689586587
2021-05-01 07:32:26 +00:00
import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None
2021-05-05 03:49:38 +00:00
# upstream bug dirty fix
# https://github.com/marrow/mailer/issues/87#issuecomment-713319548
2021-05-01 07:32:26 +00:00
import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value):
2021-05-05 03:49:38 +00:00
"""
Encode string in base64.
:param value: the string to encode.
:type value: str
:rtype str
:return: the base64 encoded string
"""
2021-05-01 07:32:26 +00:00
return base64.b64encode(value)
base64.encodestring = encodestring
2021-05-05 03:49:38 +00:00
# end bugfix
2021-05-01 07:32:26 +00:00
2021-05-01 06:43:13 +00:00
import argparse
import configparser
2021-05-01 21:11:15 +00:00
import re
2021-05-01 06:43:13 +00:00
import time
import getpass
import socket
from pathlib import Path
2021-05-01 06:43:13 +00:00
from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message
2021-05-05 03:49:38 +00:00
from gi.repository import GLib
import dbus
import dbus.mainloop.glib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
2021-05-01 06:43:13 +00:00
2021-05-05 03:49:38 +00:00
2021-05-01 06:43:13 +00:00
class MMS2Mail:
"""
2021-05-05 03:49:38 +00:00
The class handling the conversion between MMS and mail format.
2021-05-01 06:43:13 +00:00
MMS support is provided by python-messaging
Mail support is provided by marrow.mailer
"""
2021-05-05 03:49:38 +00:00
def __init__(self, delete=False, force_read=False):
"""
Return class instance.
:param delete: delete MMS after conversion
:type delete: bool
:param force_read: force converting an already read MMS (batch mode)
:type force_read: bool
"""
self.delete = delete
self.force_read = force_read
2021-05-01 06:43:13 +00:00
self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.attach_mms = self.config.getboolean('mail', 'attach_mms',
fallback=False)
2021-05-05 03:49:38 +00:00
self.domain = self.config.get('mail', 'domain',
fallback=socket.getfqdn())
self.user = self.config.get('mail', 'user', fallback=getpass.getuser())
mbox_file = self.config.get('mail', 'mailbox',
fallback=f"/var/mail/{self.user}")
self.mailer = Mailer({'manager.use': 'immediate',
'transport.use': 'mbox',
'transport.file': mbox_file})
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus()
2021-05-01 06:43:13 +00:00
def get_bus(self):
"""
2021-05-05 03:49:38 +00:00
Return the DBus SessionBus.
:rtype dbus.SessionBus()
:return: an active SessionBus
"""
return self.bus
2021-05-05 03:49:38 +00:00
def mark_mms_read(self, dbus_path):
"""
Ask mmsd to mark the mms as read.
:param dbus_path: the mms dbus path
2021-05-05 03:49:38 +00:00
:type dbus_path: str
"""
2021-05-05 03:49:38 +00:00
message = dbus.Interface(self.bus.get_object('org.ofono.mms',
dbus_path),
'org.ofono.mms.Message')
print(f"Marking MMS as read {dbus_path}", file=sys.stderr)
message.MarkRead()
def delete_mms(self, dbus_path):
"""
Ask mmsd to delete the mms.
2021-05-05 03:49:38 +00:00
:param dbus_path: the mms dbus path
2021-05-05 03:49:38 +00:00
:type dbus_path: str
"""
message = dbus.Interface(self.bus.get_object('org.ofono.mms',
dbus_path),
'org.ofono.mms.Message')
print(f"Deleting MMS {dbus_path}", file=sys.stderr)
message.Delete()
def check_mms(self, path):
"""
Check wether the provided file would be converted.
:param path: the mms filesystem path
:type path: str
:rtype bool
"""
# Check for mmsd data file
if not Path(f"{path}").is_file():
2021-05-05 03:49:38 +00:00
print("MMS file not found : aborting", file=sys.stderr)
return False
# Check for mmsd status file
2021-05-01 06:43:13 +00:00
status = configparser.ConfigParser()
if not Path(f"{path}.status").is_file():
2021-05-05 03:49:38 +00:00
print("MMS status file not found : aborting", file=sys.stderr)
return False
2021-05-01 06:43:13 +00:00
status.read_file(open(f"{path}.status"))
# Allow only incoming MMS for the time beeing
if not (status['info']['state'] == 'downloaded' or
2021-05-05 03:49:38 +00:00
status['info']['state'] == 'received'):
print("Outgoing MMS : aborting", file=sys.stderr)
return False
if not (self.force_read or not status.getboolean('info', 'read')):
print("Already converted MMS : aborting", file=sys.stderr)
return False
return True
def convert(self, path, dbus_path=None):
"""
Convert a provided mms file to a mail stored in a mbox.
:param path: the mms filesystem path
:type path: str
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
# Check if the provided file present
if not self.check_mms(path):
print("MMS file not convertible.", file=sys.stderr)
2021-05-01 06:43:13 +00:00
return
# Generate its dbus path, for future operation (mark as read, delete)
if not dbus_path:
dbus_path = f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}"
2021-05-01 06:43:13 +00:00
2021-05-01 21:11:15 +00:00
mms = MMSMessage.from_file(path)
2021-05-05 03:49:38 +00:00
self.mailer.start()
message = Message()
2021-05-01 06:43:13 +00:00
# Generate Mail Headers
2021-05-05 03:49:38 +00:00
mms_from, mms_from_type = mms.headers.get('From',
'unknown/undef').split('/')
message.author = f"{mms_from}@{self.domain}"
mms_from, mms_from_type = mms.headers.get('To',
'unknown/undef').split('/')
message.to = f"{self.user}@{self.domain}"
2021-05-01 21:11:15 +00:00
if 'Subject' in mms.headers and mms.headers['Subject']:
message.subject = mms.headers['Subject']
2021-05-05 03:49:38 +00:00
else:
message.subject = f"MMS from {mms_from}"
if 'Date' in mms.headers and mms.headers['Date']:
message.date = mms.headers['Date']
2021-05-01 21:11:15 +00:00
# Recopy MMS HEADERS
for header in mms.headers:
2021-05-05 03:49:38 +00:00
message.headers.append((f"X-MMS-{header}",
f"{mms.headers[header]}"))
2021-05-01 06:43:13 +00:00
for data_part in mms.data_parts:
2021-05-05 03:49:38 +00:00
datacontent = data_part.headers['Content-Type']
2021-05-01 06:43:13 +00:00
if datacontent is not None:
if 'text/plain' in datacontent[0]:
message.plain = f"{data_part.data} \n"
2021-05-01 06:43:13 +00:00
if 'Name' in datacontent[1]:
filename = datacontent[1]['Name']
2021-05-05 03:49:38 +00:00
message.attach(filename, data_part.data)
# Ensure a proper body content in the resulting mail
if not message.plain:
message.plain = " "
2021-05-05 03:49:38 +00:00
# Add MMS binary file, for debugging purpose or reparsing in the future
if self.attach_mms:
message.attach(path, None, None, None, False, "mms.bin")
2021-05-05 03:49:38 +00:00
# Creating an empty file stating the mms as been converted
self.mark_mms_read(dbus_path)
if self.delete:
self.delete_mms(dbus_path)
# Write the mail
2021-05-01 06:43:13 +00:00
self.mailer.send(message)
self.mailer.stop()
2021-05-01 06:43:13 +00:00
2021-05-05 03:49:38 +00:00
class FSWatcher:
"""
2021-05-05 03:49:38 +00:00
Use OS filesystem notification to watch for new MMS (DEPRECATED).
Events are send to the FSHandler class
"""
2021-05-05 03:49:38 +00:00
# Path to modemmanager storage
mms_folder = f"{Path.home()}/.mms/modemmanager"
def __init__(self):
2021-05-05 03:49:38 +00:00
"""Construct an instance."""
self.observer = Observer()
self.patternold = re.compile('[0-9A-F]{40}$')
self.pattern = re.compile('[0-9a-f]{36}$')
2021-05-05 03:49:38 +00:00
def is_mmsd_mms_file(self, path):
"""
2021-05-05 03:49:38 +00:00
Test if the provided file seems to be a mms file created by mmsd.
:param path: the mms filesystem path
2021-05-05 03:49:38 +00:00
:type path: str
:rtype boolean
:return: the test result
"""
if self.pattern.search(path) or self.patternold.search(path):
return True
else:
return False
def run(self):
2021-05-05 03:49:38 +00:00
"""Run the watcher mainloop."""
event_handler = FSHandler()
self.observer.schedule(event_handler, self.mms_folder, recursive=False)
self.observer.start()
try:
while True:
time.sleep(5)
2021-05-05 03:49:38 +00:00
finally:
self.observer.stop()
2021-05-05 03:49:38 +00:00
self.observer.join()
class FSHandler(FileSystemEventHandler):
2021-05-05 03:49:38 +00:00
"""Handle the FSWatcher event."""
@staticmethod
def on_any_event(event):
2021-05-05 03:49:38 +00:00
"""Trigger conversion on event by the FSWatcher."""
if event.is_directory:
return None
elif event.event_type == 'created' or event.event_type == 'modified':
if w.is_mmsd_mms_file(event.src_path):
print(f"New MMS found : {event.src_path}.", file=sys.stderr)
m.convert(event.src_path)
elif event.event_type == 'moved':
if w.is_mmsd_mms_file(event.dest_path):
print(f"New MMS found : {event.dest_path}.", file=sys.stderr)
m.convert(event.dest_path)
2021-05-05 03:49:38 +00:00
class DbusWatcher():
2021-05-05 03:49:38 +00:00
"""Use DBus Signal notification to watch for new MMS."""
def run(self):
"""Run the watcher mainloop."""
bus = m.get_bus()
bus.add_signal_receiver(self.message_added,
2021-05-05 03:49:38 +00:00
bus_name="org.ofono.mms",
signal_name="MessageAdded",
member_keyword="member",
path_keyword="path",
interface_keyword="interface")
mainloop = GLib.MainLoop()
mainloop.run()
2021-05-05 03:49:38 +00:00
def message_added(self, name, value, member, path, interface):
"""Trigger conversion on MessageAdded signal."""
if value['Status'] == 'downloaded' or value['Status'] == 'received':
print(f"New incoming MMS found ({name.split('/')[-1]})")
2021-05-05 03:49:38 +00:00
m.convert(value['Attachments'][0][2], name)
else:
print(f"New outgoing MMS found ({name.split('/')[-1]})")
2021-05-01 21:11:15 +00:00
2021-05-05 03:49:38 +00:00
2021-05-01 06:43:13 +00:00
if __name__ == '__main__':
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group()
2021-05-05 03:49:38 +00:00
mode.add_argument("-d", "--daemon",
help="Use dbus signal from mmsd by default but can also \
watch mmsd storage folder (useful for mmsd < 1.0)",
nargs="?", default="dbus",
choices=['dbus', 'filesystem'], dest='watcher')
mode.add_argument("-f", "--file", nargs='+',
help="Parse specified mms files and quit", dest='files')
parser.add_argument('--delete', action='store_true', dest='delete',
help="Ask mmsd to delete the converted MMS")
parser.add_argument('--force-read', action='store_true',
dest='force_read', help="Force conversion even if MMS \
is marked as read")
2021-05-01 06:43:13 +00:00
args = parser.parse_args()
m = MMS2Mail(args.delete, args.force_read)
2021-05-05 03:49:38 +00:00
if args.files:
for mms_file in args.files:
m.convert(mms_file)
2021-05-05 03:49:38 +00:00
elif args.watcher == 'dbus':
print("Starting mms2mail in daemon mode with dbus watcher")
w = DbusWatcher()
w.run()
2021-05-05 03:49:38 +00:00
elif args.watcher == 'filesystem':
print("Starting mms2mail in daemon mode with filesystem watcher")
w = FSWatcher()
2021-05-01 06:43:13 +00:00
w.run()
else:
parser.print_help()