xenu_nntp/lib/nntp/tiny/client.py
2024-12-05 15:07:44 -05:00

156 lines
4.2 KiB
Python

import re
import socket
import ssl
import datetime
import email.utils
from typing import Optional
from nntp.tiny.socket import Connection
from nntp.tiny.host import Host
from nntp.tiny.response import Response, ResponseCode
from nntp.tiny.message import MessageRange, Message
from nntp.tiny.remote import (
RemoteException,
RemoteNewsgroup,
RemoteNewsgroupDescription,
RemoteNewsgroupSummary,
RemoteMessageOverview
)
class ClientException(Exception):
pass
class ClientEOFException(ClientException):
def __str__(self):
return 'Unexpected client EOF'
class Client(Connection):
RE_SPLIT = re.compile(r'\s+')
def _read_response(self):
line = self.readline()
if line == '':
raise ClientEOFException()
parts = self.RE_SPLIT.split(line.rstrip(), 1)
if len(parts) == 0:
return
elif len(parts) == 1:
return Response(ResponseCode(int(parts[0])))
else:
return Response(ResponseCode(int(parts[0])), parts[1])
def __init__(self, host: str, port: int, tls: bool=False):
sock = socket.create_connection((host, port))
if tls:
sslctx = ssl.create_default_context()
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
if Host.is_hostname(host):
sock = sslctx.wrap_socket(sock, server_hostname=host)
else:
sock = sslctx.wrap_socket(sock)
self.host = host
self.port = port
super().__init__(sock)
response = self._read_response()
if response is None:
raise ClientException('Server not ready')
def request(self, *args):
self.print(' '.join(args))
response = self._read_response()
if response.code.value >= 400:
raise RemoteException(response)
return response
def each_response_line(self):
while True:
line = self.readline()
if line == '':
raise ClientEOFException()
line = line.rstrip()
if line == '.':
break
yield line
def select_group(self, name: str):
return self.request('GROUP', name)
def each_newsgroup(self):
self.request('LIST', 'NEWSGROUPS')
for line in self.each_response_line():
parts = line.split(' ', 1)
yield RemoteNewsgroupDescription(parts[0], parts[1])
def each_newsgroup_since(self, timestamp: datetime.datetime):
date = timestamp.strftime('%Y%m%d')
time = timestamp.strftime('%H%M%S')
self.request('NEWGROUPS', date, time)
for line in self.each_response_line():
parts = self.RE_SPLIT.split(line)
if len(parts) != 4:
raise RemoteException('Unexpected result from NEWGROUPS')
yield RemoteNewsgroupSummary(parts[0],
int(parts[1]),
int(parts[2]),
parts[3] == 'y')
def each_newsgroup_message_overview(self, newsgroup: RemoteNewsgroup, msgrange: Optional[MessageRange]):
self.select_group(newsgroup.name)
self.request('OVER', str(msgrange))
for line in self.each_response_line():
parts = line.split('\t')
message = RemoteMessageOverview()
message.id = int(parts[0])
message.subject = parts[1]
message.sender = parts[2]
message.created_on = email.utils.parsedate_to_datetime(parts[3])
message.message_id = parts[4]
message.references = parts[5]
message.size = int(parts[6])
message.lines = int(parts[7])
for part in parts[8:]:
key, value = part.split(': ')
message.headers[key] = value
yield message
def get_message_by_id(self, message_id: str) -> Message:
message = Message()
self.request('ARTICLE', message_id)
for line in self.each_response_line():
message.readline(line + "\r\n")
message.finish()
return message