awips2/edexOsgi/com.raytheon.uf.tools.cli/impl/a2dbauth

320 lines
13 KiB
Text
Raw Normal View History

2022-05-05 12:34:50 -05:00
#!/awips2/python/bin/python3
# dbauth - A wrapper for PostgreSQL command line programs to add
# certificate-based authentication parameters based on AWIPS
# conventions.
#
# The default SSL mode is taken from the FXA configuration file
# $FXA_DATA/nationalData/db.config. This file can also specify
# credential directory to override the default of ~/.postgresql/.
#
# Ideally, this would change the database connection string to
# specify sslcert, etc. However, it currently only sets the
# PGSSLCERT, etc. environment variables.
#
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- ------- -------------- --------------------------------------
# Nov 18, 2016 19512 David Friedman Initial creation
# Mar 08, 2017 19824 David Friedman Support foreign commands and
# running under EDEX.
# Mar 25, 2019 7767 tgurney Add support for psql "echo-errors"
# flag
# Feb 25, 2021 8376 Robert Blum Add support for pgsqlshp. Note a2dbauth
# now parses the -u option as username along
# with -U and --username.
from configparser import RawConfigParser
from io import StringIO
from getopt import getopt, gnu_getopt, GetoptError
import os
from os.path import basename, isdir, isfile, join, normpath
import sys
prog_name=basename(sys.argv[0])
def pmsg(level, msg, fp=None):
fp = fp or sys.stderr
fp.write('%s: %s: %s\n' % (prog_name, level, msg))
def perr(msg):
pmsg('error', msg)
def pfat(msg):
pmsg('error', msg)
sys.exit(1)
def pwrn(msg):
pmsg('warning', msg)
def usage(fp=None):
fp = fp or sys.stderr
fp.write('''usage: %s [options] COMMAND...
-f, --foreign Do not attempt to parse target command arguments.
-m MODE, --sslmode=MODE PostgreSQL SSL connection mode
[read from $FXA_DATA/nationalData/db.config, $PGSSLMODE]
-c DIR, --creddir=DIR Directory that contains certificates and keys
[$HOME/.postgresql]
-U USER, --username=USER Specify database user (for cases in which this script can not
deduce the user from the command arguments or $PGUSER.)
-h, --help Display this message
-n Only print command that would be run
-v Print the command that will be run to standard error
''' % (prog_name,))
def read_fxa_db_conf():
data_fxa = os.environ.get('FXA_DATA', '/data/fxa')
if normpath(data_fxa) == '/awips2/edex/data/fxa':
data_fxa = '/data/fxa'
config_path = join(data_fxa, 'nationalData', 'db.config')
try:
with open(config_path, 'r') as f:
text = f.read()
c = RawConfigParser()
# db.config should be mostly compatible enough with ConfigParser
# syntax, but we need to add a section header.
c.readfp(StringIO('[default]\n' + text))
o = 'db.sslMode'
if c.has_option('default', o):
return c.get('default', o)
except Exception as e:
pwrn('reading %s: %s' % (config_path, str(e)))
return None
prog_option_db = {
'psql': ('c:d:f:lv:VX1?abeEL:no:qsSAF:HP:R:tT:xz0h:p:U:wW',
['command=','dbname=','file=','list','set=',
'variable=','version','no-psqlrc','single-transaction',
'help','echo-all', 'echo-errors', 'echo-queries',
'echo-hidden', 'log-file=', 'no-readline',
'output=', 'quiet', 'single-step', 'single-line',
'no-align', 'field-separator=', 'html', 'pset=',
'record-separator=', 'tuples-only', 'table-attr=',
'expanded', 'field-separator-zero', 'record-separator-zero',
'host=', 'port=', 'username=',
'no-password', 'password']),
'createdb': ('D:eE:l:O:T:V?h:p:U:wW',
['tablespace=', 'echo', 'encoding=', 'locale=', 'lc-collate=',
'lc-ctype=', 'owner=', 'template=', 'version', 'help', 'host=',
'port=', 'username=', 'no-password', 'password', 'maintenance-db=']),
'dropdb': ('eV?h:p:U:wW',
['echo', 'version', 'if-exists', 'help', 'host=',
'port=', 'username=', 'no-password', 'password', 'maintenance-db=']),
'pg_basebackup': ('D:F:RxX:zZ:c:l:pvV?d:h:p:s:U:wW',
['pgdata=', 'format=', 'write-recovery-conf', 'xlog', 'xlog-method=',
'gzip', 'compress=', 'checkpoint=', 'label=', 'progress', 'verbose',
'version', 'help', 'dbname=', 'host=', 'port=', 'status-interval=',
'username=', 'no-password', 'password']),
'pg_dump': ('f:F:j:vVZ:?abcCE:n:N:oOsS:t:T:xd:h:p:U:wW',
['file=', 'format=', 'jobs=', 'verbose', 'version', 'compress=', 'lock-wait-timeout=',
'help', 'data-only', 'blobs', 'clean', 'create', 'encoding=', 'schema=',
'exclude-schema=', 'oids', 'no-owner', 'schema-only', 'superuser=', 'table=',
'exclude-table=', 'no-privileges', 'binary-upgrade', 'column-inserts', 'disable-dollar-quoting',
'disable-triggers', 'exclude-table-data=', 'inserts', 'no-security-labels',
'no-synchronized-snapshots', 'no-tablespaces', 'no-unlogged-table-data', 'quote-all-identifiers',
'section=', 'serializable-deferrable', 'use-set-session-authorization', 'dbname=',
'host=', 'port=', 'username=', 'no-password', 'password', 'role=']),
'pg_restore': ('d:f:F:lvV?acCeI:j:L:n:OP:sS:t:T:x1h:p:U:wW',
['dbname=', 'file=', 'format=', 'list', 'verbose', 'version', 'help',
'data-only', 'clean', 'create', 'exit-on-error', 'index=', 'jobs=',
'use-list=', 'schema=', 'no-owner', 'function=', 'schema-only',
'superuser=', 'table=', 'trigger=', 'no-privileges', 'single-transaction',
'disable-triggers', 'no-data-for-failed-tables', 'no-security-labels',
'no-tablespaces', 'section=', 'use-set-session-authorization', 'host=',
'port=', 'username=', 'no-password', 'password', 'role=']),
'pg_dumpall': ('f:V?acgoOrsS:txd:h:l:p:U:wW',
['file=', 'version=', 'lock-wait-timeout=', 'help=', 'data-only=',
'clean=', 'globals-only=', 'oids=', 'no-owner=', 'roles-only=',
'schema-only=', 'superuser=', 'tablespaces-only=', 'no-privileges=',
'binary-upgrade=', 'column-inserts=', 'disable-dollar-quoting=', 'disable-triggers=',
'inserts=', 'no-security-labels=', 'no-tablespaces=', 'no-unlogged-table-data=',
'quote-all-identifiers=', 'use-set-session-authorization', 'dbname=',
'host=', 'database=', 'port=', 'username=', 'no-password=', 'password=',
'role=']),
'vacuumdb': ('ad:efFqt:vVzZ?h:p:U:wW',
['all', 'dbname=' 'echo', 'full', 'freeze', 'quiet', 'table='
'verbose', 'version', 'analyze', 'analyze-only', 'help', 'host='
'port=' 'username=' 'no-password', 'password', 'maintenance-db=']),
'pgsql2shp': ('f:h:p:P:u:g:brkm:?', ['help'])
}
def fallback_getopt(args):
out_opts = []
out_args = []
ai = iter(args)
for arg in ai:
if arg == '--':
break
elif arg[0:2] in ['-u', '-U']:
if len(arg) > 2:
out_opts.append((arg[0:2], arg[2:]))
else:
try:
out_opts.append((arg[0:2], next(ai)))
except StopIteration:
break
elif arg[0:10] == '--username':
if len(arg) > 10 and arg[11] == '=':
out_opts.append(('--username', arg[12:]))
else:
try:
out_opts.append(('--username', next(ai)))
except StopIteration:
break
return out_opts, out_args
class Main(object):
def __init__(self):
self.ssl_mode = None
self.cred_dir = None
self.is_foreign = False
self.user = None
self.verbose = False
self.print_only = False
self.input_arguments = []
self.override_environment = {}
self.must_set_ssl_mode = False
self.warned_cred_dir_nexist = False
def init_defaults(self):
if self.ssl_mode is None:
self.ssl_mode = read_fxa_db_conf()
if self.ssl_mode is not None:
self.must_set_ssl_mode = True
else:
self.ssl_mode = os.environ.get('PGSSLMODE')
if self.cred_dir is None:
h = os.environ.get('HOME')
if h:
self.cred_dir = join(h, '.postgresql')
else:
pwrn('HOME is not set')
def validate_values(self):
if self.ssl_mode is not None and self.ssl_mode not in (
'disable','allow','prefer','require','verify-ca','verify-full'):
pwrn('invalid ssl mode "%s"' % (self.ssl_mode,))
if self.may_use_ssl() and not isdir(self.cred_dir):
pwrn('credential dir does not exist or is not a directory: ' + self.cred_dir)
self.warned_cred_dir_nexist = True
def may_use_ssl(self):
return self.ssl_mode is None or self.ssl_mode != 'disable'
def process(self):
self.init_defaults()
self.validate_values()
self.process_input_arguments()
if self.print_only:
self.print_command(sys.stdout)
else:
if self.verbose:
self.print_command(sys.stderr)
self.execute_command()
def process_input_arguments(self):
user = os.environ.get('PGUSER')
target_program = None
if self.input_arguments:
target_program = basename(self.input_arguments[0])
if not self.is_foreign:
opts, _ = None, None
try:
short_opts, long_opts = prog_option_db[target_program]
try:
opts, _ = gnu_getopt(self.input_arguments[1:], short_opts, long_opts)
except GetoptError as e:
pwrn('unable to parse target program options: ' + str(e))
except KeyError:
pwrn('unknown target program: ' + str(target_program))
if opts is None:
opts, _ = fallback_getopt(self.input_arguments[1:])
for k, v in opts:
if k in ('-u','-U','--username'):
user = v
if self.user is not None:
user = self.user
oo = self.override_environment
if self.must_set_ssl_mode:
oo['PGSSLMODE'] = self.ssl_mode
if self.may_use_ssl() and self.cred_dir:
if user:
oo['PGSSLCERT'] = self.cred_file(user + '.crt')
oo['PGSSLKEY'] = self.cred_file(user + '.key')
oo['PGSSLROOTCERT'] = self.cred_file('root.crt')
def cred_file(self, base_name):
cred_path = join(self.cred_dir, base_name)
if not isfile(cred_path) and not self.warned_cred_dir_nexist:
pwrn('credential path does not exist or is not file: ' + cred_path)
return cred_path
def print_command(self, fp):
cmd = ' '.join([ '%s=%s' % (k, v) for k, v in
self.override_environment.items()] +
list(self.input_arguments))
fp.write(cmd)
fp.write('\n')
def execute_command(self):
if not self.input_arguments:
perr('no command given')
usage()
sys.exit(1)
sys.stdout.flush()
sys.stderr.flush()
env_to_use = os.environ
if self.override_environment:
env_to_use = env_to_use.copy()
env_to_use.update(self.override_environment)
exe = self.input_arguments[0]
try:
os.execvpe(exe, self.input_arguments, env_to_use)
# Should never be here
sys.exit(0)
except Exception as e:
pfat('failed to execute target program %s: %s' % (exe, str(e)))
def run():
main = Main()
try:
opts, args = getopt(sys.argv[1:], 'c:fm:U:hnv', ['creddir=','foreign','sslmode=','username=','help'])
except GetoptError as e:
perr(str(e))
usage()
sys.exit(1)
for k, v in opts:
if k == '-n':
main.print_only = True
elif k == '-h' or k == '--help':
usage(sys.stdout)
sys.exit(0)
elif k == '-v':
main.verbose = True
elif k == '-m' or k == '--sslmode':
main.ssl_mode = v
main.must_set_ssl_mode = True
elif k == '-c' or k == '--creddir':
main.cred_dir = v
elif k in ['-u','-U','--username']:
main.user = v
elif k == '-f' or k == '--foreign':
main.is_foreign = True
main.input_arguments = args
main.process()
if __name__ == '__main__':
run()