import re import enum import sqlite3 class MBoxReaderError(Exception): pass class MBoxReaderBuffer(): def __init__(self): self.lines = [None, None, None, None] self.index = None def add(self, line: str): if self.lines[0] is None: self.lines[0] = line self.index = 0 elif self.lines[1] is None: self.lines[1] = line self.index = 1 elif self.lines[2] is None: self.lines[2] = line self.index = 2 elif self.lines[3] is None: self.lines[3] = line self.index = 3 else: self.lines[0] = self.lines[1] self.lines[1] = self.lines[2] self.lines[2] = self.lines[3] self.lines[3] = line self.index = 3 if self.index is None: self.index = 0 elif self.index < 3: self.index += 1 def is_empty_line(self, line): return self.lines[line] == '\n' def is_from_line(self, line): return self.lines[line][0:5] == 'From ' def is_header_line(self, line): return re.match('^([^:]+): (.*)$', self.lines[line]) is not None def is_start(self): if self.lines[0] is None or self.lines[1] is None: return if self.is_from_line(0) and self.is_header_line(1): pass elif self.is_empty_line(0) \ and self.is_empty_line(1) \ and self.is_from_line(2) \ and self.is_header_line(3): return 3 class MBoxMessageState(enum.Enum): EMPTY = 0 HEADER = 1 BODY = 2 class MBoxMessage(): __slots__ = 'state', 'headers', 'line', 'body', 'key', def __init__(self): self.state = MBoxMessageState.EMPTY self.headers = dict() self.line = None self.body = '' self.key = None def add(self, line: str): if self.state is MBoxMessageState.EMPTY: self.state = MBoxMessageState.HEADER if self.state is MBoxMessageState.HEADER: if line == '\n' or line == '\r\n': self.state = MBoxMessageState.BODY self.body = '' elif line[0] == ' ' or line[0] == '\t': self.headers[self.key] += ' ' + line.strip() else: match = re.match('^([^:]+): (.*)$', line) if match: self.key = match[1] self.headers[self.key] = match[2].rstrip() elif self.state is MBoxMessageState.BODY: if self.line is None: self.body = line else: self.body += self.line self.line = line def is_first_line(self): return len(self.headers) == 1 and self.body == '' @staticmethod def each_line(text: str): start = 0 end = len(text) while True: try: index = text.index('\n', start, end) yield text[start:index+1] start = index + 1 if start == end: break except ValueError: yield text[start:end] break @staticmethod def parse(text: str): message = MBoxMessage() for line in MBoxMessage.each_line(text): message.add(line) return message class MBoxReader(): __slots__ = 'path', 'fh', 'line', 'buf', 'message', def __init__(self, path: str): self.path = path self.fh = open(path, 'r', newline='') self.line = 0 self.buf = MBoxReaderBuffer() self.message = None def get_message(self): while True: line = self.fh.readline() if line is None or line == '': ret = self.message self.message = None return ret self.line += 1 self.buf.add(line) if self.buf.is_start(): if self.message is None: self.message = MBoxMessage() else: ret = self.message self.message = MBoxMessage() self.message.add(line) return ret if self.message: self.message.add(line) def messages(self): while True: message = self.get_message() if message is None: break yield message