Omaha #3352 Better logging and threading added.

Change-Id: I0011085bed8f758b151526683c57ea2ad5ee7c69

Former-commit-id: 934d96b72a [formerly 6c5675cb2d] [formerly 80fe6531b4 [formerly 638cb3e45b8f75f251b4de4b8670c530721615ff]]
Former-commit-id: 80fe6531b4
Former-commit-id: 55a1680a40
This commit is contained in:
Roger Ferrel 2014-07-31 15:16:22 -05:00
parent b673013d52
commit c762e53d74
4 changed files with 431 additions and 105 deletions

View file

@ -1,2 +1,6 @@
# Schedule for checking/updating warngen geometries
geospatial.updater.cron=0+0+0/1+*+*+?
geospatial.updater.cron=0+0+0/1+*+*+?
# Maximum number of threads to determine a site's features geometry.
#geospatial.geometry.threads=5

View file

@ -0,0 +1,186 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.warning.gis;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryCollection;
import com.vividsolutions.jts.geom.GeometryCollectionIterator;
import com.vividsolutions.jts.geom.GeometryFactory;
import com.vividsolutions.jts.geom.LineString;
import com.vividsolutions.jts.geom.LinearRing;
import com.vividsolutions.jts.geom.MultiPolygon;
import com.vividsolutions.jts.geom.Point;
import com.vividsolutions.jts.geom.Polygon;
/**
* This generates the clipped geometry between feature and CWA features.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 16, 2014 3352 rferrel Initial creation
*
* </pre>
*
* @author rferrel
* @version 1.0
*/
public class FeatureCallable implements Callable<Geometry> {
private final IUFStatusHandler statusHandler;
/**
* Assumed to be GeomIntersectonCallable futures for getting the
* intersections between a feature and associated cwaFeatures.
*/
private final List<Future<Geometry>> geomFutures;
/**
* Constructor.
*
* @param jobNo
* @param queue
* @param jobQueue
*/
public FeatureCallable(List<Future<Geometry>> geomFutures,
IUFStatusHandler statusHandler) {
super();
this.geomFutures = geomFutures;
this.statusHandler = statusHandler;
}
@Override
public Geometry call() throws InterruptedException, ExecutionException {
Geometry multiPolygon = null;
Geometry[] clippedGeoms = null;
multiPolygon = null;
clippedGeoms = new Geometry[geomFutures.size()];
// Wait for all intersections to finish.
for (int index = 0; index < clippedGeoms.length; ++index) {
Future<Geometry> future = geomFutures.get(index);
Geometry clippedGeom = null;
clippedGeom = future.get();
clippedGeoms[index] = clippedGeom;
if (clippedGeom instanceof GeometryCollection) {
GeometryCollection gc = (GeometryCollection) clippedGeom;
if (multiPolygon != null) {
multiPolygon = multiPolygon
.union(convertToMultiPolygon(gc));
} else {
multiPolygon = convertToMultiPolygon(gc);
}
}
}
Geometry result = null;
if (multiPolygon != null) {
result = multiPolygon;
} else if (clippedGeoms[clippedGeoms.length - 1] != null) {
result = clippedGeoms[clippedGeoms.length - 1];
}
return result;
}
/**
* Convert a GeometryCollection to a MultiPolygon.
*
* @param gc
*/
private MultiPolygon convertToMultiPolygon(GeometryCollection gc) {
GeometryCollectionIterator iter = new GeometryCollectionIterator(gc);
Set<Polygon> polygons = new HashSet<Polygon>();
MultiPolygon mp = null;
iter.next();
while (iter.hasNext()) {
Object o = iter.next();
if (o instanceof MultiPolygon) {
if (mp == null) {
mp = (MultiPolygon) o;
} else {
mp = (MultiPolygon) mp.union((MultiPolygon) o);
}
} else if (o instanceof Polygon) {
polygons.add((Polygon) o);
} else if ((o instanceof LineString) || (o instanceof Point)) {
LinearRing lr = null;
Coordinate[] coords = null;
if (o instanceof LineString) {
Coordinate[] cs = ((LineString) o).getCoordinates();
if (cs.length < 4) {
coords = new Coordinate[4];
for (int j = 0; j < cs.length; j++) {
coords[j] = new Coordinate(cs[j]);
}
for (int j = cs.length; j < 4; j++) {
coords[j] = new Coordinate(cs[3 - j]);
}
} else {
coords = new Coordinate[cs.length + 1];
for (int j = 0; j < cs.length; j++) {
coords[j] = new Coordinate(cs[j]);
}
coords[cs.length] = new Coordinate(cs[0]);
}
} else {
coords = new Coordinate[4];
for (int i = 0; i < 4; i++) {
coords[i] = ((Point) o).getCoordinate();
}
}
lr = (((Geometry) o).getFactory()).createLinearRing(coords);
Polygon poly = (new GeometryFactory()).createPolygon(lr, null);
polygons.add(poly);
} else {
statusHandler.handle(Priority.WARN,
"Unprocessed Geometry object: "
+ o.getClass().getName());
}
}
if ((mp == null) && (polygons.size() == 0)) {
return null;
}
if (polygons.size() > 0) {
Polygon[] p = polygons.toArray(new Polygon[0]);
if (mp != null) {
mp = (MultiPolygon) mp.union(new MultiPolygon(p, gc
.getFactory()));
} else {
mp = new MultiPolygon(p, gc.getFactory());
}
}
return mp;
}
}

View file

@ -0,0 +1,59 @@
/**
* This software was developed and / or modified by Raytheon Company,
* pursuant to Contract DG133W-05-CQ-1067 with the US Government.
*
* U.S. EXPORT CONTROLLED TECHNICAL DATA
* This software product contains export-restricted data whose
* export/transfer/disclosure is restricted by U.S. law. Dissemination
* to non-U.S. persons whether in the United States or abroad requires
* an export license or other authorization.
*
* Contractor Name: Raytheon Company
* Contractor Address: 6825 Pine Street, Suite 340
* Mail Stop B8
* Omaha, NE 68106
* 402.291.0100
*
* See the AWIPS II Master Rights File ("Master Rights File.pdf") for
* further licensing information.
**/
package com.raytheon.edex.plugin.warning.gis;
import java.util.concurrent.Callable;
import com.vividsolutions.jts.geom.Geometry;
/**
* This generates the clipped geometry intersections between a feature and a CWA
* Feature.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* Jul 16, 2014 3352 rferrel Initial creation
*
* </pre>
*
* @author rferrel
* @version 1.0
*/
public class GeomIntersectionCallable implements Callable<Geometry> {
private final Geometry feature;
private final Geometry cwaFeature;
public GeomIntersectionCallable(Geometry feature, Geometry cwaFeature) {
super();
this.feature = feature;
this.cwaFeature = cwaFeature;
}
@Override
public Geometry call() {
return feature.intersection(cwaFeature);
}
}

View file

@ -19,6 +19,7 @@
**/
package com.raytheon.edex.plugin.warning.gis;
import java.io.File;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -30,6 +31,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.xml.bind.JAXBException;
@ -65,6 +73,7 @@ import com.raytheon.uf.common.serialization.SingleTypeJAXBManager;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.edex.core.EDEXUtil;
import com.raytheon.uf.edex.core.EdexException;
import com.raytheon.uf.edex.database.cluster.ClusterLockUtils;
@ -72,16 +81,10 @@ import com.raytheon.uf.edex.database.cluster.ClusterLockUtils.LockState;
import com.raytheon.uf.edex.database.cluster.ClusterTask;
import com.raytheon.uf.edex.database.dao.CoreDao;
import com.raytheon.uf.edex.database.dao.DaoConfig;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryCollection;
import com.vividsolutions.jts.geom.GeometryCollectionIterator;
import com.vividsolutions.jts.geom.GeometryFactory;
import com.vividsolutions.jts.geom.LineString;
import com.vividsolutions.jts.geom.LinearRing;
import com.vividsolutions.jts.geom.MultiPolygon;
import com.vividsolutions.jts.geom.Point;
import com.vividsolutions.jts.geom.Polygon;
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory;
import com.vividsolutions.jts.simplify.TopologyPreservingSimplifier;
@ -104,6 +107,7 @@ import com.vividsolutions.jts.simplify.TopologyPreservingSimplifier;
* Feb 07, 2014 16090 mgamazaychikov Changed visibility of some methods
* Mar 19, 2014 2726 rjpeter Made singleton instance.
* Apr 29, 2014 3033 jsanchez Properly handled site and back up site files.
* Jul 15, 2014 3352 rferrel Better logging and threading added.
* </pre>
*
* @author rjpeter
@ -119,8 +123,71 @@ public class GeospatialDataGenerator {
private final String updaterEndpoint;
/**
* Pool to service the callable Geometry.
*/
private final ExecutorService pool;
/**
* Property in the warning.properties to determine the maximum number of
* geometry threads.
*/
private final String THREAD_COUNT_PROPERTY = "geospatial.geometry.threads";
/** Default thread count must be a valid positive number string. */
private final String DEFAULT_THREAD_COUNT = "5";
/** Cluster task name. */
private final static String CLUSTER_NAME = "WarngenGeometryGenerator";
/** Time out lock after one minute. */
private final static long TIME_OUT = TimeUtil.MILLIS_PER_MINUTE;
/** Task to update the lock time for the locked plugin cluster task. */
private static final class LockUpdateTask extends TimerTask {
/** The locked cluster task's details. */
private final String details;
public LockUpdateTask(String details) {
this.details = details;
}
@Override
public void run() {
long currentTime = System.currentTimeMillis();
ClusterLockUtils.updateLockTime(CLUSTER_NAME, details, currentTime);
}
}
/**
* The constructor.
*/
public GeospatialDataGenerator(String updaterEndpoint) {
this.updaterEndpoint = updaterEndpoint;
this.pool = initPool(THREAD_COUNT_PROPERTY, DEFAULT_THREAD_COUNT);
}
/**
* Parse property for thread count and return a fixed thread pool.
*
* @param propKey
* @param defaultStr
* @return pool
*/
private ExecutorService initPool(String propKey, String defaultStr) {
String maxThreadStr = System.getProperty(propKey, defaultStr);
int maxThreads = -1;
try {
maxThreads = Integer.parseInt(maxThreadStr);
if (maxThreads <= 0) {
maxThreads = Integer.parseInt(defaultStr);
}
} catch (NumberFormatException ex) {
maxThreads = Integer.parseInt(defaultStr);
}
return Executors.newFixedThreadPool(maxThreads);
}
public void generateUniqueGeospatialMetadataGeometries() {
@ -139,19 +206,28 @@ public class GeospatialDataGenerator {
List<String> templates = getTemplates(dialogConfig);
Set<GeospatialMetadata> metaDataSet = getMetaDataSet(sites, templates);
for (String site : sites) {
statusHandler.handle(Priority.INFO,
"Checking warngen geometries for site: " + site);
for (GeospatialMetadata md : metaDataSet) {
for (final String site : sites) {
long start = System.currentTimeMillis();
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
statusHandler.handle(Priority.INFO,
"Checking warngen geometries for site: " + site);
}
for (final GeospatialMetadata md : metaDataSet) {
try {
generateGeoSpatialList(site, md);
} catch (Exception e) {
statusHandler
.handle(Priority.ERROR,
"Failed to generate geospatial data for warngen",
e);
} catch (SpatialException e) {
statusHandler.handle(Priority.PROBLEM,
e.getLocalizedMessage(), e);
}
}
if (statusHandler.isPriorityEnabled(Priority.INFO)) {
long time = System.currentTimeMillis() - start;
statusHandler.handle(Priority.INFO, String.format(
"Checking warngen geometries for site: %s, took: %s",
site, TimeUtil.prettyDuration(time)));
}
}
}
@ -222,17 +298,36 @@ public class GeospatialDataGenerator {
return rval;
}
private String getClusterDetails(String site, GeospatialMetadata metaData) {
String fileName = generateGeoDataFilename(metaData);
return String.format("%s%s%s", site, File.separator, fileName);
}
public GeospatialDataSet generateGeoSpatialList(String site,
GeospatialMetadata metaData) throws SpatialException {
GeospatialDataSet dataSet = null;
String file = generateGeoDataFilename(metaData);
String details = getClusterDetails(site, metaData);
ClusterTask ct = null;
Timer lockUpdateTimer = null;
long start = 0L;
try {
do {
ct = ClusterLockUtils.lock("WarngenGeometryGenerator", file,
60000, true);
ct = ClusterLockUtils.lock(CLUSTER_NAME, details, TIME_OUT,
true);
} while (!LockState.SUCCESSFUL.equals(ct.getLockState()));
start = System.currentTimeMillis();
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
String message = String
.format("Start generate Geo Spatial data for site: %s, lock: %s.",
site, ct.getId().getDetails());
statusHandler.handle(Priority.DEBUG, message);
}
lockUpdateTimer = new Timer(CLUSTER_NAME + " timer", true);
TimerTask tt = new LockUpdateTask(ct.getId().getDetails());
lockUpdateTimer.schedule(tt, TIME_OUT / 2L, TIME_OUT / 2L);
GeospatialTime curTime = null;
try {
@ -339,8 +434,21 @@ public class GeospatialDataGenerator {
}
}
} finally {
if (lockUpdateTimer != null) {
lockUpdateTimer.cancel();
lockUpdateTimer = null;
}
if (ct != null) {
ClusterLockUtils.unlock(ct, false);
if (statusHandler.isPriorityEnabled(Priority.DEBUG)) {
long time = System.currentTimeMillis() - start;
String message = String
.format("Generated Geo Spatial data for site: %s, lock: %s, total time: %s.",
site, ct.getId().getDetails(),
TimeUtil.prettyDuration(time));
statusHandler.handle(Priority.DEBUG, message);
}
}
}
return dataSet;
@ -417,7 +525,8 @@ public class GeospatialDataGenerator {
}
private GeospatialData[] queryGeospatialData(String site,
GeospatialMetadata metaData) throws SpatialException {
GeospatialMetadata metaData) throws SpatialException,
InterruptedException, ExecutionException {
String areaSource = metaData.getAreaSource();
HashMap<String, RequestConstraint> map = new HashMap<String, RequestConstraint>(
@ -445,29 +554,7 @@ public class GeospatialDataGenerator {
.query(cwaSource,
cwaAreaFields.toArray(new String[cwaAreaFields
.size()]), null, cwaMap, SearchMode.WITHIN);
Geometry multiPolygon = null;
Geometry clippedGeom = null;
for (int i = 0; i < features.length; i++) {
multiPolygon = null;
for (SpatialQueryResult cwaFeature : cwaFeatures) {
clippedGeom = features[i].geometry
.intersection(cwaFeature.geometry);
if (clippedGeom instanceof GeometryCollection) {
GeometryCollection gc = (GeometryCollection) clippedGeom;
if (multiPolygon != null) {
multiPolygon = multiPolygon
.union(convertToMultiPolygon(gc));
} else {
multiPolygon = convertToMultiPolygon(gc);
}
}
}
if (multiPolygon != null) {
features[i].geometry = multiPolygon;
} else if (clippedGeom != null) {
features[i].geometry = clippedGeom;
}
}
updateFeatures(features, cwaFeatures);
}
topologySimplifyQueryResults(features);
@ -487,73 +574,63 @@ public class GeospatialDataGenerator {
}
/**
* Convert a GeometryCollection to a MultiPolygon.
* Queue request for each feature and wait for the results.
*
* @param gc
* @param features
* @param cwaFeatures
* @throws InterruptedException
* @throws ExecutionException
*/
private MultiPolygon convertToMultiPolygon(GeometryCollection gc) {
GeometryCollectionIterator iter = new GeometryCollectionIterator(gc);
Set<Polygon> polygons = new HashSet<Polygon>();
MultiPolygon mp = null;
iter.next();
while (iter.hasNext()) {
Object o = iter.next();
if (o instanceof MultiPolygon) {
if (mp == null) {
mp = (MultiPolygon) o;
} else {
mp = (MultiPolygon) mp.union((MultiPolygon) o);
}
} else if (o instanceof Polygon) {
polygons.add((Polygon) o);
} else if ((o instanceof LineString) || (o instanceof Point)) {
LinearRing lr = null;
Coordinate[] coords = null;
if (o instanceof LineString) {
Coordinate[] cs = ((LineString) o).getCoordinates();
if (cs.length < 4) {
coords = new Coordinate[4];
for (int j = 0; j < cs.length; j++) {
coords[j] = new Coordinate(cs[j]);
}
for (int j = cs.length; j < 4; j++) {
coords[j] = new Coordinate(cs[3 - j]);
}
} else {
coords = new Coordinate[cs.length + 1];
for (int j = 0; j < cs.length; j++) {
coords[j] = new Coordinate(cs[j]);
}
coords[cs.length] = new Coordinate(cs[0]);
}
} else {
coords = new Coordinate[4];
for (int i = 0; i < 4; i++) {
coords[i] = ((Point) o).getCoordinate();
}
}
lr = (((Geometry) o).getFactory()).createLinearRing(coords);
Polygon poly = (new GeometryFactory()).createPolygon(lr, null);
polygons.add(poly);
} else {
statusHandler.handle(Priority.WARN,
"Unprocessed Geometry object: "
+ o.getClass().getName());
}
private void updateFeatures(SpatialQueryResult[] features,
SpatialQueryResult[] cwaFeatures) throws InterruptedException,
ExecutionException {
List<Future<Geometry>> featureFutures = new ArrayList<Future<Geometry>>(
features.length);
Geometry[] cwaFeturesGeoms = new Geometry[cwaFeatures.length];
for (int index = 0; index < cwaFeturesGeoms.length; ++index) {
cwaFeturesGeoms[index] = cwaFeatures[index].geometry;
}
if ((mp == null) && (polygons.size() == 0)) {
return null;
List<Future<Geometry>> geomFutures = null;
List<Callable<Geometry>> featuresCallable = new ArrayList<Callable<Geometry>>(
featureFutures.size());
// Queue all intersections.
for (int index = 0; index < features.length; ++index) {
geomFutures = submitGeomIntersection(features[index].geometry,
cwaFeturesGeoms);
Callable<Geometry> callable = new FeatureCallable(geomFutures,
statusHandler);
featuresCallable.add(callable);
}
if (polygons.size() > 0) {
Polygon[] p = polygons.toArray(new Polygon[0]);
if (mp != null) {
mp = (MultiPolygon) mp.union(new MultiPolygon(p, gc
.getFactory()));
} else {
mp = new MultiPolygon(p, gc.getFactory());
}
// Finally queue all features.
featureFutures = pool.invokeAll(featuresCallable);
for (int index = 0; index < features.length; ++index) {
Future<Geometry> future = featureFutures.get(index);
features[index].geometry = future.get();
}
return mp;
}
/**
* Get future's list for a feature's geometry.
*
* @param featureGeom
* @param cwaFeaturesGeoms
* @return geomFutures
*/
private List<Future<Geometry>> submitGeomIntersection(Geometry featureGeom,
Geometry[] cwaFeaturesGeoms) {
List<Future<Geometry>> geomFutures = new ArrayList<Future<Geometry>>(
cwaFeaturesGeoms.length);
for (Geometry cwaGeom : cwaFeaturesGeoms) {
Callable<Geometry> callable = new GeomIntersectionCallable(
featureGeom, cwaGeom);
geomFutures.add(pool.submit(callable));
}
return geomFutures;
}
/**