xenu_nntp/lib/nntp/tiny/client.py

157 lines
4.2 KiB
Python
Raw Normal View History

import re
import socket
import ssl
import datetime
import email.utils
2024-12-04 23:52:29 -05:00
from typing import Optional
from nntp.tiny.socket import Connection
from nntp.tiny.host import Host
from nntp.tiny.response import Response, ResponseCode
2024-12-05 15:07:44 -05:00
from nntp.tiny.message import MessageRange, Message
from nntp.tiny.remote import (
RemoteException,
RemoteNewsgroup,
RemoteNewsgroupDescription,
2024-12-05 14:29:19 -05:00
RemoteNewsgroupSummary,
RemoteMessageOverview
)
class ClientException(Exception):
pass
2024-12-04 22:35:49 -05:00
class ClientEOFException(ClientException):
def __str__(self):
return 'Unexpected client EOF'
class Client(Connection):
RE_SPLIT = re.compile(r'\s+')
def _read_response(self):
line = self.readline()
if line == '':
raise ClientEOFException()
parts = self.RE_SPLIT.split(line.rstrip(), 1)
if len(parts) == 0:
return
elif len(parts) == 1:
return Response(ResponseCode(int(parts[0])))
else:
return Response(ResponseCode(int(parts[0])), parts[1])
def __init__(self, host: str, port: int, tls: bool=False):
sock = socket.create_connection((host, port))
if tls:
sslctx = ssl.create_default_context()
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
if Host.is_hostname(host):
sock = sslctx.wrap_socket(sock, server_hostname=host)
else:
sock = sslctx.wrap_socket(sock)
self.host = host
self.port = port
super().__init__(sock)
response = self._read_response()
if response is None:
raise ClientException('Server not ready')
def request(self, *args):
self.print(' '.join(args))
response = self._read_response()
if response.code.value >= 400:
raise RemoteException(response)
return response
2024-12-04 22:35:49 -05:00
def each_response_line(self):
while True:
line = self.readline()
if line == '':
raise ClientEOFException()
line = line.rstrip()
if line == '.':
break
yield line
2024-12-05 15:06:59 -05:00
def select_group(self, name: str):
return self.request('GROUP', name)
def each_newsgroup(self):
self.request('LIST', 'NEWSGROUPS')
for line in self.each_response_line():
parts = line.split(' ', 1)
yield RemoteNewsgroupDescription(parts[0], parts[1])
2024-12-05 08:39:17 -05:00
def each_newsgroup_since(self, timestamp: datetime.datetime):
date = timestamp.strftime('%Y%m%d')
time = timestamp.strftime('%H%M%S')
self.request('NEWGROUPS', date, time)
for line in self.each_response_line():
parts = self.RE_SPLIT.split(line)
if len(parts) != 4:
raise RemoteException('Unexpected result from NEWGROUPS')
2024-12-05 15:07:21 -05:00
2024-12-05 14:29:19 -05:00
yield RemoteNewsgroupSummary(parts[0],
int(parts[1]),
int(parts[2]),
parts[3] == 'y')
2024-12-05 15:07:44 -05:00
def each_newsgroup_message_overview(self, newsgroup: RemoteNewsgroup, msgrange: Optional[MessageRange]):
2024-12-05 15:06:59 -05:00
self.select_group(newsgroup.name)
2024-12-05 15:07:44 -05:00
self.request('OVER', str(msgrange))
for line in self.each_response_line():
parts = line.split('\t')
2024-12-05 14:29:19 -05:00
message = RemoteMessageOverview()
message.id = int(parts[0])
message.subject = parts[1]
message.sender = parts[2]
message.created_on = email.utils.parsedate_to_datetime(parts[3])
message.message_id = parts[4]
message.references = parts[5]
message.size = int(parts[6])
message.lines = int(parts[7])
for part in parts[8:]:
key, value = part.split(': ')
message.headers[key] = value
yield message
2024-12-05 15:07:44 -05:00
def get_message_by_id(self, message_id: str) -> Message:
message = Message()
self.request('ARTICLE', message_id)
for line in self.each_response_line():
message.readline(line + "\r\n")
message.finish()
return message