import socket class BufferOverflow(Exception): def __init__(self): super().__init__("Buffer overflow") class LineBuffer(): __slots__ = 'buf', 'size', 'offset', 'count', 'eof', 'done', BUFFER_SIZE = 4096 def __init__(self, size: int=BUFFER_SIZE): self.buf: bytearray = bytearray(size) self.size: int = size self.offset: int = 0 self.count: int = 0 self.eof: bool = False self.done: bool = False def _shift(self): count = self.size - self.offset self.buf[0:count] = self.buf[self.offset:self.count] self.offset = 0 self.count = count def _is_full(self): return self.count == self.size def _fill(self, sock: socket.socket): if self.eof: return readlen = self.size - self.count data = sock.recv(readlen) if data == b'': self.eof = True return readlen = len(data) self.buf[self.offset:self.offset+readlen] = data self.count += readlen def _flush(self, end: int): ret = self.buf[self.offset:end+1] self.offset = end + 1 return str(ret, 'ascii') def readline(self, sock: socket.socket) -> str: while True: index = self.buf.find(b'\n', self.offset, self.count) if index < 0: if self._is_full(): if self.offset == 0: raise BufferOverflow() else: self._shift() else: self._fill(sock) else: return self._flush(index)