import re import enum import datetime from email.header import decode_header from nntp.tiny.db import DatabaseTable def decode(text: str): decoded = decode_header(text)[0] if decoded[1] is None: if decoded[0] == b'': return '' return str(decoded[0]) try: return str(decoded[0], decoded[1]) except: if decoded[0] == b'': return '' return str(decoded[0]) def each_line(text: str): start = 0 end = len(text) while True: try: index = text.index('\n', start, end) yield text[start:index+1] start = index + 1 if start == end: break except ValueError: yield text[start:end] break DATE_MONTHS = { 'jan': 1, 'january': 1, 'feb': 2, 'february': 2, 'mar': 3, 'march': 3, 'apr': 4, 'april': 4, 'may': 5, 'jun': 6, 'june': 6, 'jul': 7, 'july': 7, 'aug': 8, 'august': 8, 'sep': 9, 'september': 9, 'oct': 10, 'october': 10, 'nov': 11, 'november': 11, 'dec': 12, 'december': 12 } DATE_RE = [ re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?P[+\-])(?P\d{2})(?P\d{2})$'), re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?P[+\-])(?P\d{2})(?P\d{2}) \((?:[A-Za-z ]+)\)$'), re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?:GMT|UTC)$'), re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}) (?P[+\-])(?P\d{2})(?P\d{2})$'), re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}) (?P[+\-])(?P\d{2})(?P\d{2}) \((?:[A-Z ]+)\)$'), re.compile(r'^(?:[A-Za-z]+),\s+(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}) (?:GMT|UTC)$'), re.compile(r'^(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?P[+\-])(?P\d{2})(?P\d{2})$'), re.compile(r'^(?P
\d{1,2}) (?P[A-Za-z]+) (?P\d{4}) (?P\d{2}):(?P\d{2}):(?P\d{2}) (?:GMT|UTC)$'), ] def parse_timestamp(timestamp: str): if timestamp is None or timestamp == '': return datetime.datetime.fromtimestamp(0) for re in DATE_RE: match = re.match(timestamp) if match is not None: capture = match.groupdict() mult = -1 if capture.get('offset_sign', '+') == '-' else 1 tz = datetime.timezone(datetime.timedelta( hours = mult * int(capture.get('offset_hour', 0)), minutes = mult * int(capture.get('offset_minute', 0)) )) yyyy = int(capture['yyyy']) if 'month' in capture: mm = DATE_MONTHS[capture['month'].lower()] else: mm = int(capture['mm']) dd = int(capture['dd']) hh = int(capture.get('hh', 0)) MM = int(capture.get('MM', 0)) ss = int(capture.get('ss', 0)) return datetime.datetime(yyyy, mm, dd, hh, MM, ss, 0, tz) class MessageState(enum.Enum): EMPTY = 0 HEADER = 1 BODY = 2 class Message(DatabaseTable): __slots__ = ( '_cache', '_headers', '_body', '_key', 'id', 'newsgroup_id', 'state', 'line', 'content', ) name = 'newsgroup_message' key = 'id' columns = ( 'id', 'newsgroup_id', 'created_on', 'message_id', 'parent_id', 'sender', 'subject', 'content' ) RE_HEADER = re.compile(r'^([A-Za-z0-9\-]+): (.*)$') def __init__(self): self._cache = dict() self._headers = None self._body = None self._key = None self.id = None self.newsgroup_id = None self.state = MessageState.EMPTY self.line = None self.content = '' @staticmethod def __from_row__(row): message = Message() # # Defer parsing the message content until a specific header not already # assigned to a dedcicated property, or the message body, is required. # message.content = row['content'] message.id = row['id'] message.newsgroup_id = row['newsgroup_id'] message.created_on = row['created_on'] message.message_id = row['message_id'] message.parent_id = row['parent_id'] message.sender = row['sender'] return message def __values__(self) -> tuple: return ( self.newsgroup_id, self.created_on, self.message_id, self.parent_id, self.sender, self.subject, self.content ) @property def headers(self): if self._headers is None: self.read(self.content) return self._headers @property def body(self): if self._body is None: self.read(self.content) return self._body def header(self, key: str): if self._headers is None: self.read(self.content) return self.headers.get(key.lower()) @property def created_on(self): value = self._cache.get('created_on') if value is not None: return datetime.datetime.fromisoformat(value) timestamp = self.header('Date') ret = parse_timestamp(timestamp) self._cache['created_on'] = str(ret) return ret @created_on.setter def created_on(self, value): if self._headers is not None: self._headers['date'] = str(value) self._cache['created_on'] = str(value) @property def message_id(self) -> str: if self._headers is None: return self._cache.get('message_id') return self.header('Message-ID') @message_id.setter def message_id(self, value): if self._headers is None: self._cache['message_id'] = value else: self.headers['message-id'] = value @property def parent_id(self) -> str: if self._headers is None: return self._cache.get('parent_id') return self.header('References') @parent_id.setter def parent_id(self, value): if self._headers is None: self._cache['parent_id'] = value else: self.headers['references'] = value @property def sender(self) -> str: if self._headers is None: return self._cache.get('sender') return self.headers.get('from', 'Unknown') @sender.setter def sender(self, value): if self._headers is None: self._cache['sender'] = value else: self.headers['from'] = value @property def subject(self) -> str: if self._headers is None: return self._cache.get('subject', '(no subject)') return self.headers.get('subject', '(no subject)') @subject.setter def subject(self, value): if self._headers is None: self._cache['subject'] = value else: self.headers['subject'] = value def is_first_line(self): return len(self.headers) == 1 and (self._body == '' or self._body is None) def read_line(self, line: str): if self.line is not None: self.content += self.line if self.state is MessageState.EMPTY: self.state = MessageState.HEADER self._headers = dict() if self.state is MessageState.HEADER: if line == '\n' or line == '\r\n': self.state = MessageState.BODY elif line[0] == ' ' or line[0] == '\t': self._headers[self._key] += ' ' + decode(line.strip()) else: match = self.RE_HEADER.match(line) if match: self._key = match[1].lower() self._headers[self._key] = decode(match[2].rstrip()) elif self.state is MessageState.BODY: if self._body is None: self._body = '' else: self._body += self.line self.line = line def read(self, text: str): for line in each_line(text): self.read_line(line) @staticmethod def from_text(text: str): message = Message() for line in each_line(text): message.read_line(line) return message