Merge pull request #666 from tiffanycmeyer13/unidata_20.3.2

Added ldm bin directory to be tracked - which included update to edex…
This commit is contained in:
srcarter3 2024-02-13 11:59:23 -07:00 committed by GitHub
commit a656a7173a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 481 additions and 0 deletions

View file

@ -0,0 +1,418 @@
#!/awips2/python/bin/python3
#
# edexBridge.py
#
# SOFTWARE HISTORY
#
# Date Ticket# Engineer Description
# ------------- -------- ------------ ------------------------------------------
# Oct 08, 2009 brockwoo Initial creation
# Jun 21, 2013 bkowal Re-written to work with the qpid messaging
# api
# May 06, 2014 3102 rjpeter Updated to call cleanup if connect failed.
# Limit number of messages to be sent to
# QPID on a single send call
# Aug 05, 2014 3458 rjpeter Added logging of error when issue occurs
# on send
# Nov 04, 2014 2991 dlovely Updated to work with QPID 0.30
# Feb 23, 2017 6082 bsteffen Updated to work with ssl certificates
# Jul 17, 2018 7143 bsteffen Reopen log files on HUP
# Jul 23, 2018 7235 bsterren Split producer and consumer into two threads
# Jul 10, 2019 7724 mrichardson Upgrade Qpid to Qpid Proton
# Jul 10, 2019 7724 dgilling Simplify multithreading
# Nov 04, 2019 7969 dgilling Ported to python
# Jul 07, 2020 8187 randerso Added qpid connection_id
# Jul 29, 2021 8612 dgilling Added command line options for pqact's
# semaphore and shared memory segment
# key codes.
#Jan 01, 2024 tiffanym@ucar Change ufpy call to awips
import argparse
import ctypes
import ctypes.util
import datetime
import logging
import logging.handlers
import multiprocessing
import os
import pwd
import queue
import signal
import socket
import sys
import threading
import time
import proton
import proton.utils
import proton.reactor
from awips import UsageArgumentParser
# custom logging level
VERBOSE = 15
logging.addLevelName(VERBOSE, "VERBOSE")
logger = None
class edex_message(ctypes.Structure):
_fields_ = [("filename", ctypes.c_char*4096),
("ident", ctypes.c_char*256),]
def process_args():
DEFAULT_QPID_HOST = "localhost"
DEFAULT_QPID_PORT = 5672
DEFAULT_QPID_USER = "guest"
DEFAULT_CERT_PASSWORD = os.getenv("QPID_SSL_CERT_PASSWORD", "password")
DEFAULT_QPID_CERT_PATH = os.getenv("QPID_SSL_CERT_DB", os.path.join(pwd.getpwuid(os.getuid()).pw_dir,".qpid"))
DEFAULT_QPID_CERT_NAME = os.getenv("QPID_SSL_CERT_NAME", DEFAULT_QPID_USER)
DEFAULT_QPID_CERT_KEY = os.getenv("QPID_SSL_CERT_KEY_FILE", DEFAULT_QPID_USER)
DEFAULT_KEY_PROJ_ID = "R"
DEFAULT_SEMKEY_PROJ_ID = "e"
DESCRIPTION= (
"EDEX Bridge is used by the LDM to post data available messages to Qpid, which "
"alerts the EDEX Ingest server(s) that a file is ready for processing."
)
parser = UsageArgumentParser.UsageArgumentParser(prog="edexBridge", description=DESCRIPTION)
parser.add_argument("-s", "--server",
dest="broker_uri",
action="store",
default=DEFAULT_QPID_HOST,
metavar="SERVER",
help="specify hostname of qpid")
parser.add_argument("-p", "--port",
dest="port",
action="store",
type=int,
default=DEFAULT_QPID_PORT,
metavar="PORT",
help="specify port to connect to qpid")
parser.add_argument("-l", "--log",
dest="log_fname",
required=True,
action="store",
metavar="LOGFILE",
help="log to a specific file")
log_level_group = parser.add_mutually_exclusive_group()
log_level_group.add_argument("-v", "--verbose",
dest="log_level",
action="store_const",
const=VERBOSE,
default=logging.INFO,
help="explain what is being done")
log_level_group.add_argument("-x", "--debug",
dest="log_level",
action="store_const",
const=logging.DEBUG,
default=logging.INFO,
help="include extra logging, useful for debugging")
parser.add_argument("--ssl-cert-key-file",
dest="cert_key",
action="store",
default=DEFAULT_QPID_CERT_KEY,
metavar="NAME",
help="Key file used to verify the certificate")
parser.add_argument("--ssl-cert-db",
dest="cert_path",
action="store",
default=DEFAULT_QPID_CERT_PATH,
metavar="PATH",
help="Path to directory containing the certificate and key files")
parser.add_argument("--ssl-cert-name",
dest="cert_name",
action="store",
default=DEFAULT_QPID_CERT_NAME,
metavar="NAME",
help="Name of the certificate to use")
parser.add_argument("--ssl-cert-password",
dest="cert_password",
action="store",
default=DEFAULT_CERT_PASSWORD,
metavar="PASSWORD",
help="Password used to access the certificate")
parser.add_argument("--key-proj-id",
dest="key_proj_id",
action="store",
default=DEFAULT_KEY_PROJ_ID,
metavar="CHAR",
help="Key code for the pqact shared memory segment")
parser.add_argument("--sem-proj-id",
dest="semkey_proj_id",
action="store",
default=DEFAULT_SEMKEY_PROJ_ID,
metavar="CHAR",
help="Key code for the pqact semaphore")
args = parser.parse_args()
if len(args.key_proj_id) > 1:
parser.error("argument for --key-proj-id should be only a single character.")
if len(args.semkey_proj_id) > 1:
parser.error("argument for --sem-proj-id should be only a single character.")
return args
def init_logging(file, level):
logging.basicConfig(level=level,
format="%(asctime)s %(module)s[%(thread)d] %(levelname)s: %(message)s",
datefmt="%b %d %H:%M:%S",
handlers=[logging.handlers.WatchedFileHandler(file)])
# handlers=[logging.handlers.WatchedFileHandler(file), logging.StreamHandler()])
global logger
logger = logging.getLogger("edexBridge")
def send_to_qpid(host, port, cert_path, cert_name, key_name, cert_password, msg_queue, shutdown_signal):
url = f"amqps://{host}:{port}"
certfile = os.path.join(cert_path, f"{cert_name}.crt")
certkey = os.path.join(cert_path, f"{key_name}.key")
ssl_domain = proton.SSLDomain(mode=proton.SSLDomain.MODE_CLIENT)
ssl_domain.set_credentials(certfile, certkey, cert_password)
ADDRESS = "external.dropbox"
is_shutdown = False
messages_sent = 0
def print_sent_count():
nonlocal shutdown_signal, messages_sent
while True:
shutdown_signal.wait(60)
logger.info("Sent %d messages in the last 60 seconds.", messages_sent)
messages_sent = 0
if shutdown_signal.is_set():
return
print_thread = threading.Thread(target=print_sent_count)
print_thread.start()
clientID = ":".join([
socket.gethostname(),
pwd.getpwuid(os.getuid()).pw_name,
"edexBridge",
str(os.getpid()),
])
while True:
conn = None
sender = None
try:
if not is_shutdown:
is_shutdown = shutdown_signal.is_set()
if is_shutdown:
logger.info("Received shutdown signal.")
container = proton.reactor.Container()
container.container_id = clientID
conn = proton.utils.BlockingConnection(url, container=container, ssl_domain=ssl_domain)
sender = conn.create_sender(ADDRESS)
logger.info("Connected to broker [%s].", url)
while True:
try:
if not is_shutdown:
is_shutdown = shutdown_signal.is_set()
if is_shutdown:
logger.info("Received shutdown signal.")
msg_args = msg_queue.get(True, 1)
if msg_args:
logger.debug("Sending file [%s] to EDEX with header [%s].", msg_args["body"], msg_args["subject"])
msg = proton.Message(**msg_args)
msg.properties["enqueueTime"] = int(datetime.datetime.now().timestamp() * 1000)
sender.send(msg)
messages_sent += 1
except queue.Empty:
logger.debug("Send queue is empty.")
if is_shutdown:
if messages_sent:
logger.info("Sent remaining %d messages.", messages_sent)
logger.info("Send process shutting down.")
return
except proton.ProtonException:
logger.error("Send failed for file [%s].", msg_args["body"])
logger.warn("Lost connection to QPID broker.", exc_info=True)
msg_queue.put(msg_args)
break
except:
logger.exception("Send failed for file [%s].", msg_args["body"])
msg_queue.put(msg_args)
except proton.ProtonException:
logger.error("Failed to connect to QPID broker", exc_info=True)
if is_shutdown:
if msg_queue.qsize():
logger.warn("Exiting with %d unsent messages.", msg_queue.qsize())
return
finally:
try:
if conn:
logger.info("Disconnecting from broker.")
conn.close()
except proton.ProtonException:
logger.warn("Exception trying to close connection", exc_info=True)
time.sleep(30)
print_thread.join()
def queue_messages(start_index, end_index, message_pointer, msg_queue):
logger.debug("Preparing to queue %d messages.", (end_index - start_index))
for i in range(start_index, end_index):
file = message_pointer[i].filename.decode()
header = message_pointer[i].ident.decode()
msg_args = {"subject": header,
"body": file,
"durable": True,
"properties": {},
}
try:
msg_queue.put_nowait(msg_args)
except queue.Full:
logger.warning("Send queue is full. Discarding oldest messages.")
while True:
try:
msg_queue.get_nowait()
except queue.Empty:
pass
try:
msg_queue.put_nowait(msg_args)
except queue.Full:
continue
break
logger.debug("Queued message for file [%s] and WMO header [%s].", file, header)
def main():
args = process_args()
init_logging(args.log_fname, args.log_level)
logger.debug("Command-line args: %s", args)
MAX_QUEUE_SIZE = 100000
msg_queue = multiprocessing.Queue(MAX_QUEUE_SIZE)
shutdown_signal = threading.Event()
qpid_proc = threading.Thread(target=send_to_qpid,
args=(args.broker_uri, args.port, args.cert_path, args.cert_name,
args.cert_key, args.cert_password, msg_queue, shutdown_signal))
logger.info("Starting QPID container...")
qpid_proc.start()
libc = ctypes.util.find_library("c")
if not libc:
logger.critical("Unable to load libc.so.")
return 1
libc = ctypes.CDLL(libc, use_errno=True)
key = libc.ftok(b"/etc/rc.d/rc.local", ord(args.key_proj_id))
semkey = libc.ftok(b"/etc/rc.d/rc.local", ord(args.semkey_proj_id))
for i in range(5):
semid = libc.semget(semkey, 2, 0o666)
if semid != -1:
break
logger.warning("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno()))
time.sleep(1)
else:
logger.critical("Could not attach to the semaphore created by pqact: %s", os.strerror(ctypes.get_errno()))
return 0
queue_size = libc.semctl(semid, 0, 12);
shmid = libc.shmget(key, ctypes.sizeof(edex_message) * queue_size, 0o666);
if shmid == -1:
logger.critical("Could not attach to the shared memory created by pqact: %s", os.strerror(ctypes.get_errno()))
return 0
libc.shmat.restype = ctypes.POINTER(edex_message)
messageCursor = libc.shmat(shmid, None, 0o10000)
done = False
def handle_SIGHUP(signal, frame):
# swallow this signal because it's no longer necessary to
# programmatically trigger the log rollover.
# logrotate.d and logging.WatchedFileHandler will do all the needed
# work
# still register for the signal just in case old configs are still
# sending it.
logger.log(VERBOSE, "Received SIGHUP.")
pass
signal.signal(signal.SIGHUP, handle_SIGHUP)
def handle_SIGINT(signal, frame):
nonlocal qpid_proc
logger.log(VERBOSE, "Received SIGINT.")
qpid_proc.terminate()
qpid_proc.join()
sys.exit(0)
signal.signal(signal.SIGINT, handle_SIGINT)
def handle_SIGTERM(signal, frame):
nonlocal done
logger.log(VERBOSE, "Received SIGTERM.")
done = True
signal.signal(signal.SIGTERM, handle_SIGTERM)
def handle_SIGUSR2(signal, frame):
logger.log(VERBOSE, "Received SIGUSR2.")
level = logger.getEffectiveLevel()
if logging.DEBUG == level:
logger.setLevel(logging.ERROR)
elif logging.INFO == level:
logger.setLevel(logging.DEBUG)
elif logging.ERROR == level:
logger.setLevel(logging.INFO)
signal.signal(signal.SIGUSR2, handle_SIGUSR2)
queue_start = 0
messages_sent = 0
start_tick = datetime.datetime.utcnow()
while not done:
end_tick = datetime.datetime.utcnow()
elapsed_seconds = (end_tick - start_tick).total_seconds()
if elapsed_seconds >= 60:
logger.info("Queued %d messages in the last %d seconds. Current queue size: %d",
messages_sent, elapsed_seconds, msg_queue.qsize())
messages_sent = 0
start_tick = datetime.datetime.utcnow()
queue_location = libc.semctl(semid, 1, 12)
queue_end = queue_location + 1
logger.debug("queue_location: %d, queue_end: %d, queue_start: %d", queue_location, queue_end, queue_start)
if queue_location < 0 or queue_end == queue_start:
logger.debug("No new messages to send.")
time.sleep(1)
else:
#
end_diff = 0
# Need to copy in the end of the queue before moving to front
if queue_start > queue_end:
if queue_start - queue_end < queue_size//2:
# Indicates a risk that the queue could wrap all the way
# around itself, missing messages.
logger.warn("Queue with size %d is wrapping from %d to %d.",
queue_size, queue_start, queue_end)
end_diff = queue_size - queue_start;
if end_diff > 0:
queue_messages(queue_start, queue_size, messageCursor, msg_queue)
messages_sent += (queue_size - queue_start);
queue_start = 0
queue_messages(queue_start, queue_end, messageCursor, msg_queue)
messages_sent += (queue_end - queue_start)
queue_start = queue_end
logger.log(VERBOSE, "Sending shutdown signal to secondary process.")
shutdown_signal.set()
logger.info("Stopped reading new messages, waiting for remaining messages to be sent.");
libc.shmdt(messageCursor)
qpid_proc.join(60)
if qpid_proc.is_alive():
logger.warn("QPID sender thread did not shutdown.")
qpid_proc.terminate()
qpid_proc.join()
logger.info("Shutting down.")
if __name__ == '__main__':
sys.exit(main())

View file

@ -0,0 +1,17 @@
#!/bin/bash
myPID=$$
echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\tStarting Script (pid = $myPID, parent = $PPID)" >> ~/logs/`basename $0 .sh`
if ps -wef|grep `basename $0` | grep -v grep | grep -v $myPID | grep -v $PPID
then
exit 0
fi
cd /data_store
while true
do
for _dir in `ls`
do
echo -e "`date +%Y%m%d`\t`date +%H:%M:%S`\t\tfind ${_dir} -mtime +0 -type f -exec rm -f {} \;" >> ~/logs/`basename $0 .sh`
find ${_dir} -mtime +0 -type f -exec rm -f {} \;
done
done

View file

@ -0,0 +1,25 @@
#!/bin/bash
# Check for necessary named pipes (FIFOs)
myhost=`hostname -s | cut -d- -f1`
echo "It seems like host(`hostname`) is NOT a downlink CP."
echo "Do you still want to start LDM (y/n) [default=n]?"
read ch
if [ $ch != 'y' -o $ch != 'Y' ];then
echo "Exiting."
exit 1
# Start LDM
su - ldm -c "ldmadmin start"
# Pass SIGUSR2 signal to pqact to enable logging into ldmd.log (INFO log level)
echo "Fetching pid for pqact process"
pid=`ps --no-headers -C pqact |awk '{print $1}'`
su - ldm -c "kill -s USR2 $pid"
echo "Done."

View file

@ -0,0 +1,21 @@
#!/bin/bash
if [ -f /etc/rc.config.d/AWIPS ];then
. /etc/rc.config.d/AWIPS
fi
#echo "Halting retransmission process"
#pid=`ps -ef |grep start_sbn_retransmit|grep -v grep | awk '{print $2}'`
#kill -s SIGKILL $pid
echo "Stopping ldm service"
su - ldm -c "ldmadmin stop"
echo "Releasing shared memory resources"
su - ldm -c "acq_ldm_freeshm -m0"
echo "Done."