xenu_nntp/lib/nntp/tiny/client.py
2024-12-05 22:10:15 -05:00

191 lines
5.3 KiB
Python

import re
import socket
import ssl
import datetime
import email.header
import email.utils
from typing import Optional
from nntp.tiny.socket import Connection
from nntp.tiny.host import Host
from nntp.tiny.response import Response, ResponseCode
from nntp.tiny.message import MessageRange, Message, MessagePart
from nntp.tiny.remote import (
RemoteException,
RemoteNewsgroup,
RemoteNewsgroupDescription,
RemoteNewsgroupSummary,
RemoteMessageOverview
)
class ClientException(Exception):
def __init__(self, response: Response):
self.response = response
def __str__(self):
return str(self.response)
class ClientEOFException(ClientException):
def __str__(self):
return 'Unexpected client EOF'
class Client(Connection):
RE_SPLIT = re.compile(r'\s+')
def response_read(self):
line = self.readline()
if line == '':
raise ClientEOFException()
parts = self.RE_SPLIT.split(line.rstrip(), 1)
if len(parts) == 0:
return
elif len(parts) == 1:
return Response(ResponseCode(int(parts[0])))
else:
return Response(ResponseCode(int(parts[0])), parts[1])
def __init__(self, host: str, port: int, tls: bool=False):
sock = socket.create_connection((host, port))
if tls:
sslctx = ssl.create_default_context()
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
if Host.is_hostname(host):
sock = sslctx.wrap_socket(sock, server_hostname=host)
else:
sock = sslctx.wrap_socket(sock)
self.host = host
self.port = port
super().__init__(sock)
response = self.response_read()
if response is None:
raise ClientException('Server not ready')
def request(self, *args):
self.print(' '.join(args))
response = self.response_read()
if response.code.value >= 400:
raise RemoteException(response)
return response
def each_response_line(self):
while True:
line = self.readline()
if line == '':
raise ClientEOFException()
line = line.rstrip()
if line == '.':
break
yield line
def select_group(self, name: str):
return self.request('GROUP', name)
def each_newsgroup(self):
self.request('LIST', 'NEWSGROUPS')
for line in self.each_response_line():
parts = line.split(' ', 1)
yield RemoteNewsgroupDescription(parts[0], parts[1])
def each_newsgroup_since(self, timestamp: datetime.datetime):
date = timestamp.strftime('%Y%m%d')
time = timestamp.strftime('%H%M%S')
self.request('NEWGROUPS', date, time)
for line in self.each_response_line():
parts = self.RE_SPLIT.split(line)
if len(parts) != 4:
raise RemoteException('Unexpected result from NEWGROUPS')
yield RemoteNewsgroupSummary(parts[0],
int(parts[1]),
int(parts[2]),
parts[3] == 'y')
def each_newsgroup_message_overview(self, newsgroup: RemoteNewsgroup, msgrange: Optional[MessageRange]):
self.select_group(newsgroup.name)
self.request('OVER', str(msgrange))
for line in self.each_response_line():
parts = line.split('\t')
message = RemoteMessageOverview()
message.id = int(parts[0])
message.subject = parts[1]
message.sender = parts[2]
message.created_on = email.utils.parsedate_to_datetime(parts[3])
message.message_id = parts[4]
message.references = parts[5]
message.size = int(parts[6])
message.lines = int(parts[7])
for part in parts[8:]:
key, value = part.split(': ')
message.headers[key] = value
yield message
def message_by_id(self, message_id: str) -> Message:
message = Message()
self.request('ARTICLE', message_id)
for line in self.each_response_line():
message.readline(line + "\r\n")
message.finish()
return message
def message_post(self, message: Message):
response = self.request('POST')
if response.code is not ResponseCode.NNTP_INQUIRY_ARTICLE:
raise ClientException(response)
self.message_send(message, MessagePart.WHOLE)
self.end()
response = self.response_read()
if response.code is not ResponseCode.NNTP_ARTICLE_RECEIVED:
raise RemoteException(response)
def message_offer(self, message: Message):
response = self.request('IHAVE', message.message_id)
if response.code is ResponseCode.NNTP_ARTICLE_NOT_WANTED_ID:
return
elif response.code not ResponseCode.NNTP_INQUIRY_ARTICLE_ID:
raise RemoteException(response)
self.message_send(message, MessagePart.WHOLE)
self.end()
response = self.response_read()
if response.code is not ResponseCode.NNTP_ARTICLE_RECEIVED_ID:
raise RemoteException(response)