Added ldm bin directory to be tracked - which included update to edexBridge to use awips instead of ufpy
This commit is contained in:
parent
fbd407a4c4
commit
972a54cf77
4 changed files with 481 additions and 0 deletions
418
rpms/awips2.upc/Installer.ldm/patch/bin/edexBridge
Normal file
418
rpms/awips2.upc/Installer.ldm/patch/bin/edexBridge
Normal file
|
@ -0,0 +1,418 @@
|
|||
#!/awips2/python/bin/python3
|
||||
|
||||
#
|
||||
# edexBridge.py
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------- -------- ------------ ------------------------------------------
|
||||
# Oct 08, 2009 brockwoo Initial creation
|
||||
# Jun 21, 2013 bkowal Re-written to work with the qpid messaging
|
||||
# api
|
||||
# May 06, 2014 3102 rjpeter Updated to call cleanup if connect failed.
|
||||
# Limit number of messages to be sent to
|
||||
# QPID on a single send call
|
||||
# Aug 05, 2014 3458 rjpeter Added logging of error when issue occurs
|
||||
# on send
|
||||
# Nov 04, 2014 2991 dlovely Updated to work with QPID 0.30
|
||||
# Feb 23, 2017 6082 bsteffen Updated to work with ssl certificates
|
||||
# Jul 17, 2018 7143 bsteffen Reopen log files on HUP
|
||||
# Jul 23, 2018 7235 bsterren Split producer and consumer into two threads
|
||||
# Jul 10, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
|
||||
# Jul 10, 2019 7724 dgilling Simplify multithreading
|
||||
# Nov 04, 2019 7969 dgilling Ported to python
|
||||
# Jul 07, 2020 8187 randerso Added qpid connection_id
|
||||
# Jul 29, 2021 8612 dgilling Added command line options for pqact's
|
||||
# semaphore and shared memory segment
|
||||
# key codes.
|
||||
#Jan 01, 2024 tiffanym@ucar Change ufpy call to awips
|
||||
|
||||
import argparse
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
import datetime
|
||||
import logging
|
||||
import logging.handlers
|
||||
import multiprocessing
|
||||
import os
|
||||
import pwd
|
||||
import queue
|
||||
import signal
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
import proton
|
||||
import proton.utils
|
||||
import proton.reactor
|
||||
|
||||
from awips import UsageArgumentParser
|
||||
|
||||
|
||||
# custom logging level
|
||||
VERBOSE = 15
|
||||
logging.addLevelName(VERBOSE, "VERBOSE")
|
||||
|
||||
logger = None
|
||||
|
||||
class edex_message(ctypes.Structure):
|
||||
_fields_ = [("filename", ctypes.c_char*4096),
|
||||
("ident", ctypes.c_char*256),]
|
||||
|
||||
|
||||
def process_args():
|
||||
DEFAULT_QPID_HOST = "localhost"
|
||||
DEFAULT_QPID_PORT = 5672
|
||||
DEFAULT_QPID_USER = "guest"
|
||||
DEFAULT_CERT_PASSWORD = os.getenv("QPID_SSL_CERT_PASSWORD", "password")
|
||||
DEFAULT_QPID_CERT_PATH = os.getenv("QPID_SSL_CERT_DB", os.path.join(pwd.getpwuid(os.getuid()).pw_dir,".qpid"))
|
||||
DEFAULT_QPID_CERT_NAME = os.getenv("QPID_SSL_CERT_NAME", DEFAULT_QPID_USER)
|
||||
DEFAULT_QPID_CERT_KEY = os.getenv("QPID_SSL_CERT_KEY_FILE", DEFAULT_QPID_USER)
|
||||
DEFAULT_KEY_PROJ_ID = "R"
|
||||
DEFAULT_SEMKEY_PROJ_ID = "e"
|
||||
DESCRIPTION= (
|
||||
"EDEX Bridge is used by the LDM to post data available messages to Qpid, which "
|
||||
"alerts the EDEX Ingest server(s) that a file is ready for processing."
|
||||
)
|
||||
|
||||
parser = UsageArgumentParser.UsageArgumentParser(prog="edexBridge", description=DESCRIPTION)
|
||||
parser.add_argument("-s", "--server",
|
||||
dest="broker_uri",
|
||||
action="store",
|
||||
default=DEFAULT_QPID_HOST,
|
||||
metavar="SERVER",
|
||||
help="specify hostname of qpid")
|
||||
parser.add_argument("-p", "--port",
|
||||
dest="port",
|
||||
action="store",
|
||||
type=int,
|
||||
default=DEFAULT_QPID_PORT,
|
||||
metavar="PORT",
|
||||
help="specify port to connect to qpid")
|
||||
parser.add_argument("-l", "--log",
|
||||
dest="log_fname",
|
||||
required=True,
|
||||
action="store",
|
||||
metavar="LOGFILE",
|
||||
help="log to a specific file")
|
||||
log_level_group = parser.add_mutually_exclusive_group()
|
||||
log_level_group.add_argument("-v", "--verbose",
|
||||
dest="log_level",
|
||||
action="store_const",
|
||||
const=VERBOSE,
|
||||
default=logging.INFO,
|
||||
help="explain what is being done")
|
||||
log_level_group.add_argument("-x", "--debug",
|
||||
dest="log_level",
|
||||
action="store_const",
|
||||
const=logging.DEBUG,
|
||||
default=logging.INFO,
|
||||
help="include extra logging, useful for debugging")
|
||||
parser.add_argument("--ssl-cert-key-file",
|
||||
dest="cert_key",
|
||||
action="store",
|
||||
default=DEFAULT_QPID_CERT_KEY,
|
||||
metavar="NAME",
|
||||
help="Key file used to verify the certificate")
|
||||
parser.add_argument("--ssl-cert-db",
|
||||
dest="cert_path",
|
||||
action="store",
|
||||
default=DEFAULT_QPID_CERT_PATH,
|
||||
metavar="PATH",
|
||||
help="Path to directory containing the certificate and key files")
|
||||
parser.add_argument("--ssl-cert-name",
|
||||
dest="cert_name",
|
||||
action="store",
|
||||
default=DEFAULT_QPID_CERT_NAME,
|
||||
metavar="NAME",
|
||||
help="Name of the certificate to use")
|
||||
parser.add_argument("--ssl-cert-password",
|
||||
dest="cert_password",
|
||||
action="store",
|
||||
default=DEFAULT_CERT_PASSWORD,
|
||||
metavar="PASSWORD",
|
||||
help="Password used to access the certificate")
|
||||
parser.add_argument("--key-proj-id",
|
||||
dest="key_proj_id",
|
||||
action="store",
|
||||
default=DEFAULT_KEY_PROJ_ID,
|
||||
metavar="CHAR",
|
||||
help="Key code for the pqact shared memory segment")
|
||||
parser.add_argument("--sem-proj-id",
|
||||
dest="semkey_proj_id",
|
||||
action="store",
|
||||
default=DEFAULT_SEMKEY_PROJ_ID,
|
||||
metavar="CHAR",
|
||||
help="Key code for the pqact semaphore")
|
||||
|
||||
args = parser.parse_args()
|
||||
if len(args.key_proj_id) > 1:
|
||||
parser.error("argument for --key-proj-id should be only a single character.")
|
||||
if len(args.semkey_proj_id) > 1:
|
||||
parser.error("argument for --sem-proj-id should be only a single character.")
|
||||
return args
|
||||
|
||||
def init_logging(file, level):
|
||||
logging.basicConfig(level=level,
|
||||
format="%(asctime)s %(module)s[%(thread)d] %(levelname)s: %(message)s",
|
||||
datefmt="%b %d %H:%M:%S",
|
||||
handlers=[logging.handlers.WatchedFileHandler(file)])
|
||||
# handlers=[logging.handlers.WatchedFileHandler(file), logging.StreamHandler()])
|
||||
global logger
|
||||
logger = logging.getLogger("edexBridge")
|
||||
|
||||
def send_to_qpid(host, port, cert_path, cert_name, key_name, cert_password, msg_queue, shutdown_signal):
|
||||
url = f"amqps://{host}:{port}"
|
||||
certfile = os.path.join(cert_path, f"{cert_name}.crt")
|
||||
certkey = os.path.join(cert_path, f"{key_name}.key")
|
||||
ssl_domain = proton.SSLDomain(mode=proton.SSLDomain.MODE_CLIENT)
|
||||
ssl_domain.set_credentials(certfile, certkey, cert_password)
|
||||
ADDRESS = "external.dropbox"
|
||||
is_shutdown = False
|
||||
|
||||
messages_sent = 0
|
||||
def print_sent_count():
|
||||
nonlocal shutdown_signal, messages_sent
|
||||
while True:
|
||||
shutdown_signal.wait(60)
|
||||
logger.info("Sent %d messages in the last 60 seconds.", messages_sent)
|
||||
messages_sent = 0
|
||||
if shutdown_signal.is_set():
|
||||
return
|
||||
print_thread = threading.Thread(target=print_sent_count)
|
||||
print_thread.start()
|
||||
|
||||
clientID = ":".join([
|
||||
socket.gethostname(),
|
||||
pwd.getpwuid(os.getuid()).pw_name,
|
||||
"edexBridge",
|
||||
str(os.getpid()),
|
||||
])
|
||||
|
||||
while True:
|
||||
conn = None
|
||||
sender = None
|
||||
try:
|
||||
if not is_shutdown:
|
||||
is_shutdown = shutdown_signal.is_set()
|
||||
if is_shutdown:
|
||||
logger.info("Received shutdown signal.")
|
||||
|
||||
container = proton.reactor.Container()
|
||||
container.container_id = clientID
|
||||
conn = proton.utils.BlockingConnection(url, container=container, ssl_domain=ssl_domain)
|
||||
sender = conn.create_sender(ADDRESS)
|
||||
logger.info("Connected to broker [%s].", url)
|
||||
|
||||
while True:
|
||||
try:
|
||||
if not is_shutdown:
|
||||
is_shutdown = shutdown_signal.is_set()
|
||||
if is_shutdown:
|
||||
logger.info("Received shutdown signal.")
|
||||
|
||||
msg_args = msg_queue.get(True, 1)
|
||||
if msg_args:
|
||||
logger.debug("Sending file [%s] to EDEX with header [%s].", msg_args["body"], msg_args["subject"])
|
||||
msg = proton.Message(**msg_args)
|
||||
msg.properties["enqueueTime"] = int(datetime.datetime.now().timestamp() * 1000)
|
||||
sender.send(msg)
|
||||
messages_sent += 1
|
||||
except queue.Empty:
|
||||
logger.debug("Send queue is empty.")
|
||||
if is_shutdown:
|
||||
if messages_sent:
|
||||
logger.info("Sent remaining %d messages.", messages_sent)
|
||||
logger.info("Send process shutting down.")
|
||||
return
|
||||
except proton.ProtonException:
|
||||
logger.error("Send failed for file [%s].", msg_args["body"])
|
||||
logger.warn("Lost connection to QPID broker.", exc_info=True)
|
||||
msg_queue.put(msg_args)
|
||||
break
|
||||
except:
|
||||
logger.exception("Send failed for file [%s].", msg_args["body"])
|
||||
msg_queue.put(msg_args)
|
||||
except proton.ProtonException:
|
||||
logger.error("Failed to connect to QPID broker", exc_info=True)
|
||||
if is_shutdown:
|
||||
if msg_queue.qsize():
|
||||
logger.warn("Exiting with %d unsent messages.", msg_queue.qsize())
|
||||
return
|
||||
finally:
|
||||
try:
|
||||
if conn:
|
||||
logger.info("Disconnecting from broker.")
|
||||
conn.close()
|
||||
except proton.ProtonException:
|
||||
logger.warn("Exception trying to close connection", exc_info=True)
|
||||
time.sleep(30)
|
||||
print_thread.join()
|
||||
|
||||
def queue_messages(start_index, end_index, message_pointer, msg_queue):
|
||||
logger.debug("Preparing to queue %d messages.", (end_index - start_index))
|
||||
|
||||
for i in range(start_index, end_index):
|
||||
file = message_pointer[i].filename.decode()
|
||||
header = message_pointer[i].ident.decode()
|
||||
msg_args = {"subject": header,
|
||||
"body": file,
|
||||
"durable": True,
|
||||
"properties": {},
|
||||
}
|
||||
|
||||
try:
|
||||
msg_queue.put_nowait(msg_args)
|
||||
except queue.Full:
|
||||
logger.warning("Send queue is full. Discarding oldest messages.")
|
||||
while True:
|
||||
try:
|
||||
msg_queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
try:
|
||||
msg_queue.put_nowait(msg_args)
|
||||
except queue.Full:
|
||||
continue
|
||||
break
|
||||
logger.debug("Queued message for file [%s] and WMO header [%s].", file, header)
|
||||
|
||||
def main():
|
||||
args = process_args()
|
||||
|
||||
init_logging(args.log_fname, args.log_level)
|
||||
|
||||
logger.debug("Command-line args: %s", args)
|
||||
|
||||
MAX_QUEUE_SIZE = 100000
|
||||
msg_queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
|
||||
shutdown_signal = threading.Event()
|
||||
|
||||
qpid_proc = threading.Thread(target=send_to_qpid,
|
||||
args=(args.broker_uri, args.port, args.cert_path, args.cert_name,
|
||||
args.cert_key, args.cert_password, msg_queue, shutdown_signal))
|
||||
logger.info("Starting QPID container...")
|
||||
qpid_proc.start()
|
||||
|
||||
libc = ctypes.util.find_library("c")
|
||||
if not libc:
|
||||
logger.critical("Unable to load libc.so.")
|
||||
return 1
|
||||
libc = ctypes.CDLL(libc, use_errno=True)
|
||||
|
||||
key = libc.ftok(b"/etc/rc.d/rc.local", ord(args.key_proj_id))
|
||||
semkey = libc.ftok(b"/etc/rc.d/rc.local", ord(args.semkey_proj_id))
|
||||
|
||||
for i in range(5):
|
||||
semid = libc.semget(semkey, 2, 0o666)
|
||||
if semid != -1:
|
||||
break
|
||||
logger.warning("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno()))
|
||||
time.sleep(1)
|
||||
else:
|
||||
logger.critical("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno()))
|
||||
return 0
|
||||
|
||||
queue_size = libc.semctl(semid, 0, 12);
|
||||
shmid = libc.shmget(key, ctypes.sizeof(edex_message) * queue_size, 0o666);
|
||||
if shmid == -1:
|
||||
logger.critical("Could not attach to the shared memory created by pqact: %s", os.strerror(ctypes.get_errno()))
|
||||
return 0
|
||||
|
||||
libc.shmat.restype = ctypes.POINTER(edex_message)
|
||||
messageCursor = libc.shmat(shmid, None, 0o10000)
|
||||
|
||||
done = False
|
||||
|
||||
def handle_SIGHUP(signal, frame):
|
||||
# swallow this signal because it's no longer necessary to
|
||||
# programmatically trigger the log rollover.
|
||||
# logrotate.d and logging.WatchedFileHandler will do all the needed
|
||||
# work
|
||||
# still register for the signal just in case old configs are still
|
||||
# sending it.
|
||||
logger.log(VERBOSE, "Received SIGHUP.")
|
||||
pass
|
||||
signal.signal(signal.SIGHUP, handle_SIGHUP)
|
||||
def handle_SIGINT(signal, frame):
|
||||
nonlocal qpid_proc
|
||||
logger.log(VERBOSE, "Received SIGINT.")
|
||||
qpid_proc.terminate()
|
||||
qpid_proc.join()
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, handle_SIGINT)
|
||||
def handle_SIGTERM(signal, frame):
|
||||
nonlocal done
|
||||
logger.log(VERBOSE, "Received SIGTERM.")
|
||||
done = True
|
||||
signal.signal(signal.SIGTERM, handle_SIGTERM)
|
||||
def handle_SIGUSR2(signal, frame):
|
||||
logger.log(VERBOSE, "Received SIGUSR2.")
|
||||
level = logger.getEffectiveLevel()
|
||||
if logging.DEBUG == level:
|
||||
logger.setLevel(logging.ERROR)
|
||||
elif logging.INFO == level:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
elif logging.ERROR == level:
|
||||
logger.setLevel(logging.INFO)
|
||||
signal.signal(signal.SIGUSR2, handle_SIGUSR2)
|
||||
|
||||
queue_start = 0
|
||||
messages_sent = 0
|
||||
start_tick = datetime.datetime.utcnow()
|
||||
while not done:
|
||||
end_tick = datetime.datetime.utcnow()
|
||||
elapsed_seconds = (end_tick - start_tick).total_seconds()
|
||||
if elapsed_seconds >= 60:
|
||||
logger.info("Queued %d messages in the last %d seconds. Current queue size: %d",
|
||||
messages_sent, elapsed_seconds, msg_queue.qsize())
|
||||
messages_sent = 0
|
||||
start_tick = datetime.datetime.utcnow()
|
||||
|
||||
queue_location = libc.semctl(semid, 1, 12)
|
||||
queue_end = queue_location + 1
|
||||
logger.debug("queue_location: %d, queue_end: %d, queue_start: %d", queue_location, queue_end, queue_start)
|
||||
|
||||
if queue_location < 0 or queue_end == queue_start:
|
||||
logger.debug("No new messages to send.")
|
||||
time.sleep(1)
|
||||
else:
|
||||
#
|
||||
end_diff = 0
|
||||
|
||||
# Need to copy in the end of the queue before moving to front
|
||||
if queue_start > queue_end:
|
||||
if queue_start - queue_end < queue_size//2:
|
||||
# Indicates a risk that the queue could wrap all the way
|
||||
# around itself, missing messages.
|
||||
logger.warn("Queue with size %d is wrapping from %d to %d.",
|
||||
queue_size, queue_start, queue_end)
|
||||
end_diff = queue_size - queue_start;
|
||||
if end_diff > 0:
|
||||
queue_messages(queue_start, queue_size, messageCursor, msg_queue)
|
||||
messages_sent += (queue_size - queue_start);
|
||||
queue_start = 0
|
||||
|
||||
queue_messages(queue_start, queue_end, messageCursor, msg_queue)
|
||||
messages_sent += (queue_end - queue_start)
|
||||
queue_start = queue_end
|
||||
|
||||
logger.log(VERBOSE, "Sending shutdown signal to secondary process.")
|
||||
shutdown_signal.set()
|
||||
|
||||
logger.info("Stopped reading new messages, waiting for remaining messages to be sent.");
|
||||
|
||||
libc.shmdt(messageCursor)
|
||||
qpid_proc.join(60)
|
||||
if qpid_proc.is_alive():
|
||||
logger.warn("QPID sender thread did not shutdown.")
|
||||
qpid_proc.terminate()
|
||||
qpid_proc.join()
|
||||
logger.info("Shutting down.")
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
17
rpms/awips2.upc/Installer.ldm/patch/bin/monitor_data_store.sh
Executable file
17
rpms/awips2.upc/Installer.ldm/patch/bin/monitor_data_store.sh
Executable file
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash
|
||||
myPID=$$
|
||||
echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\tStarting Script (pid = $myPID, parent = $PPID)" >> ~/logs/`basename $0 .sh`
|
||||
if ps -wef|grep `basename $0` | grep -v grep | grep -v $myPID | grep -v $PPID
|
||||
then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
cd /data_store
|
||||
while true
|
||||
do
|
||||
for _dir in `ls`
|
||||
do
|
||||
echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\t\tfind ${_dir} -mtime +0 -type f -exec rm -f {} \;" >> ~/logs/`basename $0 .sh`
|
||||
find ${_dir} -mtime +0 -type f -exec rm -f {} \;
|
||||
done
|
||||
done
|
25
rpms/awips2.upc/Installer.ldm/patch/bin/start_ldm
Normal file
25
rpms/awips2.upc/Installer.ldm/patch/bin/start_ldm
Normal file
|
@ -0,0 +1,25 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Check for necessary named pipes (FIFOs)
|
||||
|
||||
myhost=`hostname -s | cut -d- -f1`
|
||||
|
||||
echo "It seems like host(`hostname`) is NOT a downlink CP."
|
||||
echo "Do you still want to start LDM (y/n) [default=n]?"
|
||||
read ch
|
||||
if [ $ch != 'y' -o $ch != 'Y' ];then
|
||||
echo "Exiting."
|
||||
exit 1
|
||||
|
||||
# Start LDM
|
||||
|
||||
su - ldm -c "ldmadmin start"
|
||||
|
||||
# Pass SIGUSR2 signal to pqact to enable logging into ldmd.log (INFO log level)
|
||||
|
||||
echo "Fetching pid for pqact process"
|
||||
pid=`ps --no-headers -C pqact |awk '{print $1}'`
|
||||
su - ldm -c "kill -s USR2 $pid"
|
||||
|
||||
echo "Done."
|
||||
|
21
rpms/awips2.upc/Installer.ldm/patch/bin/stop_ldm
Normal file
21
rpms/awips2.upc/Installer.ldm/patch/bin/stop_ldm
Normal file
|
@ -0,0 +1,21 @@
|
|||
#!/bin/bash
|
||||
|
||||
|
||||
if [ -f /etc/rc.config.d/AWIPS ];then
|
||||
. /etc/rc.config.d/AWIPS
|
||||
fi
|
||||
|
||||
#echo "Halting retransmission process"
|
||||
#pid=`ps -ef |grep start_sbn_retransmit|grep -v grep | awk '{print $2}'`
|
||||
#kill -s SIGKILL $pid
|
||||
|
||||
echo "Stopping ldm service"
|
||||
|
||||
su - ldm -c "ldmadmin stop"
|
||||
|
||||
echo "Releasing shared memory resources"
|
||||
|
||||
su - ldm -c "acq_ldm_freeshm -m0"
|
||||
|
||||
echo "Done."
|
||||
|
Loading…
Add table
Reference in a new issue