#!/usr/bin/env python
# cardinal_pythonlib/bulk_email/models.py
"""
===============================================================================
Original code copyright (C) 2009-2022 Rudolf Cardinal (rudolf@pobox.com).
This file is part of cardinal_pythonlib.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
===============================================================================
SQLAlchemy models for the simple bulk e-mail tool.
"""
# =============================================================================
# Imports
# =============================================================================
import datetime
import logging
import sys
from typing import Any, Iterable, Tuple
from sqlalchemy.event import listens_for
from sqlalchemy.orm.attributes import instance_state
from sqlalchemy.orm.exc import NoResultFound
# noinspection PyProtectedMember
from sqlalchemy.orm.session import make_transient, Session
from sqlalchemy.sql.schema import Column, ForeignKey
from sqlalchemy.sql.sqltypes import (
Boolean,
DateTime,
Float,
Integer,
String,
Text,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.sql.expression import and_, exists
from cardinal_pythonlib.bulk_email.constants import (
CONTENT_TYPE_MAX_LENGTH,
ENCODING_NAME_MAX_LENGTH,
FERNET_KEY_BASE64_LENGTH,
HOSTNAME_MAX_LENGTH,
PASSWORD_MAX_LENGTH,
USERNAME_MAX_LENGTH,
)
from cardinal_pythonlib.colander_utils import EMAIL_ADDRESS_MAX_LEN
from cardinal_pythonlib.email.sendmail import (
ASCII,
CONTENT_TYPE_TEXT,
is_email_valid,
UTF8,
)
from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery
from cardinal_pythonlib.sysops import EXIT_FAILURE
try:
from cryptography.fernet import Fernet
except ImportError:
print(
"First, please install the 'cryptography' module with "
"'pip install cryptography'."
)
sys.exit(EXIT_FAILURE)
log = logging.getLogger(__name__)
# =============================================================================
# The master SQLAlchemy ORM class
# =============================================================================
Base = declarative_base()
[docs]def make_table_args(*args, **kwargs) -> Tuple[Any]:
"""
SQLAlchemy allows several formats for __table_args__. The most generic
is a tuple that is a sequence of e.g. constraints, with the last element
being a dictionary of keyword arguments.
"""
# Add standard ones:
kwargs["mysql_engine"] = "InnoDB"
kwargs["mysql_row_format"] = "DYNAMIC"
kwargs["mysql_charset"] = "utf8mb4 COLLATE utf8mb4_unicode_ci"
# Return the tuple:
return tuple(args + (kwargs,))
# =============================================================================
# Config
# =============================================================================
# noinspection PyUnusedLocal
@listens_for(Session, "before_flush")
def before_flush(session: Session, flush_context, instances) -> None:
# https://docs.sqlalchemy.org/en/14/orm/events.html#sqlalchemy.orm.SessionEvents.before_flush # noqa: E501
for instance in session.dirty:
if not isinstance(instance, Config):
continue
if not session.is_modified(instance):
continue
if not instance_state(instance).has_identity:
continue
# make it transient
instance.new_version(session)
# re-add
session.add(instance)
[docs]class Config(Base):
"""
Stores records of configuration information.
Maintains history (so the config for previous send jobs is visible), per:
- https://docs.sqlalchemy.org/en/14/_modules/examples/versioned_rows/versioned_rows.html
- https://docs.sqlalchemy.org/en/14/orm/examples.html#module-examples.versioned_rows
""" # noqa: E501
__tablename__ = "config"
__table_args__ = make_table_args(comment="Stores configuration records.")
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
config_id = Column(
Integer,
# ... may not be supported by all databases; see
# https://docs.sqlalchemy.org/en/14/core/constraints.html#check-constraint # noqa: E501
# ... though MySQL has recently added this:
# https://dev.mysql.com/doc/refman/8.0/en/create-table-check-constraints.html # noqa: E501
primary_key=True,
autoincrement=True,
comment="Primary key.",
)
database_creation_datetime = Column(
DateTime, comment="Date/time this database was created."
)
host = Column(
String(length=HOSTNAME_MAX_LENGTH),
nullable=False,
comment="Mail (SMTP) server hostname",
)
port = Column(
Integer, nullable=False, comment="Mail (SMTP) server TCP port"
)
use_tls = Column(Boolean, nullable=False, comment="Use TLS?")
username = Column(
String(length=USERNAME_MAX_LENGTH),
nullable=False,
comment="Mail (SMTP) server username",
)
password_encrypted = Column(
String(length=PASSWORD_MAX_LENGTH),
nullable=False,
comment="Mail (SMTP) server password (reversibly encrypted)",
)
encryption_key = Column(
String(length=FERNET_KEY_BASE64_LENGTH),
nullable=False,
comment="Encryption key for password",
)
time_between_emails = Column(
Float, nullable=False, comment="Time (in seconds) between each e-mail."
)
# -------------------------------------------------------------------------
# Creation and version control
# -------------------------------------------------------------------------
[docs] @classmethod
def get_current_config(cls, session: Session) -> "Config":
"""
Fetch the current config, or raise.
"""
cfg = (
session.query(cls)
.order_by(cls.config_id.desc())
.first() # returns object or None
)
if cfg is None:
error = (
"No configuration found. " "Run the 'configure' command first."
)
log.critical(error)
raise ValueError(error)
return cfg
[docs] @classmethod
def get_or_create_config(cls, session: Session) -> "Config":
"""
Fetch the current config, or make a new one.
"""
cfg = (
session.query(cls)
.order_by(cls.config_id.desc())
.first() # returns object or None
)
if cfg is None:
cfg = cls()
session.add(cfg)
return cfg
def __init__(self, *args, **kwargs) -> None:
"""
Default configuration.
"""
super().__init__(*args, **kwargs)
self.database_creation_datetime = datetime.datetime.now()
# noinspection PyUnusedLocal
[docs] def new_version(self, session: Session) -> None:
"""
Called by listener (see above) when object is being saved.
"""
# Make us transient (removes persistent identity).
make_transient(self)
# Set PK to None. A new PK will be generated on INSERT.
self.config_id = None
# -------------------------------------------------------------------------
# Obscuring passwords
# -------------------------------------------------------------------------
@property
def password(self) -> str:
"""
Returns the plaintext password.
"""
# Key
key_b64_bytes = self.encryption_key.encode(ASCII)
# Engine
f = Fernet(key_b64_bytes)
# Decryption
encrypted_password_b64_bytes = self.password_encrypted.encode(ASCII)
return f.decrypt(encrypted_password_b64_bytes).decode(UTF8)
@password.setter
def password(self, password: str) -> None:
"""
Store the password using reversible encryption.
"""
# Key
key_b64_bytes = Fernet.generate_key() # base64-encoded 32-byte key
self.encryption_key = key_b64_bytes.decode(ASCII) # str version
# Engine
f = Fernet(key_b64_bytes)
# Encryption
password_bytes = password.encode(UTF8)
encrypted_b64_bytes = f.encrypt(password_bytes) # base64-encoded
self.password_encrypted = encrypted_b64_bytes.decode(ASCII)
# =============================================================================
# Recipient
# =============================================================================
[docs]class Recipient(Base):
"""
Details of an e-mail recipient.
"""
__tablename__ = "recipient"
__table_args__ = make_table_args(comment="E-mail recipients.")
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
recipient_id = Column(
Integer, primary_key=True, autoincrement=True, comment="Primary key."
) # type: int
email = Column(
String(length=EMAIL_ADDRESS_MAX_LEN),
nullable=False,
unique=True,
comment="E-mail address.",
)
# -------------------------------------------------------------------------
# Creation
# -------------------------------------------------------------------------
[docs] @classmethod
def fetch_or_add(cls, email: str, session: Session) -> "Recipient":
"""
Fetch or create/add the recipient.
"""
try:
return session.query(cls).filter_by(email=email).one()
except NoResultFound:
newvar = cls(email=email)
session.add(newvar)
return newvar
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
if not is_email_valid(self.email):
raise ValueError(f"Invalid e-mail address: {self.email!r}")
# -------------------------------------------------------------------------
# Info
# -------------------------------------------------------------------------
@staticmethod
def n_recipients(session: Session) -> int:
query = CountStarSpecializedQuery([Recipient], session=session)
return query.count_star()
# =============================================================================
# Content
# =============================================================================
[docs]class Content(Base):
"""
Contents of an e-mail that will be, or has been, sent to one or many
recipients.
"""
__tablename__ = "content"
__table_args__ = make_table_args(
comment="Contents of an e-mail (without recipient details; these are "
"templates that may be sent to multiple recipients)."
)
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
content_id = Column(
Integer, primary_key=True, autoincrement=True, comment="Primary key."
) # type: int
from_addr = Column(
String(length=EMAIL_ADDRESS_MAX_LEN),
nullable=False,
comment="From (sender's) e-mail address.",
)
reply_to_addr = Column(
String(length=EMAIL_ADDRESS_MAX_LEN),
nullable=False,
comment="Reply-To (sender's) e-mail address.",
)
email_datetime = Column(
DateTime,
comment="E-mail creation date/time, or NULL for "
'"the time of sending".',
)
subject = Column(
Text,
# There is no limit to the subject length in RFC 2822; you can have
# multiple lines (each up to 998 characters).
comment="E-mail subject",
)
body = Column(Text, comment="E-mail body")
content_type = Column(
String(length=CONTENT_TYPE_MAX_LENGTH),
nullable=False,
default=CONTENT_TYPE_TEXT,
server_default=CONTENT_TYPE_TEXT,
comment=f"Character set (encoding). Default: {UTF8!r}.",
)
charset = Column(
String(length=ENCODING_NAME_MAX_LENGTH),
nullable=False,
default=UTF8,
server_default=UTF8,
comment=f"Character set (encoding). Default: {UTF8!r}.",
)
# -------------------------------------------------------------------------
# Info
# -------------------------------------------------------------------------
@staticmethod
def n_templates(session: Session) -> int:
query = CountStarSpecializedQuery([Content], session=session)
return query.count_star()
# =============================================================================
# Job
# =============================================================================
[docs]class Job(Base):
"""
A task to send one e-mail to one recipient.
"""
__tablename__ = "job"
__table_args__ = make_table_args(
comment="Attempts to send e-mails, successfully or otherwise."
)
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
job_id = Column(
Integer, primary_key=True, autoincrement=True, comment="Primary key."
) # type: int
recipient_id = Column(
Integer,
ForeignKey("recipient.recipient_id", ondelete="CASCADE"),
nullable=False,
comment="Which recipient? Foreign key to recipient.recipient_id",
)
content_id = Column(
Integer,
ForeignKey("content.content_id", ondelete="CASCADE"),
nullable=False,
comment="What content? Foreign key to content.content_id",
)
# -------------------------------------------------------------------------
# Relationships
# -------------------------------------------------------------------------
recipient = relationship("Recipient") # type: Recipient
content = relationship("Content") # type: Content
# -------------------------------------------------------------------------
# Jobs to be done
# -------------------------------------------------------------------------
@staticmethod
def _pending_job_query(session: Session) -> CountStarSpecializedQuery:
return CountStarSpecializedQuery([Job], session=session).filter(
~exists()
.select_from(SendAttempt)
.where(
and_(
SendAttempt.job_id == Job.job_id,
SendAttempt.success == True, # noqa: E712
)
)
)
@classmethod
def gen_jobs_to_be_done(cls, session: Session) -> Iterable["Job"]:
query = cls._pending_job_query(session)
for job in query.order_by(Job.job_id).all():
yield job
@staticmethod
def n_completed_jobs(session: Session) -> int:
# noinspection PyPep8
query = (
CountStarSpecializedQuery([Job], session=session)
.join(SendAttempt)
.filter(SendAttempt.success == True) # noqa: E712
)
return query.count_star()
@classmethod
def n_pending_jobs(cls, session: Session) -> int:
query = cls._pending_job_query(session)
return query.count_star()
@classmethod
def clear_pending_jobs(cls, session: Session) -> None:
query = cls._pending_job_query(session)
query.delete(synchronize_session=False)
# =============================================================================
# Attempt
# =============================================================================
[docs]class SendAttempt(Base):
"""
Records the attempt to send an e-mail. Success means that it was accepted
by our local server -- that doesn't imply final delivery (e.g. if the
address is wrong).
"""
__tablename__ = "send_attempt"
__table_args__ = make_table_args(
comment="Attempts to send e-mails, successfully or otherwise."
)
# -------------------------------------------------------------------------
# Columns
# -------------------------------------------------------------------------
send_attempt_id = Column(
Integer, primary_key=True, autoincrement=True, comment="Primary key."
) # type: int
job_id = Column(
Integer,
ForeignKey("job.job_id", ondelete="CASCADE"),
nullable=False,
comment="Which job is this for? Foreign key to job.job_id",
)
config_id = Column(
Integer,
ForeignKey("config.config_id", ondelete="CASCADE"),
nullable=False,
comment="Which config was used? Foreign key to config.config_id",
)
when_attempt = Column(
DateTime, nullable=False, comment="When was the attempt made?"
)
success = Column(
Boolean, nullable=False, comment="Was the sending attempt successful?"
)
details = Column(
Text,
comment="If unsuccessful, error details may be here. If successful, "
"success messages go here.",
)
# -------------------------------------------------------------------------
# Relationships
# -------------------------------------------------------------------------
job = relationship("Job") # type: Job
config = relationship("Config") # type: Config