OpenSMTPD queue Support

This commit is contained in:
yo 2020-12-02 20:45:35 +01:00
parent 243c04c51b
commit f0949ce60c
3 changed files with 172 additions and 51 deletions

View File

@ -1,8 +1,9 @@
# coding: utf-8 # coding: utf-8
# #
# Postfix queue control python tool (pymailq) # Postfix/OpenSMTPD queue control python tool (pymailq)
# #
# Copyright (C) 2014 Denis Pompilio (jawa) <denis.pompilio@gmail.com> # Copyright (C) 2014 Denis Pompilio (jawa) <denis.pompilio@gmail.com>
# 2020 Yo <johan@nosd.in> - OpenSMTPD compatibility
# #
# This file is part of pymailq # This file is part of pymailq
# #
@ -33,11 +34,12 @@ except ImportError:
DEBUG = False DEBUG = False
#: Current version of the package as :class:`str`. #: Current version of the package as :class:`str`.
VERSION = "0.9.0" VERSION = "0.10.0"
#: Module configuration as :class:`dict`. #: Module configuration as :class:`dict`.
CONFIG = { CONFIG = {
"core": { "core": {
"smtpd_type": "postfix",
"postfix_spool": "/var/spool/postfix" "postfix_spool": "/var/spool/postfix"
}, },
"commands": { "commands": {
@ -105,6 +107,8 @@ def load_config(cfg_file):
if "core" in cfg.sections(): if "core" in cfg.sections():
if cfg.has_option("core", "postfix_spool"): if cfg.has_option("core", "postfix_spool"):
CONFIG["core"]["postfix_spool"] = cfg.get("core", "postfix_spool") CONFIG["core"]["postfix_spool"] = cfg.get("core", "postfix_spool")
if cfg.has_option("core", "smtpd_type"):
CONFIG["core"]["smtpd_type"] = cfg.get("core", "smtpd_type")
if "commands" in cfg.sections(): if "commands" in cfg.sections():
for key in cfg.options("commands"): for key in cfg.options("commands"):

View File

@ -26,7 +26,7 @@ from pymailq import CONFIG, debug
class QueueControl(object): class QueueControl(object):
""" """
Postfix queue control using postsuper command. Postfix/openSMTPD queue control using postsuper/smtpctl command.
The :class:`~control.QueueControl` instance defines the following The :class:`~control.QueueControl` instance defines the following
attributes: attributes:
@ -109,31 +109,55 @@ class QueueControl(object):
# It should not be possible to inject commands, but who knows... # It should not be possible to inject commands, but who knows...
# https://www.kevinlondon.com/2015/07/26/dangerous-python-functions.html # https://www.kevinlondon.com/2015/07/26/dangerous-python-functions.html
# And consider the use of sh module: https://amoffat.github.io/sh/ # And consider the use of sh module: https://amoffat.github.io/sh/
operation_cmd = self.get_operation_cmd(operation) + ['-'] if 'postfix' in CONFIG['core']['smtpd_type']:
try: operation_cmd = self.get_operation_cmd(operation) + ['-']
child = subprocess.Popen(operation_cmd, try:
stdin=subprocess.PIPE, child = subprocess.Popen(operation_cmd,
stderr=subprocess.PIPE) stdin=subprocess.PIPE,
except EnvironmentError as exc: stderr=subprocess.PIPE)
command_str = " ".join(operation_cmd) except EnvironmentError as exc:
error_msg = "Unable to call '%s': %s" % (command_str, str(exc)) command_str = " ".join(operation_cmd)
raise RuntimeError(error_msg) error_msg = "Unable to call '%s': %s" % (command_str, str(exc))
raise RuntimeError(error_msg)
# If permissions error, the postsuper process takes ~1s to teardown. # If permissions error, the postsuper process takes ~1s to teardown.
# Wait this delay and raise error message if process has stopped. # Wait this delay and raise error message if process has stopped.
time.sleep(1.1) time.sleep(1.1)
child.poll() child.poll()
if child.returncode: if child.returncode:
raise RuntimeError(child.communicate()[1].strip().decode()) raise RuntimeError(child.communicate()[1].strip().decode())
try: try:
for msg in messages:
child.stdin.write((msg.qid+'\n').encode())
stderr = child.communicate()[1].strip()
except BrokenPipeError:
raise RuntimeError("Unexpected error: child process has crashed")
return [line.strip() for line in stderr.decode().split('\n')]
# smtpctl do not use stdin, have to call the binary for each message :-(
else:
result = []
for msg in messages: for msg in messages:
child.stdin.write((msg.qid+'\n').encode()) operation_cmd = self.get_operation_cmd(operation) + [msg.qid]
stderr = child.communicate()[1].strip() try:
except BrokenPipeError: child = subprocess.Popen(operation_cmd,
raise RuntimeError("Unexpected error: child process has crashed") stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except EnvironmentError as exc:
command_str = " ".join(operation_cmd)
error_msg = "Unable to call '%s': %s" % (command_str, str(exc))
raise RuntimeError(error_msg)
stdout, stderr = child.communicate()
if not len(stdout):
return [line.strip() for line in stderr.decode().split('\n')]
result.append(stdout.decode())
return result
return [line.strip() for line in stderr.decode().split('\n')]
def delete_messages(self, messages): def delete_messages(self, messages):
""" """

145
store.py
View File

@ -3,6 +3,7 @@
# Postfix queue control python tool (pymailq) # Postfix queue control python tool (pymailq)
# #
# Copyright (C) 2014 Denis Pompilio (jawa) <denis.pompilio@gmail.com> # Copyright (C) 2014 Denis Pompilio (jawa) <denis.pompilio@gmail.com>
# 2020 Yo <johan@nosd.in> - OpenSMTPD compatibility
# #
# This file is part of pymailq # This file is part of pymailq
# #
@ -150,17 +151,21 @@ class Mail(object):
:ref:`pymailq-configuration` :ref:`pymailq-configuration`
""" """
def __init__(self, mail_id, size=0, date=None, sender=""): def __init__(self, mail_id, size=0, date=None, exp_date=None, last_delivery=None,
nr_delivery=None, status=None, sender="", recipients=[], errors=[]):
"""Init method""" """Init method"""
self.parsed = False self.parsed = False
self.parse_error = "" self.parse_error = ""
self.qid = mail_id self.qid = mail_id
self.date = date self.date = date
self.status = "" self.exp_date = exp_date
self.last_delivery = last_delivery
self.nr_delivery = nr_delivery
self.status = status
self.size = int(size) self.size = int(size)
self.sender = sender self.sender = sender
self.recipients = [] self.recipients = recipients
self.errors = [] self.errors = errors
self.head = MailHeaders() self.head = MailHeaders()
# Getting optionnal status from postqueue mail_id # Getting optionnal status from postqueue mail_id
@ -169,6 +174,9 @@ class Mail(object):
self.qid = mail_id[:-1] self.qid = mail_id[:-1]
self.status = postqueue_status.get(mail_id[-1], "deferred") self.status = postqueue_status.get(mail_id[-1], "deferred")
# We want to keep compatibility, so "postfix" is considered default
if not 'smtpd_type' in CONFIG['core']: CONFIG['core']['smtpd_type'] = 'postfix'
@property @property
def postcat_cmd(self): def postcat_cmd(self):
""" """
@ -210,7 +218,7 @@ class Mail(object):
""" """
Parse message content. Parse message content.
This method use Postfix mails content parsing command defined in This method use Postfix/OpenSMTPD mails content parsing command defined in
:attr:`~Mail.postcat_cmd` attribute. :attr:`~Mail.postcat_cmd` attribute.
This command is runned using :class:`subprocess.Popen` instance. This command is runned using :class:`subprocess.Popen` instance.
@ -222,8 +230,12 @@ class Mail(object):
Postfix manual: Postfix manual:
`postcat`_ -- Show Postfix queue file contents `postcat`_ -- Show Postfix queue file contents
OpenSMTPD manual:
`smtpctl`_ -- Show OpenSMTPD queue file contents
""" """
if 'opensmtpd' in CONFIG['core']['smtpd_type']:
is_postfix = False
# Reset parsing error message # Reset parsing error message
self.parse_error = "" self.parse_error = ""
@ -233,24 +245,29 @@ class Mail(object):
stdout, stderr = child.communicate() stdout, stderr = child.communicate()
if not len(stdout): if not len(stdout):
# Ignore first 3 line on stderr which are: if 'opensmtpd' in CONFIG['core']['smtpd_type']:
# postcat: name_mask: all self.parse_error = "\n".join(stderr.decode().split('\n'))
# postcat: inet_addr_local: configured 3 IPv4 addresses else:
# postcat: inet_addr_local: configured 3 IPv6 addresses # Ignore first 3 line on stderr which are:
self.parse_error = "\n".join(stderr.decode().split('\n')[3:]) # postcat: name_mask: all
# postcat: inet_addr_local: configured 3 IPv4 addresses
# postcat: inet_addr_local: configured 3 IPv6 addresses
self.parse_error = "\n".join(stderr.decode().split('\n')[3:])
return return
raw_content = "" raw_content = ""
for line in stdout.decode('utf-8', errors='replace').split('\n'): for line in stdout.decode('utf-8', errors='replace').split('\n'):
if self.size == 0 and line.startswith("message_size: "): if is_postfix and self.size == 0 and line.startswith("message_size: "):
self.size = int(line[14:].strip().split()[0]) self.size = int(line[14:].strip().split()[0])
elif self.date is None and line.startswith("create_time: "): elif is_postfix and self.date is None and line.startswith("create_time: "):
self.date = datetime.strptime(line[13:].strip(), self.date = datetime.strptime(line[13:].strip(),
"%a %b %d %H:%M:%S %Y") "%a %b %d %H:%M:%S %Y")
elif not len(self.sender) and line.startswith("sender: "): elif is_postfix and not len(self.sender) and line.startswith("sender: "):
self.sender = line[8:].strip() self.sender = line[8:].strip()
elif line.startswith("regular_text: "): elif is_postfix and line.startswith("regular_text: "):
raw_content += "%s\n" % (line[14:],) raw_content += "%s\n" % (line[14:],)
elif is_postfix == False:
raw_content += "%s\n" % (line,)
# For python2.7 compatibility, encode unicode to str # For python2.7 compatibility, encode unicode to str
if not isinstance(raw_content, str): if not isinstance(raw_content, str):
@ -378,7 +395,9 @@ class PostqueueStore(object):
postqueue_cmd = None postqueue_cmd = None
spool_path = None spool_path = None
postqueue_mailstatus = ['active', 'deferred', 'hold'] postqueue_mailstatus = ['active', 'deferred', 'hold']
mail_id_re = re.compile(r"^([A-F0-9]{8,12}|[B-Zb-z0-9]{11,16})[*!]?$") # Just a little change to match OpenSMTPD envelope id
#mail_id_re = re.compile(r"^([A-F0-9]{8,12}|[B-Zb-z0-9]{11,16})[*!]?$")
mail_id_re = re.compile(r"^([A-F0-9]{8,12}|[B-Za-z0-9]{11,16})[*!]?$")
mail_addr_re = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]+$") mail_addr_re = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]+$")
MailClass = Mail MailClass = Mail
@ -392,6 +411,10 @@ class PostqueueStore(object):
self.loaded_at = None self.loaded_at = None
self.mails = [] self.mails = []
# We want to keep compatibility, so "postfix" is considered default
if not 'smtpd_type' in CONFIG['core']: CONFIG['core']['smtpd_type'] = 'postfix'
@property @property
@debug @debug
def known_headers(self): def known_headers(self):
@ -430,8 +453,11 @@ class PostqueueStore(object):
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
stdout = child.communicate()[0] stdout = child.communicate()[0]
# return lines list without the headers and footers if 'postfix' in CONFIG['core']['smtpd_type']:
return [line.strip() for line in stdout.decode().split('\n')][1:-2] # return lines list without the headers and footers
return [line.strip() for line in stdout.decode().split('\n')][1:-2]
else:
return [line.strip() for line in stdout.decode().split('\n')]
def _is_mail_id(self, mail_id): def _is_mail_id(self, mail_id):
""" """
@ -451,7 +477,7 @@ class PostqueueStore(object):
return True return True
@debug @debug
def _load_from_postqueue(self, filename=None, parse=False): def _load_from_postfix_postqueue(self, filename=None, parse=False):
""" """
Load content from postfix queue using postqueue command output. Load content from postfix queue using postqueue command output.
@ -547,6 +573,70 @@ class PostqueueStore(object):
#print("parsing mails") #print("parsing mails")
[mail.parse() for mail in self.mails] [mail.parse() for mail in self.mails]
@debug
def _load_from_opensmtpd_postqueue(self, filename=None, parse=False):
"""
Load content from opensmtpd queue using "smtpctl show queue" command output.
Output lines from :attr:`~store.PostqueueStore._get_postqueue_output`
are parsed to build :class:`~store.Mail` objects. Sample Postfix queue
control tool (`postqueue`_) output::
fab15d0efd0b9489|inet4|mta||ToddCookolnuo@psi.br|john.galt@post.lu|john.galt@post.lu|1606747552|1607093152|0|16|pending|8464|450 4.1.8 <ToddCookolnuo@psi.br>: Sender address rejected: Domain not found
e9eb7c58be75d090|inet4|mta||fyxt@snowcopolo.com|marco.polo@vuvuzela.br|marco.polo@vuvuzela.br|1606645251|1606990851|0|23|pending|15363|450 4.1.8 <fyxt@snowcopolo.com>: Sender address rejected: Domain not found
Parsing rules are pretty simple
Optionnal argument ``filename`` can be set with a file containing
output of the `postqueue`_ command. In this case, output lines of
`postqueue`_ command are directly read from ``filename`` and parsed,
the `postqueue`_ command is never used.
Optionnal argument ``parse`` controls whether mails are parsed or not.
This is useful to load every known mail headers for later filtering.
:param str filename: File to load mails from
:param bool parse: Controls whether loaded mails are parsed or not.
"""
if filename is None:
postqueue_output = self._get_postqueue_output()
else:
postqueue_output = open(filename).readlines()
mail = None
for line in postqueue_output:
line = line.strip()
# Headers and footers start with dash (-)
#if line.startswith('-'):
# continue
# Mails are blank line separated
#if not len(line):
# continue
fields = line.split('|')
#if "(" == fields[0][0]:
# Store error message without parenthesis: [1:-1]
# gathered errors must be associated with specific recipients
# TODO: change recipients or errors structures to link these
# objects together.
if self._is_mail_id(fields[0]):
date = datetime.fromtimestamp(int(fields[7]))
exp_date = datetime.fromtimestamp(int(fields[8]))
lst_date = datetime.fromtimestamp(int(fields[9]))
nr_delivery = fields[10]
mail = self.MailClass(fields[0], sender=fields[4], recipients=[fields[5]],
date=date, exp_date=exp_date, last_delivery=lst_date,
nr_delivery=nr_delivery, status=fields[11],
errors=[fields[13]])
self.mails.append(mail)
if parse:
#print("parsing mails")
[mail.parse() for mail in self.mails]
@debug @debug
def _load_from_spool(self, parse=True): def _load_from_spool(self, parse=True):
""" """
@ -587,13 +677,13 @@ class PostqueueStore(object):
@debug @debug
def load(self, method="postqueue", filename=None, parse=False): def load(self, method="postqueue", filename=None, parse=False):
""" """
Load content from postfix mails queue. Load content from postfix/OpenSMTPD mails queue.
Mails are loaded using postqueue command line tool or reading directly Mails are loaded using postqueue/smtpctl command line tool or reading
from spool. The optionnal argument, if present, is a method string and directly from spool. The optionnal argument, if present, is a
specifies the method used to gather mails informations. By default, method string and specifies the method used to gather mails informations.
method is set to ``"postqueue"`` and the standard Postfix queue By default, method is set to ``"postqueue"`` and the standard queue
control tool: `postqueue`_ is used. control tool: `postqueue`_ or `smtpctl show queue`is used.
Optionnal argument ``parse`` controls whether mails are parsed or not. Optionnal argument ``parse`` controls whether mails are parsed or not.
This is useful to load every known mail headers for later filtering. This is useful to load every known mail headers for later filtering.
@ -610,10 +700,13 @@ class PostqueueStore(object):
gc.collect() gc.collect()
self.mails = [] self.mails = []
smtpd_type = CONFIG['core']['smtpd_type']
if filename is None: if filename is None:
getattr(self, "_load_from_{0}".format(method))(parse=parse) getattr(self, "_load_from_{0}_{1}".format(smtpd_type, method))(parse=parse)
else: else:
getattr(self, "_load_from_{0}".format(method))(filename, parse) getattr(self, "_load_from_{0}_{1}".format(smtpd_type, method))(filename, parse)
self.loaded_at = datetime.now() self.loaded_at = datetime.now()
@debug @debug