xenu_nntp/lib/nntp/tiny/session.py
2024-11-30 20:31:13 -05:00

877 lines
25 KiB
Python

import re
import enum
import socket
import datetime
import fnmatch
import traceback
import email.header
import email.utils
from typing import Optional
from nntp.tiny.buffer import LineBuffer, OutputBuffer, BufferOverflow
from nntp.tiny.db import Database
from nntp.tiny.response import Response, ResponseCode
from nntp.tiny.newsgroup import Newsgroup
from nntp.tiny.user import User, UserPermission
from nntp.tiny.message import (Message, MessageRange, MessagePart,
each_line)
class SessionMode(enum.Enum):
READER = 1
class Session():
NNTP_VERSION = 2
NNTP_CAPABILITIES = [
'VERSION %d' % (NNTP_VERSION),
'READER',
'HDR',
'NEWNEWS',
'LIST ACTIVE NEWSGROUP OVERVIEW.FMT SUBSCRIPTIONS',
'OVER MSGID',
'AUTHINFO USER',
]
RE_SPLIT = re.compile(r'\s+')
def __init__(self, server, sock: socket.socket):
self.server = server
self.db: Database = server.connect_to_db()
self.sock: socket.socket = sock
self.buf: LineBuffer = LineBuffer()
self.output: OutputBuffer = OutputBuffer(sock)
self.mode: SessionMode = SessionMode.READER
self.active: bool = True
self.newsgroup: Optional[Newsgroup] = None
self.user: Optional[User] = None
self.perms: Optional[UserPermission] = None
self.article_id: Optional[int] = None
def print(self, text: str, end: str="\r\n"):
return self.output.print(text, end)
def flush(self):
return self.output.flush()
def readline(self):
self.flush()
return self.buf.readline(self.sock)
def end(self):
return self.print('.')
def respond(self, code: ResponseCode, message: str=None, body=None):
response = Response(code, message, body)
return self.print(str(response))
def _cmd_authinfo_user(self, username):
if self.user is not None:
return self.respond(ResponseCode.NNTP_AUTH_BAD_SEQUENCE)
self.user = self.db.get(User, {'username': username})
return self.respond(ResponseCode.NNTP_INQUIRY_PASSPHRASE)
def _cmd_authinfo_pass(self, password):
if self.user is None or not self.user.auth(password):
return self.respond(ResponseCode.NNTP_AUTH_FAILED)
self.perms = self.user.permissions(self.db)
return self.respond(ResponseCode.NNTP_AUTH_ACCEPTED)
AUTHINFO_SUBCOMMANDS = {
'USER': _cmd_authinfo_user,
'PASS': _cmd_authinfo_pass
}
def _cmd_authinfo(self, *args):
if len(args) == 0:
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR, "No subcommand provided")
subcmd, *subargs = args
fn = self.AUTHINFO_SUBCOMMANDS.get(subcmd.upper())
if fn is None:
return self.respond(ResponseCode.NNTP_COMMAND_UNKNOWN)
return fn(self, *subargs)
def _cmd_capabilities(self, *args):
self.respond(ResponseCode.NNTP_CAPABILITIES_FOLLOW)
if self.perms and self.perms & UserPermission.POST:
self.print('POST')
for item in self.NNTP_CAPABILITIES:
self.print(item)
self.end()
def _cmd_mode(self, *args):
if len(args) != 1 or args[0] != 'READER':
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR)
self.mode = SessionMode.READER
return self.respond(ResponseCode.NNTP_POST_PROHIBITED)
def _cmd_group(self, name: str):
if name not in self.server.newsgroups:
return self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_FOUND)
newsgroup = self.server.newsgroups[name]
sql = """
select
count(message_id),
min(message_id),
max(message_id)
from
newsgroup_message
where
newsgroup_id = ?
"""
cr = self.db.execute(sql, (newsgroup.id,))
row = cr.fetchone()
if row is None:
text = "%d %d %d %s" % (
0, 0, 0, newsgroup.name
)
self.article_id = None
else:
text = "%d %d %d %s" % (
row[0],
row[1],
row[2],
newsgroup.name
)
self.article_id = row[1]
self.newsgroup = newsgroup
return self.respond(ResponseCode.NNTP_GROUP_LISTING, text)
def _cmd_last(self):
if self.newsgroup is None:
return self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
if self.article_id is None:
return self.respond(ResponseCode.NNTP_ARTICLE_INVALID_NUMBER)
sql = """
select
max(message_id)
from
newsgroup_message
where
newsgroup_id = ?
and message_id < ?
"""
cr = self.db.execute(sql, (self.newsgroup.id, self.article_id))
row = cr.fetchone()
if row is None or row[0] is None:
return self.respond(ResponseCode.NNTP_ARTICLE_NO_PREVIOUS)
self.article_id = row[0]
return self.respond(ResponseCode.NNTP_ARTICLE_STAT_RESPONSE)
def _cmd_next(self):
if self.newsgroup is None:
return self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
if self.article_id is None:
return self.respond(ResponseCode.NNTP_ARTICLE_INVALID_NUMBER)
sql = """
select
min(message_id)
from
newsgroup_message
where
message_id = ?
and id > ?
"""
cr = self.db.execute(sql, (self.newsgroup.id, self.article_id))
row = cr.fetchone()
if row is None or row[0] is None:
return self.respond(ResponseCode.NNTP_ARTICLE_NO_NEXT)
self.article_id = row[0]
return self.respond(ResponseCode.NNTP_ARTICLE_STAT_RESPONSE)
def _newsgroup_summary(self, newsgroup: Newsgroup) -> str:
sql = """
select
count(message_id),
min(message_id),
max(message_id)
from
newsgroup_message
where
newsgroup_id = ?
"""
cr = self.db.execute(sql, (newsgroup.id))
row = cr.fetchone()
return "%d %d %d %s" % (
row[0],
row[1],
row[2],
newsgroup.name
)
def _cmd_listgroup(self, *args):
newsgroup = self.newsgroup
if len(args) == 0 and newsgroup is None:
return self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
elif len(args) > 0:
newsgroup = self.server.newsgroups.get(args[0])
if newsgroup is None:
return self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_FOUND)
sql = """
select
message_id
from
newsgroup_message
where
newsgroup_id = ?
"""
if len(args) > 1:
msgrange = MessageRange.parse(args[1])
sql += " and " + msgrange.where()
summary = self._newsgroup_summary(newsgroup)
cr = self.db.execute(sql, (newsgroup.id))
self.respond(ResponseCode.NNTP_GROUP_LISTING, summary)
for message in cr.each():
self.print(str(message.id))
return self.end()
def _newsgroup_summary(self, newsgroup: Newsgroup):
sql = """
select
min(message_id),
max(message_id)
from
newsgroup_message
where
newsgroup_id = ?
"""
cr = self.db.execute(sql, (newsgroup.id,))
row = cr.fetchone()
return {
'low': row[0],
'high': row[1],
'perms': 'n'
}
def print_newsgroup(self, newsgroup: Newsgroup):
summary = self._newsgroup_summary(newsgroup)
return self.print("%s %d %d %s" % (
newsgroup.name,
summary['low'],
summary['high'],
summary['perms']
))
def _cmd_list_newsgroups(self):
self.respond(ResponseCode.NNTP_INFORMATION_FOLLOWS)
for name in self.server.newsgroups:
newsgroup = self.server.newsgroups[name]
self.print_newsgroup(newsgroup)
return self.end()
def _newsgroup_last_active(self, newsgroup: Newsgroup):
sql = """
select
max(message.created_on)
from
newsgroup_message,
message
where
message.id = newsgroup_message.message_id
and newsgroup_message.newsgroup_id = ?
"""
cr = self.db.execute(sql, (newsgroup.id,))
row = cr.fetchone()
if row is None:
return
return datetime.datetime.fromisoformat(row[0])
def _cmd_list_active(self):
now = datetime.datetime.now(datetime.UTC)
self.respond(ResponseCode.NNTP_INFORMATION_FOLLOWS)
for name in self.server.newsgroups:
newsgroup = self.server.newsgroups[name]
last_active = self._newsgroup_last_active(newsgroup)
if now - last_active < datetime.timedelta(days=1):
self.print_newsgroup(newsgroup)
return self.end()
def _cmd_list_active_times(self):
self.respond(ResponseCode.NNTP_INFORMATION_FOLLOWS)
for name in self.server.newsgroups:
newsgroup = self.server.newsgroups[name]
self.print("%s %d %s" % (
name,
newsgroup.created_on.timestamp(),
newsgroup.created_by
))
return self.end()
OVERVIEW_FMT_HEADERS = [
'Subject',
'From',
'Date',
'Message-ID',
'References',
'Bytes',
'Lines',
]
def _cmd_list_overview_fmt(self):
self.respond(ResponseCode.NNTP_INFORMATION_FOLLOWS, "Order of fields in overview database")
for header in self.OVERVIEW_FMT_HEADERS:
self.print("%s:" % (header,))
return self.end()
SUPPORTED_HEADERS = [
':',
':lines',
':bytes',
]
def _cmd_list_headers(self):
self.respond(ResponseCode.NNTP_INFORMATION_FOLLOWS, "metadata items supported")
for name in self.SUPPORTED_HEADERS:
self.print(name)
self.end()
LIST_SUBCOMMANDS = {
'NEWSGROUPS': _cmd_list_newsgroups,
'ACTIVE': _cmd_list_active,
'ACTIVE.TIMES': _cmd_list_active_times,
'OVERVIEW.FMT': _cmd_list_overview_fmt,
'HEADERS': _cmd_list_headers,
}
def _cmd_list(self, *args):
if len(args) == 0:
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR, "No subcommand provided")
subcmd, *subargs = args
fn = self.LIST_SUBCOMMANDS.get(subcmd.upper())
if fn is None:
return self.respond(ResponseCode.NNTP_COMMAND_UNKNOWN)
return fn(self, *subargs)
RE_DATE_SHORT = re.compile(r'^(\d{2})(\d{2})(\d{2})$')
RE_DATE_LONG = re.compile(r'^(\d{4})(\d{2})(\d{2})$')
RE_TIME = re.compile(r'^(\d{2})(\d{2})(\d{2})$')
def _parse_date_time(self, datestr: str, timestr: str):
yyyy, mm, dd = None, None, None,
hh, MM, ss = None, None, None
match = self.RE_DATE_SHORT.match(datestr)
if match:
yy, mm, dd = map(int, match[1:3])
if yy >= 70:
yyyy = 1900 + yy
else:
yyyy = 2000 + yy
match = self.RE_DATE_LONG.match(datestr)
if match:
yyyy, mm, dd = map(int, match[1:3])
if yyyy is None:
return
match = self.RE_TIME.match(timestr)
if match is None:
return
hh, mm, ss = map(int, match[1:3])
return datetime.datetime(yyyy, mm, dd, hh, MM, ss)
def _cmd_newnews(self, wildmat, datestr, timestr, *args):
gmt = False
if len(args) == 1:
if args[0] == "GMT":
gmt = True
else:
return self.send_response(ResponseCode.NNTP_SYNTAX_ERROR, "Only optional 'GMT' allowed")
elif len(args) > 1:
return self.send_response(ResponseCode.NNTP_SYNTAX_ERROR, "Too many arguments")
timestamp = self._parse_date_time(datestr, timestr)
if timestamp is None:
return self.send_response(ResponseCode.NNTP_SYNTAX_ERROR, "Invalid date or time")
self.respond(ResponseCode.NNTP_ARTICLE_LISTING_ID_FOLLOWS)
sql = """
select
message.id
from
newsgroup_message,
message
where
message.id = newsgroup_message.message_id
and newsgroup_message.newsgroup_id = ?
and message.created_on >= ?
"""
for name in self.server.newsgroups:
if fnmatch.fnmatch(name, wildmat):
newsgroup = self.server.newsgroups[name]
cr = self.db.execute(sql, (newsgroup.id, timestamp.isoformat()))
for row in cr.each():
self.print(row[0])
return self.end()
def _cmd_newgroups(self, wildmat, datestr, timestr, *args):
gmt = False
if len(args) == 1:
if args[0] == "GMT":
gmt = True
else:
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR, "Only optional 'GMT' allowed")
elif len(args) > 1:
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR, "Too many arguments")
self.respond(ResponseCode.NNTP_GROUPS_NEW_FOLLOW)
for name in self.server.newsgroups:
if fnmatch.fnmatch(name, wildmat):
newsgroup = self.server.newsgroups[name]
self.print_newsgroup(newsgroup)
return self.end()
def _each_message_by_id(self, identifier: str, success: ResponseCode=ResponseCode.NNTP_INFORMATION_FOLLOWS):
if identifier is None:
if self.newsgroup is None:
self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
return
if self.article_id is None:
self.respond(ResponseCode.NNTP_ARTICLE_INVALID_NUMBER)
return
message = self.db.get(Message, {'id': str(self.article_id)})
if message is None:
self.respond(ResponseCode.NNTP_ARTICLE_INVALID_NUMBER)
return
self.respond(success)
yield message
elif identifier[0] == '<':
message = self.db.query(Message, {
'message_id': identifier
}).fetchone()
if message is None:
self.respond(ResponseCode.NNTP_ARTICLE_NOT_FOUND_ID)
return
self.respond(success)
yield message
else:
if self.newsgroup is None:
self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
return
msgrange = MessageRange.parse(identifier)
sql = """
select
message.*
from
newsgroup_message,
message
where
message.id = newsgroup_message.message_id
and newsgroup_message.newsgroup_id = ?
"""
sql += " and " + msgrange.where('message')
cr = self.db.query_sql(Message, sql, (self.newsgroup.id,))
first = True
for message in cr.each():
if first:
first = False
self.respond(success)
yield message
if first:
self.respond(ResponseCode.NNTP_ARTICLE_NOT_FOUND_NUM)
self.end()
def _send_message_headers(self, message: Message):
for name in message.headers:
self.print("%s: %s" % (
name, message.headers[name]
))
def _message_by_id(self, identifier: Optional[str]=None):
if identifier is None:
if self.newsgroup is None:
self.respond(ResponseCode.NNTP_NEWSGROUP_NOT_SELECTED)
return
if self.article_id is None:
self.respond(ResponseCode.NNTP_ARTICLE_INVALID_NUMBER)
return
message = self.db.get(Message, {'id': self.article_id})
if message is None:
self.respond(ResponseCode.NNTP_ARTICLE_NOT_FOUND_NUM)
return
return message
elif identifier[0] == '<':
message = self.db.query(Message, {
'message_id': identifier
}).fetchone()
if message is None:
self.respond(ResponseCode.NNTP_ARTICLE_NOT_FOUND_ID)
return
return message
else:
message = self.db.get(Message, {'id': int(identifier)})
if message is None:
self.respond(ResponseCode.NNTP_ARTICLE_NOT_FOUND_NUM)
return
return message
def _serve_message(self, part: MessagePart, identifier: Optional[str]=None):
message = self._message_by_id(identifier)
if message is None:
return
text = "%d %s" % (
message.id,
message.message_id
)
self.respond(ResponseCode.NNTP_ARTICLE_LISTING, text)
if part is MessagePart.HEAD or part is MessagePart.WHOLE:
self._send_message_headers(message)
if part is MessagePart.WHOLE:
self.print('')
if part is MessagePart.BODY or part is MessagePart.WHOLE:
for line in each_line(message.body):
stripped = line.rstrip()
if stripped == '.':
self.print('..')
else:
self.print(stripped)
return self.end()
def _cmd_head(self, identifier: Optional[str]=None):
return self._serve_message(MessagePart.HEAD, identifier)
def _cmd_body(self, identifier: Optional[str]=None):
return self._serve_message(MessagePart.BODY, identifier)
def _cmd_article(self, identifier: Optional[str]=None):
return self._serve_message(MessagePart.WHOLE, identifier)
def _send_message_header(self, message: Message, name: str):
return self.print("%d %s" % (
message.id, message.headers.get(name, '')
))
def _cmd_hdr(self, name: str, identifier: Optional[str]=None):
for message in self._each_message_by_id(identifier):
self._send_message_header(message, name)
def _message_overview(self, message: Message) -> dict:
def f(s: str):
return s.replace('\t', ' ').replace('\r', '').replace('\n', ' ').replace('\0', '')
parts = [
str(message.id),
email.header.Header(message.subject).encode(),
email.header.Header(message.sender).encode(),
email.utils.format_datetime(message.created_on),
message.message_id,
message.reference_ids or '',
str(len(message.content)),
str(message.content.count('\n') + 1),
]
for k in message.headers:
parts.append("%s: %s" % (
k, message.headers[k]
))
return map(f, parts)
def _cmd_over(self, identifier: Optional[str]=None):
for message in self._each_message_by_id(identifier, ResponseCode.NNTP_OVERVIEW_FOLLOWS):
overview = self._message_overview(message)
self.print('\t'.join(overview))
def _cmd_stat(self, identifier: Optional[str]=None):
message = self._message_by_id(identifier)
if message is None:
return
text = "%d %s" % (message.id, message.message_id)
self.article_id = message.id
return self.respond(ResponseCode.NNTP_ARTICLE_STAT_RESPONSE, text)
RE_NEWSGROUPS_SPLIT = re.compile(r'\s*,\s*')
def _save_message(self, message: Message):
value = message.header('Newsgroups')
if value is None or value == '':
return False
names = map(lambda s: s.lower(), self.RE_NEWSGROUPS_SPLIT.split(value))
newsgroups = list()
for name in names:
if name not in self.server.newsgroups:
return False
newsgroups.append(self.server.newsgroups[name])
if len(newsgroups) == 0:
return False
message.message_id_assign()
if not message.validate():
return False
self.db.add(message)
for newsgroup in newsgroups:
sql = """
insert into newsgroup_message (
newsgroup_id, message_id
) values (?, ?)
"""
cr = self.db.execute(sql, (newsgroup.id, message.id))
self.db.commit()
return True
def _post_impl(self, message_id=None):
if self.perms is None or not self.perms & UserPermission.POST:
return self.respond(ResponseCode.NNTP_POST_PROHIBITED)
if message_id:
sql = """
select
count(message_id)
from
message
where
message_id = ?
"""
cr = self.db.execute(sql, (message_id,))
row = cr.fetchone()
if row is not None and row[0] > 0:
return self.respond(ResponseCode.NNTP_ARTICLE_NOT_WANTED_ID)
code_inquiry = ResponseCode.NNTP_INQUIRY_ARTICLE_ID
code_received = ResponseCode.NNTP_ARTICLE_RECEIVED_ID
else:
code_inquiry = ResponseCode.NNTP_INQUIRY_ARTICLE
code_received = ResponseCode.NNTP_ARTICLE_RECEIVED
self.respond(code_inquiry)
message = Message()
while True:
line = self.readline()
if line == '':
self.active = False
break
stripped = line.rstrip()
if stripped == '.':
if self._save_message(message):
return self.respond(code_received)
else:
return self.respond(ResponseCode.NNTP_POST_FAILED)
elif stripped == '..':
line = line[1:]
try:
message.readline(line)
except:
return self.respond(ResponseCode.NNTP_POST_FAILED)
def _cmd_post(self):
return self._post_impl()
def _cmd_ihave(self, message_id):
return self._post_impl(message_id)
def _cmd_date(self):
timestamp = datetime.datetime.now(datetime.UTC)
return self.respond(ResponseCode.NNTP_DATE,
timestamp.strftime("%Y%m%d%H%M%S"))
def _cmd_quit(self):
self.active = False
return self.respond(ResponseCode.NNTP_CONNECTION_CLOSING)
COMMANDS = {
'AUTHINFO': _cmd_authinfo,
'CAPABILITIES': _cmd_capabilities,
'MODE': _cmd_mode,
'GROUP': _cmd_group,
'LAST': _cmd_last,
'NEXT': _cmd_next,
'LISTGROUP': _cmd_listgroup,
'LIST': _cmd_list,
'NEWNEWS': _cmd_newnews,
'NEWGROUPS': _cmd_newgroups,
'HEAD': _cmd_head,
'BODY': _cmd_body,
'ARTICLE': _cmd_article,
'HDR': _cmd_hdr,
'XHDR': _cmd_hdr,
'OVER': _cmd_over,
'XOVER': _cmd_over,
'STAT': _cmd_stat,
'POST': _cmd_post,
'IHAVE': _cmd_ihave,
'DATE': _cmd_date,
'QUIT': _cmd_quit,
}
def greet(self):
self.respond(ResponseCode.NNTP_SERVICE_READY_POST_PROHIBITED)
def handle_command(self):
line = self.readline()
if line == '':
self.active = False
return
tokens = self.RE_SPLIT.split(line.rstrip())
command, *args = tokens
fn = self.COMMANDS.get(command.upper())
if fn is None:
return self.respond(ResponseCode.NNTP_COMMAND_UNKNOWN)
try:
return fn(self, *args)
except TypeError as e:
traceback.print_exception(e)
return self.respond(ResponseCode.NNTP_SYNTAX_ERROR)
except Exception as e:
traceback.print_exception(e)
return self.respond(ResponseCode.NNTP_COMMAND_UNAVAILABLE)
def handle(self):
self.greet()
try:
while self.active:
self.handle_command()
self.sock.close()
except BrokenPipeError:
pass
except ConnectionResetError:
pass