diff --git a/rpms/awips2.upc/Installer.ldm/patch/bin/edexBridge b/rpms/awips2.upc/Installer.ldm/patch/bin/edexBridge new file mode 100644 index 0000000000..a5e0bed9b4 --- /dev/null +++ b/rpms/awips2.upc/Installer.ldm/patch/bin/edexBridge @@ -0,0 +1,418 @@ +#!/awips2/python/bin/python3 + +# +# edexBridge.py +# +# SOFTWARE HISTORY +# +# Date Ticket# Engineer Description +# ------------- -------- ------------ ------------------------------------------ +# Oct 08, 2009 brockwoo Initial creation +# Jun 21, 2013 bkowal Re-written to work with the qpid messaging +# api +# May 06, 2014 3102 rjpeter Updated to call cleanup if connect failed. +# Limit number of messages to be sent to +# QPID on a single send call +# Aug 05, 2014 3458 rjpeter Added logging of error when issue occurs +# on send +# Nov 04, 2014 2991 dlovely Updated to work with QPID 0.30 +# Feb 23, 2017 6082 bsteffen Updated to work with ssl certificates +# Jul 17, 2018 7143 bsteffen Reopen log files on HUP +# Jul 23, 2018 7235 bsterren Split producer and consumer into two threads +# Jul 10, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton +# Jul 10, 2019 7724 dgilling Simplify multithreading +# Nov 04, 2019 7969 dgilling Ported to python +# Jul 07, 2020 8187 randerso Added qpid connection_id +# Jul 29, 2021 8612 dgilling Added command line options for pqact's +# semaphore and shared memory segment +# key codes. +#Jan 01, 2024 tiffanym@ucar Change ufpy call to awips + +import argparse +import ctypes +import ctypes.util +import datetime +import logging +import logging.handlers +import multiprocessing +import os +import pwd +import queue +import signal +import socket +import sys +import threading +import time + +import proton +import proton.utils +import proton.reactor + +from awips import UsageArgumentParser + + +# custom logging level +VERBOSE = 15 +logging.addLevelName(VERBOSE, "VERBOSE") + +logger = None + +class edex_message(ctypes.Structure): + _fields_ = [("filename", ctypes.c_char*4096), + ("ident", ctypes.c_char*256),] + + +def process_args(): + DEFAULT_QPID_HOST = "localhost" + DEFAULT_QPID_PORT = 5672 + DEFAULT_QPID_USER = "guest" + DEFAULT_CERT_PASSWORD = os.getenv("QPID_SSL_CERT_PASSWORD", "password") + DEFAULT_QPID_CERT_PATH = os.getenv("QPID_SSL_CERT_DB", os.path.join(pwd.getpwuid(os.getuid()).pw_dir,".qpid")) + DEFAULT_QPID_CERT_NAME = os.getenv("QPID_SSL_CERT_NAME", DEFAULT_QPID_USER) + DEFAULT_QPID_CERT_KEY = os.getenv("QPID_SSL_CERT_KEY_FILE", DEFAULT_QPID_USER) + DEFAULT_KEY_PROJ_ID = "R" + DEFAULT_SEMKEY_PROJ_ID = "e" + DESCRIPTION= ( + "EDEX Bridge is used by the LDM to post data available messages to Qpid, which " + "alerts the EDEX Ingest server(s) that a file is ready for processing." + ) + + parser = UsageArgumentParser.UsageArgumentParser(prog="edexBridge", description=DESCRIPTION) + parser.add_argument("-s", "--server", + dest="broker_uri", + action="store", + default=DEFAULT_QPID_HOST, + metavar="SERVER", + help="specify hostname of qpid") + parser.add_argument("-p", "--port", + dest="port", + action="store", + type=int, + default=DEFAULT_QPID_PORT, + metavar="PORT", + help="specify port to connect to qpid") + parser.add_argument("-l", "--log", + dest="log_fname", + required=True, + action="store", + metavar="LOGFILE", + help="log to a specific file") + log_level_group = parser.add_mutually_exclusive_group() + log_level_group.add_argument("-v", "--verbose", + dest="log_level", + action="store_const", + const=VERBOSE, + default=logging.INFO, + help="explain what is being done") + log_level_group.add_argument("-x", "--debug", + dest="log_level", + action="store_const", + const=logging.DEBUG, + default=logging.INFO, + help="include extra logging, useful for debugging") + parser.add_argument("--ssl-cert-key-file", + dest="cert_key", + action="store", + default=DEFAULT_QPID_CERT_KEY, + metavar="NAME", + help="Key file used to verify the certificate") + parser.add_argument("--ssl-cert-db", + dest="cert_path", + action="store", + default=DEFAULT_QPID_CERT_PATH, + metavar="PATH", + help="Path to directory containing the certificate and key files") + parser.add_argument("--ssl-cert-name", + dest="cert_name", + action="store", + default=DEFAULT_QPID_CERT_NAME, + metavar="NAME", + help="Name of the certificate to use") + parser.add_argument("--ssl-cert-password", + dest="cert_password", + action="store", + default=DEFAULT_CERT_PASSWORD, + metavar="PASSWORD", + help="Password used to access the certificate") + parser.add_argument("--key-proj-id", + dest="key_proj_id", + action="store", + default=DEFAULT_KEY_PROJ_ID, + metavar="CHAR", + help="Key code for the pqact shared memory segment") + parser.add_argument("--sem-proj-id", + dest="semkey_proj_id", + action="store", + default=DEFAULT_SEMKEY_PROJ_ID, + metavar="CHAR", + help="Key code for the pqact semaphore") + + args = parser.parse_args() + if len(args.key_proj_id) > 1: + parser.error("argument for --key-proj-id should be only a single character.") + if len(args.semkey_proj_id) > 1: + parser.error("argument for --sem-proj-id should be only a single character.") + return args + +def init_logging(file, level): + logging.basicConfig(level=level, + format="%(asctime)s %(module)s[%(thread)d] %(levelname)s: %(message)s", + datefmt="%b %d %H:%M:%S", + handlers=[logging.handlers.WatchedFileHandler(file)]) + # handlers=[logging.handlers.WatchedFileHandler(file), logging.StreamHandler()]) + global logger + logger = logging.getLogger("edexBridge") + +def send_to_qpid(host, port, cert_path, cert_name, key_name, cert_password, msg_queue, shutdown_signal): + url = f"amqps://{host}:{port}" + certfile = os.path.join(cert_path, f"{cert_name}.crt") + certkey = os.path.join(cert_path, f"{key_name}.key") + ssl_domain = proton.SSLDomain(mode=proton.SSLDomain.MODE_CLIENT) + ssl_domain.set_credentials(certfile, certkey, cert_password) + ADDRESS = "external.dropbox" + is_shutdown = False + + messages_sent = 0 + def print_sent_count(): + nonlocal shutdown_signal, messages_sent + while True: + shutdown_signal.wait(60) + logger.info("Sent %d messages in the last 60 seconds.", messages_sent) + messages_sent = 0 + if shutdown_signal.is_set(): + return + print_thread = threading.Thread(target=print_sent_count) + print_thread.start() + + clientID = ":".join([ + socket.gethostname(), + pwd.getpwuid(os.getuid()).pw_name, + "edexBridge", + str(os.getpid()), + ]) + + while True: + conn = None + sender = None + try: + if not is_shutdown: + is_shutdown = shutdown_signal.is_set() + if is_shutdown: + logger.info("Received shutdown signal.") + + container = proton.reactor.Container() + container.container_id = clientID + conn = proton.utils.BlockingConnection(url, container=container, ssl_domain=ssl_domain) + sender = conn.create_sender(ADDRESS) + logger.info("Connected to broker [%s].", url) + + while True: + try: + if not is_shutdown: + is_shutdown = shutdown_signal.is_set() + if is_shutdown: + logger.info("Received shutdown signal.") + + msg_args = msg_queue.get(True, 1) + if msg_args: + logger.debug("Sending file [%s] to EDEX with header [%s].", msg_args["body"], msg_args["subject"]) + msg = proton.Message(**msg_args) + msg.properties["enqueueTime"] = int(datetime.datetime.now().timestamp() * 1000) + sender.send(msg) + messages_sent += 1 + except queue.Empty: + logger.debug("Send queue is empty.") + if is_shutdown: + if messages_sent: + logger.info("Sent remaining %d messages.", messages_sent) + logger.info("Send process shutting down.") + return + except proton.ProtonException: + logger.error("Send failed for file [%s].", msg_args["body"]) + logger.warn("Lost connection to QPID broker.", exc_info=True) + msg_queue.put(msg_args) + break + except: + logger.exception("Send failed for file [%s].", msg_args["body"]) + msg_queue.put(msg_args) + except proton.ProtonException: + logger.error("Failed to connect to QPID broker", exc_info=True) + if is_shutdown: + if msg_queue.qsize(): + logger.warn("Exiting with %d unsent messages.", msg_queue.qsize()) + return + finally: + try: + if conn: + logger.info("Disconnecting from broker.") + conn.close() + except proton.ProtonException: + logger.warn("Exception trying to close connection", exc_info=True) + time.sleep(30) + print_thread.join() + +def queue_messages(start_index, end_index, message_pointer, msg_queue): + logger.debug("Preparing to queue %d messages.", (end_index - start_index)) + + for i in range(start_index, end_index): + file = message_pointer[i].filename.decode() + header = message_pointer[i].ident.decode() + msg_args = {"subject": header, + "body": file, + "durable": True, + "properties": {}, + } + + try: + msg_queue.put_nowait(msg_args) + except queue.Full: + logger.warning("Send queue is full. Discarding oldest messages.") + while True: + try: + msg_queue.get_nowait() + except queue.Empty: + pass + try: + msg_queue.put_nowait(msg_args) + except queue.Full: + continue + break + logger.debug("Queued message for file [%s] and WMO header [%s].", file, header) + +def main(): + args = process_args() + + init_logging(args.log_fname, args.log_level) + + logger.debug("Command-line args: %s", args) + + MAX_QUEUE_SIZE = 100000 + msg_queue = multiprocessing.Queue(MAX_QUEUE_SIZE) + shutdown_signal = threading.Event() + + qpid_proc = threading.Thread(target=send_to_qpid, + args=(args.broker_uri, args.port, args.cert_path, args.cert_name, + args.cert_key, args.cert_password, msg_queue, shutdown_signal)) + logger.info("Starting QPID container...") + qpid_proc.start() + + libc = ctypes.util.find_library("c") + if not libc: + logger.critical("Unable to load libc.so.") + return 1 + libc = ctypes.CDLL(libc, use_errno=True) + + key = libc.ftok(b"/etc/rc.d/rc.local", ord(args.key_proj_id)) + semkey = libc.ftok(b"/etc/rc.d/rc.local", ord(args.semkey_proj_id)) + + for i in range(5): + semid = libc.semget(semkey, 2, 0o666) + if semid != -1: + break + logger.warning("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno())) + time.sleep(1) + else: + logger.critical("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno())) + return 0 + + queue_size = libc.semctl(semid, 0, 12); + shmid = libc.shmget(key, ctypes.sizeof(edex_message) * queue_size, 0o666); + if shmid == -1: + logger.critical("Could not attach to the shared memory created by pqact: %s", os.strerror(ctypes.get_errno())) + return 0 + + libc.shmat.restype = ctypes.POINTER(edex_message) + messageCursor = libc.shmat(shmid, None, 0o10000) + + done = False + + def handle_SIGHUP(signal, frame): + # swallow this signal because it's no longer necessary to + # programmatically trigger the log rollover. + # logrotate.d and logging.WatchedFileHandler will do all the needed + # work + # still register for the signal just in case old configs are still + # sending it. + logger.log(VERBOSE, "Received SIGHUP.") + pass + signal.signal(signal.SIGHUP, handle_SIGHUP) + def handle_SIGINT(signal, frame): + nonlocal qpid_proc + logger.log(VERBOSE, "Received SIGINT.") + qpid_proc.terminate() + qpid_proc.join() + sys.exit(0) + signal.signal(signal.SIGINT, handle_SIGINT) + def handle_SIGTERM(signal, frame): + nonlocal done + logger.log(VERBOSE, "Received SIGTERM.") + done = True + signal.signal(signal.SIGTERM, handle_SIGTERM) + def handle_SIGUSR2(signal, frame): + logger.log(VERBOSE, "Received SIGUSR2.") + level = logger.getEffectiveLevel() + if logging.DEBUG == level: + logger.setLevel(logging.ERROR) + elif logging.INFO == level: + logger.setLevel(logging.DEBUG) + elif logging.ERROR == level: + logger.setLevel(logging.INFO) + signal.signal(signal.SIGUSR2, handle_SIGUSR2) + + queue_start = 0 + messages_sent = 0 + start_tick = datetime.datetime.utcnow() + while not done: + end_tick = datetime.datetime.utcnow() + elapsed_seconds = (end_tick - start_tick).total_seconds() + if elapsed_seconds >= 60: + logger.info("Queued %d messages in the last %d seconds. Current queue size: %d", + messages_sent, elapsed_seconds, msg_queue.qsize()) + messages_sent = 0 + start_tick = datetime.datetime.utcnow() + + queue_location = libc.semctl(semid, 1, 12) + queue_end = queue_location + 1 + logger.debug("queue_location: %d, queue_end: %d, queue_start: %d", queue_location, queue_end, queue_start) + + if queue_location < 0 or queue_end == queue_start: + logger.debug("No new messages to send.") + time.sleep(1) + else: + # + end_diff = 0 + + # Need to copy in the end of the queue before moving to front + if queue_start > queue_end: + if queue_start - queue_end < queue_size//2: + # Indicates a risk that the queue could wrap all the way + # around itself, missing messages. + logger.warn("Queue with size %d is wrapping from %d to %d.", + queue_size, queue_start, queue_end) + end_diff = queue_size - queue_start; + if end_diff > 0: + queue_messages(queue_start, queue_size, messageCursor, msg_queue) + messages_sent += (queue_size - queue_start); + queue_start = 0 + + queue_messages(queue_start, queue_end, messageCursor, msg_queue) + messages_sent += (queue_end - queue_start) + queue_start = queue_end + + logger.log(VERBOSE, "Sending shutdown signal to secondary process.") + shutdown_signal.set() + + logger.info("Stopped reading new messages, waiting for remaining messages to be sent."); + + libc.shmdt(messageCursor) + qpid_proc.join(60) + if qpid_proc.is_alive(): + logger.warn("QPID sender thread did not shutdown.") + qpid_proc.terminate() + qpid_proc.join() + logger.info("Shutting down.") + + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/rpms/awips2.upc/Installer.ldm/patch/bin/monitor_data_store.sh b/rpms/awips2.upc/Installer.ldm/patch/bin/monitor_data_store.sh new file mode 100755 index 0000000000..2b258f7a83 --- /dev/null +++ b/rpms/awips2.upc/Installer.ldm/patch/bin/monitor_data_store.sh @@ -0,0 +1,17 @@ +#!/bin/bash +myPID=$$ +echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\tStarting Script (pid = $myPID, parent = $PPID)" >> ~/logs/`basename $0 .sh` +if ps -wef|grep `basename $0` | grep -v grep | grep -v $myPID | grep -v $PPID +then + exit 0 +fi + +cd /data_store +while true +do + for _dir in `ls` + do + echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\t\tfind ${_dir} -mtime +0 -type f -exec rm -f {} \;" >> ~/logs/`basename $0 .sh` + find ${_dir} -mtime +0 -type f -exec rm -f {} \; + done +done diff --git a/rpms/awips2.upc/Installer.ldm/patch/bin/start_ldm b/rpms/awips2.upc/Installer.ldm/patch/bin/start_ldm new file mode 100644 index 0000000000..64e885e59b --- /dev/null +++ b/rpms/awips2.upc/Installer.ldm/patch/bin/start_ldm @@ -0,0 +1,25 @@ +#!/bin/bash + +# Check for necessary named pipes (FIFOs) + +myhost=`hostname -s | cut -d- -f1` + +echo "It seems like host(`hostname`) is NOT a downlink CP." +echo "Do you still want to start LDM (y/n) [default=n]?" +read ch +if [ $ch != 'y' -o $ch != 'Y' ];then + echo "Exiting." + exit 1 + +# Start LDM + +su - ldm -c "ldmadmin start" + +# Pass SIGUSR2 signal to pqact to enable logging into ldmd.log (INFO log level) + +echo "Fetching pid for pqact process" +pid=`ps --no-headers -C pqact |awk '{print $1}'` +su - ldm -c "kill -s USR2 $pid" + +echo "Done." + diff --git a/rpms/awips2.upc/Installer.ldm/patch/bin/stop_ldm b/rpms/awips2.upc/Installer.ldm/patch/bin/stop_ldm new file mode 100644 index 0000000000..5b80c79ff1 --- /dev/null +++ b/rpms/awips2.upc/Installer.ldm/patch/bin/stop_ldm @@ -0,0 +1,21 @@ +#!/bin/bash + + +if [ -f /etc/rc.config.d/AWIPS ];then + . /etc/rc.config.d/AWIPS +fi + +#echo "Halting retransmission process" +#pid=`ps -ef |grep start_sbn_retransmit|grep -v grep | awk '{print $2}'` +#kill -s SIGKILL $pid + +echo "Stopping ldm service" + +su - ldm -c "ldmadmin stop" + +echo "Releasing shared memory resources" + +su - ldm -c "acq_ldm_freeshm -m0" + +echo "Done." +