class BufferOverflow(Exception): def __init__(self): super().__init__("Buffer overflow") class LineBuffer(): __slots__ = 'buf', 'offset', BUFFER_SIZE = 4096 def __init__(self, size: int=BUFFER_SIZE): self.buf: bytearray = bytearray(size) self.offset: int = 0 def readline(self, sock: socket.socket) -> bytes: while True: # # Check and see if there is already a line in the # internal buffer. # offset = self.buf.find(b'\n', self.offset, self.count) if offset < 0: # # There is no line in the buffer. Check and see if # there is more room in the buffer to continue # reading. # if offset == self.BUFFER_SIZE - 1: # # There is no room left in the buffer; the line # must be too long. Panic. # raise BufferOverflow() # # Read from the socket to attempt to fill the buffer # more. # data = sock.recv(self.BUFFER_SIZE - self.offset) # # Check to see if recv() returned any data. If not, # the socket is closed. Return an empty string. # if data == b'': return '' count = len(data) # # Now, fill the buffer. The next iteration of the # loop will be used to check for a newline. # self.buf[self.offset:self.offset+count] = data else: # # There is a line in the buffer. Prepare to return # it. # ret = self.buf[self.offset:offset] # # Advance the offset to one character beyond the # newline. # self.offset = offset + 1 return str(ret, 'ascii')