awips2/deltaScripts/17.1.1/DR19611/pg_hba_conf-tool.py

330 lines
11 KiB
Python
Raw Permalink Normal View History

2017-04-21 18:33:55 -06:00
#!/usr/bin/env python
# pg_hba_conf-tool.py - Scans and modifies pg_hba.conf for certificate-based
# authentication changes.
#
# Modification History
#
# Name Date Comments
# ---------------------------------------------------------------------------
# David Friedman 2016-12-07 DR 19611 - Initial creation
# David Friedman 2016-12-22 DR 19637 - Handle AX DB server config.
from getopt import GetoptError, getopt
from os import chown, fdopen, remove, rename, stat
from os.path import basename, dirname, exists
from shutil import copymode
import sys
from tempfile import mkstemp
from time import gmtime, strftime
from traceback import print_exc
def pmsg(level, msg, fp=None):
fp = fp or sys.stderr
fp.write('%s: %s\n' % (level, msg))
def pinf(msg):
sys.stdout.write(msg + '\n')
def perr(msg):
pmsg('error', msg)
UPDATE_OPTION = 'update-for-ssl'
DISABLE_OPTION = 'disable-remote-non-ssl'
ENABLE_OPTION = 'enable-remote-non-ssl'
CHECK_OPTION = 'check'
MODE_OPTIONS = [UPDATE_OPTION, DISABLE_OPTION, ENABLE_OPTION, CHECK_OPTION]
# We do not care about 'local' connections
CONNECTION_TYPES = ('host', 'hostssl', 'hostnossl')
DISABLED_TAG = ' # disabled for 17.1.1'
class Line(object):
"""Represents a line in a pg_hba.conf file.
Contains both the raw text and the parsed entry, if one is found.
If the line is a commented-out entry, the parsed entry can still be
present."""
def __init__(self, text, parsed):
self.text = text
self.parsed = parsed
self.commented = False
self.tagged_as_disabled = text.find(DISABLED_TAG) >= 0
def make_hostnossl(self):
# Assumes already parsed as 'host' only and not something else
self.text = self.text.replace('host', 'hostnossl', 1)
def comment(self):
self.text = '# ' + self.text + DISABLED_TAG
def uncomment(self):
if self.text[-len(DISABLED_TAG):] == DISABLED_TAG:
self.text = self.text[:-len(DISABLED_TAG)]
i = self.text.find('#')
if i >= 0:
self.text = self.text[i + 1:].lstrip()
class Entry(object):
"""A parsed pg_hba.conf entry"""
def __init__(self):
self.type = None
self.database = None
self.user = None
self.address_fields = None
self.auth_method = None
self.auth_options = None
def is_localhost(self):
af = self.address_fields
if af:
if len(af) == 1:
return af[0].lower() in ('127.0.0.1/32', '::1/128', 'localhost')
else:
return ([x.lower() for x in af[0:2]] in
(['127.0.0.1','255.255.255.255'],
['::1','ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff']))
return False
def __eq__(self, other):
return (self.type == other.type and self.database == other.database
and self.user == other.user and self.address_fields == other.address_fields
and self.auth_method == other.auth_method
and self.auth_options == other.auth_options)
def load(path):
"""Load and parse the specified pg_hba.conf file"""
with open(path, 'r') as f:
return [parse_line(l.rstrip('\n')) for l in list(f)]
def split_comment(line):
i = line.find('#')
if i >= 0:
return (line[:i], line[i + 1:])
else:
return (line, '')
def parse_line(line, in_comment=False):
"""Attempt to parse a line of a pg_hba.conf file.
Return a Line object representing the line. Should always succeed
even if parsing the entry fails."""
try:
nc, c = split_comment(line)
if not nc and not in_comment:
cl = parse_line(c, True)
l = Line(line, cl.parsed)
l.commented = True
return l
else:
p = None
# Note: does not handle quoting
tokens = nc.split()
if len(tokens) >= 5:
if tokens[0] in CONNECTION_TYPES:
p = Entry()
p.type = tokens[0]
p.database = tokens[1]
p.user = tokens[2]
if '/' in tokens[3]:
i = 4
else:
i = 5
p.address_fields = tokens[3:i]
if i < len(tokens):
p.auth_method = tokens[i]
p.auth_options = tokens[i+1:]
if p.auth_method.upper() == 'METHOD':
p = None # sample line
elif '"' in line:
raise ValueError("Cannot handle quotes in line: " + line)
return Line(line, p)
except Exception:
print_exc()
return Line(line, None)
def get_site_ip_address_ranges(hba):
"""Attempt to determine all of the site-specific IP address ranges by
looking for all non-localhost entries for the "metadata" database.
If there are no such entries, look for the entries with the database
field set to "all"."""
result = []
for db in ('metadata', 'all'):
for line in hba:
p = line.parsed
if not line.commented and p and p.database == db and p.type in ('host', 'hostnossl'):
if not p.is_localhost():
result.append(p.address_fields)
if result:
break
return result
def get_required_ssl_lines(hba):
"""Generate all of the certificate-based authentication entries that should
be in pg_hba.conf, taking the site-specific address ranges into account."""
site_ip_addr_ranges = get_site_ip_address_ranges(hba)
required_lines = []
for r in site_ip_addr_ranges:
text = 'hostssl all all %s cert clientcert=1' % (' '.join(r),)
required_lines.append(parse_line(text))
required_lines += [
parse_line('hostssl all all 127.0.0.1/32 cert clientcert=1'),
parse_line('hostssl all all ::1/128 cert clientcert=1')
]
return required_lines
def scan_and_update(hba, update=False, report=False):
"""Scan the contents of a pg_hba.conf file, making changes for
certificate-based authentication (if 'update' is True) and/or
reporting the state of the file (if 'report' is True.)"""
seen_host_lines = False
seen_remote_non_cba = False
ssl_required = get_required_ssl_lines(hba)
for line in hba:
p = line.parsed
if p and not line.commented:
if p.type == 'host':
seen_host_lines = True
if update:
line.make_hostnossl()
elif p.type == 'hostnossl':
if not p.is_localhost():
seen_remote_non_cba = True
elif p.type == 'hostssl':
if p.auth_method != 'cert' or "clientcert=1" not in p.auth_options:
seen_remote_non_cba = True
for i in range(0, len(ssl_required)):
if p == ssl_required[i].parsed:
del ssl_required[i]
break
if update:
hba += ssl_required
if report:
if seen_host_lines:
pinf('Has "host" lines that may be ambiguous and should be changed to either "hostnossl" or "hostssl".')
if seen_remote_non_cba:
pinf('Allows remote access without certificate-based authentication.')
else:
pinf('Does not allow remote access without certificate-based authentication.')
if ssl_required:
pinf("Missing the following entries for certificate-based authentication.")
pinf('\n'.join([' ' + line.text for line in ssl_required]))
else:
pinf("Has all requried certificate-based authentication entries.")
return hba
def update_for_ssl(hba):
return scan_and_update(hba, update=True)
def disable_remote_non_ssl(hba):
for line in hba:
p = line.parsed
if p and not line.commented and p.type in ('host', 'hostnossl') and not p.is_localhost():
line.comment()
return hba
def enable_remote_non_ssl(hba):
for line in hba:
p = line.parsed
if p and line.commented and line.tagged_as_disabled and p.type == 'hostnossl':
line.uncomment()
return hba
def check(hba):
scan_and_update(hba, report=True)
def usage():
pinf('''usage: %s [mode option] [other options]...
Mode Options:
--check Check status of the file.
--update-for-ssl Change ambigous "host" entries to "hostnossl" and
add required "hostssl" entries.
--disable-remote-non-ssl Comment out any entries that allow remote access
without SSL and cetificate-based authentication.
--enable-remote-non-ssl Restore entries marked as disabled by the
the --disable-remote-no-ssl mode.
Modes that modify the file create a backup: {file}.{date}[.{n}]
Other Options:
-f FILE Operate on FILE instead of /awips2/data/pg_hba.conf
-o Write modified contents to standard output instead of the
input file.
''' % (basename(sys.argv[0]),))
def write_hba(hba, f):
for line in hba:
f.write(line.text)
f.write('\n')
def store(hba, path):
"""Store authentication file contents to the specified path.
Stores to the specified path, making a backup of the original."""
backup_sfx = strftime('%Y-%m-%d', gmtime())
h, temp_path = mkstemp(dir=dirname(path))
try:
st = stat(path)
with fdopen(h, 'w') as f:
write_hba(hba, f)
chown(temp_path, st.st_uid, st.st_gid)
copymode(path, temp_path)
i = 0
def make_backup_path():
s = path + '.' + backup_sfx
if i > 0:
s += '.' + str(i)
return s
backup_path = make_backup_path()
while True:
if not exists(backup_path):
break
i += 1
backup_path = make_backup_path()
rename(path, backup_path)
rename(temp_path, path)
temp_path = None
finally:
if temp_path:
remove(temp_path)
func_for_mode = {
UPDATE_OPTION: update_for_ssl,
ENABLE_OPTION: enable_remote_non_ssl,
DISABLE_OPTION: disable_remote_non_ssl,
CHECK_OPTION: check
}
def main():
mode = None
path = '/awips2/data/pg_hba.conf'
to_stdout = False
try:
opts, args = getopt(sys.argv[1:], 'f:o', MODE_OPTIONS)
except GetoptError as e:
perr(str(e))
usage()
sys.exit(1)
for k, v in opts:
if k == '-f':
path = v
elif k == '-o':
to_stdout = True
elif k[0:2] == '--' and k[2:] in MODE_OPTIONS:
mode = k[2:]
if args:
usage()
sys.exit(1)
if mode:
hba = load(path)
hba = func_for_mode[mode](hba)
if mode != CHECK_OPTION:
if not to_stdout:
store(hba, path)
else:
write_hba(hba, sys.stdout)
else:
usage()
sys.exit(1)
if __name__ == '__main__':
main()