import re import enum import datetime from dateparser.search import search_dates from email.header import decode_header from nntp.tiny.db import DatabaseTable def decode(text: str): decoded = decode_header(text)[0] if decoded[1] is None: return str(decoded[0]) try: return str(decoded[0], decoded[1]) except: return str(decoded[0]) def each_line(text: str): start = 0 end = len(text) while True: try: index = text.index('\n', start, end) yield text[start:index+1] start = index + 1 if start == end: break except ValueError: yield text[start:end] break class MessageState(enum.Enum): EMPTY = 0 HEADER = 1 BODY = 2 class Message(DatabaseTable): __slots__ = ( 'id', 'newsgroup_id', 'state', 'headers', 'line', 'content', 'body', '_key' ) name = 'newsgroup_message' key = 'id' columns = ( 'newsgroup_id', 'created_on', 'message_id', 'parent_id', 'sender', 'subject', 'content' ) RE_HEADER = re.compile(r'^([A-Za-z0-9\-]+): (.*)$') def __init__(self): self.id = None self.newsgroup_id = None self.state = MessageState.EMPTY self.headers = dict() self.line = None self.content = '' self.body = None self._key = None @staticmethod def __from_row__(row): message = Message() message.id = row['id'] message.parse(row['content']) return message def __values__(self): return ( self.newsgroup_id, self.date(), self.unique_id(), self.header('references'), self.sender(), self.subject(), self.content ) def add(self, line: str): if self.line is not None: self.content += self.line if self.state is MessageState.EMPTY: self.state = MessageState.HEADER if self.state is MessageState.HEADER: if line == '\n' or line == '\r\n': self.state = MessageState.BODY elif line[0] == ' ' or line[0] == '\t': self.headers[self._key] += ' ' + decode(line.strip()) else: match = self.RE_HEADER.match(line) if match: self._key = match[1].lower() self.headers[self._key] = decode(match[2].rstrip()) elif self.state is MessageState.BODY: if self.body is None: self.body = '' else: self.body += self.line self.line = line def header(self, key: str): return self.headers.get(key.lower()) def unique_id(self) -> str: return self.header('Message-ID') def parent_id(self) -> str: return self.header('References') def date(self): try: return search_dates(self.headers['date'])[0][1] except: return datetime.datetime.fromtimestamp(0) def sender(self): return self.headers.get('from', 'Unknown') def subject(self): return self.headers.get('subject', '(no subject)') def is_first_line(self): return len(self.headers) == 1 and (self.body == '' or self.body is None) @staticmethod def parse(text: str): message = Message() for line in each_line(text): message._parse_line(line) return message