#!/awips2/python/bin/python3 ## # This software was developed and / or modified by Raytheon Company, # pursuant to Contract DG133W-05-CQ-1067 with the US Government. # # U.S. EXPORT CONTROLLED TECHNICAL DATA # This software product contains export-restricted data whose # export/transfer/disclosure is restricted by U.S. law. Dissemination # to non-U.S. persons whether in the United States or abroad requires # an export license or other authorization. # # Contractor Name: Raytheon Company # Contractor Address: 6825 Pine Street, Suite 340 # Mail Stop B8 # Omaha, NE 68106 # 402.291.0100 # # See the AWIPS II Master Rights File ("Master Rights File.pdf") for # further licensing information. ## # # Utility for providing easy access to localization files from the command line. # # SOFTWARE HISTORY # # Date Ticket# Engineer Description # --------- -------- --------- -------------------------- # 08/09/17 5731 bsteffen Initial Creation. # 04/27/20 7883 tgurney Python 3 fixes # 01/12/21 8735 mapeters Python 3 fixes for binary files import sys import argparse from ufpy.localization.LocalizationFileManager import (LocalizationFileManager, NON_EXISTENT_CHECKSUM, LocalizationFileIsNotDirectoryException) def getManager(args): """Translate the host, site, and type args into a LocalizationFileManager Args: args: the result of ArgumentParser.parse_args() from main() Returns: a LocalizationFileManager """ manager_args = {} if args.host is not None: manager_args["host"] = args.host if args.site is not None: manager_args["site"] = args.site if args.type is not None: manager_args["localizationType"] = args.type if args.port is not None: manager_args["port"] = args.port return LocalizationFileManager(**manager_args) def getLocalizationFile(manager, args): """Translate the level and path args into a LocalizationFile Args: manager: a LocalizationFileManager args: the result of ArgumentParser.parse_args() from main() Returns: a LocalizationFile """ if args.level: return manager.getSpecific(args.level.lower(), args.path) else: f = manager.getAbsolute(args.path) if f is None: raise RuntimeError(args.path + " does not exist") return f def openLocalFile(args, mode): """Translate the file args into an open file. Args: args: the result of ArgumentParser.parse_args() from main() mode: 'r' or 'w', just like the builtin open Returns: a file object """ if args.file == 1: f = args.path if '/' in f: f = f.rsplit('/',1)[1] return open(f, mode); else: return open(args.file, mode); def validateChange(f, args): """Translate the checksum and overwrite args into an error if the file shouldn't be changed. Args: f: a LocalizationFile args: the result of ArgumentParser.parse_args() from main() """ if args.checksum: match = False if args.checksum == f.checksum: match = True elif len(args.checksum) > 5 and len(f.checksum) > len(args.checksum): # Allow partial checksums, kinda like git, as ong as there are at least 5 characters. match = args.checksum == f.checksum[:len(args.checksum)] if not(match): raise RuntimeError("Checksum mismatch: " + f.checksum) elif not(args.overwrite) and f.checksum != NON_EXISTENT_CHECKSUM: raise RuntimeError("--overwrite or --checksum " + f.checksum + " must be used to " + args.command + " an existing file") def formatContext(context, width): """Convert context to user friendly string Args: context: a Localization Context width: the width of the context column Return: A string representing the context """ if context.isBase(): return context.level.ljust(width) else: return (context.level + '(' + context.name + ')').ljust(width) ls_help = "list all the localization files in a directory." def ls(args): """list all the localization files in a directory. Args: args: the result of ArgumentParser.parse_args() from main() """ manager = getManager(args) path = args.path try: if args.all or args.level: file_sets = manager.listIncremental(path) files = [] for s in file_sets: for f in s: if not(args.level) or f.context.level == args.level.lower(): files.append(f) else: files = manager.listAbsolute(path) except LocalizationFileIsNotDirectoryException: if args.all or args.level: files = manager.getIncremental(path) if(args.level): new_files = [] for f in files: if f.context.level == args.level.lower(): newfiles.append(f) files = new_files else: files = [manager.getAbsolute(path)] if files: contextWidth = max((len(formatContext(f.context, 1)) for f in files)) checksumWidth = max((len(f.checksum) for f in files)) for f in files: if args.long: print(formatContext(f.context, contextWidth), \ f.checksum.ljust(checksumWidth), \ f.timestamp.strftime("%Y-%m-%d %H:%M:%S"), \ f.path, \ sep=" ") elif args.all: print(formatContext(f.context, contextWidth), f.path, sep=" ") else: print(f.path) cat_help = "read the contents of a localization file." def cat(args): """read the contents of a localization file. Args: args: the result of ArgumentParser.parse_args() from main() """ manager = getManager(args) f = getLocalizationFile(manager, args) with f.open('r') as src: if args.file: with openLocalFile(args, 'wb') as dest: dest.write(src.read()) else: sys.stdout.buffer.write(src.read()) write_help = "write the contents of a localization file." def write(args): """write the contents of a localization file. Args: args: the result of ArgumentParser.parse_args() from main() """ manager = getManager(args) if args.level: f = manager.getSpecific(args.level.lower(), args.path) else: f = manager.getSpecific('user', args.path) validateChange(f, args) with f.open('w') as dest: if args.file: with openLocalFile(args, 'rb') as src: dest.write(src.read()) else: dest.write(sys.stdin.buffer.read()) rm_help="delete a localization file." def rm(args): """delete a localization file. Args: args: the result of ArgumentParser.parse_args() from main() """ manager = getManager(args) f = getLocalizationFile(manager, args); validateChange(f, args) f.delete(); def main(): parser = argparse.ArgumentParser(description="Interact with localization files") l_help = "Restrict operations to a single specific localization level. " + \ "By default most commands only use the highest version of any particular file." parser.add_argument("-r", "--level", help=l_help) parser.add_argument("-t", "--type", help='Localization type, default is common_static') parser.add_argument("--host", help='Localization server hostname') parser.add_argument("--port", help='Localization server port') parser.add_argument("-s", "--site", help='') subparsers = parser.add_subparsers(help='Specific operation to perform', dest="command") # Python 3 adds real support for subcommand aliases. ls_commands = ("ls", "list") for alias in ls_commands: help = ls_help if alias != ls_commands[0]: help = "alias for " + ls_commands[0] list_parser = subparsers.add_parser(alias, description=ls_help, help=help) list_parser.add_argument("-a", "--all", action='store_true', help='Show files at all levels, default is only the highest level.') list_parser.add_argument("-l", "--long", action='store_true', help='Longer listing including checksum and timestamp') list_parser.set_defaults(func=ls) list_parser.add_argument("path", help='localization path to operate on', nargs='?', default='.') cat_commands = ("cat", "read") for alias in ("cat", "read"): help = cat_help if alias != cat_commands[0]: help = "alias for " + cat_commands[0] read_parser = subparsers.add_parser(alias, description=cat_help, help=help); f_help = "Write the file to the local filesystem instead of printing to stdout. " + \ "If this option is used without specifying a file than the name of the localization file is used." # -f doesn't always work with no arg, see http://bugs.python.org/issue9338 read_parser.add_argument("-f", "--file", nargs='?', const=1, help=f_help) read_parser.set_defaults(func=cat) read_parser.add_argument("path", help='localization path to operate on') write_parser = subparsers.add_parser("write", description=write_help, help=write_help); f_help = "Read a file from the local filesystem instead of reading stdin. " + \ "If this option is used without specifying a file than the name of the localization file is used." # -f doesn't always work with no arg, see http://bugs.python.org/issue9338 write_parser.add_argument("-f", "--file", nargs='?', const=1, help=f_help) consistency_group = write_parser.add_mutually_exclusive_group(); consistency_group.add_argument("-o", "--overwrite", action='store_true', help='Write a new file, ignoring any existing file.') consistency_group.add_argument("-c", "--checksum", help='The checksum of an existing file that should be overwritten') write_parser.set_defaults(func=write) write_parser.add_argument("path", help='localization path to operate on') rm_commands = ("rm", "delete") for alias in ("rm","delete"): help = rm_help if alias != rm_commands[0]: help = "alias for " + rm_commands[0] delete_parser = subparsers.add_parser(alias, description=rm_help, help=help); delete_parser.add_argument("-o", "--overwrite", action='store_true', help='Delete regardless of which version of the file is on the server.') delete_parser.add_argument("-c", "--checksum", help='Delete only if the file has the specified checksum.') delete_parser.set_defaults(func=rm) delete_parser.add_argument("path", help='localization path to operate on') args = parser.parse_args() args.func(args) if __name__ == "__main__": main()