import socket class BufferOverflow(Exception): def __init__(self): super().__init__("Buffer overflow") class OutputBuffer(): __slots__ = 'buf', 'size', 'sock', 'offset', BUFFER_SIZE = 20480 def __init__(self, sock: socket.socket, size: int=BUFFER_SIZE): self.buf: bytearray = bytearray(size) self.size: int = size self.sock: socket.socket = sock self.offset: int = 0 def print(self, text: str, end: str="\r\n"): data = bytes(text + end, 'utf-8') count = len(data) if count > self.size: return self.sock.send(data) if count + self.offset > self.size: to_fill = self.size - self.offset left = count - to_fill self.buf[self.offset:self.size] = data[0:to_fill] self.sock.send(self.buf) self.offset = 0 self.buf[0:left] = data[to_fill:to_fill+left] else: self.buf[self.offset:self.offset+count] = data self.offset += count def flush(self): self.sock.send(self.buf[0:self.offset]) self.offset = 0 class LineBuffer(): __slots__ = 'buf', 'size', 'offset_i', 'offset_o', 'eof', 'done', BUFFER_SIZE = 4096 def __init__(self, size: int=BUFFER_SIZE): self.buf: bytearray = bytearray(size) self.size: int = size self.offset_i: int = 0 self.offset_o: int = 0 self.eof: bool = False self.done: bool = False def _shift(self): count = self.offset_i - self.offset_o self.buf[0:count] = self.buf[self.offset_o:self.offset_i] self.offset_i = count self.offset_o = 0 def _is_full(self): return self.offset_i >= self.size def _fill(self, sock: socket.socket): if self.eof: return data = sock.recv(self.size - self.offset_i) if data == b'': self.eof = True return readlen = len(data) self.buf[self.offset_i:self.size] = data self.offset_i += readlen def _drain(self, end=None): if end is None: end = self.offset_i + 1 ret = self.buf[self.offset_o:end+1] self.offset_o = end + 1 return str(ret, 'utf-8') def readline(self, sock: socket.socket) -> str: if self.done: return '' while True: index = self.buf.find(b'\n', self.offset_o, self.offset_i) if index < 0: if self._is_full(): if self.offset_o == 0: raise BufferOverflow() self._shift() elif self.eof: self.done = True return self._drain() else: self._fill(sock) else: return self._drain(index)