import re import enum import datetime import uuid from email.utils import parsedate_to_datetime from email.header import decode_header, Header from nntp.tiny.db import DatabaseTable def decode(text: str): decoded = decode_header(text)[0] if decoded[0] == b'': return '' if decoded[1] is None: return text try: return str(decoded[0], decoded[1]) except: return text def each_line(text: str): start = 0 end = len(text) while True: index = text.find('\n', start, end) if index < 0: yield text[start:end] break yield text[start:index+1] start = index + 1 if start == end: break def parse_timestamp(timestamp: str): if timestamp is None or timestamp == '': return datetime.datetime.fromtimestamp(0) return parsedate_to_datetime(timestamp) class MessagePart(enum.Enum): HEAD = 1 BODY = enum.auto() WHOLE = enum.auto() class MessageRange(): __slots__ = 'id', 'min', 'max', RE_NUM = re.compile(r'^(\d+)$') RE_RANGE = re.compile(r'^(\d+)-(\d+)$') RE_RANGE_LOWER = re.compile(r'^(\d+$)-$') RE_RANGE_UPPER = re.compile(r'^-(\d+$)$') def __init__(self): self.id: int = None self.min: int = None self.max: int = None def __str__(self): if self.id is not None: return str(self.id) if self.min is not None and self.max is None: return "%d-" % (self.min) elif self.min is not None and self.max is not None: return "%d-%d" % (self.min, self.max) elif self.min is None and self.max is not None: return "-%d" % (self.max) return "?" def where(self, table=None): prefix = '' if table is None else f"{table}." column = prefix + 'id' if self.id is not None: return "%s = %d" % (column, self.id) if self.min is not None and self.max is None: return "%s >= %d" % (column, self.min) elif self.min is not None and self.max is not None: return "%s >= %d and %s <= %d" % (column, self.min, column, self.max) elif self.min is None and self.max is not None: return "%s <= %d" % (column, self.max) @staticmethod def parse(r: str): match = __class__.RE_NUM.match(r) if match: obj = __class__() obj.id = int(match[1]) return obj match = __class__.RE_RANGE.match(r) if match: obj = __class__() obj.min = int(match[1]) obj.max = int(match[2]) return obj match = __class__.RE_RANGE_LOWER.match(r) if match: obj = __class__() obj.min = int(match[1]) return obj match = __class__.RE_RANGE_UPPER.match(r) if match: obj = __class__() obj.max = int(match[1]) return obj class MessageState(enum.Enum): EMPTY = 0 HEADER = 1 BODY = 2 class Message(DatabaseTable): __slots__ = ( '_cache', '_headers', '_headers_lc', '_body', '_key', 'id', 'state', 'line', 'content', ) name = 'message' key = 'id' columns = ( 'id', 'created_on', 'message_id', 'reference_ids', 'sender', 'subject', 'content' ) RE_HEADER = re.compile(r'^([A-Za-z0-9\-]+): (.*)$') def __init__(self): self._cache = dict() self._headers = None self._headers_lc = None self._body = None self._key = None self.id = None self.state = MessageState.EMPTY self.line = None self.content = '' @staticmethod def __from_row__(row): message = Message() # # Defer parsing the message content until a specific header not already # assigned to a dedcicated property, or the message body, is required. # message.content = row['content'] message.id = row['id'] message.created_on = row['created_on'] message.message_id = row['message_id'] message.reference_ids = row['reference_ids'] message.sender = row['sender'] message.subject = row['subject'] return message def __values__(self) -> tuple: return ( self.created_on, self.message_id, self.reference_ids, self.sender, self.subject, self.content ) @property def headers(self): if self._headers is None: self.read(self.content) return self._headers @property def body(self): if self._body is None: self.read(self.content) return self._body def _header_set(self, key: str, value: str): self._headers[key] = value self._headers_lc[key.lower()] = value def _header_append(self, key: str, value: str): if key not in self._headers: self._headers[key] = value self._headers_lc[key.lower()] = value else: self._headers[key] += value self._headers_lc[key.lower()] += value def header(self, key: str, default=None): if self._headers is None: self.read(self.content) value = self._headers_lc.get(key.lower(), default) if value is not None: return decode(value) @property def created_on(self): value = self._cache.get('created_on') if value is not None: return datetime.datetime.fromisoformat(value) timestamp = self.header('Date') ret = parse_timestamp(timestamp) self._cache['created_on'] = str(ret) return ret @created_on.setter def created_on(self, value): if self._headers is None: self._cache['created_on'] = str(value) elif value is not None: self._header_set('Date', Header(str(value)).encode()) @property def message_id(self) -> str: if self._headers is None: return self._cache.get('message_id') return self.header('Message-ID') @message_id.setter def message_id(self, value): if self._headers is None: self._cache['message_id'] = value elif value is not None: self._header_set('Message-ID', Header(value).encode()) @property def reference_ids(self) -> str: if self._headers is None: return self._cache.get('reference_ids') return self.header('References') @reference_ids.setter def reference_ids(self, value): if self._headers is None: self._cache['reference_ids'] = value elif value is not None: self._header_set('References', Header(value).encode()) @property def sender(self) -> str: if self._headers is None: return self._cache.get('sender') return self.header('From', 'Unknown') @sender.setter def sender(self, value): if self._headers is None: self._cache['sender'] = value elif value is not None: self._header_set('From', Header(value).encode()) @property def subject(self) -> str: if self._headers is None: return self._cache.get('subject', '(no subject)') return self.header('subject', '(no subject)') @subject.setter def subject(self, value): if self._headers is None: self._cache['subject'] = value elif value is not None: self._header_set('Subject', Header(value).encode()) def is_first_line(self): return len(self.headers) == 1 and (self._body == '' or self._body is None) def readline(self, line: str): if self.line is not None: self.content += self.line if self.state is MessageState.EMPTY: self.state = MessageState.HEADER self._headers = dict() self._headers_lc = dict() if self.state is MessageState.HEADER: if line == '\n' or line == '\r\n': self.state = MessageState.BODY elif line[0] == ' ' or line[0] == '\t': self._header_append(self._key, ' ' + line.strip()) else: match = self.RE_HEADER.match(line) if match: self._key = match[1] self._header_append(self._key, match[2].rstrip()) elif self.state is MessageState.BODY: if self._body is None: self._body = '' else: self._body += self.line self.line = line def finish(self): if self.line: self.content += self.line self._body += self.line def read(self, text: str): for line in each_line(text): self.readline(line) self.finish() def message_id_assign(self): sender = self.sender if sender is None: return current = self.message_id if current is None: parts = sender.split('@', 2) remote = 'unknown.host' if len(parts) == 0 else parts[1] self.message_id = '<%s@%s>' % ( str(uuid.uuid4()), remote ) def validate(self): if self.created_on is None: return False if self.sender is None: return False if self.message_id is None: return False if self.subject is None: return False if self.header('Newsgroups') is None: return False if self.header('Path') is None: return False return True @staticmethod def from_text(text: str): message = Message() for line in each_line(text): message.readline(line) return message