awips2/edexOsgi/com.raytheon.uf.tools.cli/impl/lf
2022-05-05 12:34:50 -05:00

292 lines
11 KiB
Python

#!/awips2/python/bin/python3
##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Utility for providing easy access to localization files from the command line.
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# --------- -------- --------- --------------------------
# 08/09/17 5731 bsteffen Initial Creation.
# 04/27/20 7883 tgurney Python 3 fixes
# 01/12/21 8735 mapeters Python 3 fixes for binary files
import sys
import argparse
from ufpy.localization.LocalizationFileManager import (LocalizationFileManager,
NON_EXISTENT_CHECKSUM,
LocalizationFileIsNotDirectoryException)
def getManager(args):
"""Translate the host, site, and type args into a LocalizationFileManager
Args:
args: the result of ArgumentParser.parse_args() from main()
Returns:
a LocalizationFileManager
"""
manager_args = {}
if args.host is not None:
manager_args["host"] = args.host
if args.site is not None:
manager_args["site"] = args.site
if args.type is not None:
manager_args["localizationType"] = args.type
if args.port is not None:
manager_args["port"] = args.port
return LocalizationFileManager(**manager_args)
def getLocalizationFile(manager, args):
"""Translate the level and path args into a LocalizationFile
Args:
manager: a LocalizationFileManager
args: the result of ArgumentParser.parse_args() from main()
Returns:
a LocalizationFile
"""
if args.level:
return manager.getSpecific(args.level.lower(), args.path)
else:
f = manager.getAbsolute(args.path)
if f is None:
raise RuntimeError(args.path + " does not exist")
return f
def openLocalFile(args, mode):
"""Translate the file args into an open file.
Args:
args: the result of ArgumentParser.parse_args() from main()
mode: 'r' or 'w', just like the builtin open
Returns:
a file object
"""
if args.file == 1:
f = args.path
if '/' in f:
f = f.rsplit('/',1)[1]
return open(f, mode);
else:
return open(args.file, mode);
def validateChange(f, args):
"""Translate the checksum and overwrite args into an error if the file shouldn't be changed.
Args:
f: a LocalizationFile
args: the result of ArgumentParser.parse_args() from main()
"""
if args.checksum:
match = False
if args.checksum == f.checksum:
match = True
elif len(args.checksum) > 5 and len(f.checksum) > len(args.checksum):
# Allow partial checksums, kinda like git, as ong as there are at least 5 characters.
match = args.checksum == f.checksum[:len(args.checksum)]
if not(match):
raise RuntimeError("Checksum mismatch: " + f.checksum)
elif not(args.overwrite) and f.checksum != NON_EXISTENT_CHECKSUM:
raise RuntimeError("--overwrite or --checksum " + f.checksum + " must be used to " + args.command + " an existing file")
def formatContext(context, width):
"""Convert context to user friendly string
Args:
context: a Localization Context
width: the width of the context column
Return:
A string representing the context
"""
if context.isBase():
return context.level.ljust(width)
else:
return (context.level + '(' + context.name + ')').ljust(width)
ls_help = "list all the localization files in a directory."
def ls(args):
"""list all the localization files in a directory.
Args:
args: the result of ArgumentParser.parse_args() from main()
"""
manager = getManager(args)
path = args.path
try:
if args.all or args.level:
file_sets = manager.listIncremental(path)
files = []
for s in file_sets:
for f in s:
if not(args.level) or f.context.level == args.level.lower():
files.append(f)
else:
files = manager.listAbsolute(path)
except LocalizationFileIsNotDirectoryException:
if args.all or args.level:
files = manager.getIncremental(path)
if(args.level):
new_files = []
for f in files:
if f.context.level == args.level.lower():
newfiles.append(f)
files = new_files
else:
files = [manager.getAbsolute(path)]
if files:
contextWidth = max((len(formatContext(f.context, 1)) for f in files))
checksumWidth = max((len(f.checksum) for f in files))
for f in files:
if args.long:
print(formatContext(f.context, contextWidth), \
f.checksum.ljust(checksumWidth), \
f.timestamp.strftime("%Y-%m-%d %H:%M:%S"), \
f.path, \
sep=" ")
elif args.all:
print(formatContext(f.context, contextWidth), f.path, sep=" ")
else:
print(f.path)
cat_help = "read the contents of a localization file."
def cat(args):
"""read the contents of a localization file.
Args:
args: the result of ArgumentParser.parse_args() from main()
"""
manager = getManager(args)
f = getLocalizationFile(manager, args)
with f.open('r') as src:
if args.file:
with openLocalFile(args, 'wb') as dest:
dest.write(src.read())
else:
sys.stdout.buffer.write(src.read())
write_help = "write the contents of a localization file."
def write(args):
"""write the contents of a localization file.
Args:
args: the result of ArgumentParser.parse_args() from main()
"""
manager = getManager(args)
if args.level:
f = manager.getSpecific(args.level.lower(), args.path)
else:
f = manager.getSpecific('user', args.path)
validateChange(f, args)
with f.open('w') as dest:
if args.file:
with openLocalFile(args, 'rb') as src:
dest.write(src.read())
else:
dest.write(sys.stdin.buffer.read())
rm_help="delete a localization file."
def rm(args):
"""delete a localization file.
Args:
args: the result of ArgumentParser.parse_args() from main()
"""
manager = getManager(args)
f = getLocalizationFile(manager, args);
validateChange(f, args)
f.delete();
def main():
parser = argparse.ArgumentParser(description="Interact with localization files")
l_help = "Restrict operations to a single specific localization level. " + \
"By default most commands only use the highest version of any particular file."
parser.add_argument("-r", "--level", help=l_help)
parser.add_argument("-t", "--type", help='Localization type, default is common_static')
parser.add_argument("--host", help='Localization server hostname')
parser.add_argument("--port", help='Localization server port')
parser.add_argument("-s", "--site", help='')
subparsers = parser.add_subparsers(help='Specific operation to perform', dest="command")
# Python 3 adds real support for subcommand aliases.
ls_commands = ("ls", "list")
for alias in ls_commands:
help = ls_help
if alias != ls_commands[0]:
help = "alias for " + ls_commands[0]
list_parser = subparsers.add_parser(alias, description=ls_help, help=help)
list_parser.add_argument("-a", "--all", action='store_true', help='Show files at all levels, default is only the highest level.')
list_parser.add_argument("-l", "--long", action='store_true', help='Longer listing including checksum and timestamp')
list_parser.set_defaults(func=ls)
list_parser.add_argument("path", help='localization path to operate on', nargs='?', default='.')
cat_commands = ("cat", "read")
for alias in ("cat", "read"):
help = cat_help
if alias != cat_commands[0]:
help = "alias for " + cat_commands[0]
read_parser = subparsers.add_parser(alias, description=cat_help, help=help);
f_help = "Write the file to the local filesystem instead of printing to stdout. " + \
"If this option is used without specifying a file than the name of the localization file is used."
# -f doesn't always work with no arg, see http://bugs.python.org/issue9338
read_parser.add_argument("-f", "--file", nargs='?', const=1, help=f_help)
read_parser.set_defaults(func=cat)
read_parser.add_argument("path", help='localization path to operate on')
write_parser = subparsers.add_parser("write", description=write_help, help=write_help);
f_help = "Read a file from the local filesystem instead of reading stdin. " + \
"If this option is used without specifying a file than the name of the localization file is used."
# -f doesn't always work with no arg, see http://bugs.python.org/issue9338
write_parser.add_argument("-f", "--file", nargs='?', const=1, help=f_help)
consistency_group = write_parser.add_mutually_exclusive_group();
consistency_group.add_argument("-o", "--overwrite", action='store_true', help='Write a new file, ignoring any existing file.')
consistency_group.add_argument("-c", "--checksum", help='The checksum of an existing file that should be overwritten')
write_parser.set_defaults(func=write)
write_parser.add_argument("path", help='localization path to operate on')
rm_commands = ("rm", "delete")
for alias in ("rm","delete"):
help = rm_help
if alias != rm_commands[0]:
help = "alias for " + rm_commands[0]
delete_parser = subparsers.add_parser(alias, description=rm_help, help=help);
delete_parser.add_argument("-o", "--overwrite", action='store_true', help='Delete regardless of which version of the file is on the server.')
delete_parser.add_argument("-c", "--checksum", help='Delete only if the file has the specified checksum.')
delete_parser.set_defaults(func=rm)
delete_parser.add_argument("path", help='localization path to operate on')
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()