119 lines
3 KiB
Python
119 lines
3 KiB
Python
import socket
|
|
|
|
class BufferOverflow(Exception):
|
|
def __init__(self):
|
|
super().__init__("Buffer overflow")
|
|
|
|
class OutputBuffer():
|
|
__slots__ = 'buf', 'size', 'sock', 'offset',
|
|
|
|
BUFFER_SIZE = 20480
|
|
|
|
def __init__(self, sock: socket.socket, size: int=BUFFER_SIZE):
|
|
self.buf: bytearray = bytearray(size)
|
|
self.size: int = size
|
|
self.sock: socket.socket = sock
|
|
self.offset: int = 0
|
|
|
|
def write(self, data: bytes):
|
|
remaining = len(data)
|
|
offset_i = 0
|
|
|
|
while remaining > 0:
|
|
needed = self.size - self.offset
|
|
copy = min(needed, remaining)
|
|
|
|
start_i = offset_i
|
|
end_i = start_i + copy
|
|
|
|
start_o = self.offset
|
|
end_o = start_o + copy
|
|
|
|
self.buf[start_o:end_o] = data[start_i:end_i]
|
|
|
|
offset_i += copy
|
|
remaining -= copy
|
|
self.offset += copy
|
|
|
|
if self.offset == self.size:
|
|
self.sock.send(self.buf)
|
|
self.offset = 0
|
|
|
|
def print(self, text: str, end: str="\r\n"):
|
|
return self.write(bytes(text + end, 'utf-8'))
|
|
|
|
def flush(self):
|
|
self.sock.send(self.buf[0:self.offset])
|
|
self.offset = 0
|
|
|
|
class LineBuffer():
|
|
__slots__ = 'buf', 'size', 'offset_i', 'offset_o', 'eof', 'done',
|
|
|
|
BUFFER_SIZE = 4096
|
|
|
|
def __init__(self, size: int=BUFFER_SIZE):
|
|
self.buf: bytearray = bytearray(size)
|
|
self.size: int = size
|
|
self.offset_i: int = 0
|
|
self.offset_o: int = 0
|
|
self.eof: bool = False
|
|
self.done: bool = False
|
|
|
|
def _shift(self):
|
|
count = self.offset_i - self.offset_o
|
|
|
|
self.buf[0:count] = self.buf[self.offset_o:self.offset_i]
|
|
|
|
self.offset_i = count
|
|
self.offset_o = 0
|
|
|
|
def _is_full(self):
|
|
return self.offset_i >= self.size
|
|
|
|
def _fill(self, sock: socket.socket):
|
|
if self.eof:
|
|
return
|
|
|
|
data = sock.recv(self.size - self.offset_i)
|
|
|
|
if data == b'':
|
|
self.eof = True
|
|
return
|
|
|
|
readlen = len(data)
|
|
|
|
self.buf[self.offset_i:self.size] = data
|
|
|
|
self.offset_i += readlen
|
|
|
|
def _drain(self, end=None):
|
|
if end is None:
|
|
end = self.offset_i + 1
|
|
|
|
ret = self.buf[self.offset_o:end+1]
|
|
|
|
self.offset_o = end + 1
|
|
|
|
return str(ret, 'utf-8')
|
|
|
|
def readline(self, sock: socket.socket) -> str:
|
|
if self.done:
|
|
return ''
|
|
|
|
while True:
|
|
index = self.buf.find(b'\n', self.offset_o, self.offset_i)
|
|
|
|
if index < 0:
|
|
if self._is_full():
|
|
if self.offset_o == 0:
|
|
raise BufferOverflow()
|
|
|
|
self._shift()
|
|
elif self.eof:
|
|
self.done = True
|
|
|
|
return self._drain()
|
|
else:
|
|
self._fill(sock)
|
|
else:
|
|
return self._drain(index)
|