Omaha #3296 Added performance logging. Moved backup and purging to separate java thread. Cached PythonScript.
Change-Id: I9814345b512674269f17f4bf61bd615f0a50b059 (cherry picked from commit a11bcadd76eb9f9597e23ee609fd9cb364ad26ac) Former-commit-id: 49638ba1e9561dea7b318b7adc0f1ab9571170f5
This commit is contained in:
parent
399c06820a
commit
beee08c32b
10 changed files with 510 additions and 99 deletions
|
@ -1,4 +1,3 @@
|
|||
com.raytheon.edex.plugin.gfe.wcl.WclInfo
|
||||
com.raytheon.edex.plugin.gfe.isc.IscSendRecord
|
||||
com.raytheon.edex.plugin.gfe.smartinit.SmartInitRecord
|
||||
com.raytheon.edex.plugin.gfe.smartinit.SmartInitRecordPK
|
|
@ -31,7 +31,6 @@ import javax.xml.bind.annotation.XmlAccessorType;
|
|||
import javax.xml.bind.annotation.XmlAttribute;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
|
||||
import com.raytheon.uf.common.serialization.ISerializableObject;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerialize;
|
||||
import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
||||
|
||||
|
@ -43,9 +42,7 @@ import com.raytheon.uf.common.serialization.annotations.DynamicSerializeElement;
|
|||
*/
|
||||
@XmlAccessorType(XmlAccessType.NONE)
|
||||
@DynamicSerialize
|
||||
public class WclInfo implements ISerializableObject {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
public class WclInfo {
|
||||
|
||||
@XmlAttribute
|
||||
@DynamicSerializeElement
|
||||
|
|
|
@ -2,7 +2,7 @@ Manifest-Version: 1.0
|
|||
Bundle-ManifestVersion: 2
|
||||
Bundle-Name: Activetablesrv Plug-in
|
||||
Bundle-SymbolicName: com.raytheon.uf.edex.activetable
|
||||
Bundle-Version: 1.12.1174.qualifier
|
||||
Bundle-Version: 1.13.0.qualifier
|
||||
Bundle-Vendor: RAYTHEON
|
||||
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
|
||||
Import-Package: com.raytheon.uf.common.activetable,
|
||||
|
@ -24,6 +24,7 @@ Require-Bundle: com.raytheon.uf.common.localization;bundle-version="1.11.1",
|
|||
com.raytheon.uf.common.activetable;bundle-version="1.12.1174",
|
||||
com.raytheon.uf.edex.site;bundle-version="1.0.0",
|
||||
com.google.guava;bundle-version="1.0.0",
|
||||
org.apache.log4j;bundle-version="1.0.0"
|
||||
org.apache.log4j;bundle-version="1.0.0",
|
||||
org.apache.commons.io;bundle-version="2.4.0"
|
||||
Eclipse-RegisterBuddy: com.raytheon.uf.common.serialization
|
||||
Export-Package: com.raytheon.uf.edex.activetable
|
||||
|
|
|
@ -24,14 +24,15 @@ import java.util.Arrays;
|
|||
import java.util.Calendar;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import jep.JepException;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.raytheon.edex.site.SiteUtil;
|
||||
import com.raytheon.edex.util.Util;
|
||||
import com.raytheon.uf.common.activetable.ActiveTableMode;
|
||||
import com.raytheon.uf.common.activetable.ActiveTableRecord;
|
||||
|
@ -49,9 +50,13 @@ import com.raytheon.uf.common.localization.PathManagerFactory;
|
|||
import com.raytheon.uf.common.python.PyUtil;
|
||||
import com.raytheon.uf.common.python.PythonScript;
|
||||
import com.raytheon.uf.common.site.SiteMap;
|
||||
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.PerformanceStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.ITimer;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.edex.core.EDEXUtil;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
|
@ -82,6 +87,9 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
|||
* PRACTICE active table.
|
||||
* Jun 11, 2013 2083 randerso Log active table changes
|
||||
* Mar 06, 2014 2883 randerso Pass siteId into python code
|
||||
* Jun 17, 2014 3296 randerso Cached PythonScript. Moved active table
|
||||
* backup and purging to a separate thread.
|
||||
* Added performance logging
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -98,9 +106,44 @@ public class ActiveTable {
|
|||
|
||||
private static final String NEXT_ETN_LOCK = "ActiveTableNextEtn";
|
||||
|
||||
private static String filePath;
|
||||
private static ThreadLocal<PythonScript> threadLocalPythonScript = new ThreadLocal<PythonScript>() {
|
||||
|
||||
private static String includePath;
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see java.lang.ThreadLocal#initialValue()
|
||||
*/
|
||||
@Override
|
||||
protected PythonScript initialValue() {
|
||||
try {
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
timer.start();
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationContext commonCx = pathMgr.getContext(
|
||||
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
|
||||
String filePath = pathMgr.getFile(commonCx,
|
||||
"vtec" + File.separator + "ActiveTable.py").getPath();
|
||||
String siteId = pathMgr.getContext(
|
||||
LocalizationType.COMMON_STATIC, LocalizationLevel.SITE)
|
||||
.getContextName();
|
||||
String includePath = PyUtil.buildJepIncludePath(
|
||||
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
|
||||
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId),
|
||||
ActiveTablePyIncludeUtil
|
||||
.getGfeConfigIncludePath(siteId));
|
||||
|
||||
PythonScript python = new PythonScript(filePath, includePath,
|
||||
ActiveTable.class.getClassLoader());
|
||||
timer.stop();
|
||||
PerformanceStatus.getHandler("ActiveTable").logDuration(
|
||||
"create PythonScript", timer.getElapsedTime());
|
||||
return python;
|
||||
} catch (JepException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static CoreDao practiceDao = new CoreDao(
|
||||
DaoConfig.forClass(PracticeActiveTableRecord.class));
|
||||
|
@ -108,25 +151,9 @@ public class ActiveTable {
|
|||
private static CoreDao operationalDao = new CoreDao(
|
||||
DaoConfig.forClass(OperationalActiveTableRecord.class));
|
||||
|
||||
private PythonScript python;
|
||||
|
||||
static {
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationContext commonCx = pathMgr.getContext(
|
||||
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
|
||||
filePath = pathMgr.getFile(commonCx,
|
||||
"vtec" + File.separator + "ActiveTable.py").getPath();
|
||||
String siteId = pathMgr.getContext(LocalizationType.COMMON_STATIC,
|
||||
LocalizationLevel.SITE).getContextName();
|
||||
String pythonPath = ActiveTablePyIncludeUtil
|
||||
.getCommonPythonIncludePath();
|
||||
String vtecPath = ActiveTablePyIncludeUtil.getVtecIncludePath(siteId);
|
||||
String configPath = ActiveTablePyIncludeUtil
|
||||
.getGfeConfigIncludePath(siteId);
|
||||
includePath = PyUtil.buildJepIncludePath(pythonPath, vtecPath,
|
||||
configPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
*/
|
||||
public ActiveTable() {
|
||||
}
|
||||
|
||||
|
@ -155,6 +182,10 @@ public class ActiveTable {
|
|||
* the active table mode (PRACTICE or OPERATIONAL)
|
||||
* @param phensigList
|
||||
* phensigs to include. If null, all phensigs will be included.
|
||||
* @param act
|
||||
* the VTEC action. If null all actions will be included
|
||||
* @param etn
|
||||
* the ETN. If null all ETNs will be included
|
||||
* @param requestValidTimes
|
||||
* true if only valid times are to be returned
|
||||
* @return the active table corresponding to the input parameters
|
||||
|
@ -174,6 +205,10 @@ public class ActiveTable {
|
|||
* the active table mode (PRACTICE or OPERATIONAL)
|
||||
* @param phensigList
|
||||
* phensigs to include. If null, all phensigs will be included.
|
||||
* @param act
|
||||
* the VTEC action. If null all actions will be included
|
||||
* @param etn
|
||||
* the ETN. If null all ETNs will be included
|
||||
* @param requestValidTimes
|
||||
* true if only valid times are to be returned
|
||||
* @param wfos
|
||||
|
@ -187,32 +222,18 @@ public class ActiveTable {
|
|||
String[] wfos) {
|
||||
|
||||
if (wfos == null || !Arrays.asList(wfos).contains("all")) {
|
||||
SiteMap siteMap = SiteMap.getInstance();
|
||||
|
||||
if (wfos == null || wfos.length == 0) {
|
||||
// default to WFOs from VTECPartners
|
||||
|
||||
// Use the 3-char site or VTEC_DECODER_SITES will be empty
|
||||
Set<String> site3s = siteMap.getSite3LetterIds(siteId);
|
||||
Set<String> wfoSet = new TreeSet<String>();
|
||||
for (String site3 : site3s) {
|
||||
VTECPartners vtecPartners = VTECPartners.getInstance(site3);
|
||||
List<String> wfoList = (List<String>) vtecPartners
|
||||
.getattr("VTEC_DECODER_SITES");
|
||||
wfoSet.addAll(wfoList);
|
||||
String spcSite = (String) vtecPartners
|
||||
.getattr("VTEC_SPC_SITE");
|
||||
wfoSet.add(spcSite);
|
||||
String tpcSite = (String) vtecPartners
|
||||
.getattr("VTEC_TPC_SITE");
|
||||
wfoSet.add(tpcSite);
|
||||
}
|
||||
Set<String> wfoSet = getDecoderSites(siteId);
|
||||
wfoSet.add(siteId);
|
||||
wfos = wfoSet.toArray(new String[0]);
|
||||
}
|
||||
|
||||
// We have an array of 3- or 4-char WFOs to filter against.
|
||||
// We need a String "KMFL,KTBW,..." for the query.
|
||||
SiteMap siteMap = SiteMap.getInstance();
|
||||
StringBuilder wfosb = new StringBuilder();
|
||||
String sep = "";
|
||||
for (String wfo : wfos) {
|
||||
|
@ -229,6 +250,36 @@ public class ActiveTable {
|
|||
false);
|
||||
}
|
||||
|
||||
private static Set<String> getDecoderSites(String siteId) {
|
||||
SiteMap siteMap = SiteMap.getInstance();
|
||||
|
||||
// Use the 3-char site or VTEC_DECODER_SITES will be empty
|
||||
Set<String> site3s = siteMap.getSite3LetterIds(siteId);
|
||||
Set<String> wfoSet = new HashSet<String>();
|
||||
for (String site3 : site3s) {
|
||||
VTECPartners vtecPartners = VTECPartners.getInstance(site3);
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> wfoList = (List<String>) vtecPartners
|
||||
.getattr("VTEC_DECODER_SITES");
|
||||
wfoSet.addAll(wfoList);
|
||||
String spcSite = (String) vtecPartners.getattr("VTEC_SPC_SITE");
|
||||
wfoSet.add(spcSite);
|
||||
String tpcSite = (String) vtecPartners.getattr("VTEC_TPC_SITE");
|
||||
wfoSet.add(tpcSite);
|
||||
}
|
||||
return wfoSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next ETN for a specific site and phensig
|
||||
*
|
||||
* @param siteId
|
||||
* @param mode
|
||||
* @param phensig
|
||||
* @param currentTime
|
||||
* @param isLock
|
||||
* @return next ETN
|
||||
*/
|
||||
public static Integer getNextEtn(String siteId, ActiveTableMode mode,
|
||||
String phensig, Calendar currentTime, boolean isLock) {
|
||||
String lockName = getEtnClusterLockName(siteId, mode);
|
||||
|
@ -312,11 +363,38 @@ public class ActiveTable {
|
|||
mode = ActiveTableMode.OPERATIONAL;
|
||||
}
|
||||
|
||||
MergeResult result = filterTable(siteId,
|
||||
getActiveTable(siteId, mode), newRecords, mode, offsetSecs);
|
||||
IPerformanceStatusHandler perfStat = PerformanceStatus
|
||||
.getHandler("ActiveTable");
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
timer.start();
|
||||
List<ActiveTableRecord> activeTable = getActiveTable(siteId, mode);
|
||||
timer.stop();
|
||||
perfStat.logDuration("getActiveTable", timer.getElapsedTime());
|
||||
|
||||
// get decoder sites to see if we need to backup active table
|
||||
Set<String> decoderSites = getDecoderSites(siteId);
|
||||
|
||||
// if any new record is from one of the decoder sites
|
||||
// we need to queue a backup
|
||||
for (ActiveTableRecord rec : newRecords) {
|
||||
if (decoderSites.contains(rec.getOfficeid())) {
|
||||
ActiveTableBackup.queue(mode, activeTable);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
timer.reset();
|
||||
timer.start();
|
||||
MergeResult result = filterTable(siteId, activeTable, newRecords,
|
||||
mode, offsetSecs);
|
||||
timer.stop();
|
||||
perfStat.logDuration("filterTable", timer.getElapsedTime());
|
||||
|
||||
timer.reset();
|
||||
timer.start();
|
||||
updateTable(siteId, result, mode);
|
||||
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateTable", timer.getElapsedTime());
|
||||
if (result.changeList.size() > 0) {
|
||||
sendNotification(mode, result.changeList, "VTECDecoder");
|
||||
}
|
||||
|
@ -349,25 +427,16 @@ public class ActiveTable {
|
|||
args.put("offsetSecs", offsetSecs);
|
||||
MergeResult result = null;
|
||||
try {
|
||||
PythonScript python = threadLocalPythonScript.get();
|
||||
try {
|
||||
python = new PythonScript(filePath, includePath,
|
||||
ActiveTable.class.getClassLoader());
|
||||
try {
|
||||
result = (MergeResult) python
|
||||
.execute("mergeFromJava", args);
|
||||
} catch (JepException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error updating active table", e);
|
||||
}
|
||||
result = (MergeResult) python.execute("mergeFromJava", args);
|
||||
} catch (JepException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error initializing active table python", e);
|
||||
}
|
||||
} finally {
|
||||
if (python != null) {
|
||||
python.dispose();
|
||||
python = null;
|
||||
"Error updating active table", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error initializing active table python", e);
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -483,15 +552,32 @@ public class ActiveTable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge new records into the active table
|
||||
*
|
||||
* @param newRecords
|
||||
* records to be merged
|
||||
* @return Exception if any occurs during merge
|
||||
*/
|
||||
public Exception merge(List<ActiveTableRecord> newRecords) {
|
||||
return merge(newRecords, 0.0f);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge new records into the active table
|
||||
*
|
||||
* @param newRecords
|
||||
* records to be merged
|
||||
* @param timeOffset
|
||||
* time offset for practice mode in displaced real time mode
|
||||
* @return Exception if any occurs during merge
|
||||
*/
|
||||
public Exception merge(List<ActiveTableRecord> newRecords, float timeOffset) {
|
||||
Exception exc = null;
|
||||
try {
|
||||
if (newRecords != null) {
|
||||
String siteId = newRecords.get(0).getOfficeid();
|
||||
String siteId = SiteUtil.getSite();
|
||||
siteId = SiteMap.getInstance().getSite4LetterId(siteId);
|
||||
updateActiveTable(siteId, newRecords, timeOffset);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
@ -596,10 +682,15 @@ public class ActiveTable {
|
|||
}
|
||||
}
|
||||
|
||||
public void dispose() {
|
||||
python.dispose();
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear the practice active table for the requested site
|
||||
*
|
||||
* @param requestedSiteId
|
||||
* site ID
|
||||
* @param mode
|
||||
* unused (removed in later build)
|
||||
* @throws DataAccessLayerException
|
||||
*/
|
||||
public static void clearPracticeTable(String requestedSiteId,
|
||||
ActiveTableMode mode) throws DataAccessLayerException {
|
||||
CoreDao dao = practiceDao;
|
||||
|
@ -612,6 +703,13 @@ public class ActiveTable {
|
|||
dao.executeNativeSql(sql);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump product text to temp file
|
||||
*
|
||||
* @param productText
|
||||
* product text
|
||||
* @return the temp file
|
||||
*/
|
||||
public static File dumpProductToTempFile(String productText) {
|
||||
File file = Util.createTempFile(productText.getBytes(), "vtec");
|
||||
file.deleteOnExit();
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/**
|
||||
* This software was developed and / or modified by Raytheon Company,
|
||||
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
|
||||
*
|
||||
* U.S. EXPORT CONTROLLED TECHNICAL DATA
|
||||
* This software product contains export-restricted data whose
|
||||
* export/transfer/disclosure is restricted by U.S. law. Dissemination
|
||||
* to non-U.S. persons whether in the United States or abroad requires
|
||||
* an export license or other authorization.
|
||||
*
|
||||
* Contractor Name: Raytheon Company
|
||||
* Contractor Address: 6825 Pine Street, Suite 340
|
||||
* Mail Stop B8
|
||||
* Omaha, NE 68106
|
||||
* 402.291.0100
|
||||
*
|
||||
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
* further licensing information.
|
||||
**/
|
||||
package com.raytheon.uf.edex.activetable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import jep.JepException;
|
||||
|
||||
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||
|
||||
import com.raytheon.uf.common.activetable.ActiveTableMode;
|
||||
import com.raytheon.uf.common.activetable.ActiveTableRecord;
|
||||
import com.raytheon.uf.common.activetable.VTECPartners;
|
||||
import com.raytheon.uf.common.localization.IPathManager;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationLevel;
|
||||
import com.raytheon.uf.common.localization.LocalizationContext.LocalizationType;
|
||||
import com.raytheon.uf.common.localization.PathManagerFactory;
|
||||
import com.raytheon.uf.common.python.PyUtil;
|
||||
import com.raytheon.uf.common.python.PythonScript;
|
||||
import com.raytheon.uf.common.status.IPerformanceStatusHandler;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.PerformanceStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.ITimer;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
|
||||
/**
|
||||
* Perform Active Table Backup
|
||||
*
|
||||
* <pre>
|
||||
*
|
||||
* SOFTWARE HISTORY
|
||||
*
|
||||
* Date Ticket# Engineer Description
|
||||
* ------------ ---------- ----------- --------------------------
|
||||
* Jun 17, 2014 #3296 randerso Initial creation
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
* @author randerso
|
||||
* @version 1.0
|
||||
*/
|
||||
|
||||
public class ActiveTableBackup {
|
||||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(ActiveTableBackup.class);
|
||||
|
||||
private static class BackupRequest {
|
||||
ActiveTableMode activeTableMode;
|
||||
|
||||
List<ActiveTableRecord> activeTable;
|
||||
|
||||
BackupRequest(ActiveTableMode activeTableMode,
|
||||
List<ActiveTableRecord> activeTable) {
|
||||
this.activeTableMode = activeTableMode;
|
||||
this.activeTable = activeTable;
|
||||
}
|
||||
}
|
||||
|
||||
private static int QUEUE_LIMIT = 10;
|
||||
|
||||
private static BlockingQueue<BackupRequest> queue = new LinkedBlockingQueue<BackupRequest>(
|
||||
QUEUE_LIMIT);
|
||||
|
||||
private static ThreadLocal<PythonScript> threadLocalPythonScript = new ThreadLocal<PythonScript>() {
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see java.lang.ThreadLocal#initialValue()
|
||||
*/
|
||||
@Override
|
||||
protected PythonScript initialValue() {
|
||||
try {
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationContext commonCx = pathMgr.getContext(
|
||||
LocalizationType.COMMON_STATIC, LocalizationLevel.BASE);
|
||||
String filePath = pathMgr.getFile(commonCx,
|
||||
"vtec" + File.separator + "VTECTableUtil.py").getPath();
|
||||
String siteId = pathMgr.getContext(
|
||||
LocalizationType.COMMON_STATIC, LocalizationLevel.SITE)
|
||||
.getContextName();
|
||||
String includePath = PyUtil.buildJepIncludePath(
|
||||
ActiveTablePyIncludeUtil.getCommonPythonIncludePath(),
|
||||
ActiveTablePyIncludeUtil.getVtecIncludePath(siteId));
|
||||
|
||||
PythonScript python = new PythonScript(filePath, includePath,
|
||||
ActiveTableBackup.class.getClassLoader());
|
||||
return python;
|
||||
} catch (JepException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static Runnable target = new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
IPerformanceStatusHandler perfStat = PerformanceStatus
|
||||
.getHandler("ActiveTable");
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
long lastPurge = 0;
|
||||
while (true) {
|
||||
try {
|
||||
BackupRequest request = queue.take();
|
||||
timer.reset();
|
||||
timer.start();
|
||||
IPathManager pathMgr = PathManagerFactory.getPathManager();
|
||||
LocalizationContext ctx = pathMgr.getContext(
|
||||
LocalizationType.EDEX_STATIC,
|
||||
LocalizationLevel.SITE);
|
||||
String siteId = ctx.getContextName();
|
||||
File backupDir = pathMgr.getFile(ctx,
|
||||
FileUtil.join("vtec", "backup")).getAbsoluteFile();
|
||||
|
||||
try {
|
||||
PythonScript python = threadLocalPythonScript.get();
|
||||
HashMap<String, Object> args = new HashMap<String, Object>(
|
||||
4, 1.0f);
|
||||
args.put("activeTable", request.activeTable);
|
||||
args.put("activeTableMode",
|
||||
request.activeTableMode.toString());
|
||||
args.put("filePath", backupDir.getParent());
|
||||
args.put("siteId", siteId);
|
||||
try {
|
||||
python.execute("backupActiveTable", args);
|
||||
} catch (JepException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error backing up active table", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error initializing active table python", e);
|
||||
}
|
||||
timer.stop();
|
||||
perfStat.logDuration("backup activeTable",
|
||||
timer.getElapsedTime());
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
// don't run purge more than once a day
|
||||
if ((now - lastPurge) > TimeUtil.MILLIS_PER_DAY) {
|
||||
timer.reset();
|
||||
timer.start();
|
||||
lastPurge = now;
|
||||
// get purge age in hours
|
||||
long purgeAge = ((Number) VTECPartners.getInstance(
|
||||
siteId).getattr("VTEC_BACKUP_TABLE_PURGE_TIME",
|
||||
168 * 4)).longValue();
|
||||
|
||||
// compute purge time
|
||||
long purgeTime = now
|
||||
- (purgeAge * TimeUtil.MILLIS_PER_HOUR);
|
||||
|
||||
// file filter for backup files for the requested mode
|
||||
FilenameFilter filter = new WildcardFileFilter("*"
|
||||
+ request.activeTableMode.toString() + "*.gz");
|
||||
|
||||
// purge any backup file older than purge time;
|
||||
for (File file : backupDir.listFiles(filter)) {
|
||||
if (file.lastModified() < purgeTime) {
|
||||
file.delete();
|
||||
}
|
||||
}
|
||||
|
||||
timer.stop();
|
||||
perfStat.logDuration("purge activeTable backups",
|
||||
timer.getElapsedTime());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
e.getLocalizedMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
private static Thread backupJob = new Thread(target, "activeTableBackup");
|
||||
|
||||
/**
|
||||
* Queue an active table backup request
|
||||
*
|
||||
* @param activeTableMode
|
||||
* @param activeTable
|
||||
*/
|
||||
public static void queue(ActiveTableMode activeTableMode,
|
||||
List<ActiveTableRecord> activeTable) {
|
||||
BackupRequest req = new BackupRequest(activeTableMode, activeTable);
|
||||
try {
|
||||
queue.add(req);
|
||||
} catch (IllegalStateException e) {
|
||||
// discard a backup request due to queue full
|
||||
BackupRequest discard = queue.poll();
|
||||
queue.add(req);
|
||||
if (discard != null) {
|
||||
statusHandler
|
||||
.warn("ActiveTable backup request discarded, queue full");
|
||||
}
|
||||
}
|
||||
if (!backupJob.isAlive()) {
|
||||
backupJob.start();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -19,17 +19,18 @@
|
|||
**/
|
||||
package com.raytheon.uf.edex.activetable;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.raytheon.edex.esb.Headers;
|
||||
import com.raytheon.uf.common.activetable.ActiveTableMode;
|
||||
import com.raytheon.uf.common.activetable.ActiveTableRecord;
|
||||
import com.raytheon.uf.common.dataplugin.warning.AbstractWarningRecord;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.PerformanceStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.ITimer;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
|
||||
/**
|
||||
* Service for the VTEC active table. Determines if the VTEC product corresponds
|
||||
|
@ -43,6 +44,7 @@ import com.raytheon.uf.common.status.UFStatus.Priority;
|
|||
* Mar 17, 2009 njensen Initial creation
|
||||
* Jul 14, 2009 #2950 njensen Multiple site support
|
||||
* Dec 21, 2009 #4055 njensen No site filtering
|
||||
* Jun 17, 2014 3296 randerso Added performance logging
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -54,16 +56,30 @@ public class ActiveTableSrv {
|
|||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(ActiveTableSrv.class);
|
||||
|
||||
private static Map<Long, ActiveTable> activeTableMap = new HashMap<Long, ActiveTable>();
|
||||
private static ThreadLocal<ActiveTable> threadLocalActiveTable = new ThreadLocal<ActiveTable>() {
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
* @see java.lang.ThreadLocal#initialValue()
|
||||
*/
|
||||
@Override
|
||||
protected ActiveTable initialValue() {
|
||||
return new ActiveTable();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* Merge VTEC info from new warning records into the active table
|
||||
*
|
||||
* @param records
|
||||
*/
|
||||
public void vtecArrived(List<AbstractWarningRecord> records) {
|
||||
ITimer timer = TimeUtil.getTimer();
|
||||
timer.start();
|
||||
try {
|
||||
long threadId = Thread.currentThread().getId();
|
||||
ActiveTable activeTable = activeTableMap.get(threadId);
|
||||
if (activeTable == null) {
|
||||
activeTable = new ActiveTable();
|
||||
activeTableMap.put(threadId, activeTable);
|
||||
}
|
||||
ActiveTable activeTable = threadLocalActiveTable.get();
|
||||
if (records != null && records.size() > 0) {
|
||||
activeTable.merge(ActiveTableRecord.transformFromWarnings(
|
||||
records, ActiveTableMode.OPERATIONAL));
|
||||
|
@ -72,8 +88,18 @@ public class ActiveTableSrv {
|
|||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Error merging active table", t);
|
||||
}
|
||||
timer.stop();
|
||||
PerformanceStatus.getHandler("ActiveTable").logDuration(
|
||||
"Total time to process " + records.size() + " records",
|
||||
timer.getElapsedTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge new warning records into the practice active table
|
||||
*
|
||||
* @param records
|
||||
* @param headers
|
||||
*/
|
||||
public void practiceVtecArrived(List<AbstractWarningRecord> records,
|
||||
Headers headers) {
|
||||
Integer offsetSeconds = null;
|
||||
|
@ -84,12 +110,7 @@ public class ActiveTableSrv {
|
|||
offsetSeconds = Integer.valueOf(0);
|
||||
}
|
||||
if (records != null && records.size() > 0) {
|
||||
long threadId = Thread.currentThread().getId();
|
||||
ActiveTable activeTable = activeTableMap.get(threadId);
|
||||
if (activeTable == null) {
|
||||
activeTable = new ActiveTable();
|
||||
activeTableMap.put(threadId, activeTable);
|
||||
}
|
||||
ActiveTable activeTable = threadLocalActiveTable.get();
|
||||
try {
|
||||
activeTable.merge(ActiveTableRecord.transformFromWarnings(
|
||||
records, ActiveTableMode.PRACTICE), offsetSeconds
|
||||
|
|
|
@ -28,6 +28,9 @@
|
|||
# 06/11/13 #2083 randerso Log active table changes, save backups
|
||||
# 03/06/14 #2883 randerso Pass siteId into mergeFromJava
|
||||
# 03/25/14 #2884 randerso Added xxxid to VTECChange
|
||||
# 06/17/13 #3296 randerso Moved active table backup and purging
|
||||
# to a separate thread in java.
|
||||
# Added performance logging
|
||||
#
|
||||
|
||||
import time
|
||||
|
@ -40,6 +43,10 @@ from com.raytheon.uf.common.localization import PathManagerFactory
|
|||
from com.raytheon.uf.common.localization import LocalizationContext_LocalizationType as LocalizationType
|
||||
from com.raytheon.uf.common.localization import LocalizationContext_LocalizationLevel as LocalizationLevel
|
||||
|
||||
from com.raytheon.uf.common.time.util import TimeUtil
|
||||
from com.raytheon.uf.common.status import PerformanceStatus
|
||||
perfStat = PerformanceStatus.getHandler("ActiveTable")
|
||||
|
||||
class ActiveTable(VTECTableUtil.VTECTableUtil):
|
||||
|
||||
def __init__(self, activeTableMode):
|
||||
|
@ -60,6 +67,8 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
changedFlag = False
|
||||
|
||||
#delete "obsolete" records from the old table.
|
||||
timer = TimeUtil.getTimer()
|
||||
timer.start()
|
||||
vts = VTECTableSqueeze.VTECTableSqueeze(self._time+offsetSecs)
|
||||
activeTable, tossRecords = vts.squeeze(activeTable)
|
||||
for r in tossRecords:
|
||||
|
@ -67,9 +76,13 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
del vts
|
||||
if len(tossRecords):
|
||||
changedFlag = True
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable squeeze", timer.getElapsedTime());
|
||||
|
||||
#expand out any 000 UGC codes, such as FLC000, to indicate all
|
||||
#zones.
|
||||
timer.reset()
|
||||
timer.start()
|
||||
newRecExpanded = []
|
||||
compare1 = ['phen', 'sig', 'officeid', 'etn', 'pil']
|
||||
for newR in newRecords:
|
||||
|
@ -85,11 +98,15 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
else:
|
||||
newRecExpanded.append(newR)
|
||||
newRecords = newRecExpanded
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable expand", timer.getElapsedTime());
|
||||
|
||||
# match new records with old records, with issue time is different
|
||||
# years and event times overlap. Want to reassign ongoing events
|
||||
# from last year's issueTime to be 12/31/2359z, rather than the
|
||||
# real issuetime (which is this year's).
|
||||
timer.reset()
|
||||
timer.start()
|
||||
compare = ['phen', 'sig', 'officeid', 'pil', 'etn']
|
||||
for newR in newRecords:
|
||||
cyear = time.gmtime(newR['issueTime'])[0] #current year issuance time
|
||||
|
@ -106,9 +123,13 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
"\nNewRec: ", self.printEntry(newR),
|
||||
"OldRec: ", self.printEntry(oldR))
|
||||
newR['issueTime'] = lastYearIssueTime
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable match", timer.getElapsedTime());
|
||||
|
||||
|
||||
# split records out by issuance year for processing
|
||||
timer.reset()
|
||||
timer.start()
|
||||
newRecDict = {} #key is issuance year
|
||||
oldRecDict = {}
|
||||
years = []
|
||||
|
@ -126,8 +147,12 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
oldRecDict[issueYear] = records
|
||||
if issueYear not in years:
|
||||
years.append(issueYear)
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable split", timer.getElapsedTime());
|
||||
|
||||
# process each year
|
||||
timer.reset()
|
||||
timer.start()
|
||||
compare = ['id', 'phen', 'sig', 'officeid', 'pil']
|
||||
|
||||
for year in years:
|
||||
|
@ -166,8 +191,12 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
oldR['state'] = "Replaced"
|
||||
changedFlag = True
|
||||
updatedTable.append(oldR)
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable process", timer.getElapsedTime());
|
||||
|
||||
#always add in the new records (except for ROU)
|
||||
timer.reset()
|
||||
timer.start()
|
||||
compare = ['id', 'phen', 'sig', 'officeid', 'pil', 'etn']
|
||||
for year in newRecDict.keys():
|
||||
newRecords = newRecDict[year]
|
||||
|
@ -199,8 +228,12 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
rec = (newR['officeid'], newR['pil'], newR['phensig'], newR['xxxid'])
|
||||
if rec not in changes:
|
||||
changes.append(rec)
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable add", timer.getElapsedTime());
|
||||
|
||||
#filter out any captured text and overviewText if not in the categories
|
||||
timer.reset()
|
||||
timer.start()
|
||||
cats = self._getTextCaptureCategories()
|
||||
if cats is not None:
|
||||
for rec in updatedTable:
|
||||
|
@ -210,6 +243,8 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
if rec.has_key('overviewText'):
|
||||
del rec['overviewText']
|
||||
|
||||
timer.stop();
|
||||
perfStat.logDuration("updateActiveTable filter", timer.getElapsedTime());
|
||||
return updatedTable, tossRecords, changes, changedFlag
|
||||
|
||||
# time overlaps, if tr1 overlaps tr2 (adjacent is not an overlap)
|
||||
|
@ -250,35 +285,35 @@ class ActiveTable(VTECTableUtil.VTECTableUtil):
|
|||
return outTable, purgedRecords, changes, changedFlag
|
||||
|
||||
def mergeFromJava(siteId, activeTable, newRecords, logger, mode, offsetSecs=0):
|
||||
perfStat.log("mergeFromJava called for site: %s, activeTable: %d , newRecords: %d" %
|
||||
(siteId, activeTable.size(), newRecords.size()))
|
||||
timer = TimeUtil.getTimer()
|
||||
timer.start()
|
||||
pyActive = []
|
||||
szActive = activeTable.size()
|
||||
for i in range(szActive):
|
||||
pyActive.append(ActiveTableRecord.ActiveTableRecord(activeTable.get(i)))
|
||||
|
||||
decoderSites = VTECPartners.VTEC_DECODER_SITES
|
||||
decoderSites.append(VTECPartners.get4ID(siteId))
|
||||
decoderSites.append(VTECPartners.VTEC_SPC_SITE)
|
||||
decoderSites.append(VTECPartners.VTEC_TPC_SITE)
|
||||
|
||||
backup = False
|
||||
pyNew = []
|
||||
szNew = newRecords.size()
|
||||
for i in range(szNew):
|
||||
rec = ActiveTableRecord.ActiveTableRecord(newRecords.get(i))
|
||||
if rec['officeid'] in decoderSites:
|
||||
backup = True
|
||||
pyNew.append(rec)
|
||||
|
||||
active = ActiveTable(mode)
|
||||
|
||||
if backup:
|
||||
oldActiveTable = active._convertTableToPurePython(pyActive, siteId)
|
||||
active.saveOldActiveTable(oldActiveTable)
|
||||
pTime = getattr(VTECPartners, "VTEC_BACKUP_TABLE_PURGE_TIME",168)
|
||||
active.purgeOldSavedTables(pTime)
|
||||
logger.info("Updating " + mode + " Active Table: new records\n" +
|
||||
active.printActiveTable(pyNew, combine=1))
|
||||
|
||||
timer.stop()
|
||||
perfStat.logDuration("mergeFromJava preprocess", timer.getElapsedTime());
|
||||
|
||||
updatedTable, purgeRecords, changes, changedFlag = active.activeTableMerge(pyActive, pyNew, offsetSecs)
|
||||
perfStat.log("mergeFromJava activeTableMerge returned updateTable: %d, purgeRecords: %d, changes: %d" %
|
||||
(len(updatedTable), len(purgeRecords), len(changes)))
|
||||
|
||||
timer.reset()
|
||||
timer.start()
|
||||
logger.info("Updated " + mode + " Active Table: purged\n" +
|
||||
active.printActiveTable(purgeRecords, combine=1))
|
||||
|
||||
|
@ -314,4 +349,6 @@ def mergeFromJava(siteId, activeTable, newRecords, logger, mode, offsetSecs=0):
|
|||
|
||||
from com.raytheon.uf.common.activetable import MergeResult
|
||||
result = MergeResult(updatedList, purgedList, changeList)
|
||||
timer.stop()
|
||||
perfStat.logDuration("mergeFromJava postprocess", timer.getElapsedTime());
|
||||
return result
|
||||
|
|
|
@ -43,6 +43,7 @@ import logging
|
|||
import os
|
||||
import sys
|
||||
import time
|
||||
import errno
|
||||
|
||||
import ActiveTableRecord
|
||||
import siteConfig
|
||||
|
|
|
@ -17,6 +17,14 @@
|
|||
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
# further licensing information.
|
||||
##
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 06/17/13 #3296 randerso Default debug to False to avoid logging overhead
|
||||
#
|
||||
|
||||
import os, sys, time, copy, LogStream
|
||||
|
||||
# This class takes a VTEC active table and eliminates unnecessary
|
||||
|
@ -27,7 +35,7 @@ import os, sys, time, copy, LogStream
|
|||
class VTECTableSqueeze:
|
||||
|
||||
#constructor
|
||||
def __init__(self, currentTime, debug=True):
|
||||
def __init__(self, currentTime, debug=False):
|
||||
self.__ctime = currentTime
|
||||
self.__thisYear = time.gmtime(self.__ctime)[0]
|
||||
self.__debug = debug
|
||||
|
|
|
@ -17,7 +17,14 @@
|
|||
# See the AWIPS II Master Rights File ("Master Rights File.pdf") for
|
||||
# further licensing information.
|
||||
##
|
||||
# Utility classes for the VTEC active table
|
||||
# Utility classes for the VTEC util table
|
||||
#
|
||||
# SOFTWARE HISTORY
|
||||
#
|
||||
# Date Ticket# Engineer Description
|
||||
# ------------ ---------- ----------- --------------------------
|
||||
# 06/17/13 #3296 randerso Moved active table backup and purging
|
||||
# to a separate thread in java.
|
||||
|
||||
import copy
|
||||
import cPickle
|
||||
|
@ -318,7 +325,7 @@ class VTECTableUtil:
|
|||
os.makedirs(directory)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
LogStream.logProblem("Could not create active table backup directory:",
|
||||
LogStream.logProblem("Could not create util table backup directory:",
|
||||
directory, LogStream.exc())
|
||||
raise e
|
||||
|
||||
|
@ -384,3 +391,14 @@ class VTECTableUtil:
|
|||
|
||||
return JUtil.javaObjToPyVal(javaDictFormat)
|
||||
|
||||
def backupActiveTable(activeTable, activeTableMode, filePath, siteId):
|
||||
import ActiveTableRecord
|
||||
pyActive = []
|
||||
szActive = activeTable.size()
|
||||
for i in range(szActive):
|
||||
pyActive.append(ActiveTableRecord.ActiveTableRecord(activeTable.get(i)))
|
||||
|
||||
# create a dummy name to simplify the file access code in VTECTableUtil
|
||||
util = VTECTableUtil(os.path.join(filePath, activeTableMode + ".tbl"))
|
||||
oldActiveTable = util._convertTableToPurePython(pyActive, siteId)
|
||||
util.saveOldActiveTable(oldActiveTable)
|
||||
|
|
Loading…
Add table
Reference in a new issue