Issue #1230: Update pypies logging

Change-Id: Ic990a5d04971f6374e68d87ad0ceb09991734000

Former-commit-id: b7bc9a0791 [formerly ebbd6dd6ba] [formerly da8796f655] [formerly da8796f655 [formerly 084b44e957]] [formerly 4463098852 [formerly da8796f655 [formerly 084b44e957] [formerly 4463098852 [formerly 2cda7c79a5f8f061a4e284bb7dd0fd4cd04a2245]]]]
Former-commit-id: 4463098852
Former-commit-id: 7b4f79747bfe7fec827f2d3277aade9a3e2aa0c7 [formerly 5c99aa95c38cb2da022dbeb4eb75b74a20cf2fdd] [formerly 8dc2f6c2c4 [formerly a2bcc8bffd]]
Former-commit-id: 8dc2f6c2c4
Former-commit-id: d9b2e3039a
This commit is contained in:
Richard Peter 2012-10-01 12:11:29 -05:00
parent 77613a0731
commit 3bf73ed5fc
9 changed files with 222 additions and 67 deletions

View file

@ -34,6 +34,7 @@
import fcntl, time, os, logging
from pypies import logger
from pypies import timeMap
MAX_TIME_TO_WAIT = 120 # seconds
@ -52,6 +53,8 @@ def dirCheck(filename):
os.close(fd)
def getLock(filename, mode):
t0 = time.time()
dirCheck(filename)
gotLock = False
startTime = time.time()
@ -82,12 +85,25 @@ def getLock(filename, mode):
if logger.isEnabledFor(logging.DEBUG):
logger.debug(str(os.getpid()) + " failed to get lock")
os.close(fd)
t1=time.time()
if timeMap.has_key('getLock'):
timeMap['getLock']+=t1-t0
else:
timeMap['getLock']=t1-t0
return gotLock, fd
def releaseLock(fd):
t0=time.time()
fcntl.lockf(fd, fcntl.LOCK_UN)
os.close(fd)
if logger.isEnabledFor(logging.DEBUG):
logger.debug('Released lock on ' + str(fd))
t1=time.time()
if timeMap.has_key('releaseLock'):
timeMap['releaseLock']+=t1-t0
else:
timeMap['releaseLock']=t1-t0

View file

@ -33,11 +33,12 @@
import time, os, logging
from pypies import logger
from pypies import timeMap
MAX_TIME_TO_WAIT = 120 # seconds
ORPHAN_TIMEOUT = 150 # seconds
MAX_SLEEP_TIME = 0.05
MIN_SLEEP_TIME = 0.01
MAX_SLEEP_TIME = 0.025
MIN_SLEEP_TIME = 0.005
readLockAppend = "_read"
writeLockAppend = "_write"
@ -52,6 +53,8 @@ def dirCheck(filename):
raise e
def getLock(filename, mode):
t0 = time.time()
dirCheck(filename)
gotLock, fpath = _getLockInternal(filename, mode)
if gotLock:
@ -60,6 +63,13 @@ def getLock(filename, mode):
else:
if logger.isEnabledFor(logging.DEBUG):
logger.debug(str(os.getpid()) + " failed to get lock")
t1=time.time()
if timeMap.has_key('getLock'):
timeMap['getLock']+=t1-t0
else:
timeMap['getLock']=t1-t0
return gotLock, fpath
@ -159,10 +169,18 @@ def _getSleepTime(timeWaiting):
sleepTime = MIN_SLEEP_TIME
elif sleepTime > MAX_SLEEP_TIME:
sleepTime = MAX_SLEEP_TIME
if timeMap.has_key('approxLockSleepTime'):
timeMap['approxLockSleepTime']+=sleepTime
else:
timeMap['approxLockSleepTime']=sleepTime
return sleepTime
def releaseLock(lockPath):
t0=time.time()
if lockPath.endswith('.pid'):
# it was a read
os.remove(lockPath)
@ -185,6 +203,12 @@ def releaseLock(lockPath):
if logger.isEnabledFor(logging.DEBUG):
logger.debug('Released lock on ' + str(lockPath))
t1=time.time()
if timeMap.has_key('releaseLock'):
timeMap['releaseLock']+=t1-t0
else:
timeMap['releaseLock']=t1-t0
def _checkForOrphans(filename):
if logger.isEnabledFor(logging.DEBUG):
logger.debug('Checking for orphan locks on ' + filename)
@ -233,6 +257,11 @@ def _checkForOrphans(filename):
# 2 indicates no such directory, assuming another process removed it
if e.errno != 2:
logger.error('Unable to remove orphaned lock: ' + str(e))
if timeMap.has_key('orphanCheck'):
timeMap['orphanCheck']+=(time.time() - nowTime)
else:
timeMap['orphanCheck']=(time.time() - nowTime)
return orphanRemoved

View file

@ -46,6 +46,7 @@ def getLogger():
return logger
logger = getLogger()
timeMap = {}
def pypiesWrapper(request):

View file

@ -42,6 +42,7 @@ from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.request import *
from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.response import *
logger = pypies.logger
timeMap = pypies.timeMap
from pypies.impl import H5pyDataStore
datastore = H5pyDataStore.H5pyDataStore()
@ -61,8 +62,9 @@ datastoreMap = {
@Request.application
def pypies_response(request):
timeMap.clear()
try:
t0 = time.time()
startTime = time.time()
try:
obj = dynamicserialize.deserialize(request.data)
except:
@ -71,6 +73,7 @@ def pypies_response(request):
resp = ErrorResponse()
resp.setError(msg)
return __prepareResponse(resp)
timeMap['deserialize']=time.time()-startTime
clz = obj.__class__
if logger.isEnabledFor(logging.DEBUG):
@ -90,11 +93,14 @@ def pypies_response(request):
logger.error(msg)
resp = ErrorResponse()
resp.setError(msg)
startSerialize = time.time()
httpResp = __prepareResponse(resp)
if success:
t1 = time.time()
logger.info({'request':datastoreMap[clz][1], 'time':t1-t0, 'file':obj.getFilename()})
endTime = time.time()
timeMap['serialize'] = endTime - startSerialize
timeMap['total'] = endTime - startTime
logger.info({'request':datastoreMap[clz][1], 'time':timeMap, 'file':obj.getFilename()})
#logger.info("pid=" + str(os.getpid()) + " " + datastoreMap[clz][1] + " on " + obj.getFilename() + " processed in " + ('%.3f' % (t1-t0)) + " seconds")
return httpResp
except:

View file

@ -32,10 +32,11 @@
#
#
import numpy, pypies, logging
import numpy, pypies, logging, time
from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage import *
from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage.records import *
logger = pypies.logger
timeMap = pypies.timeMap
typeToClassMap = {
numpy.int8: ByteDataRecord,
@ -48,6 +49,8 @@ typeToClassMap = {
}
def createStorageRecord(rawData, ds):
t0=time.time()
t = typeToClassMap[rawData.dtype.type]
inst = t()
name = ds.name
@ -98,4 +101,10 @@ def createStorageRecord(rawData, ds):
# TODO downscaled?
inst.setProps(props)
t1=time.time()
if timeMap.has_key('createRecord'):
timeMap['createRecord']+=t1-t0
else:
timeMap['createRecord']=t1-t0
return inst

View file

@ -46,6 +46,7 @@ from dynamicserialize.dstypes.com.raytheon.uf.common.datastorage.records import
from dynamicserialize.dstypes.com.raytheon.uf.common.pypies.response import *
logger = pypies.logger
timeMap = pypies.timeMap
vlen_str_type = h5py.new_vlen(str)
@ -82,6 +83,7 @@ class H5pyDataStore(IDataStore.IDataStore):
exc = []
failRecs = []
ss = None
t0=time.time()
for r in recs:
try:
if r.getProps() and r.getProps().getDownscaled():
@ -97,14 +99,18 @@ class H5pyDataStore(IDataStore.IDataStore):
status.setOperationPerformed(ss['op'])
if ss.has_key('index'):
status.setIndexOfAppend(ss['index'])
t1=time.time()
timeMap['store']=t1-t0
resp = StoreResponse()
resp.setStatus(status)
resp.setExceptions(exc)
resp.setFailedRecords(failRecs)
return resp
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
@ -318,7 +324,10 @@ class H5pyDataStore(IDataStore.IDataStore):
grp = ds.parent
grp.id.unlink(ds.name)
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
return resp
@ -330,8 +339,11 @@ class H5pyDataStore(IDataStore.IDataStore):
try:
group = request.getGroup()
req = request.getRequest()
if req:
if req:
t0=time.time()
grp = self.__getGroup(f, group)
t1=time.time()
timeMap['getGroup']=t1-t0
result = [self.__retrieveInternal(grp, request.getDataset(), req)]
else:
result = self.__retrieve(f, group, request.getIncludeInterpolated())
@ -339,8 +351,12 @@ class H5pyDataStore(IDataStore.IDataStore):
resp.setRecords(result)
return resp
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def __retrieve(self, f, group, includeInterpolated=False):
@ -427,7 +443,10 @@ class H5pyDataStore(IDataStore.IDataStore):
resp.setRecords(recs)
return resp
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def getDatasets(self, request):
@ -439,7 +458,10 @@ class H5pyDataStore(IDataStore.IDataStore):
ds = grp.keys()
return ds
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def deleteFiles(self, request):
@ -492,7 +514,10 @@ class H5pyDataStore(IDataStore.IDataStore):
resp = StoreResponse()
return resp
finally:
t0=time.time()
f.close()
t1=time.time()
timeMap['closeFile']=t1-t0
LockManager.releaseLock(lock)
def __createDatasetInternal(self, group, datasetName, dtype, szDims,
@ -566,10 +591,11 @@ class H5pyDataStore(IDataStore.IDataStore):
if gotLock:
LockManager.releaseLock(lock)
def __openFile(self, filename, mode='r'):
def __openFile(self, filename, mode='r'):
if mode == 'r' and not os.path.exists(filename):
raise StorageException('File ' + filename + ' does not exist')
gotLock, fd = LockManager.getLock(filename, mode)
t0=time.time()
if not gotLock:
raise StorageException('Unable to acquire lock on file ' + filename)
try:
@ -581,17 +607,21 @@ class H5pyDataStore(IDataStore.IDataStore):
logger.error(msg)
LockManager.releaseLock(fd)
raise e
t1=time.time()
timeMap['openFile']=t1-t0
return f, fd
def __getGroup(self, f, name, create=False):
t0=time.time()
parts = name.split('/')
grp = None
for s in parts:
if not grp:
if not s:
s = '/'
if s in f.keys() or s == '/':
if s == '/' or s in f.keys():
grp = f[s]
else:
if create:
@ -607,7 +637,13 @@ class H5pyDataStore(IDataStore.IDataStore):
grp = grp.create_group(s)
else:
raise StorageException("No group " + name + " found")
t1=time.time()
if timeMap.has_key('getGroup'):
timeMap['getGroup']+=t1-t0
else:
timeMap['getGroup']=t1-t0
return grp
def __link(self, group, linkName, dataset):
@ -649,6 +685,7 @@ class H5pyDataStore(IDataStore.IDataStore):
return results
def __doRepack(self, filepath, basePath, outDir, compression):
t0=time.time()
# call h5repack to repack the file
if outDir is None:
repackedFullPath = filepath + '.repacked'
@ -673,6 +710,8 @@ class H5pyDataStore(IDataStore.IDataStore):
# repack failed, but they wanted the data in a different
# directory, so just copy the original data without the repack
shutil.copy(filepath, repackedFullPath)
t1=time.time()
timeMap['repack']
return success
def __doFileAction(self, filepath, basePath, outputDir, fileAction, response, compression='NONE', timestampCheck=None):

View file

@ -32,12 +32,14 @@
#
#
import numpy, pypies, logging
import numpy, pypies, logging, time
import h5py.selections
from pypies import StorageException, NotImplementedException
logger = pypies.logger
timeMap = pypies.timeMap
def read(ds, request):
t0=time.time()
rt = request.getType()
if logger.isEnabledFor(logging.DEBUG):
logger.debug('requestType=' + rt)
@ -100,7 +102,13 @@ def read(ds, request):
else:
raise NotImplementedException('Only read requests supported are ' +
'ALL, POINT, XLINE, YLINE, and SLAB')
t1=time.time()
if timeMap.has_key('read'):
timeMap['read']+=t1-t0
else:
timeMap['read']=t1-t0
return result

View file

@ -35,6 +35,20 @@
import threading, time, logging
STORE_DIR = '/awips2/edex/data/hdf5/' # TODO this should be a config file
STORE_DIR_LEN = len(STORE_DIR)
SECTION_KEYS=['total',
' deserialize',
' getLock',
' approxLockSleepTime',
' orphanCheck',
' openFile',
' getGroup',
' repack',
' read',
' store',
' createRecord',
' closeFile',
' releaseLock',
' serialize']
class StatsThread(threading.Thread):
@ -77,7 +91,7 @@ class StatsThread(threading.Thread):
self.hourStats['lastOutput'] = time.time()
def addRecord(self, rec):
def addRecord(self, rec):
with self.lock:
self.minuteStats = self.__addNewStat(self.minuteStats, rec)
self.hourStats = self.__addNewStat(self.hourStats, rec)
@ -90,23 +104,37 @@ class StatsThread(threading.Thread):
plugin = pluginName[0:slashIndex]
else:
plugin = pluginName
req = rec['request']
recTime = rec['time']
if statDict.has_key(plugin):
pluginEntry = statDict[plugin]
pluginDict = statDict[plugin]
else:
pluginEntry = {}
if not pluginEntry.has_key(req):
pluginEntry[req] = {'count':0, 'time':0.0, 'slowest':0.0, 'fastest':9999.0}
requestEntry = pluginEntry[req]
requestEntry['count'] = requestEntry['count'] + 1
requestEntry['time'] = requestEntry['time'] + recTime
if recTime > requestEntry['slowest']:
requestEntry['slowest'] = recTime
if recTime < requestEntry['fastest']:
requestEntry['fastest'] = recTime
statDict[plugin] = pluginEntry
pluginDict = {}
statDict[plugin]=pluginDict
req = rec['request']
if pluginDict.has_key(req):
reqDict=pluginDict[req]
else:
reqDict={}
pluginDict[req] = reqDict
recTimes = rec['time']
for timeKey in recTimes.keys():
recTime=recTimes[timeKey]
if not reqDict.has_key(timeKey):
reqDict[timeKey] = {'count':0, 'time':0.0, 'slowest':0.0, 'fastest':9999.0}
requestEntry = reqDict[timeKey]
requestEntry['count'] += 1
requestEntry['time'] += recTime
if recTime > requestEntry['slowest']:
requestEntry['slowest'] = recTime
if recTime < requestEntry['fastest']:
requestEntry['fastest'] = recTime
return statDict
@ -120,34 +148,34 @@ class StatsThread(threading.Thread):
if len(statDict):
stmt += COL + 'plugin'.ljust(20)
stmt += 'request'.ljust(20) + COL
stmt += 'section'.ljust(25) + COL
stmt += 'count'.rjust(7) + COL
stmt += 'average'.rjust(8) + COL
stmt += 'min'.rjust(5) + COL
stmt += 'max'.rjust(5)
stmt += '\n'
stmt += ('-' * 85) + '\n'
stmt += ('-' * 114) + '\n'
pluginNames = statDict.keys()
pluginNames.sort()
for plugin in pluginNames:
pluginEntry = statDict[plugin]
reqNames = pluginEntry.keys()
pluginDict = statDict[plugin]
reqNames = pluginDict.keys()
reqNames.sort()
for req in reqNames:
stmt += COL + plugin.ljust(20)
entry = pluginEntry[req]
avg = '%.3f' % (entry['time'] / entry['count'])
fast = '%.3f' % (entry['fastest'])
slow = '%.3f' % (entry['slowest'])
stmt += req.ljust(20) + COL
stmt += str(entry['count']).rjust(7) + COL + avg.rjust(8) + COL
stmt += fast + COL + slow + '\n'
reqDict = pluginDict[req]
for section in SECTION_KEYS:
timeKey = section.strip()
if reqDict.has_key(timeKey):
stmt += COL + plugin.ljust(20)
entry = reqDict[timeKey]
avg = '%.3f' % (entry['time'] / entry['count'])
fast = '%.3f' % (entry['fastest'])
slow = '%.3f' % (entry['slowest'])
stmt += req.ljust(20) + COL
stmt += section.ljust(25) + COL
stmt += str(entry['count']).rjust(7) + COL + avg.rjust(8) + COL
stmt += fast + COL + slow + '\n'
stmt += '\n'
else:
stmt += COL + 'No transactions reported'
stmt += COL + 'No transactions reported'
return stmt

View file

@ -55,7 +55,8 @@ class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
import StatsThread
statsThread = StatsThread.StatsThread(logCfg)
statsThread.start()
statsThread.start()
SECTION_KEYS = StatsThread.SECTION_KEYS
def handle(self):
"""
@ -64,24 +65,42 @@ class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
according to whatever policy is configured locally.
"""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
msg = obj['msg']
if type(msg) is str:
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
else:
self.statsThread.addRecord(msg)
if msg['time'] > LOG_THRESHOLD:
obj['msg'] = 'Processed ' + msg['request'] + ' on ' + msg['file'] + ' in ' + ('%.3f' % msg['time']) + ' seconds'
try:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
msg = obj['msg']
if type(msg) is str:
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
else:
self.statsThread.addRecord(msg)
timeDict = msg['time']
if timeDict['total'] > LOG_THRESHOLD:
#obj['msg'] = 'Processed ' + msg['request'] + ' on ' + msg['file'] + ' in ' + ('%.3f' % msg['time']['total']) + ' seconds'
logMsg = 'Processed ' + msg['request'] + ' on ' + msg['file'] + '. Timing entries in seconds: '
addComma=False
for SECTION in self.SECTION_KEYS:
timeKey=SECTION.strip()
if timeDict.has_key(timeKey):
if addComma:
logMsg += ','
else:
addComma = True
logMsg += ' ' + timeKey + ' ' + ('%.3f' % timeDict[timeKey])
obj['msg'] = logMsg
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
except Exception, e:
import sys, traceback, string
t, v, tb = sys.exc_info()
print string.join(traceback.format_exception(t, v, tb))
def unPickle(self, data):