Merge "Issue #1868 Rewrite mergeAll in PluginDao Change-Id: I5f5706229029c9fe11c8b6a057e871aaf10baf9d" into omaha_13.4.1

Former-commit-id: c35d7575dc [formerly c35d7575dc [formerly 636d54e9ac473d59a8293ade9814161ec06e6b50]]
Former-commit-id: ad5757729f
Former-commit-id: 35db65a6c1
This commit is contained in:
Richard Peter 2013-04-17 12:46:02 -05:00 committed by Gerrit Code Review
commit bd3d8d0a85
4 changed files with 308 additions and 289 deletions

View file

@ -22,6 +22,7 @@ package com.raytheon.uf.common.dataplugin;
import java.lang.reflect.Field;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;
import javax.persistence.Column;
@ -75,7 +76,9 @@ import com.raytheon.uf.common.util.ConvertUtil;
* Apr 12, 2013 1857 bgonzale Changed to MappedSuperclass, named generator,
* GenerationType SEQUENCE, moved Indexes to getter
* methods.
*
* Mar 29, 2013 1638 mschenke Added methods for loading from data map and creating data map from
* dataURI fields
* Apr 15, 2013 1868 bsteffen Improved performance of createDataURIMap
* </pre>
*
*/
@ -206,6 +209,91 @@ public abstract class PluginDataObject extends PersistableDataObject implements
return uriBuffer;
}
/**
* Populates the record object from a data map
*
* @param dataMap
* @throws PluginException
*/
public void populateFromMap(Map<String, Object> dataMap)
throws PluginException {
populateFromMap(this, dataMap);
}
/**
* Creates a mapping of dataURI fields to objects set in the record
*
* @return
* @throws PluginException
*/
public Map<String, Object> createDataURIMap() throws PluginException {
try {
Class<? extends PluginDataObject> thisClass = this.getClass();
Map<String, Object> map = new HashMap<String, Object>();
map.put("pluginName", getPluginName());
int index = 0;
String fieldName = PluginDataObject.getDataURIFieldName(thisClass,
index++);
while (fieldName != null) {
Object source = this;
int start = 0;
int end = fieldName.indexOf('.', start);
while (end >= 0) {
source = PropertyUtils.getProperty(source,
fieldName.substring(start, end));
start = end + 1;
end = fieldName.indexOf('.', start);
}
source = PropertyUtils.getProperty(source,
fieldName.substring(start));
map.put(fieldName, source);
fieldName = PluginDataObject.getDataURIFieldName(thisClass,
index++);
}
return map;
} catch (Exception e) {
throw new PluginException("Error constructing dataURI mapping", e);
}
}
/**
* Populates object from data mapping
*
* @param object
* @param dataMap
*/
public static void populateFromMap(Object object,
Map<String, Object> dataMap) throws PluginException {
try {
for (String property : dataMap.keySet()) {
String[] nested = property.split("[.]");
if (nested.length > 0) {
Object source = object;
for (int i = 0; i < nested.length - 1; ++i) {
String field = nested[i];
Object obj = PropertyUtils.getProperty(source, field);
if (obj == null) {
obj = PropertyUtils.getPropertyType(source, field)
.newInstance();
PropertyUtils.setProperty(source, field, obj);
}
source = obj;
}
String sourceProperty = nested[nested.length - 1];
Object value = dataMap.get(property);
if (value != null) {
PropertyUtils
.setProperty(source, sourceProperty, value);
}
}
}
} catch (Exception e) {
throw new PluginException("Error populating record type: "
+ (object != null ? object.getClass() : null)
+ " from map: " + dataMap, e);
}
}
/**
* Recursive method to populate an object from the elements in a dataURI
* string
@ -468,15 +556,14 @@ public abstract class PluginDataObject extends PersistableDataObject implements
}
/**
* Used to determine if a given subclass exposes the IDecoderGettable
* interface. Normally if the class does implement the interface then a
* reference to "this" is returned. Otherwise a null reference indicates
* that the interface is not implemented.
* TODO: Rework non-PointDataContainer plots and remove
*
* @return The IDecoderGettable interface implementation. Null reference if
* not implemented.
* @return
*/
public abstract IDecoderGettable getDecoderGettable();
@Deprecated
public IDecoderGettable getDecoderGettable() {
return null;
}
public void setDataURI(String dataURI) {
this.dataURI = dataURI;

View file

@ -33,10 +33,8 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@ -70,7 +68,6 @@ import com.raytheon.uf.common.dataquery.db.QueryResult;
import com.raytheon.uf.common.dataquery.db.QueryResultRow;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.edex.database.DataAccessLayerException;
import com.raytheon.uf.edex.database.query.DatabaseQuery;
@ -96,6 +93,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
* 7/24/07 353 bphillip Initial Check in
* 5/14/08 1076 brockwoo Fix for distinct with multiple properties
* Oct 10, 2012 1261 djohnson Incorporate changes to DaoConfig, add generic to {@link IPersistableDataObject}.
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
*
* </pre>
*
@ -255,52 +253,6 @@ public class CoreDao extends HibernateDaoSupport {
});
}
private static final String mergeSqlFormat = "select id from awips.%s where dataURI=:dataURI";
public <T> List<PersistableDataObject<T>> mergeAll(
final List<PersistableDataObject<T>> obj) {
List<PersistableDataObject<T>> duplicates = new ArrayList<PersistableDataObject<T>>();
Session s = this.getHibernateTemplate().getSessionFactory()
.openSession();
Transaction tx = s.beginTransaction();
try {
Map<String, Query> pluginQueryMap = new HashMap<String, Query>();
for (PersistableDataObject<T> pdo : obj) {
if (pdo == null) {
logger.error("Attempted to insert null PersistableDataObject");
continue;
}
String plugin = ((PluginDataObject) pdo).getPluginName();
Query q = pluginQueryMap.get(plugin);
if (q == null) {
q = s.createSQLQuery(String.format(mergeSqlFormat, plugin));
pluginQueryMap.put(plugin, q);
}
q.setString("dataURI", (String) pdo.getIdentifier());
if (q.list().size() == 0) {
s.persist(pdo);
} else {
if (!pdo.isOverwriteAllowed()) {
duplicates.add(pdo);
} else {
statusHandler.handle(Priority.DEBUG, "Overwriting "
+ pdo.getIdentifier());
}
}
}
tx.commit();
} catch (Throwable e) {
// TODO
e.printStackTrace();
tx.rollback();
} finally {
if (s != null) {
s.close();
}
}
return duplicates;
}
/**
* Deletes an object from the database
*

View file

@ -34,8 +34,16 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.geotools.geometry.jts.ReferencedEnvelope;
import org.hibernate.Criteria;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.criterion.Projections;
import org.hibernate.criterion.Restrictions;
import org.hibernate.exception.ConstraintViolationException;
import org.opengis.referencing.FactoryException;
import org.opengis.referencing.crs.CoordinateReferenceSystem;
import org.springframework.transaction.TransactionStatus;
@ -69,6 +77,7 @@ import com.raytheon.uf.common.localization.LocalizationFile;
import com.raytheon.uf.common.localization.PathManagerFactory;
import com.raytheon.uf.common.serialization.SerializationException;
import com.raytheon.uf.common.serialization.SerializationUtil;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.util.TimeUtil;
import com.raytheon.uf.common.util.FileUtil;
import com.raytheon.uf.edex.core.EdexException;
@ -103,6 +112,7 @@ import com.vividsolutions.jts.geom.Polygon;
* Mar 27, 2013 1821 bsteffen Remove extra store in persistToHDF5 for
* replace only operations.
* Apr 04, 2013 djohnson Remove formerly removed methods that won't compile.
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
*
* </pre>
*
@ -120,6 +130,11 @@ public abstract class PluginDao extends CoreDao {
/** The hdf5 file system suffix */
public static final String HDF5_SUFFIX = ".h5";
// should match batch size in hibernate config
protected static final int COMMIT_INTERVAL = 100;
protected static final ConcurrentMap<Class<?>, DuplicateCheckStat> pluginDupCheckRate = new ConcurrentHashMap<Class<?>, DuplicateCheckStat>();
/** The base path of the folder containing HDF5 data for the owning plugin */
public final String PLUGIN_HDF5_DIR;
@ -129,9 +144,6 @@ public abstract class PluginDao extends CoreDao {
/** The owning plugin name */
protected String pluginName;
/** The sql string used to check for duplicates in the database */
protected String dupCheckSql = "select id from awips.:tableName where dataURI=':dataURI'";
protected static final String PURGE_VERSION_FIELD = "dataTime.refTime";
/**
@ -154,8 +166,6 @@ public abstract class PluginDao extends CoreDao {
this.pluginName = pluginName;
PLUGIN_HDF5_DIR = pluginName + File.separator;
dupCheckSql = dupCheckSql.replace(":tableName", PluginFactory
.getInstance().getPrimaryTable(pluginName));
pathProvider = PluginFactory.getInstance().getPathProvider(pluginName);
}
@ -187,14 +197,13 @@ public abstract class PluginDao extends CoreDao {
persistToDatabase(records);
}
@SuppressWarnings("unchecked")
public PluginDataObject[] persistToDatabase(PluginDataObject... records) {
List<PersistableDataObject<Object>> toPersist = new ArrayList<PersistableDataObject<Object>>();
List<PluginDataObject> toPersist = new ArrayList<PluginDataObject>();
for (PluginDataObject record : records) {
toPersist.add(record);
}
List<PersistableDataObject<Object>> duplicates = mergeAll(toPersist);
for (PersistableDataObject<Object> pdo : duplicates) {
List<PluginDataObject> duplicates = mergeAll(toPersist);
for (PluginDataObject pdo : duplicates) {
logger.info("Discarding duplicate: "
+ ((PluginDataObject) (pdo)).getDataURI());
toPersist.remove(pdo);
@ -202,6 +211,166 @@ public abstract class PluginDao extends CoreDao {
return toPersist.toArray(new PluginDataObject[toPersist.size()]);
}
/**
* Commits(merges) a list of pdos into the database. Duplicates will not be
* committed unless the object allows override.
*
* @param objects
* the objects to commit
* @return any duplicate objects which already existed in the db.
*/
public List<PluginDataObject> mergeAll(final List<PluginDataObject> objects) {
if (objects == null || objects.isEmpty()) {
return objects;
}
List<PluginDataObject> duplicates = new ArrayList<PluginDataObject>();
Class<? extends PluginDataObject> pdoClass = objects.get(0).getClass();
DuplicateCheckStat dupStat = pluginDupCheckRate.get(pdoClass);
if (dupStat == null) {
dupStat = new DuplicateCheckStat();
pluginDupCheckRate.put(pdoClass, dupStat);
}
boolean duplicateCheck = dupStat.isDuplicateCheck();
int dupCommitCount = 0;
int noDupCommitCount = 0;
StatelessSession ss = null;
try {
ss = getHibernateTemplate().getSessionFactory()
.openStatelessSession();
// process them all in fixed sized batches.
for (int i = 0; i < objects.size(); i += COMMIT_INTERVAL) {
List<PluginDataObject> subList = objects.subList(i,
Math.min(i + COMMIT_INTERVAL, objects.size()));
List<PluginDataObject> subDuplicates = new ArrayList<PluginDataObject>();
boolean constraintViolation = false;
Transaction tx = null;
if (!duplicateCheck) {
// First attempt is to just shove everything in the database
// as fast as possible and assume no duplicates.
try {
tx = ss.beginTransaction();
for (PluginDataObject object : subList) {
if (object == null) {
continue;
}
ss.insert(object);
}
tx.commit();
} catch (ConstraintViolationException e) {
tx.rollback();
constraintViolation = true;
}
}
if (constraintViolation || duplicateCheck) {
// Second attempt will do duplicate checking, and possibly
// overwrite.
constraintViolation = false;
try {
tx = ss.beginTransaction();
for (PluginDataObject object : subList) {
if (object == null) {
continue;
}
try {
Criteria criteria = ss.createCriteria(pdoClass);
populateDatauriCriteria(criteria, object);
criteria.setProjection(Projections.id());
Integer id = (Integer) criteria.uniqueResult();
if (id != null) {
object.setId(id);
if (object.isOverwriteAllowed()) {
ss.update(object);
} else {
subDuplicates.add(object);
}
} else {
ss.insert(object);
}
} catch (PluginException e) {
statusHandler.handle(Priority.PROBLEM,
"Query failed: Unable to insert or update "
+ object.getIdentifier(), e);
}
}
tx.commit();
} catch (ConstraintViolationException e) {
constraintViolation = true;
tx.rollback();
}
}
if (constraintViolation) {
// Third attempt will commit each pdo individually.
subDuplicates.clear();
for (PluginDataObject object : subList) {
if (object == null) {
continue;
}
try {
tx = ss.beginTransaction();
Criteria criteria = ss.createCriteria(pdoClass);
populateDatauriCriteria(criteria, object);
criteria.setProjection(Projections.id());
Integer id = (Integer) criteria.uniqueResult();
if (id != null) {
object.setId(id);
if (object.isOverwriteAllowed()) {
ss.update(object);
} else {
subDuplicates.add(object);
}
} else {
ss.insert(object);
}
tx.commit();
} catch (ConstraintViolationException e) {
subDuplicates.add(object);
tx.rollback();
} catch (PluginException e) {
statusHandler.handle(Priority.PROBLEM,
"Query failed: Unable to insert or update "
+ object.getIdentifier(), e);
}
}
}
if (subDuplicates.isEmpty()) {
noDupCommitCount += 1;
} else {
dupCommitCount += 1;
duplicates.addAll(subDuplicates);
}
}
dupStat.updateRate(noDupCommitCount
/ (noDupCommitCount + dupCommitCount));
} finally {
if (ss != null) {
ss.close();
}
}
return duplicates;
}
private void populateDatauriCriteria(Criteria criteria, PluginDataObject pdo)
throws PluginException {
criteria.add(Restrictions.eq("dataURI", pdo.getDataURI()));
// TODO this code block can be used if we drop the dataURI column.
// for (Entry<String, Object> uriEntry :
// pdo.createDataURIMap().entrySet()) {
// String key = uriEntry.getKey();
// Object value = uriEntry.getValue();
// if (key.equals("pluginName")) {
// ;// this is not in the db, only used internally.
// } else if (value == null) {
// criteria.add(Restrictions.isNull(key));
// } else {
// criteria.add(Restrictions.eq(key, value));
// }
// }
}
/**
* Persists the HDF5 component of the records to the HDF5 repository
*
@ -1626,4 +1795,36 @@ public abstract class PluginDao extends CoreDao {
}
return rval;
}
protected static class DuplicateCheckStat {
// percentage of commits that need to succeed without duplicate checking
// for a plugin to attempt to skip duplicate checking.
protected static final float DUPLICATE_CHECK_THRESHOLD = 0.5f;
// This number controls the maximum number of transactions to
// "remember" which will affect how difficult it is to change the
// cumulative rate. The larger the number is, the more successful(or
// failed) attempts it will take to change the cumulativeRate.
protected static final int DUPLICATE_MEMORY = 5000;
protected boolean duplicateCheck = true;
protected float cumulativeRate = 1.0f;
protected int total = 0;
protected boolean isDuplicateCheck() {
return duplicateCheck;
}
protected void updateRate(float rate) {
cumulativeRate = (rate + cumulativeRate * total) / (total + 1);
duplicateCheck = cumulativeRate < DUPLICATE_CHECK_THRESHOLD;
if (total < DUPLICATE_MEMORY) {
total++;
}
}
}
}

View file

@ -27,28 +27,19 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.xml.bind.JAXBException;
import net.sf.cglib.beans.BeanMap;
import org.hibernate.HibernateException;
import org.hibernate.Query;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import com.raytheon.uf.common.dataplugin.PluginDataObject;
import com.raytheon.uf.common.dataplugin.PluginException;
import com.raytheon.uf.common.dataplugin.persist.DefaultPathProvider;
import com.raytheon.uf.common.dataplugin.persist.IPersistable;
import com.raytheon.uf.common.dataplugin.persist.PersistableDataObject;
import com.raytheon.uf.common.datastorage.DataStoreFactory;
import com.raytheon.uf.common.datastorage.IDataStore;
import com.raytheon.uf.common.datastorage.IDataStore.StoreOp;
@ -64,7 +55,6 @@ import com.raytheon.uf.common.pointdata.PointDataDescription;
import com.raytheon.uf.common.pointdata.PointDataView;
import com.raytheon.uf.common.status.IUFStatusHandler;
import com.raytheon.uf.common.status.UFStatus;
import com.raytheon.uf.common.status.UFStatus.Priority;
import com.raytheon.uf.common.time.DataTime;
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
import com.raytheon.uf.edex.database.plugin.PluginDao;
@ -79,6 +69,7 @@ import com.raytheon.uf.edex.database.plugin.PluginDao;
* ------------ ---------- ----------- --------------------------
* Apr 13, 2009 chammack Initial creation
* Jan 14, 2013 1469 bkowal Removed the hdf5 data directory.
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
*
* </pre>
*
@ -91,43 +82,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
private static final transient IUFStatusHandler statusHandler = UFStatus
.getHandler(PointDataPluginDao.class);
// should match batch size in hibernate config
protected static final int COMMIT_INTERVAL = 100;
// after limit failures on one persistAll call, will switch to individual
// storage and dup check
protected static final int BULK_FAILURE_LIMIT = 3;
protected static final ConcurrentMap<String, DupCheckStat> pluginBulkSuccessRate = new ConcurrentHashMap<String, DupCheckStat>();
// percentage of bulk commits that need to succeed for a plugin to not use
// dup check
protected static final float DUP_CHECK_THRESHOLD = 0.5f;
protected static class DupCheckStat {
protected boolean checkDup = true;
protected float cumulativeRate = 0;
protected int total = 0;
protected boolean checkDup() {
return checkDup;
}
protected void updateRate(float rate) {
cumulativeRate = (rate + cumulativeRate * total) / (total + 1);
checkDup = cumulativeRate < DUP_CHECK_THRESHOLD;
// handle roll over... just incase, which at this point culmulative
// updates are almost pointless, 100 there simply for concurrency
// handling
if (total < Integer.MAX_VALUE - 100) {
total++;
}
}
}
public static enum LevelRequest {
ALL, NONE, SPECIFIC;
@ -174,169 +128,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
this.beanMapCache = new LinkedBlockingQueue<BeanMap>();
}
/**
* Persists all objects in collection
*
* @param obj
* The object to be persisted to the database
*/
public void persistAll(final List<? extends Object> objList) {
StatelessSession ss = null;
try {
ss = getHibernateTemplate().getSessionFactory()
.openStatelessSession();
int index = 0;
int commitPoint = 0;
// intelligently choose whether to use bulk storage based on
// previous stores for this plugin type
DupCheckStat rate = pluginBulkSuccessRate.get(pluginName);
if (rate == null) {
rate = new DupCheckStat();
pluginBulkSuccessRate.put(pluginName, rate);
}
boolean bulkPersist = true;
Transaction tx = null;
int bulkDups = 0;
int bulkFailures = 0;
int bulkSuccess = 0;
boolean dupCheck = rate.checkDup();
boolean dupOccurred = false;
Query q = null;
while (commitPoint < objList.size()) {
if (bulkPersist) {
Iterator<? extends Object> itr = objList
.listIterator(commitPoint);
index = commitPoint;
dupOccurred = false;
try {
tx = ss.beginTransaction();
while (itr.hasNext()) {
PersistableDataObject pdo = (PersistableDataObject) itr
.next();
if (dupCheck) {
if (q == null) {
String sql = "select id from awips."
+ ((PluginDataObject) pdo)
.getPluginName()
+ " where dataURI=:dataURI";
q = ss.createSQLQuery(sql);
}
q.setString("dataURI",
(String) pdo.getIdentifier());
List<?> list = q.list();
if ((list == null) || (list.size() == 0)) {
ss.insert(pdo);
index++;
} else {
itr.remove();
dupOccurred = true;
}
} else {
ss.insert(pdo);
index++;
}
if (index % COMMIT_INTERVAL == 0) {
tx.commit();
q = null;
commitPoint = index;
if (dupOccurred) {
dupOccurred = false;
bulkDups++;
} else {
bulkSuccess++;
}
tx = ss.beginTransaction();
}
}
tx.commit();
commitPoint = index;
bulkSuccess++;
} catch (Exception e) {
bulkFailures++;
statusHandler
.handle(Priority.PROBLEM,
"Error storing pointdata batch to database, applying dup check and storing batch individually");
bulkPersist = false;
try {
tx.rollback();
} catch (HibernateException e1) {
statusHandler.handle(Priority.PROBLEM,
"Rollback failed", e1);
}
}
} else {
// persist records individually, using uri dup check
Iterator<? extends Object> itr = objList
.listIterator(commitPoint);
index = 0;
dupOccurred = false;
// only persist individually through one commit interval
while (itr.hasNext() && (index / COMMIT_INTERVAL == 0)) {
try {
tx = ss.beginTransaction();
PersistableDataObject pdo = (PersistableDataObject) itr
.next();
String sql = "select id from awips."
+ ((PluginDataObject) pdo).getPluginName()
+ " where dataURI=:dataURI";
q = ss.createSQLQuery(sql);
q.setString("dataURI", (String) pdo.getIdentifier());
List<?> list = q.list();
if ((list == null) || (list.size() == 0)) {
ss.insert(pdo);
tx.commit();
index++;
} else {
tx.commit();
itr.remove();
dupOccurred = true;
}
} catch (Exception e) {
statusHandler
.handle(Priority.PROBLEM,
"Error storing pointdata individually to database",
e);
itr.remove();
try {
tx.rollback();
} catch (HibernateException e1) {
statusHandler.handle(Priority.PROBLEM,
"Rollback failed", e1);
}
}
}
if (dupOccurred) {
bulkDups++;
} else {
bulkSuccess++;
}
commitPoint += index;
if (bulkFailures < BULK_FAILURE_LIMIT) {
bulkPersist = true;
}
}
}
// calculate bulk success rate
float thisRate = bulkSuccess / (bulkSuccess + bulkDups);
rate.updateRate(thisRate);
} finally {
if (ss != null) {
ss.close();
}
}
}
/*
* (non-Javadoc)
*
@ -433,18 +224,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
}
@Override
public PluginDataObject[] persistToDatabase(PluginDataObject... records) {
List<PersistableDataObject> persist = new ArrayList<PersistableDataObject>(
Arrays.asList(records));
persistAll(persist);
if (persist.size() != records.length) {
return persist.toArray(new PluginDataObject[persist.size()]);
} else {
return records;
}
}
public File getFullFilePath(PluginDataObject p) {
File file;
String directory = p.getPluginName() + File.separator