import re import enum import datetime from email.utils import parsedate_to_datetime from email.header import decode_header from nntp.tiny.db import DatabaseTable def decode(text: str): decoded = decode_header(text)[0] if decoded[1] is None: if decoded[0] == b'': return '' return str(decoded[0]) try: return str(decoded[0], decoded[1]) except: if decoded[0] == b'': return '' return str(decoded[0]) def each_line(text: str): start = 0 end = len(text) while True: try: index = text.index('\n', start, end) yield text[start:index+1] start = index + 1 if start == end: break except ValueError: yield text[start:end] break def parse_timestamp(timestamp: str): if timestamp is None or timestamp == '': return datetime.datetime.fromtimestamp(0) return parsedate_to_datetime(timestamp) class MessageState(enum.Enum): EMPTY = 0 HEADER = 1 BODY = 2 class Message(DatabaseTable): __slots__ = ( '_cache', '_headers', '_body', '_key', 'id', 'newsgroup_id', 'state', 'line', 'content', ) name = 'newsgroup_message' key = 'id' columns = ( 'id', 'newsgroup_id', 'created_on', 'message_id', 'parent_id', 'sender', 'subject', 'content' ) RE_HEADER = re.compile(r'^([A-Za-z0-9\-]+): (.*)$') def __init__(self): self._cache = dict() self._headers = None self._body = None self._key = None self.id = None self.newsgroup_id = None self.state = MessageState.EMPTY self.line = None self.content = '' @staticmethod def __from_row__(row): message = Message() # # Defer parsing the message content until a specific header not already # assigned to a dedcicated property, or the message body, is required. # message.content = row['content'] message.id = row['id'] message.newsgroup_id = row['newsgroup_id'] message.created_on = row['created_on'] message.message_id = row['message_id'] message.parent_id = row['parent_id'] message.sender = row['sender'] return message def __values__(self) -> tuple: return ( self.newsgroup_id, self.created_on, self.message_id, self.parent_id, self.sender, self.subject, self.content ) @property def headers(self): if self._headers is None: self.read(self.content) return self._headers @property def body(self): if self._body is None: self.read(self.content) return self._body def header(self, key: str): if self._headers is None: self.read(self.content) return self.headers.get(key.lower()) @property def created_on(self): value = self._cache.get('created_on') if value is not None: return datetime.datetime.fromisoformat(value) timestamp = self.header('Date') ret = parse_timestamp(timestamp) self._cache['created_on'] = str(ret) return ret @created_on.setter def created_on(self, value): if self._headers is not None: self._headers['date'] = str(value) self._cache['created_on'] = str(value) @property def message_id(self) -> str: if self._headers is None: return self._cache.get('message_id') return self.header('Message-ID') @message_id.setter def message_id(self, value): if self._headers is None: self._cache['message_id'] = value else: self.headers['message-id'] = value @property def parent_id(self) -> str: if self._headers is None: return self._cache.get('parent_id') return self.header('References') @parent_id.setter def parent_id(self, value): if self._headers is None: self._cache['parent_id'] = value else: self.headers['references'] = value @property def sender(self) -> str: if self._headers is None: return self._cache.get('sender') return self.headers.get('from', 'Unknown') @sender.setter def sender(self, value): if self._headers is None: self._cache['sender'] = value else: self.headers['from'] = value @property def subject(self) -> str: if self._headers is None: return self._cache.get('subject', '(no subject)') return self.headers.get('subject', '(no subject)') @subject.setter def subject(self, value): if self._headers is None: self._cache['subject'] = value else: self.headers['subject'] = value def is_first_line(self): return len(self.headers) == 1 and (self._body == '' or self._body is None) def read_line(self, line: str): if self.line is not None: self.content += self.line if self.state is MessageState.EMPTY: self.state = MessageState.HEADER self._headers = dict() if self.state is MessageState.HEADER: if line == '\n' or line == '\r\n': self.state = MessageState.BODY elif line[0] == ' ' or line[0] == '\t': self._headers[self._key] += ' ' + decode(line.strip()) else: match = self.RE_HEADER.match(line) if match: self._key = match[1].lower() self._headers[self._key] = decode(match[2].rstrip()) elif self.state is MessageState.BODY: if self._body is None: self._body = '' else: self._body += self.line self.line = line def read(self, text: str): for line in each_line(text): self.read_line(line) @staticmethod def from_text(text: str): message = Message() for line in each_line(text): message.read_line(line) return message