[fix] PEP08

This commit is contained in:
Alex 2021-05-05 05:49:38 +02:00
parent df9d5acf76
commit b8578a65b2

203
mms2mail
View file

@ -1,18 +1,27 @@
#!/usr/bin/python3 #!/usr/bin/python3
""" """An mms to mail converter for mmsd."""
An mms to mail converter for mmsd # upstream bug dirty fix
""" # https://github.com/marrow/mailer/issues/87#issuecomment-689586587
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-689586587
import sys import sys
if sys.version_info[0] == 3 and sys.version_info[1] > 7: if sys.version_info[0] == 3 and sys.version_info[1] > 7:
sys.modules["cgi.parse_qsl"] = None sys.modules["cgi.parse_qsl"] = None
#upstream bug dirty fix https://github.com/marrow/mailer/issues/87#issuecomment-713319548 # upstream bug dirty fix
# https://github.com/marrow/mailer/issues/87#issuecomment-713319548
import base64 import base64
if sys.version_info[0] == 3 and sys.version_info[1] > 8: if sys.version_info[0] == 3 and sys.version_info[1] > 8:
def encodestring(value): def encodestring(value):
"""
Encode string in base64.
:param value: the string to encode.
:type value: str
:rtype str
:return: the base64 encoded string
"""
return base64.b64encode(value) return base64.b64encode(value)
base64.encodestring = encodestring base64.encodestring = encodestring
#end bugfix # end bugfix
import argparse import argparse
import configparser import configparser
@ -25,106 +34,111 @@ from pathlib import Path
from messaging.mms.message import MMSMessage from messaging.mms.message import MMSMessage
from marrow.mailer import Mailer, Message from marrow.mailer import Mailer, Message
from gi.repository import GObject, GLib from gi.repository import GLib
import dbus import dbus
import dbus.mainloop.glib import dbus.mainloop.glib
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler from watchdog.events import FileSystemEventHandler
class MMS2Mail: class MMS2Mail:
""" """
The class handling the conversion between MMS and mail format The class handling the conversion between MMS and mail format.
MMS support is provided by python-messaging MMS support is provided by python-messaging
Mail support is provided by marrow.mailer Mail support is provided by marrow.mailer
""" """
def __init__(self): def __init__(self):
""" """Return class instance."""
Constructor loading configuration
"""
self.config = configparser.ConfigParser() self.config = configparser.ConfigParser()
self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini") self.config.read(f"{Path.home()}/.mms/modemmanager/mms2mail.ini")
self.mailer = Mailer({'manager.use': 'immediate', 'transport.use': 'mbox', 'transport.file': self.config.get('mail','mailbox', fallback=f"/var/mail/{getpass.getuser()}")}) self.domain = self.config.get('mail', 'domain',
fallback=socket.getfqdn())
self.user = self.config.get('mail', 'user', fallback=getpass.getuser())
mbox_file = self.config.get('mail', 'mailbox',
fallback=f"/var/mail/{self.user}")
self.mailer = Mailer({'manager.use': 'immediate',
'transport.use': 'mbox',
'transport.file': mbox_file})
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.bus = dbus.SessionBus() self.bus = dbus.SessionBus()
def get_bus(self): def get_bus(self):
""" """
Return the DBus SessionBus Return the DBus SessionBus.
:rtype dbus.SessionBus() :rtype dbus.SessionBus()
:return: an active SessionBus :return: an active SessionBus
""" """
return self.bus return self.bus
def mark_mms_read(self,dbus_path): def mark_mms_read(self, dbus_path):
""" """
Ask MMSd to mark the mms as read Ask MMSd to mark the mms as read.
:param dbus_path: the mms dbus path :param dbus_path: the mms dbus path
:type dbus_path: str :type dbus_path: str
""" """
message = dbus.Interface(self.bus.get_object('org.ofono.mms', dbus_path), message = dbus.Interface(self.bus.get_object('org.ofono.mms',
'org.ofono.mms.Message') dbus_path),
'org.ofono.mms.Message')
message.MarkRead() message.MarkRead()
def convert(self, path, dbus_path = None): def convert(self, path, dbus_path=None):
"""
Convert a provided mms file to a mail stored in a mbox
:param path: the mms filesystem path
:type path: str
:param dbus_path: the mms dbus path
:type dbus_path: str
""" """
Convert a provided mms file to a mail stored in a mbox.
:param path: the mms filesystem path
:type path: str
:param dbus_path: the mms dbus path
:type dbus_path: str
"""
# Check if the provided file present # Check if the provided file present
if not Path(f"{path}").is_file(): if not Path(f"{path}").is_file():
print(f"MMS file not found : aborting", file=sys.stderr) print("MMS file not found : aborting", file=sys.stderr)
return return
# Generate its dbus path, for future operation (mark as read, delete) # Generate its dbus path, for future operation (mark as read, delete)
if not dbus_path: if not dbus_path:
dbus_path=f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}" dbus_path = f"/org/ofono/mms/modemmanager/{path.split('/')[-1]}"
if Path(f"{path}.mail").is_file() and args.watcher: if Path(f"{path}.mail").is_file() and args.watcher:
print(f"Already converted MMS : doing nothing ({path})", file=sys.stderr) print(f"Already converted MMS : doing nothing ({path})",
file=sys.stderr)
return return
# Check for mmsd status file # Check for mmsd status file
status = configparser.ConfigParser() status = configparser.ConfigParser()
if not Path(f"{path}.status").is_file(): if not Path(f"{path}.status").is_file():
print(f"MMS status file not found : aborting", file=sys.stderr) print("MMS status file not found : aborting", file=sys.stderr)
return return
status.read_file(open(f"{path}.status")) status.read_file(open(f"{path}.status"))
# Allow only incoming MMS for the time beeing # Allow only incoming MMS for the time beeing
if status['info']['state'] == 'downloaded' or status['info']['state'] == 'received': if (status['info']['state'] == 'downloaded' or
print(f"New incoming MMS : converting to Mail ({path})", file=sys.stderr) status['info']['state'] == 'received'):
print(f"New incoming MMS : converting to Mail ({path})",
file=sys.stderr)
else: else:
print(f"New outgoing MMS : doing nothing ({path})", file=sys.stderr) print(f"New outgoing MMS : doing nothing ({path})",
file=sys.stderr)
return return
mms = MMSMessage.from_file(path) mms = MMSMessage.from_file(path)
self.mailer.start() self.mailer.start()
message = Message() message = Message()
# Generate Mail Headers # Generate Mail Headers
if 'From' in mms.headers and mms.headers['From']: mms_from, mms_from_type = mms.headers.get('From',
mms_from, mms_from_type = mms.headers['From'].split('/') 'unknown/undef').split('/')
else: message.author = f"{mms_from}@{self.domain}"
mms_from = "unknown" mms_from, mms_from_type = mms.headers.get('To',
mms_from_type = "undef" 'unknown/undef').split('/')
message.author = f"{mms_from}@{self.config.get('mail','domain', fallback=socket.getfqdn())}" message.to = f"{self.user}@{self.domain}"
if 'To' in mms.headers and mms.headers['To']:
mms_to, mms_to_type = mms.headers['To'].split('/')
else:
mms_to = "unknown"
mms_to_type = "undef"
message.to = f"{self.config.get('mail','user', fallback=getpass.getuser())}@{self.config.get('mail','domain', fallback=socket.getfqdn())}"
if 'Subject' in mms.headers and mms.headers['Subject']: if 'Subject' in mms.headers and mms.headers['Subject']:
message.subject = mms.headers['Subject'] message.subject = mms.headers['Subject']
else: else:
message.subject = f"MMS from {mms_from}" message.subject = f"MMS from {mms_from}"
if 'Date' in mms.headers and mms.headers['Date']: if 'Date' in mms.headers and mms.headers['Date']:
@ -132,50 +146,54 @@ class MMS2Mail:
# Recopy MMS HEADERS # Recopy MMS HEADERS
for header in mms.headers: for header in mms.headers:
message.headers.append((f"X-MMS-{header}", f"{mms.headers[header]}")) message.headers.append((f"X-MMS-{header}",
f"{mms.headers[header]}"))
for data_part in mms.data_parts: for data_part in mms.data_parts:
datacontent=data_part.headers['Content-Type'] datacontent = data_part.headers['Content-Type']
if datacontent is not None: if datacontent is not None:
if 'text/plain' in datacontent[0]: if 'text/plain' in datacontent[0]:
message.plain = f"{data_part.data} \n" message.plain = f"{data_part.data} \n"
if 'Name' in datacontent[1]: if 'Name' in datacontent[1]:
filename = datacontent[1]['Name'] filename = datacontent[1]['Name']
message.attach(filename,data_part.data) message.attach(filename, data_part.data)
# Ensure a proper body content in the resulting mail # Ensure a proper body content in the resulting mail
if not message.plain: if not message.plain:
message.plain = " " message.plain = " "
# Add MMS binary file, for debugging purpose or reparsing in the future # Add MMS binary file, for debugging purpose or reparsing in the future
if self.config.getboolean('mail','attach_mms', fallback=False): if self.config.getboolean('mail', 'attach_mms', fallback=False):
message.attach(path, None, None, None, False, "mms.bin") message.attach(path, None, None, None, False, "mms.bin")
# Creating an empty file stating the mms as been converted # Creating an empty file stating the mms as been converted
Path(f"{path}.mail").touch() Path(f"{path}.mail").touch()
# Write the mail # Write the mail
self.mailer.send(message) self.mailer.send(message)
self.mailer.stop() self.mailer.stop()
class FSWatcher: class FSWatcher:
""" """
Use OS filesystem notification to watch for new MMS (DEPRECATED) Use OS filesystem notification to watch for new MMS (DEPRECATED).
Events are send to the FSHandler class Events are send to the FSHandler class
""" """
# Path to modemmanager storage # Path to modemmanager storage
mms_folder = f"{Path.home()}/.mms/modemmanager" mms_folder = f"{Path.home()}/.mms/modemmanager"
def __init__(self): def __init__(self):
"""Construct an instance."""
self.observer = Observer() self.observer = Observer()
self.patternold = re.compile('[0-9A-F]{40}$') self.patternold = re.compile('[0-9A-F]{40}$')
self.pattern = re.compile('[0-9a-f]{36}$') self.pattern = re.compile('[0-9a-f]{36}$')
def is_mmsd_mms_file(self,path): def is_mmsd_mms_file(self, path):
""" """
Test if the provided file seems to be a mms file created by mmsd Test if the provided file seems to be a mms file created by mmsd.
:param path: the mms filesystem path :param path: the mms filesystem path
:type path: str :type path: str
:rtype boolean :rtype boolean
:return: the test result :return: the test result
@ -186,31 +204,24 @@ class FSWatcher:
return False return False
def run(self): def run(self):
""" """Run the watcher mainloop."""
Run the watcher mainloop
"""
event_handler = FSHandler() event_handler = FSHandler()
self.observer.schedule(event_handler, self.mms_folder, recursive=False) self.observer.schedule(event_handler, self.mms_folder, recursive=False)
self.observer.start() self.observer.start()
try: try:
while True: while True:
time.sleep(5) time.sleep(5)
except: finally:
self.observer.stop() self.observer.stop()
print("Exiting") self.observer.join()
self.observer.join()
class FSHandler(FileSystemEventHandler): class FSHandler(FileSystemEventHandler):
""" """Handle the FSWatcher event."""
Handle the FSWatcher event
"""
@staticmethod @staticmethod
def on_any_event(event): def on_any_event(event):
""" """Trigger conversion on event by the FSWatcher."""
Function triggered on event by the FSWatcher
"""
if event.is_directory: if event.is_directory:
return None return None
elif event.event_type == 'created' or event.event_type == 'modified': elif event.event_type == 'created' or event.event_type == 'modified':
@ -222,55 +233,55 @@ class FSHandler(FileSystemEventHandler):
print(f"New MMS found : {event.dest_path}.", file=sys.stderr) print(f"New MMS found : {event.dest_path}.", file=sys.stderr)
m.convert(event.dest_path) m.convert(event.dest_path)
class DbusWatcher(): class DbusWatcher():
""" """Use DBus Signal notification to watch for new MMS."""
Use DBus Signal notification to watch for new MMS
""" def run(self):
def run(self): """Run the watcher mainloop."""
"""
Run the watcher mainloop
"""
bus = m.get_bus() bus = m.get_bus()
bus.add_signal_receiver(self.message_added, bus.add_signal_receiver(self.message_added,
bus_name="org.ofono.mms", bus_name="org.ofono.mms",
signal_name = "MessageAdded", signal_name="MessageAdded",
member_keyword="member", member_keyword="member",
path_keyword="path", path_keyword="path",
interface_keyword="interface") interface_keyword="interface")
mainloop = GLib.MainLoop() mainloop = GLib.MainLoop()
mainloop.run() mainloop.run()
def message_added(self,name, value, member, path, interface): def message_added(self, name, value, member, path, interface):
""" """Trigger conversion on MessageAdded signal."""
Function triggered on MessageAdded signal
"""
if value['Status'] == 'downloaded' or value['Status'] == 'received': if value['Status'] == 'downloaded' or value['Status'] == 'received':
print(f"New incoming MMS found ({name.split('/')[-1]})") print(f"New incoming MMS found ({name.split('/')[-1]})")
m.convert(value['Attachments'][0][2],name) m.convert(value['Attachments'][0][2], name)
else: else:
print(f"New outgoing MMS found ({name.split('/')[-1]})") print(f"New outgoing MMS found ({name.split('/')[-1]})")
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
mode = parser.add_mutually_exclusive_group() mode = parser.add_mutually_exclusive_group()
mode.add_argument("-d","--daemon", help="Use dbus signal from mmsd by default but can also watch mmsd storage folder (useful for mmsd < 1.0)", nargs="?", default="dbus", choices=['dbus','filesystem'], dest='watcher') mode.add_argument("-d", "--daemon",
mode.add_argument("-f","--file", nargs='+', help="parse specified mms files", dest='files') help="Use dbus signal from mmsd by default but can also \
watch mmsd storage folder (useful for mmsd < 1.0)",
nargs="?", default="dbus",
choices=['dbus', 'filesystem'], dest='watcher')
mode.add_argument("-f", "--file", nargs='+',
help="parse specified mms files", dest='files')
args = parser.parse_args() args = parser.parse_args()
m = MMS2Mail() m = MMS2Mail()
if args.files: if args.files:
for mms_file in args.files: for mms_file in args.files:
m.convert(mms_file) m.convert(mms_file)
elif args.watcher == 'dbus': elif args.watcher == 'dbus':
print("Starting mms2mail in daemon mode with dbus watcher") print("Starting mms2mail in daemon mode with dbus watcher")
w = DbusWatcher() w = DbusWatcher()
w.run() w.run()
elif args.watcher == 'filesystem': elif args.watcher == 'filesystem':
print("Starting mms2mail in daemon mode with filesystem watcher") print("Starting mms2mail in daemon mode with filesystem watcher")
w = FSWatcher() w = FSWatcher()
w.run() w.run()
else: else:
parser.print_help() parser.print_help()
del m