awips2/cave/com.raytheon.viz.avnconfig/localization/aviation/python/TextThreadP.py
Max Schenkelberg 6f60751ec6 Issue #2033 moved avnfps and text workstation files into respective plugins.
Change-Id: If95cb839ad81ca2a842ff7f6926847ac3928d8f2

Former-commit-id: 77e1a4d8f5237e5fae930c1e00589c752f8b3738
2013-08-15 12:21:43 -05:00

480 lines
18 KiB
Python

##
# This software was developed and / or modified by Raytheon Company,
# pursuant to Contract DG133W-05-CQ-1067 with the US Government.
#
# U.S. EXPORT CONTROLLED TECHNICAL DATA
# This software product contains export-restricted data whose
# export/transfer/disclosure is restricted by U.S. law. Dissemination
# to non-U.S. persons whether in the United States or abroad requires
# an export license or other authorization.
#
# Contractor Name: Raytheon Company
# Contractor Address: 6825 Pine Street, Suite 340
# Mail Stop B8
# Omaha, NE 68106
# 402.291.0100
#
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
# further licensing information.
##
#
# Name:
# TextThreadP.py
# GFS1-NHD:A8833.0000-SCRIPT;17
#
# Status:
# DELIVERED
#
# History:
# Revision 17 (DELIVERED)
# Created: 15-AUG-2008 13:54:57 OBERFIEL
# Fixed logic
#
# Revision 16 (DELIVERED)
# Created: 01-AUG-2008 15:44:47 OBERFIEL
# Synch'd up with changes in OB8.3.1
#
# Revision 15 (REVIEW)
# Created: 19-JUN-2008 14:33:24 GILMOREDM
# Fixed code causing an error while testing for chiming
#
# Revision 14 (DELIVERED)
# Created: 11-JUN-2008 14:53:55 OBERFIEL
# Fixed CCFP logic to ignore LINE in file.
#
# Revision 13 (REVIEW)
# Created: 16-MAY-2008 13:34:40 GILMOREDM
# Added import of MetarDecoder
#
# Revision 12 (REVIEW)
# Created: 15-MAY-2008 15:22:41 GILMOREDM
# Changed so that if a new METAR is received with the same
# timestamp, it is checked against the current METAR and if
# there is any difference in the wind, wx, vis, or cig the
# METAR is updated with the new values. This will hopefully
# reduce the chiming when duplicate METARs are received.
#
# Revision 11 (DELIVERED)
# Created: 17-MAR-2008 13:41:55 OBERFIEL
# Fixed typo with _findCCFPArea function that stopped
# monitoring of CCFP products.
#
# Revision 10 (DELIVERED)
# Created: 06-MAR-2008 17:41:38 OBERFIEL
# Fixed line-crossing bug that generated spurious alerts at
# beign sites.
#
# Revision 9 (DELIVERED)
# Created: 14-APR-2006 13:25:18 TROJAN
# spr 7117
#
# Revision 8 (DELIVERED)
# Created: 14-APR-2006 13:17:51 TROJAN
# spr 7118
#
# Revision 7 (DELIVERED)
# Created: 09-MAR-2006 13:41:55 TROJAN
# spr 7105 - CCFP fix
#
# Revision 6 (DELIVERED)
# Created: 09-MAR-2006 13:08:03 TROJAN
# fixed handling of reports without convection
#
# Revision 5 (DELIVERED)
# Created: 07-SEP-2005 12:47:29 TROJAN
# spr 7011
#
# Revision 4 (DELIVERED)
# Created: 29-AUG-2005 19:56:25 TROJAN
# fixed Metar regular expression
#
# Revision 3 (APPROVED)
# Created: 26-AUG-2005 18:10:53 TROJAN
# spr 7006
#
# Revision 2 (APPROVED)
# Created: 18-AUG-2005 15:32:19 TROJAN
# spr 6993
#
# Revision 1 (DELIVERED)
# Created: 10-JUL-2005 18:12:29 TROJAN
# spr 6548
#
# Change Document History:
# 1:
# Change Document: GFS1-NHD_SPR_7382
# Action Date: 11-OCT-2008 12:55:41
# Relationship Type: In Response to
# Status: CLOSED
# Title: AvnFPS: Persistent chiming on AvnWatch
#
#
import logging, os, Queue, re, shutil, stat, string, time
import Avn, AvnParser, MetarDecoder
_Logger = logging.getLogger(__name__)
##############################################################################
def _addmtrheader(kind, rpt):
# needed to deal with collectives
if rpt.startswith('METAR') or rpt.startswith('SPECI'):
return rpt
elif rpt and kind:
# prepend METAR or SPECI
return '%s %s' % (kind, rpt)
else:
return None
def _cleanup(direct, nhours):
# removes files older than "nhours"
cutoff = time.time() - nhours * 3600.0
for f in os.listdir(direct):
try:
path = os.path.join(direct, f)
if Avn.string2time(f) < cutoff:
os.unlink(path)
_Logger.info('Removed %s', path)
except (ValueError, OSError):
_Logger.exception('Cannot remove %s', path)
def _cleanupBad(nhours=6):
# removes files from "bad" directory older than "nhours"
direct = os.path.join('data', 'bad')
cutoff = time.time() - nhours * 3600.0
for f in os.listdir(direct):
path = os.path.join(direct, f)
if os.path.getmtime(path) < cutoff:
try:
os.unlink(path)
except OSError:
_Logger.exception('Cannot remove %s', path)
def _gettype(header):
# returns forecast type, or "=" for routine forecasts
try:
t = header.split()[3][0]
if t == 'R':
t = '='
return t
except IndexError:
return '='
def _mktstamp(day, hour, minute, year=None, month=None):
# creates file name in a format used by AvnFPS
ts = list(time.gmtime())
if year and month:
ts[:5] = int(year), int(month), int(day), int(hour), int(minute)
else:
ts[2:5] = int(day), int(hour), int(minute)
if time.mktime(ts) > time.time() + 10800.0:
if ts[1] > 1:
ts[1] -= 1
else:
ts[0] -= 1
ts[1] = 12
return time.strftime('%y%m%d%H%M', ts)
def _mkCCFPReport(ident, vtime, area):
vtime = time.strftime('%y%m%d%H00', time.gmtime(vtime))
tops = 'TOPS %d' % area['tops']
gwth = 'GWTH %d' % area['growth']
conf = 'CONF %d' % area['conf']
cvrg = 'CVRG %d' % area['coverage']
return ' '.join([ident, vtime, tops, gwth, conf, cvrg])
def _findCCFPArea(xtaf, ytaf, arealist):
# areas can overlap, select one with highest confidence, coverage
def _match(area):
inside=False
for (x0, y0), (x1, y1) in Avn.window(area['latlon']):
#
# Make sure x1-x0 > 0
if x1 < x0:
(x1,y1),(x0,y0) = (x0,y0),(x1,y1)
#
# If segment endpoints lie on either side and
# y intercept lies 'north' of TAF position, then
# count it.
#
if (x1 < xtaf) == (xtaf <= x0) and ((y0-ytaf)*(x1-x0) > (x0-xtaf)*(y1-y0)):
inside = not inside
return inside
alist = filter(_match, arealist)
if alist:
tmp = [(a['conf'], a['coverage'], a) for a in alist]
tmp.sort()
return tmp[0][-1]
else:
return None
def _decodeCCFP(data):
def _gettime(s):
y, m, d, h = int(s[:4]), int(s[4:6]), int(s[6:8]), int(s[9:11])
return time.mktime((y, m, d, h, 0, 0, 0, 0, 0))
dcd = {}
for line in data:
if line.startswith('CCFP'):
tok = line.split()
dcd['itime'] = _gettime(tok[1])
dcd['vtime'] = _gettime(tok[2])
elif line.startswith('AREA'):
tok = [int(x) for x in line.split()[1:]]
d = {'coverage': tok[0], 'conf': tok[1], 'growth': tok[2], \
'tops': tok[3], 'speed': tok[4], 'direction': tok[5]}
d['latlon'] = Avn.pairs(tok[7:-2])
dcd.setdefault('area', []).append(d)
tmp = [(d['coverage'], d) for d in dcd.get('area', [])]
tmp.sort()
tmp.reverse()
dcd['area'] = [x[1] for x in tmp] # sorted w.r. coverage, highest first
return dcd
##############################################################################
class Server(object):
"""Processes TAFs, TWEBs, METARs and CCFP reports
Expects files from acqserver. File name determines data type.
"""
_Tmout = 5.0 # minimum age of a file before processing (seconds)
_Pat = re.compile(r'[=\n]\n', re.MULTILINE) # to split collective
_Ident = string.maketrans('', '') # needed to remove chars below
_Delchars = '\003\r' # characters to remove from data files
_MtrRe = re.compile(r'(METAR |SPECI )?\w{4} .+', re.DOTALL)
_TafRe = re.compile(r'[A-Z][A-Z0-9]{3} \d{6}Z .+', re.DOTALL)
def __init__(self, inqueue, outqueue, info):
self.inqueue = inqueue
self.outqueue = outqueue
self.datapath = info['source']
self.numhours = int(info['nhours'])
self._tafinfo = []
self.metarDecoder = MetarDecoder.Decoder()
def __writeFile(self, text, directory, fname, checkSize=True):
# Writes files to AvnFPS data directory
if not os.path.isdir(directory):
os.mkdir(directory)
else:
_cleanup(directory, self.numhours)
path = os.path.join(directory, fname)
if checkSize and os.path.exists(path):
size = os.stat(path)[stat.ST_SIZE]
if size >= len(text):
_Logger.info('Skipping possible duplicate %s', path)
return None
file(path, 'w').write(text)
def dispose(self, rcode, directory, fname):
# Removes files after processing
# If there was an error, the file is copied to "bad" directory
path = os.path.join(directory, fname)
try:
if rcode:
shutil.copy2(path, os.path.join('data', 'bad'))
os.unlink(path)
except OSError:
_Logger.exception('Cannot remove %s', path)
def doMetar(self, bulletin):
# Processes METARs from a collective
try:
for n in [1, 2, 3]:
if self._MtrRe.match(bulletin[n]):
head = bulletin[:n]
break
else:
raise IndexError
except IndexError:
_Logger.warning('Cannot recognize %s', bulletin[0])
return 1
ids = self._ids['metar']
if head[-1] in ['METAR', 'SPECI']:
# a collective
kind = head[-1]
else:
kind = ''
mtrlist = filter(None, [_addmtrheader(kind, m) for m in \
self._Pat.split('\n'.join(bulletin[n:])) if len(m) > 6])
errflag = 0
for mtr in mtrlist:
try:
ident, tstamp = mtr.split(None, 3)[1:3]
if tstamp[0] in 'SN': # SA, SP or NIL
continue
if ids and not ident in ids:
continue
day, hour, minute = tstamp[:2], tstamp[2:4], tstamp[4:6]
fname = _mktstamp(day, hour, minute)
directory = os.path.join('data', 'mtrs', ident)
data = '\n'.join([bulletin[0], mtr, ''])
if os.path.exists(os.path.join(directory,fname)):
#
# A file with this timestamp exists, let's compare it to the incoming
incomingMetarDecoded = self.metarDecoder(mtr)
previousMetar = file(os.path.join(directory,fname)).readlines()[1:]
previousMetarDecoded = self.metarDecoder(' '.join([el.strip() for el in previousMetar]))
#
# Compare aviation elements for differences
doWrite, doAlert = self.differentMetars(incomingMetarDecoded,
previousMetarDecoded)
if doAlert:
self.outqueue.put(Avn.Bunch(src='mtrs', ident=ident))
if doWrite:
self.__writeFile(data, directory, fname, False)
_Logger.info('Processed %s from %s', ident, bulletin[0])
else:
self.outqueue.put(Avn.Bunch(src='mtrs', ident=ident))
self.__writeFile(data, directory, fname, False)
except Exception:
_Logger.exception('Cannot process %s:\n%s', bulletin[0], mtr)
errflag = 1
return errflag
def differentMetars(self, newmetar, oldmetar):
"""Compare new observation with the old and decide whether to write and alert"""
write, alert = False, False
#
# Differences in the sky string insufficient for alerts
for n,(element,attribute) in enumerate([('sky','str'),('sky','cig'),
('vsby','value'),('wind','str'),
('pcp','str'),('obv','str')]):
if newmetar.has_key(element) == oldmetar.has_key(element):
try:
if newmetar[element][attribute] != oldmetar[element][attribute]:
if n == 0:
write = True
else:
write = alert = True
_Logger.debug('Significant differences in %s METAR: (%s,%s)' %
(newmetar['ident']['str'],
oldmetar[element][attribute],
newmetar[element][attribute]))
break
except KeyError:
pass
else:
_Logger.debug('Significant differences found in %s METAR: %s' %
(newmetar['ident']['str'],element))
write = alert = True
break
else:
_Logger.debug('No significant differences found in %s METAR' %
newmetar['ident']['str'])
return write, alert
def doTaf(self, bulletin):
# Processes TAFs from a collective
try:
for n in [1, 2, 3]:
if self._TafRe.match(bulletin[n]):
head = bulletin[:n]
break
else:
raise IndexError
except IndexError:
_Logger.warning('Cannot recognize %s', bulletin[0])
return 1
taflist = [taf+[''] for taf in [t.split('\n') for t in \
self._Pat.split('\n'.join(bulletin[n:]))] if taf[0]]
ids = self._ids['taf']
typ = _gettype(bulletin[0])
errflag = 0
for taf in taflist:
try:
ident, tstamp = taf[0].split(None, 2)[:2]
if ids and not ident in ids:
continue
day, hour, minute = tstamp[:2], tstamp[2:4], tstamp[4:6]
fname = _mktstamp(day, hour, minute) + typ
directory = os.path.join('data', 'tafs', ident)
data = '\n'.join(head+taf)
path = self.__writeFile(data, directory, fname)
self.outqueue.put(Avn.Bunch(src='tafs', ident=ident))
_Logger.info('Processed %s from %s', ident, bulletin[0])
except Exception:
_Logger.exception('Cannot process %s\n%s', head, taf)
errflag = 1
return errflag
def doCCFP(self, bulletin):
try:
for n in [1, 2]:
if bulletin[n][:4] == 'CCFP':
head = bulletin[:n]
break
else:
raise IndexError
except IndexError:
_Logger.warning('Cannot recognize %s', bulletin[0])
return 1
cfplist = [cfp+[''] for cfp in [t.split('\n') for t in \
self._Pat.split('\n'.join(bulletin[n:]))] if cfp[0]]
errflag = 0
for cfp in cfplist:
try:
dcd = _decodeCCFP(cfp)
year, month, day, hour = time.gmtime(dcd['itime'])[:4]
fhour = int((dcd['vtime']-dcd['itime']+10.0)//3600.0)
fname = _mktstamp(day, hour, 0, year, month) + ('+%02d' % fhour)
directory = os.path.join('data', 'ccfp')
report = [head[0]]
for info in self._tafinfo:
xsi0, eta0 = info['lat']*10.0, -info['lon']*10.0
area = _findCCFPArea(xsi0, eta0, dcd['area'])
if area is None:
continue
report.append(_mkCCFPReport(info['id'], dcd['vtime'], area))
report.append('')
data = '\n'.join(report)
self.__writeFile(data, directory, fname)
self.outqueue.put(Avn.Bunch(src='ccfp', ident='ALL'))
_Logger.info('Processed %s from %s', info['id'], head[0])
except Exception:
_Logger.exception('Cannot process %s', cfp)
errflag = 1
return errflag
def getSites(self):
self._ids = AvnParser.getAllSiteIds()
for ident in self._ids['taf']:
info = AvnParser.getTafSiteCfg(ident)
if info is None:
continue
lat = float(info['geography']['lat'])
lon = float(info['geography']['lon'])
d = {'id': ident, 'lat': lat, 'lon': lon}
self._tafinfo.append(d)
def run(self):
# main loop
_cleanupBad()
self.onStartup()
lasttime = time.time()
while True:
try:
code, fname, direct = self.inqueue.get(True, self._Tmout)
if code == 0: # end thread
_Logger.info('Got exit request')
raise SystemExit
self.processFile(code, direct, fname)
now = time.time()
if now > lasttime + 21600: # every 6 hours
_cleanupBad()
lasttime = now
except Queue.Empty:
pass
except Exception:
_Logger.exception('Unexpected error')
break