xenu_nntp/lib/nntp/tiny/client.py

97 lines
2.5 KiB
Python
Raw Normal View History

import re
import socket
import ssl
import datetime
2024-12-04 23:52:29 -05:00
from typing import Optional
from nntp.tiny.socket import Connection
from nntp.tiny.host import Host
from nntp.tiny.response import Response, ResponseCode
from nntp.tiny.remote import RemoteNewsgroup
2024-12-04 23:52:29 -05:00
from nntp.tiny.message import MessageRange
class ClientException(Exception):
pass
2024-12-04 22:35:49 -05:00
class ClientEOFException(ClientException):
def __str__(self):
return 'Unexpected client EOF'
class Client(Connection):
RE_SPLIT = re.compile(r'\s+')
def _read_response(self):
line = self.readline()
if line == '':
raise ClientEOFException()
parts = self.RE_SPLIT.split(line.rstrip(), 1)
if len(parts) == 0:
return
elif len(parts) == 1:
return Response(ResponseCode(int(parts[0])))
else:
return Response(ResponseCode(int(parts[0])), parts[1])
def __init__(self, host: str, port: int, tls: bool=False):
sock = socket.create_connection((host, port))
if tls:
sslctx = ssl.create_default_context()
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
if Host.is_hostname(host):
sock = sslctx.wrap_socket(sock, server_hostname=host)
else:
sock = sslctx.wrap_socket(sock)
self.host = host
self.port = port
super().__init__(sock)
response = self._read_response()
if response is None:
raise ClientException('Server not ready')
def request(self, *args):
self.print(' '.join(args))
return self._read_response()
2024-12-04 22:35:49 -05:00
def each_response_line(self):
while True:
line = self.readline()
if line == '':
raise ClientEOFException()
line = line.rstrip()
if line == '.':
break
yield line
def each_newsgroup(self, wildmat: str, timestamp: datetime.datetime):
date = timestamp.strftime('%Y%m%d')
time = timestamp.strftime('%H%M%S')
response = self.request('NEWGROUPS', wildmat, date, time)
for line in self.each_response_line():
parts = self.RE_SPLIT.split(line)
if len(parts) != 4:
raise ClientException('Unexpected result from NEWGROUPS')
yield RemoteNewsgroup(parts[0],
int(parts[1]),
int(parts[2]),
parts[3] == 'y')