diff --git a/pythonPackages/pypies/pypies/LockManager.py b/pythonPackages/pypies/pypies/LockManager.py index 26a3c6927c..d49b9bf953 100644 --- a/pythonPackages/pypies/pypies/LockManager.py +++ b/pythonPackages/pypies/pypies/LockManager.py @@ -34,6 +34,7 @@ import fcntl, time, os, logging from pypies import logger +from pypies import timeMap MAX_TIME_TO_WAIT = 120 # seconds @@ -52,6 +53,8 @@ def dirCheck(filename): os.close(fd) def getLock(filename, mode): + t0 = time.time() + dirCheck(filename) gotLock = False startTime = time.time() @@ -82,12 +85,25 @@ def getLock(filename, mode): if logger.isEnabledFor(logging.DEBUG): logger.debug(str(os.getpid()) + " failed to get lock") os.close(fd) + + t1=time.time() + if timeMap.has_key('getLock'): + timeMap['getLock']+=t1-t0 + else: + timeMap['getLock']=t1-t0 + return gotLock, fd def releaseLock(fd): + t0=time.time() fcntl.lockf(fd, fcntl.LOCK_UN) os.close(fd) if logger.isEnabledFor(logging.DEBUG): logger.debug('Released lock on ' + str(fd)) + t1=time.time() + if timeMap.has_key('releaseLock'): + timeMap['releaseLock']+=t1-t0 + else: + timeMap['releaseLock']=t1-t0 \ No newline at end of file diff --git a/pythonPackages/pypies/pypies/MkDirLockManager.py b/pythonPackages/pypies/pypies/MkDirLockManager.py index db5493cb19..206b353883 100644 --- a/pythonPackages/pypies/pypies/MkDirLockManager.py +++ b/pythonPackages/pypies/pypies/MkDirLockManager.py @@ -33,11 +33,12 @@ import time, os, logging from pypies import logger +from pypies import timeMap MAX_TIME_TO_WAIT = 120 # seconds ORPHAN_TIMEOUT = 150 # seconds -MAX_SLEEP_TIME = 0.05 -MIN_SLEEP_TIME = 0.01 +MAX_SLEEP_TIME = 0.025 +MIN_SLEEP_TIME = 0.005 readLockAppend = "_read" writeLockAppend = "_write" @@ -52,6 +53,8 @@ def dirCheck(filename): raise e def getLock(filename, mode): + t0 = time.time() + dirCheck(filename) gotLock, fpath = _getLockInternal(filename, mode) if gotLock: @@ -60,6 +63,13 @@ def getLock(filename, mode): else: if logger.isEnabledFor(logging.DEBUG): logger.debug(str(os.getpid()) + " failed to get lock") + + t1=time.time() + if timeMap.has_key('getLock'): + timeMap['getLock']+=t1-t0 + else: + timeMap['getLock']=t1-t0 + return gotLock, fpath @@ -159,10 +169,18 @@ def _getSleepTime(timeWaiting): sleepTime = MIN_SLEEP_TIME elif sleepTime > MAX_SLEEP_TIME: sleepTime = MAX_SLEEP_TIME + + if timeMap.has_key('approxLockSleepTime'): + timeMap['approxLockSleepTime']+=sleepTime + else: + timeMap['approxLockSleepTime']=sleepTime + return sleepTime def releaseLock(lockPath): + t0=time.time() + if lockPath.endswith('.pid'): # it was a read os.remove(lockPath) @@ -185,6 +203,12 @@ def releaseLock(lockPath): if logger.isEnabledFor(logging.DEBUG): logger.debug('Released lock on ' + str(lockPath)) + t1=time.time() + if timeMap.has_key('releaseLock'): + timeMap['releaseLock']+=t1-t0 + else: + timeMap['releaseLock']=t1-t0 + def _checkForOrphans(filename): if logger.isEnabledFor(logging.DEBUG): logger.debug('Checking for orphan locks on ' + filename) @@ -233,6 +257,11 @@ def _checkForOrphans(filename): # 2 indicates no such directory, assuming another process removed it if e.errno != 2: logger.error('Unable to remove orphaned lock: ' + str(e)) - + + if timeMap.has_key('orphanCheck'): + timeMap['orphanCheck']+=(time.time() - nowTime) + else: + timeMap['orphanCheck']=(time.time() - nowTime) + return orphanRemoved \ No newline at end of file diff --git a/pythonPackages/pypies/pypies/__init__.py b/pythonPackages/pypies/pypies/__init__.py index cd3a59be96..380a0bae2d 100644 --- a/pythonPackages/pypies/pypies/__init__.py +++ b/pythonPackages/pypies/pypies/__init__.py @@ -46,6 +46,7 @@ def getLogger(): return logger logger = getLogger() +timeMap = {} def pypiesWrapper(request): diff --git a/pythonPackages/pypies/pypies/handlers.py b/pythonPackages/pypies/pypies/handlers.py index 97b6ce121f..6acba0b470 100644 --- a/pythonPackages/pypies/pypies/handlers.py +++ b/pythonPackages/pypies/pypies/handlers.py @@ -42,6 +42,7 @@ from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.request import * from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.response import * logger = pypies.logger +timeMap = pypies.timeMap from pypies.impl import H5pyDataStore datastore = H5pyDataStore.H5pyDataStore() @@ -61,8 +62,9 @@ datastoreMap = { @Request.application def pypies_response(request): + timeMap.clear() try: - t0 = time.time() + startTime = time.time() try: obj = dynamicserialize.deserialize(request.data) except: @@ -71,6 +73,7 @@ def pypies_response(request): resp = ErrorResponse() resp.setError(msg) return __prepareResponse(resp) + timeMap['deserialize']=time.time()-startTime clz = obj.__class__ if logger.isEnabledFor(logging.DEBUG): @@ -90,11 +93,14 @@ def pypies_response(request): logger.error(msg) resp = ErrorResponse() resp.setError(msg) - + + startSerialize = time.time() httpResp = __prepareResponse(resp) if success: - t1 = time.time() - logger.info({'request':datastoreMap[clz][1], 'time':t1-t0, 'file':obj.getFilename()}) + endTime = time.time() + timeMap['serialize'] = endTime - startSerialize + timeMap['total'] = endTime - startTime + logger.info({'request':datastoreMap[clz][1], 'time':timeMap, 'file':obj.getFilename()}) #logger.info("pid=" + str(os.getpid()) + " " + datastoreMap[clz][1] + " on " + obj.getFilename() + " processed in " + ('%.3f' % (t1-t0)) + " seconds") return httpResp except: diff --git a/pythonPackages/pypies/pypies/impl/DataStoreFactory.py b/pythonPackages/pypies/pypies/impl/DataStoreFactory.py index 0b0a042dc7..f2a00e2266 100644 --- a/pythonPackages/pypies/pypies/impl/DataStoreFactory.py +++ b/pythonPackages/pypies/pypies/impl/DataStoreFactory.py @@ -32,10 +32,11 @@ # # -import numpy, pypies, logging +import numpy, pypies, logging, time from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage import * from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage.records import * logger = pypies.logger +timeMap = pypies.timeMap typeToClassMap = { numpy.int8: ByteDataRecord, @@ -48,6 +49,8 @@ typeToClassMap = { } def createStorageRecord(rawData, ds): + t0=time.time() + t = typeToClassMap[rawData.dtype.type] inst = t() name = ds.name @@ -98,4 +101,10 @@ def createStorageRecord(rawData, ds): # TODO downscaled? inst.setProps(props) + t1=time.time() + if timeMap.has_key('createRecord'): + timeMap['createRecord']+=t1-t0 + else: + timeMap['createRecord']=t1-t0 + return inst diff --git a/pythonPackages/pypies/pypies/impl/H5pyDataStore.py b/pythonPackages/pypies/pypies/impl/H5pyDataStore.py index afa5e0d05a..10480427a8 100644 --- a/pythonPackages/pypies/pypies/impl/H5pyDataStore.py +++ b/pythonPackages/pypies/pypies/impl/H5pyDataStore.py @@ -46,6 +46,7 @@ from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage.records import from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.response import * logger = pypies.logger +timeMap = pypies.timeMap vlen_str_type = h5py.new_vlen(str) @@ -82,6 +83,7 @@ class H5pyDataStore(IDataStore.IDataStore): exc = [] failRecs = [] ss = None + t0=time.time() for r in recs: try: if r.getProps() and r.getProps().getDownscaled(): @@ -97,14 +99,18 @@ class H5pyDataStore(IDataStore.IDataStore): status.setOperationPerformed(ss['op']) if ss.has_key('index'): status.setIndexOfAppend(ss['index']) - + t1=time.time() + timeMap['store']=t1-t0 resp = StoreResponse() resp.setStatus(status) resp.setExceptions(exc) resp.setFailedRecords(failRecs) return resp finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) @@ -318,7 +324,10 @@ class H5pyDataStore(IDataStore.IDataStore): grp = ds.parent grp.id.unlink(ds.name) finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) return resp @@ -330,8 +339,11 @@ class H5pyDataStore(IDataStore.IDataStore): try: group = request.getGroup() req = request.getRequest() - if req: + if req: + t0=time.time() grp = self.__getGroup(f, group) + t1=time.time() + timeMap['getGroup']=t1-t0 result = [self.__retrieveInternal(grp, request.getDataset(), req)] else: result = self.__retrieve(f, group, request.getIncludeInterpolated()) @@ -339,8 +351,12 @@ class H5pyDataStore(IDataStore.IDataStore): resp.setRecords(result) return resp finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) + def __retrieve(self, f, group, includeInterpolated=False): @@ -427,7 +443,10 @@ class H5pyDataStore(IDataStore.IDataStore): resp.setRecords(recs) return resp finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) def getDatasets(self, request): @@ -439,7 +458,10 @@ class H5pyDataStore(IDataStore.IDataStore): ds = grp.keys() return ds finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) def deleteFiles(self, request): @@ -492,7 +514,10 @@ class H5pyDataStore(IDataStore.IDataStore): resp = StoreResponse() return resp finally: + t0=time.time() f.close() + t1=time.time() + timeMap['closeFile']=t1-t0 LockManager.releaseLock(lock) def __createDatasetInternal(self, group, datasetName, dtype, szDims, @@ -566,10 +591,11 @@ class H5pyDataStore(IDataStore.IDataStore): if gotLock: LockManager.releaseLock(lock) - def __openFile(self, filename, mode='r'): + def __openFile(self, filename, mode='r'): if mode == 'r' and not os.path.exists(filename): raise StorageException('File ' + filename + ' does not exist') gotLock, fd = LockManager.getLock(filename, mode) + t0=time.time() if not gotLock: raise StorageException('Unable to acquire lock on file ' + filename) try: @@ -581,17 +607,21 @@ class H5pyDataStore(IDataStore.IDataStore): logger.error(msg) LockManager.releaseLock(fd) raise e - + + t1=time.time() + timeMap['openFile']=t1-t0 + return f, fd def __getGroup(self, f, name, create=False): + t0=time.time() parts = name.split('/') grp = None for s in parts: if not grp: if not s: s = '/' - if s in f.keys() or s == '/': + if s == '/' or s in f.keys(): grp = f[s] else: if create: @@ -607,7 +637,13 @@ class H5pyDataStore(IDataStore.IDataStore): grp = grp.create_group(s) else: raise StorageException("No group " + name + " found") - + + t1=time.time() + if timeMap.has_key('getGroup'): + timeMap['getGroup']+=t1-t0 + else: + timeMap['getGroup']=t1-t0 + return grp def __link(self, group, linkName, dataset): @@ -649,6 +685,7 @@ class H5pyDataStore(IDataStore.IDataStore): return results def __doRepack(self, filepath, basePath, outDir, compression): + t0=time.time() # call h5repack to repack the file if outDir is None: repackedFullPath = filepath + '.repacked' @@ -673,6 +710,8 @@ class H5pyDataStore(IDataStore.IDataStore): # repack failed, but they wanted the data in a different # directory, so just copy the original data without the repack shutil.copy(filepath, repackedFullPath) + t1=time.time() + timeMap['repack'] return success def __doFileAction(self, filepath, basePath, outputDir, fileAction, response, compression='NONE', timestampCheck=None): diff --git a/pythonPackages/pypies/pypies/impl/HDF5OpManager.py b/pythonPackages/pypies/pypies/impl/HDF5OpManager.py index 90629ade5b..ac5f7a3d41 100644 --- a/pythonPackages/pypies/pypies/impl/HDF5OpManager.py +++ b/pythonPackages/pypies/pypies/impl/HDF5OpManager.py @@ -32,12 +32,14 @@ # # -import numpy, pypies, logging +import numpy, pypies, logging, time import h5py.selections from pypies import StorageException, NotImplementedException logger = pypies.logger +timeMap = pypies.timeMap def read(ds, request): + t0=time.time() rt = request.getType() if logger.isEnabledFor(logging.DEBUG): logger.debug('requestType=' + rt) @@ -100,7 +102,13 @@ def read(ds, request): else: raise NotImplementedException('Only read requests supported are ' + 'ALL, POINT, XLINE, YLINE, and SLAB') - + t1=time.time() + + if timeMap.has_key('read'): + timeMap['read']+=t1-t0 + else: + timeMap['read']=t1-t0 + return result diff --git a/pythonPackages/pypies/pypies/logging/StatsThread.py b/pythonPackages/pypies/pypies/logging/StatsThread.py index deb75bb278..d1b98f1a6a 100644 --- a/pythonPackages/pypies/pypies/logging/StatsThread.py +++ b/pythonPackages/pypies/pypies/logging/StatsThread.py @@ -35,6 +35,20 @@ import threading, time, logging STORE_DIR = '/awips2/edex/data/hdf5/' # TODO this should be a config file STORE_DIR_LEN = len(STORE_DIR) +SECTION_KEYS=['total', + ' deserialize', + ' getLock', + ' approxLockSleepTime', + ' orphanCheck', + ' openFile', + ' getGroup', + ' repack', + ' read', + ' store', + ' createRecord', + ' closeFile', + ' releaseLock', + ' serialize'] class StatsThread(threading.Thread): @@ -77,7 +91,7 @@ class StatsThread(threading.Thread): self.hourStats['lastOutput'] = time.time() - def addRecord(self, rec): + def addRecord(self, rec): with self.lock: self.minuteStats = self.__addNewStat(self.minuteStats, rec) self.hourStats = self.__addNewStat(self.hourStats, rec) @@ -90,23 +104,37 @@ class StatsThread(threading.Thread): plugin = pluginName[0:slashIndex] else: plugin = pluginName - req = rec['request'] - recTime = rec['time'] - + if statDict.has_key(plugin): - pluginEntry = statDict[plugin] + pluginDict = statDict[plugin] else: - pluginEntry = {} - if not pluginEntry.has_key(req): - pluginEntry[req] = {'count':0, 'time':0.0, 'slowest':0.0, 'fastest':9999.0} - requestEntry = pluginEntry[req] - requestEntry['count'] = requestEntry['count'] + 1 - requestEntry['time'] = requestEntry['time'] + recTime - if recTime > requestEntry['slowest']: - requestEntry['slowest'] = recTime - if recTime < requestEntry['fastest']: - requestEntry['fastest'] = recTime - statDict[plugin] = pluginEntry + pluginDict = {} + statDict[plugin]=pluginDict + + req = rec['request'] + + if pluginDict.has_key(req): + reqDict=pluginDict[req] + else: + reqDict={} + pluginDict[req] = reqDict + + recTimes = rec['time'] + + for timeKey in recTimes.keys(): + recTime=recTimes[timeKey] + + if not reqDict.has_key(timeKey): + reqDict[timeKey] = {'count':0, 'time':0.0, 'slowest':0.0, 'fastest':9999.0} + + requestEntry = reqDict[timeKey] + requestEntry['count'] += 1 + requestEntry['time'] += recTime + if recTime > requestEntry['slowest']: + requestEntry['slowest'] = recTime + if recTime < requestEntry['fastest']: + requestEntry['fastest'] = recTime + return statDict @@ -120,34 +148,34 @@ class StatsThread(threading.Thread): if len(statDict): stmt += COL + 'plugin'.ljust(20) stmt += 'request'.ljust(20) + COL + stmt += 'section'.ljust(25) + COL stmt += 'count'.rjust(7) + COL stmt += 'average'.rjust(8) + COL stmt += 'min'.rjust(5) + COL stmt += 'max'.rjust(5) stmt += '\n' - stmt += ('-' * 85) + '\n' + stmt += ('-' * 114) + '\n' pluginNames = statDict.keys() pluginNames.sort() for plugin in pluginNames: - pluginEntry = statDict[plugin] - reqNames = pluginEntry.keys() + pluginDict = statDict[plugin] + reqNames = pluginDict.keys() reqNames.sort() for req in reqNames: - stmt += COL + plugin.ljust(20) - entry = pluginEntry[req] - avg = '%.3f' % (entry['time'] / entry['count']) - fast = '%.3f' % (entry['fastest']) - slow = '%.3f' % (entry['slowest']) - stmt += req.ljust(20) + COL - stmt += str(entry['count']).rjust(7) + COL + avg.rjust(8) + COL - stmt += fast + COL + slow + '\n' + reqDict = pluginDict[req] + for section in SECTION_KEYS: + timeKey = section.strip() + if reqDict.has_key(timeKey): + stmt += COL + plugin.ljust(20) + entry = reqDict[timeKey] + avg = '%.3f' % (entry['time'] / entry['count']) + fast = '%.3f' % (entry['fastest']) + slow = '%.3f' % (entry['slowest']) + stmt += req.ljust(20) + COL + stmt += section.ljust(25) + COL + stmt += str(entry['count']).rjust(7) + COL + avg.rjust(8) + COL + stmt += fast + COL + slow + '\n' stmt += '\n' else: - stmt += COL + 'No transactions reported' + stmt += COL + 'No transactions reported' return stmt - - - - - - diff --git a/pythonPackages/pypies/pypies/logging/logProcess.py b/pythonPackages/pypies/pypies/logging/logProcess.py index f3670e492c..42fbf3539b 100644 --- a/pythonPackages/pypies/pypies/logging/logProcess.py +++ b/pythonPackages/pypies/pypies/logging/logProcess.py @@ -55,7 +55,8 @@ class LogRecordStreamHandler(SocketServer.StreamRequestHandler): import StatsThread statsThread = StatsThread.StatsThread(logCfg) - statsThread.start() + statsThread.start() + SECTION_KEYS = StatsThread.SECTION_KEYS def handle(self): """ @@ -64,24 +65,42 @@ class LogRecordStreamHandler(SocketServer.StreamRequestHandler): according to whatever policy is configured locally. """ while True: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unPickle(chunk) - msg = obj['msg'] - if type(msg) is str: - record = logging.makeLogRecord(obj) - self.handleLogRecord(record) - else: - self.statsThread.addRecord(msg) - if msg['time'] > LOG_THRESHOLD: - obj['msg'] = 'Processed ' + msg['request'] + ' on ' + msg['file'] + ' in ' + ('%.3f' % msg['time']) + ' seconds' + try: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unPickle(chunk) + msg = obj['msg'] + if type(msg) is str: record = logging.makeLogRecord(obj) self.handleLogRecord(record) + else: + self.statsThread.addRecord(msg) + timeDict = msg['time'] + if timeDict['total'] > LOG_THRESHOLD: + #obj['msg'] = 'Processed ' + msg['request'] + ' on ' + msg['file'] + ' in ' + ('%.3f' % msg['time']['total']) + ' seconds' + logMsg = 'Processed ' + msg['request'] + ' on ' + msg['file'] + '. Timing entries in seconds: ' + addComma=False + for SECTION in self.SECTION_KEYS: + timeKey=SECTION.strip() + if timeDict.has_key(timeKey): + if addComma: + logMsg += ',' + else: + addComma = True + logMsg += ' ' + timeKey + ' ' + ('%.3f' % timeDict[timeKey]) + + obj['msg'] = logMsg + record = logging.makeLogRecord(obj) + self.handleLogRecord(record) + except Exception, e: + import sys, traceback, string + t, v, tb = sys.exc_info() + print string.join(traceback.format_exception(t, v, tb)) def unPickle(self, data):