import re import socket import ssl import datetime import email.header import email.utils from typing import Optional from nntp.tiny.socket import Connection from nntp.tiny.host import Host from nntp.tiny.response import Response, ResponseCode from nntp.tiny.message import MessageRange, Message, MessagePart from nntp.tiny.remote import ( RemoteException, RemoteNewsgroup, RemoteNewsgroupDescription, RemoteNewsgroupSummary, RemoteMessageOverview ) class ClientException(Exception): def __init__(self, response: Response): self.response = response def __str__(self): return str(self.response) class ClientEOFException(ClientException): def __str__(self): return 'Unexpected client EOF' class Client(Connection): RE_SPLIT = re.compile(r'\s+') def response_read(self): line = self.readline() if line == '': raise ClientEOFException() parts = self.RE_SPLIT.split(line.rstrip(), 1) if len(parts) == 0: return elif len(parts) == 1: return Response(ResponseCode(int(parts[0]))) else: return Response(ResponseCode(int(parts[0])), parts[1]) def __init__(self, host: str, port: int, tls: bool=False): sock = socket.create_connection((host, port)) if tls: sslctx = ssl.create_default_context() sslctx.check_hostname = False sslctx.verify_mode = ssl.CERT_NONE if Host.is_hostname(host): sock = sslctx.wrap_socket(sock, server_hostname=host) else: sock = sslctx.wrap_socket(sock) self.host = host self.port = port super().__init__(sock) response = self.response_read() if response is None: raise ClientException('Server not ready') def request(self, *args): self.print(' '.join(args)) response = self.response_read() if response.code.value >= 400: raise RemoteException(response) return response def each_response_line(self): while True: line = self.readline() if line == '': raise ClientEOFException() line = line.rstrip() if line == '.': break yield line def select_group(self, name: str): return self.request('GROUP', name) def each_newsgroup(self): self.request('LIST', 'NEWSGROUPS') for line in self.each_response_line(): parts = line.split(' ', 1) yield RemoteNewsgroupDescription(parts[0], parts[1]) def each_newsgroup_since(self, timestamp: datetime.datetime): date = timestamp.strftime('%Y%m%d') time = timestamp.strftime('%H%M%S') self.request('NEWGROUPS', date, time) for line in self.each_response_line(): parts = self.RE_SPLIT.split(line) if len(parts) != 4: raise RemoteException('Unexpected result from NEWGROUPS') yield RemoteNewsgroupSummary(parts[0], int(parts[1]), int(parts[2]), parts[3] == 'y') def each_newsgroup_message_overview(self, newsgroup: RemoteNewsgroup, msgrange: Optional[MessageRange]): self.select_group(newsgroup.name) self.request('OVER', str(msgrange)) for line in self.each_response_line(): parts = line.split('\t') message = RemoteMessageOverview() message.id = int(parts[0]) message.subject = parts[1] message.sender = parts[2] message.created_on = email.utils.parsedate_to_datetime(parts[3]) message.message_id = parts[4] message.references = parts[5] message.size = int(parts[6]) message.lines = int(parts[7]) for part in parts[8:]: key, value = part.split(': ') message.headers[key] = value yield message def message_by_id(self, message_id: str) -> Message: message = Message() self.request('ARTICLE', message_id) for line in self.each_response_line(): message.readline(line + "\r\n") message.finish() return message def message_offer(self, message: Message): response = self.request('IHAVE', message.message_id) if response.code is ResponseCode.NNTP_ARTICLE_NOT_WANTED_ID: return elif response.code not ResponseCode.NNTP_INQUIRY_ARTICLE_ID: raise RemoteException(response) self.message_send(message, MessagePart.WHOLE) self.end() response = self.response_read() if response.code is not ResponseCode.NNTP_ARTICLE_RECEIVED_ID: raise RemoteException(response)