mms2mail/mms2mail

281 lines
9.7 KiB
Python
Executable file

#!/usr/bin/python3
"""An mms to mail converter for mmsd."""
# upstream bug dirty fix
# https://github.com/marrow/mailer/issues/87#issuecomment-689586587
import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None
# upstream bug dirty fix
# https://github.com/marrow/mailer/issues/87#issuecomment-713319548
import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value):
"""
Encode string in base64.
:param value: the string to encode.
:type value: str
:rtype str
:return: the base64 encoded string
"""
return base64.b64encode(value)
base64.encodestring = encodestring
# end bugfix
import argparse
import configparser
import getpass
import socket
import mimetypes
from pathlib import Path
from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message
from gi.repository import GLib
import dbus
import dbus.mainloop.glib
class MMS2Mail:
"""
The class handling the conversion between MMS and mail format.
MMS support is provided by python-messaging
Mail support is provided by marrow.mailer
"""
def __init__(self, delete=False, force_read=False):
"""
Return class instance.
:param delete: delete MMS after conversion
:type delete: bool
:param force_read: force converting an already read MMS (batch mode)
:type force_read: bool
"""
self.delete = delete
self.force_read = force_read
self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.attach_mms = self.config.getboolean('mail', 'attach_mms',
fallback=False)
self.domain = self.config.get('mail', 'domain',
fallback=socket.getfqdn())
self.user = self.config.get('mail', 'user', fallback=getpass.getuser())
mbox_file = self.config.get('mail', 'mailbox',
fallback=f"/var/mail/{self.user}")
self.mailer = Mailer({'manager.use': 'immediate',
'transport.use': 'mbox',
'transport.file': mbox_file})
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus()
def get_bus(self):
"""
Return the DBus SessionBus.
:rtype dbus.SessionBus()
:return: an active SessionBus
"""
return self.bus
def mark_mms_read(self, dbus_path):
"""
Ask mmsd to mark the mms as read.
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
message = dbus.Interface(self.bus.get_object('org.ofono.mms',
dbus_path),
'org.ofono.mms.Message')
print(f"Marking MMS as read {dbus_path}", file=sys.stderr)
message.MarkRead()
def delete_mms(self, dbus_path):
"""
Ask mmsd to delete the mms.
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
message = dbus.Interface(self.bus.get_object('org.ofono.mms',
dbus_path),
'org.ofono.mms.Message')
print(f"Deleting MMS {dbus_path}", file=sys.stderr)
message.Delete()
def check_mms(self, path):
"""
Check wether the provided file would be converted.
:param path: the mms filesystem path
:type path: str
:rtype bool
"""
# Check for mmsd data file
if not Path(f"{path}").is_file():
print("MMS file not found : aborting", file=sys.stderr)
return False
# Check for mmsd status file
status = configparser.ConfigParser()
if not Path(f"{path}.status").is_file():
print("MMS status file not found : aborting", file=sys.stderr)
return False
status.read_file(open(f"{path}.status"))
# Allow only incoming MMS for the time beeing
if not (status['info']['state'] == 'downloaded' or
status['info']['state'] == 'received'):
print("Outgoing MMS : aborting", file=sys.stderr)
return False
if not (self.force_read or not status.getboolean('info', 'read')):
print("Already converted MMS : aborting", file=sys.stderr)
return False
return True
def convert(self, path, dbus_path=None):
"""
Convert a provided mms file to a mail stored in a mbox.
:param path: the mms filesystem path
:type path: str
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
# Check if the provided file present
if not self.check_mms(path):
print("MMS file not convertible.", file=sys.stderr)
return
# Generate its dbus path, for future operation (mark as read, delete)
if not dbus_path:
dbus_path = f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}"
mms = MMSMessage.from_file(path)
self.mailer.start()
message = Message()
# Generate Mail Headers
mms_from, mms_from_type = mms.headers.get('From',
'unknown/undef').split('/')
message.author = f"{mms_from}@{self.domain}"
mms_from, mms_from_type = mms.headers.get('To',
'unknown/undef').split('/')
message.to = f"{self.user}@{self.domain}"
if 'Subject' in mms.headers and mms.headers['Subject']:
message.subject = mms.headers['Subject']
else:
message.subject = f"MMS from {mms_from}"
if 'Date' in mms.headers and mms.headers['Date']:
message.date = mms.headers['Date']
# Recopy MMS HEADERS
for header in mms.headers:
message.headers.append((f"X-MMS-{header}",
f"{mms.headers[header]}"))
message.plain = " "
data_id = 1
for data_part in mms.data_parts:
datacontent = data_part.headers['Content-Type']
if datacontent is not None:
if 'text/plain' in datacontent[0]:
encoding = datacontent[1].get('Charset', 'utf-8')
plain = data_part.data.decode(encoding)
message.plain += plain + '\n'
continue
extension = mimetypes.guess_extension(datacontent[0])
filename = datacontent[1].get('Name', str(data_id))
message.attach(filename + extension, data_part.data)
data_id = data_id + 1
# Add MMS binary file, for debugging purpose or reparsing in the future
if self.attach_mms:
message.attach(path, None, None, None, False, path.split('/')[-1])
# Creating an empty file stating the mms as been converted
self.mark_mms_read(dbus_path)
if self.delete:
self.delete_mms(dbus_path)
# Write the mail
self.mailer.send(message)
self.mailer.stop()
class DbusWatcher():
"""Use DBus Signal notification to watch for new MMS."""
def __init__(self, mms2mail):
"""
Return a DBusWatcher instance.
:param mms2mail: An mms2mail instance to convert new mms
:type mms2mail: mms2mail()
"""
self.mms2mail = mms2mail
def run(self):
"""Run the watcher mainloop."""
bus = self.mms2mail.get_bus()
bus.add_signal_receiver(self.message_added,
bus_name="org.ofono.mms",
signal_name="MessageAdded",
member_keyword="member",
path_keyword="path",
interface_keyword="interface")
mainloop = GLib.MainLoop()
print("Starting DBus watcher mainloop")
try:
mainloop.run()
except KeyboardInterrupt:
print("Stopping DBus watcher mainloop")
mainloop.quit()
def message_added(self, name, value, member, path, interface):
"""Trigger conversion on MessageAdded signal."""
if value['Status'] == 'downloaded' or value['Status'] == 'received':
print(f"New incoming MMS found ({name.split('/')[-1]})")
self.mms2mail.convert(value['Attachments'][0][2], name)
else:
print(f"New outgoing MMS found ({name.split('/')[-1]})")
def main():
"""Run the different functions handling mms and mail."""
parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group()
mode.add_argument("-d", "--daemon",
help="Use dbus signal from mmsd to trigger conversion",
action='store_true', dest='watcher')
mode.add_argument("-f", "--file", nargs='+',
help="Parse specified mms files and quit", dest='files')
parser.add_argument('--delete', action='store_true', dest='delete',
help="Ask mmsd to delete the converted MMS")
parser.add_argument('--force-read', action='store_true',
dest='force_read', help="Force conversion even if MMS \
is marked as read")
args = parser.parse_args()
m = MMS2Mail(args.delete, args.force_read)
if args.files:
for mms_file in args.files:
m.convert(mms_file)
elif args.watcher:
print("Starting mms2mail in daemon mode")
w = DbusWatcher(m)
w.run()
else:
parser.print_help()
if __name__ == '__main__':
main()