Issue #1868 Rewrite mergeAll in PluginDao
Change-Id: I5f5706229029c9fe11c8b6a057e871aaf10baf9d Former-commit-id:3091b7614b
[formerlyd21778dbfb
] [formerly7dc1e34f1b
] [formerly3091b7614b
[formerlyd21778dbfb
] [formerly7dc1e34f1b
] [formerly3e83b37afd
[formerly7dc1e34f1b
[formerly 93a4bc3f6e7ae7de959a25a563778f424d53f2d1]]]] Former-commit-id:3e83b37afd
Former-commit-id:a6d64052bd
[formerly0be0b444e4
] [formerly dbb6fc7a3c04ac03c6ef33dc542236d4dad7d0e5 [formerlybf72ff3d37
]] Former-commit-id: bef4782cc94047fbd8e6097b25e51550ec3ac9ba [formerly5779912e87
] Former-commit-id:f7adb5a99d
This commit is contained in:
parent
d8bfcb53e7
commit
63b7319cef
4 changed files with 308 additions and 289 deletions
|
@ -22,6 +22,7 @@ package com.raytheon.uf.common.dataplugin;
|
|||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.persistence.Column;
|
||||
|
@ -75,7 +76,9 @@ import com.raytheon.uf.common.util.ConvertUtil;
|
|||
* Apr 12, 2013 1857 bgonzale Changed to MappedSuperclass, named generator,
|
||||
* GenerationType SEQUENCE, moved Indexes to getter
|
||||
* methods.
|
||||
*
|
||||
* Mar 29, 2013 1638 mschenke Added methods for loading from data map and creating data map from
|
||||
* dataURI fields
|
||||
* Apr 15, 2013 1868 bsteffen Improved performance of createDataURIMap
|
||||
* </pre>
|
||||
*
|
||||
*/
|
||||
|
@ -206,6 +209,91 @@ public abstract class PluginDataObject extends PersistableDataObject implements
|
|||
return uriBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the record object from a data map
|
||||
*
|
||||
* @param dataMap
|
||||
* @throws PluginException
|
||||
*/
|
||||
public void populateFromMap(Map<String, Object> dataMap)
|
||||
throws PluginException {
|
||||
populateFromMap(this, dataMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mapping of dataURI fields to objects set in the record
|
||||
*
|
||||
* @return
|
||||
* @throws PluginException
|
||||
*/
|
||||
public Map<String, Object> createDataURIMap() throws PluginException {
|
||||
try {
|
||||
Class<? extends PluginDataObject> thisClass = this.getClass();
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
map.put("pluginName", getPluginName());
|
||||
int index = 0;
|
||||
String fieldName = PluginDataObject.getDataURIFieldName(thisClass,
|
||||
index++);
|
||||
while (fieldName != null) {
|
||||
Object source = this;
|
||||
int start = 0;
|
||||
int end = fieldName.indexOf('.', start);
|
||||
while (end >= 0) {
|
||||
source = PropertyUtils.getProperty(source,
|
||||
fieldName.substring(start, end));
|
||||
start = end + 1;
|
||||
end = fieldName.indexOf('.', start);
|
||||
}
|
||||
source = PropertyUtils.getProperty(source,
|
||||
fieldName.substring(start));
|
||||
map.put(fieldName, source);
|
||||
fieldName = PluginDataObject.getDataURIFieldName(thisClass,
|
||||
index++);
|
||||
}
|
||||
return map;
|
||||
} catch (Exception e) {
|
||||
throw new PluginException("Error constructing dataURI mapping", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates object from data mapping
|
||||
*
|
||||
* @param object
|
||||
* @param dataMap
|
||||
*/
|
||||
public static void populateFromMap(Object object,
|
||||
Map<String, Object> dataMap) throws PluginException {
|
||||
try {
|
||||
for (String property : dataMap.keySet()) {
|
||||
String[] nested = property.split("[.]");
|
||||
if (nested.length > 0) {
|
||||
Object source = object;
|
||||
for (int i = 0; i < nested.length - 1; ++i) {
|
||||
String field = nested[i];
|
||||
Object obj = PropertyUtils.getProperty(source, field);
|
||||
if (obj == null) {
|
||||
obj = PropertyUtils.getPropertyType(source, field)
|
||||
.newInstance();
|
||||
PropertyUtils.setProperty(source, field, obj);
|
||||
}
|
||||
source = obj;
|
||||
}
|
||||
String sourceProperty = nested[nested.length - 1];
|
||||
Object value = dataMap.get(property);
|
||||
if (value != null) {
|
||||
PropertyUtils
|
||||
.setProperty(source, sourceProperty, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new PluginException("Error populating record type: "
|
||||
+ (object != null ? object.getClass() : null)
|
||||
+ " from map: " + dataMap, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursive method to populate an object from the elements in a dataURI
|
||||
* string
|
||||
|
@ -468,15 +556,14 @@ public abstract class PluginDataObject extends PersistableDataObject implements
|
|||
}
|
||||
|
||||
/**
|
||||
* Used to determine if a given subclass exposes the IDecoderGettable
|
||||
* interface. Normally if the class does implement the interface then a
|
||||
* reference to "this" is returned. Otherwise a null reference indicates
|
||||
* that the interface is not implemented.
|
||||
* TODO: Rework non-PointDataContainer plots and remove
|
||||
*
|
||||
* @return The IDecoderGettable interface implementation. Null reference if
|
||||
* not implemented.
|
||||
* @return
|
||||
*/
|
||||
public abstract IDecoderGettable getDecoderGettable();
|
||||
@Deprecated
|
||||
public IDecoderGettable getDecoderGettable() {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setDataURI(String dataURI) {
|
||||
this.dataURI = dataURI;
|
||||
|
|
|
@ -33,10 +33,8 @@ import java.sql.SQLException;
|
|||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
|
@ -70,7 +68,6 @@ import com.raytheon.uf.common.dataquery.db.QueryResult;
|
|||
import com.raytheon.uf.common.dataquery.db.QueryResultRow;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.edex.database.DataAccessLayerException;
|
||||
import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
||||
|
||||
|
@ -96,6 +93,7 @@ import com.raytheon.uf.edex.database.query.DatabaseQuery;
|
|||
* 7/24/07 353 bphillip Initial Check in
|
||||
* 5/14/08 1076 brockwoo Fix for distinct with multiple properties
|
||||
* Oct 10, 2012 1261 djohnson Incorporate changes to DaoConfig, add generic to {@link IPersistableDataObject}.
|
||||
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -255,52 +253,6 @@ public class CoreDao extends HibernateDaoSupport {
|
|||
});
|
||||
}
|
||||
|
||||
private static final String mergeSqlFormat = "select id from awips.%s where dataURI=:dataURI";
|
||||
|
||||
public <T> List<PersistableDataObject<T>> mergeAll(
|
||||
final List<PersistableDataObject<T>> obj) {
|
||||
List<PersistableDataObject<T>> duplicates = new ArrayList<PersistableDataObject<T>>();
|
||||
Session s = this.getHibernateTemplate().getSessionFactory()
|
||||
.openSession();
|
||||
Transaction tx = s.beginTransaction();
|
||||
try {
|
||||
Map<String, Query> pluginQueryMap = new HashMap<String, Query>();
|
||||
for (PersistableDataObject<T> pdo : obj) {
|
||||
if (pdo == null) {
|
||||
logger.error("Attempted to insert null PersistableDataObject");
|
||||
continue;
|
||||
}
|
||||
String plugin = ((PluginDataObject) pdo).getPluginName();
|
||||
Query q = pluginQueryMap.get(plugin);
|
||||
if (q == null) {
|
||||
q = s.createSQLQuery(String.format(mergeSqlFormat, plugin));
|
||||
pluginQueryMap.put(plugin, q);
|
||||
}
|
||||
q.setString("dataURI", (String) pdo.getIdentifier());
|
||||
if (q.list().size() == 0) {
|
||||
s.persist(pdo);
|
||||
} else {
|
||||
if (!pdo.isOverwriteAllowed()) {
|
||||
duplicates.add(pdo);
|
||||
} else {
|
||||
statusHandler.handle(Priority.DEBUG, "Overwriting "
|
||||
+ pdo.getIdentifier());
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.commit();
|
||||
} catch (Throwable e) {
|
||||
// TODO
|
||||
e.printStackTrace();
|
||||
tx.rollback();
|
||||
} finally {
|
||||
if (s != null) {
|
||||
s.close();
|
||||
}
|
||||
}
|
||||
return duplicates;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes an object from the database
|
||||
*
|
||||
|
|
|
@ -34,8 +34,16 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.geotools.geometry.jts.ReferencedEnvelope;
|
||||
import org.hibernate.Criteria;
|
||||
import org.hibernate.StatelessSession;
|
||||
import org.hibernate.Transaction;
|
||||
import org.hibernate.criterion.Projections;
|
||||
import org.hibernate.criterion.Restrictions;
|
||||
import org.hibernate.exception.ConstraintViolationException;
|
||||
import org.opengis.referencing.FactoryException;
|
||||
import org.opengis.referencing.crs.CoordinateReferenceSystem;
|
||||
import org.springframework.transaction.TransactionStatus;
|
||||
|
@ -69,6 +77,7 @@ import com.raytheon.uf.common.localization.LocalizationFile;
|
|||
import com.raytheon.uf.common.localization.PathManagerFactory;
|
||||
import com.raytheon.uf.common.serialization.SerializationException;
|
||||
import com.raytheon.uf.common.serialization.SerializationUtil;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.util.TimeUtil;
|
||||
import com.raytheon.uf.common.util.FileUtil;
|
||||
import com.raytheon.uf.edex.core.EdexException;
|
||||
|
@ -103,6 +112,7 @@ import com.vividsolutions.jts.geom.Polygon;
|
|||
* Mar 27, 2013 1821 bsteffen Remove extra store in persistToHDF5 for
|
||||
* replace only operations.
|
||||
* Apr 04, 2013 djohnson Remove formerly removed methods that won't compile.
|
||||
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -120,6 +130,11 @@ public abstract class PluginDao extends CoreDao {
|
|||
/** The hdf5 file system suffix */
|
||||
public static final String HDF5_SUFFIX = ".h5";
|
||||
|
||||
// should match batch size in hibernate config
|
||||
protected static final int COMMIT_INTERVAL = 100;
|
||||
|
||||
protected static final ConcurrentMap<Class<?>, DuplicateCheckStat> pluginDupCheckRate = new ConcurrentHashMap<Class<?>, DuplicateCheckStat>();
|
||||
|
||||
/** The base path of the folder containing HDF5 data for the owning plugin */
|
||||
public final String PLUGIN_HDF5_DIR;
|
||||
|
||||
|
@ -129,9 +144,6 @@ public abstract class PluginDao extends CoreDao {
|
|||
/** The owning plugin name */
|
||||
protected String pluginName;
|
||||
|
||||
/** The sql string used to check for duplicates in the database */
|
||||
protected String dupCheckSql = "select id from awips.:tableName where dataURI=':dataURI'";
|
||||
|
||||
protected static final String PURGE_VERSION_FIELD = "dataTime.refTime";
|
||||
|
||||
/**
|
||||
|
@ -154,8 +166,6 @@ public abstract class PluginDao extends CoreDao {
|
|||
|
||||
this.pluginName = pluginName;
|
||||
PLUGIN_HDF5_DIR = pluginName + File.separator;
|
||||
dupCheckSql = dupCheckSql.replace(":tableName", PluginFactory
|
||||
.getInstance().getPrimaryTable(pluginName));
|
||||
pathProvider = PluginFactory.getInstance().getPathProvider(pluginName);
|
||||
}
|
||||
|
||||
|
@ -187,14 +197,13 @@ public abstract class PluginDao extends CoreDao {
|
|||
persistToDatabase(records);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public PluginDataObject[] persistToDatabase(PluginDataObject... records) {
|
||||
List<PersistableDataObject<Object>> toPersist = new ArrayList<PersistableDataObject<Object>>();
|
||||
List<PluginDataObject> toPersist = new ArrayList<PluginDataObject>();
|
||||
for (PluginDataObject record : records) {
|
||||
toPersist.add(record);
|
||||
}
|
||||
List<PersistableDataObject<Object>> duplicates = mergeAll(toPersist);
|
||||
for (PersistableDataObject<Object> pdo : duplicates) {
|
||||
List<PluginDataObject> duplicates = mergeAll(toPersist);
|
||||
for (PluginDataObject pdo : duplicates) {
|
||||
logger.info("Discarding duplicate: "
|
||||
+ ((PluginDataObject) (pdo)).getDataURI());
|
||||
toPersist.remove(pdo);
|
||||
|
@ -202,6 +211,166 @@ public abstract class PluginDao extends CoreDao {
|
|||
return toPersist.toArray(new PluginDataObject[toPersist.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Commits(merges) a list of pdos into the database. Duplicates will not be
|
||||
* committed unless the object allows override.
|
||||
*
|
||||
* @param objects
|
||||
* the objects to commit
|
||||
* @return any duplicate objects which already existed in the db.
|
||||
*/
|
||||
public List<PluginDataObject> mergeAll(final List<PluginDataObject> objects) {
|
||||
if (objects == null || objects.isEmpty()) {
|
||||
return objects;
|
||||
}
|
||||
List<PluginDataObject> duplicates = new ArrayList<PluginDataObject>();
|
||||
Class<? extends PluginDataObject> pdoClass = objects.get(0).getClass();
|
||||
DuplicateCheckStat dupStat = pluginDupCheckRate.get(pdoClass);
|
||||
if (dupStat == null) {
|
||||
dupStat = new DuplicateCheckStat();
|
||||
pluginDupCheckRate.put(pdoClass, dupStat);
|
||||
}
|
||||
boolean duplicateCheck = dupStat.isDuplicateCheck();
|
||||
int dupCommitCount = 0;
|
||||
int noDupCommitCount = 0;
|
||||
|
||||
StatelessSession ss = null;
|
||||
try {
|
||||
ss = getHibernateTemplate().getSessionFactory()
|
||||
.openStatelessSession();
|
||||
// process them all in fixed sized batches.
|
||||
for (int i = 0; i < objects.size(); i += COMMIT_INTERVAL) {
|
||||
List<PluginDataObject> subList = objects.subList(i,
|
||||
Math.min(i + COMMIT_INTERVAL, objects.size()));
|
||||
List<PluginDataObject> subDuplicates = new ArrayList<PluginDataObject>();
|
||||
boolean constraintViolation = false;
|
||||
Transaction tx = null;
|
||||
if (!duplicateCheck) {
|
||||
// First attempt is to just shove everything in the database
|
||||
// as fast as possible and assume no duplicates.
|
||||
try {
|
||||
tx = ss.beginTransaction();
|
||||
for (PluginDataObject object : subList) {
|
||||
if (object == null) {
|
||||
continue;
|
||||
}
|
||||
ss.insert(object);
|
||||
}
|
||||
tx.commit();
|
||||
} catch (ConstraintViolationException e) {
|
||||
tx.rollback();
|
||||
constraintViolation = true;
|
||||
}
|
||||
}
|
||||
if (constraintViolation || duplicateCheck) {
|
||||
// Second attempt will do duplicate checking, and possibly
|
||||
// overwrite.
|
||||
constraintViolation = false;
|
||||
try {
|
||||
tx = ss.beginTransaction();
|
||||
for (PluginDataObject object : subList) {
|
||||
if (object == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
Criteria criteria = ss.createCriteria(pdoClass);
|
||||
populateDatauriCriteria(criteria, object);
|
||||
criteria.setProjection(Projections.id());
|
||||
Integer id = (Integer) criteria.uniqueResult();
|
||||
if (id != null) {
|
||||
object.setId(id);
|
||||
if (object.isOverwriteAllowed()) {
|
||||
ss.update(object);
|
||||
} else {
|
||||
subDuplicates.add(object);
|
||||
}
|
||||
} else {
|
||||
ss.insert(object);
|
||||
}
|
||||
} catch (PluginException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Query failed: Unable to insert or update "
|
||||
+ object.getIdentifier(), e);
|
||||
}
|
||||
}
|
||||
tx.commit();
|
||||
} catch (ConstraintViolationException e) {
|
||||
constraintViolation = true;
|
||||
tx.rollback();
|
||||
}
|
||||
}
|
||||
if (constraintViolation) {
|
||||
// Third attempt will commit each pdo individually.
|
||||
subDuplicates.clear();
|
||||
for (PluginDataObject object : subList) {
|
||||
if (object == null) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
tx = ss.beginTransaction();
|
||||
Criteria criteria = ss.createCriteria(pdoClass);
|
||||
populateDatauriCriteria(criteria, object);
|
||||
criteria.setProjection(Projections.id());
|
||||
Integer id = (Integer) criteria.uniqueResult();
|
||||
if (id != null) {
|
||||
object.setId(id);
|
||||
if (object.isOverwriteAllowed()) {
|
||||
ss.update(object);
|
||||
} else {
|
||||
subDuplicates.add(object);
|
||||
}
|
||||
} else {
|
||||
ss.insert(object);
|
||||
}
|
||||
tx.commit();
|
||||
} catch (ConstraintViolationException e) {
|
||||
subDuplicates.add(object);
|
||||
tx.rollback();
|
||||
} catch (PluginException e) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Query failed: Unable to insert or update "
|
||||
+ object.getIdentifier(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (subDuplicates.isEmpty()) {
|
||||
noDupCommitCount += 1;
|
||||
} else {
|
||||
dupCommitCount += 1;
|
||||
duplicates.addAll(subDuplicates);
|
||||
}
|
||||
}
|
||||
dupStat.updateRate(noDupCommitCount
|
||||
/ (noDupCommitCount + dupCommitCount));
|
||||
} finally {
|
||||
if (ss != null) {
|
||||
ss.close();
|
||||
}
|
||||
}
|
||||
return duplicates;
|
||||
}
|
||||
|
||||
private void populateDatauriCriteria(Criteria criteria, PluginDataObject pdo)
|
||||
throws PluginException {
|
||||
criteria.add(Restrictions.eq("dataURI", pdo.getDataURI()));
|
||||
|
||||
// TODO this code block can be used if we drop the dataURI column.
|
||||
|
||||
// for (Entry<String, Object> uriEntry :
|
||||
// pdo.createDataURIMap().entrySet()) {
|
||||
// String key = uriEntry.getKey();
|
||||
// Object value = uriEntry.getValue();
|
||||
// if (key.equals("pluginName")) {
|
||||
// ;// this is not in the db, only used internally.
|
||||
// } else if (value == null) {
|
||||
// criteria.add(Restrictions.isNull(key));
|
||||
// } else {
|
||||
// criteria.add(Restrictions.eq(key, value));
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists the HDF5 component of the records to the HDF5 repository
|
||||
*
|
||||
|
@ -1626,4 +1795,36 @@ public abstract class PluginDao extends CoreDao {
|
|||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
protected static class DuplicateCheckStat {
|
||||
|
||||
// percentage of commits that need to succeed without duplicate checking
|
||||
// for a plugin to attempt to skip duplicate checking.
|
||||
protected static final float DUPLICATE_CHECK_THRESHOLD = 0.5f;
|
||||
|
||||
// This number controls the maximum number of transactions to
|
||||
// "remember" which will affect how difficult it is to change the
|
||||
// cumulative rate. The larger the number is, the more successful(or
|
||||
// failed) attempts it will take to change the cumulativeRate.
|
||||
protected static final int DUPLICATE_MEMORY = 5000;
|
||||
|
||||
protected boolean duplicateCheck = true;
|
||||
|
||||
protected float cumulativeRate = 1.0f;
|
||||
|
||||
protected int total = 0;
|
||||
|
||||
protected boolean isDuplicateCheck() {
|
||||
return duplicateCheck;
|
||||
}
|
||||
|
||||
protected void updateRate(float rate) {
|
||||
cumulativeRate = (rate + cumulativeRate * total) / (total + 1);
|
||||
duplicateCheck = cumulativeRate < DUPLICATE_CHECK_THRESHOLD;
|
||||
|
||||
if (total < DUPLICATE_MEMORY) {
|
||||
total++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,28 +27,19 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import javax.xml.bind.JAXBException;
|
||||
|
||||
import net.sf.cglib.beans.BeanMap;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.hibernate.Query;
|
||||
import org.hibernate.StatelessSession;
|
||||
import org.hibernate.Transaction;
|
||||
|
||||
import com.raytheon.uf.common.dataplugin.PluginDataObject;
|
||||
import com.raytheon.uf.common.dataplugin.PluginException;
|
||||
import com.raytheon.uf.common.dataplugin.persist.DefaultPathProvider;
|
||||
import com.raytheon.uf.common.dataplugin.persist.IPersistable;
|
||||
import com.raytheon.uf.common.dataplugin.persist.PersistableDataObject;
|
||||
import com.raytheon.uf.common.datastorage.DataStoreFactory;
|
||||
import com.raytheon.uf.common.datastorage.IDataStore;
|
||||
import com.raytheon.uf.common.datastorage.IDataStore.StoreOp;
|
||||
|
@ -64,7 +55,6 @@ import com.raytheon.uf.common.pointdata.PointDataDescription;
|
|||
import com.raytheon.uf.common.pointdata.PointDataView;
|
||||
import com.raytheon.uf.common.status.IUFStatusHandler;
|
||||
import com.raytheon.uf.common.status.UFStatus;
|
||||
import com.raytheon.uf.common.status.UFStatus.Priority;
|
||||
import com.raytheon.uf.common.time.DataTime;
|
||||
import com.raytheon.uf.edex.core.dataplugin.PluginRegistry;
|
||||
import com.raytheon.uf.edex.database.plugin.PluginDao;
|
||||
|
@ -79,6 +69,7 @@ import com.raytheon.uf.edex.database.plugin.PluginDao;
|
|||
* ------------ ---------- ----------- --------------------------
|
||||
* Apr 13, 2009 chammack Initial creation
|
||||
* Jan 14, 2013 1469 bkowal Removed the hdf5 data directory.
|
||||
* Apr 15, 2013 1868 bsteffen Rewrite mergeAll in PluginDao.
|
||||
*
|
||||
* </pre>
|
||||
*
|
||||
|
@ -91,43 +82,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
|
|||
private static final transient IUFStatusHandler statusHandler = UFStatus
|
||||
.getHandler(PointDataPluginDao.class);
|
||||
|
||||
// should match batch size in hibernate config
|
||||
protected static final int COMMIT_INTERVAL = 100;
|
||||
|
||||
// after limit failures on one persistAll call, will switch to individual
|
||||
// storage and dup check
|
||||
protected static final int BULK_FAILURE_LIMIT = 3;
|
||||
|
||||
protected static final ConcurrentMap<String, DupCheckStat> pluginBulkSuccessRate = new ConcurrentHashMap<String, DupCheckStat>();
|
||||
|
||||
// percentage of bulk commits that need to succeed for a plugin to not use
|
||||
// dup check
|
||||
protected static final float DUP_CHECK_THRESHOLD = 0.5f;
|
||||
|
||||
protected static class DupCheckStat {
|
||||
protected boolean checkDup = true;
|
||||
|
||||
protected float cumulativeRate = 0;
|
||||
|
||||
protected int total = 0;
|
||||
|
||||
protected boolean checkDup() {
|
||||
return checkDup;
|
||||
}
|
||||
|
||||
protected void updateRate(float rate) {
|
||||
cumulativeRate = (rate + cumulativeRate * total) / (total + 1);
|
||||
checkDup = cumulativeRate < DUP_CHECK_THRESHOLD;
|
||||
|
||||
// handle roll over... just incase, which at this point culmulative
|
||||
// updates are almost pointless, 100 there simply for concurrency
|
||||
// handling
|
||||
if (total < Integer.MAX_VALUE - 100) {
|
||||
total++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static enum LevelRequest {
|
||||
ALL, NONE, SPECIFIC;
|
||||
|
||||
|
@ -161,7 +115,7 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
|
|||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
private final LinkedBlockingQueue<BeanMap> beanMapCache;
|
||||
|
||||
protected PointDataDbDescription dbDataDescription;
|
||||
|
@ -174,169 +128,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
|
|||
this.beanMapCache = new LinkedBlockingQueue<BeanMap>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists all objects in collection
|
||||
*
|
||||
* @param obj
|
||||
* The object to be persisted to the database
|
||||
*/
|
||||
public void persistAll(final List<? extends Object> objList) {
|
||||
StatelessSession ss = null;
|
||||
try {
|
||||
ss = getHibernateTemplate().getSessionFactory()
|
||||
.openStatelessSession();
|
||||
int index = 0;
|
||||
int commitPoint = 0;
|
||||
|
||||
// intelligently choose whether to use bulk storage based on
|
||||
// previous stores for this plugin type
|
||||
DupCheckStat rate = pluginBulkSuccessRate.get(pluginName);
|
||||
if (rate == null) {
|
||||
rate = new DupCheckStat();
|
||||
pluginBulkSuccessRate.put(pluginName, rate);
|
||||
}
|
||||
|
||||
boolean bulkPersist = true;
|
||||
Transaction tx = null;
|
||||
int bulkDups = 0;
|
||||
int bulkFailures = 0;
|
||||
int bulkSuccess = 0;
|
||||
boolean dupCheck = rate.checkDup();
|
||||
boolean dupOccurred = false;
|
||||
Query q = null;
|
||||
|
||||
while (commitPoint < objList.size()) {
|
||||
if (bulkPersist) {
|
||||
Iterator<? extends Object> itr = objList
|
||||
.listIterator(commitPoint);
|
||||
index = commitPoint;
|
||||
dupOccurred = false;
|
||||
|
||||
try {
|
||||
tx = ss.beginTransaction();
|
||||
while (itr.hasNext()) {
|
||||
PersistableDataObject pdo = (PersistableDataObject) itr
|
||||
.next();
|
||||
|
||||
if (dupCheck) {
|
||||
if (q == null) {
|
||||
String sql = "select id from awips."
|
||||
+ ((PluginDataObject) pdo)
|
||||
.getPluginName()
|
||||
+ " where dataURI=:dataURI";
|
||||
q = ss.createSQLQuery(sql);
|
||||
}
|
||||
q.setString("dataURI",
|
||||
(String) pdo.getIdentifier());
|
||||
List<?> list = q.list();
|
||||
if ((list == null) || (list.size() == 0)) {
|
||||
ss.insert(pdo);
|
||||
index++;
|
||||
} else {
|
||||
itr.remove();
|
||||
dupOccurred = true;
|
||||
}
|
||||
} else {
|
||||
ss.insert(pdo);
|
||||
index++;
|
||||
}
|
||||
|
||||
if (index % COMMIT_INTERVAL == 0) {
|
||||
tx.commit();
|
||||
q = null;
|
||||
commitPoint = index;
|
||||
if (dupOccurred) {
|
||||
dupOccurred = false;
|
||||
bulkDups++;
|
||||
} else {
|
||||
bulkSuccess++;
|
||||
}
|
||||
tx = ss.beginTransaction();
|
||||
}
|
||||
}
|
||||
tx.commit();
|
||||
commitPoint = index;
|
||||
bulkSuccess++;
|
||||
} catch (Exception e) {
|
||||
bulkFailures++;
|
||||
statusHandler
|
||||
.handle(Priority.PROBLEM,
|
||||
"Error storing pointdata batch to database, applying dup check and storing batch individually");
|
||||
|
||||
bulkPersist = false;
|
||||
try {
|
||||
tx.rollback();
|
||||
} catch (HibernateException e1) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Rollback failed", e1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// persist records individually, using uri dup check
|
||||
Iterator<? extends Object> itr = objList
|
||||
.listIterator(commitPoint);
|
||||
index = 0;
|
||||
dupOccurred = false;
|
||||
|
||||
// only persist individually through one commit interval
|
||||
while (itr.hasNext() && (index / COMMIT_INTERVAL == 0)) {
|
||||
try {
|
||||
tx = ss.beginTransaction();
|
||||
PersistableDataObject pdo = (PersistableDataObject) itr
|
||||
.next();
|
||||
String sql = "select id from awips."
|
||||
+ ((PluginDataObject) pdo).getPluginName()
|
||||
+ " where dataURI=:dataURI";
|
||||
q = ss.createSQLQuery(sql);
|
||||
q.setString("dataURI", (String) pdo.getIdentifier());
|
||||
List<?> list = q.list();
|
||||
if ((list == null) || (list.size() == 0)) {
|
||||
ss.insert(pdo);
|
||||
tx.commit();
|
||||
index++;
|
||||
} else {
|
||||
tx.commit();
|
||||
itr.remove();
|
||||
dupOccurred = true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
statusHandler
|
||||
.handle(Priority.PROBLEM,
|
||||
"Error storing pointdata individually to database",
|
||||
e);
|
||||
itr.remove();
|
||||
|
||||
try {
|
||||
tx.rollback();
|
||||
} catch (HibernateException e1) {
|
||||
statusHandler.handle(Priority.PROBLEM,
|
||||
"Rollback failed", e1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (dupOccurred) {
|
||||
bulkDups++;
|
||||
} else {
|
||||
bulkSuccess++;
|
||||
}
|
||||
commitPoint += index;
|
||||
if (bulkFailures < BULK_FAILURE_LIMIT) {
|
||||
bulkPersist = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculate bulk success rate
|
||||
float thisRate = bulkSuccess / (bulkSuccess + bulkDups);
|
||||
rate.updateRate(thisRate);
|
||||
} finally {
|
||||
if (ss != null) {
|
||||
ss.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
|
@ -433,18 +224,6 @@ public abstract class PointDataPluginDao<T extends PluginDataObject> extends
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public PluginDataObject[] persistToDatabase(PluginDataObject... records) {
|
||||
List<PersistableDataObject> persist = new ArrayList<PersistableDataObject>(
|
||||
Arrays.asList(records));
|
||||
persistAll(persist);
|
||||
if (persist.size() != records.length) {
|
||||
return persist.toArray(new PluginDataObject[persist.size()]);
|
||||
} else {
|
||||
return records;
|
||||
}
|
||||
}
|
||||
|
||||
public File getFullFilePath(PluginDataObject p) {
|
||||
File file;
|
||||
String directory = p.getPluginName() + File.separator
|
||||
|
|
Loading…
Add table
Reference in a new issue