2012-10-11 13:28:04 -05:00
#!/usr/bin/python
# This script will add register the gridcoverage plugin, which was previosly part of grib
#
# This needs to be performed with build ????
# create_grid_tables.sh must be run before running this script.
from shutil import copytree , move , copy
from subprocess import Popen , PIPE
from thread import start_new_thread , allocate_lock
import sys
from os . path import exists , isdir
from os import mkdir
from time import sleep , time
import h5py
import struct
# this is generally a disk bound process so more threads won't help unless the disk is fast
numThreads = 1
# setting too many records at once causes the exec to psql to fail because of the long arg list
maxRecords = 200
postgresCmd = " psql -U awips -d metadata -t -q -A -c "
hdf5loc = " /awips2/edex/data/hdf5/ "
quadrantGrids = [ " ENSEMBLE37 " , " ENSEMBLE38 " , " ENSEMBLE39 " , " ENSEMBLE40 " ,
" ECMF1 " , " ECMF2 " , " ECMF3 " , " ECMF4 " , " ECMF5 " , " ECMF6 " , " ECMF7 " , " ECMF8 " ,
" UKMET40 " , " UKMET39 " , " UKMET38 " , " UKMET37 " ]
akGrids = [ " MOSGuide-AK " , " AK-NamDNG5 " , " AK-RTMA " , " AKWAVE239 " , " AKwave10 " , " AKwave4 " , " HiResW-NMM-AK " , " HiResW-ARW-AK " ,
" ETA242 " , " mesoEta217 " , " mesoEta216 " , " ETA207 " , " AVN203 " , " MRF203 " , " GFS160 " ]
prGrids = [ " HiResW-NMM-SJU " , " HiResW-ARW-SJU " , " PR-NamDNG5 " , " PR-RTMA " , " MRF205 " , " GFS161 " , " mesoEta237 " ]
hiGrids = [ " HI-NamDNG5 " , " HI-RTMA " , " HiResW-NMM-HI " , " HiResW-ARW-HI " , " MRF204 " , " AVN225 " , " GFS254 " , " SREF243 " ]
guGrids = [ " HiResW-NMM-GU " , " HiResW-ARW-GU " ]
blacklistGrids = { " quadrant grids which have already been converted in an assembled format " : quadrantGrids ,
" grids over Alaska " : akGrids , " grids over Puerto Rico " : prGrids ,
" grids over Hawaii and the Pacific Region " : hiGrids , " grids over Guam " : guGrids }
parameters = { }
models = [ ]
models_lock = allocate_lock ( )
def queryPostgres ( sql ) :
result = Popen ( postgresCmd + " \" " + sql + " \" " , stdout = PIPE , shell = True )
retVal = [ ]
for line in result . stdout :
retVal . append ( line . strip ( ) . split ( " | " ) )
return retVal
def convertModel ( modelName ) :
hdfTime = 0
totTime = 0
totTime - = time ( )
print modelName , " Loading existing grid_info "
print modelName , " Querying grib database "
2012-10-25 18:03:48 -05:00
rows = queryPostgres ( " select grib.forecasttime, grib.reftime, grib.datauri, gridcoverage.id from grib, grib_models, gridcoverage, level where grib.modelinfo_id = grib_models.id and grib_models.location_id = gridcoverage.id and grib_models.level_id = level.id and grib_models.modelName = ' %s ' order by grib.forecasttime, grib.reftime, level.masterlevel_name " % modelName )
2012-10-11 13:28:04 -05:00
print modelName , " Converting %d records " % len ( rows )
gridSql = None
lastFile = None
2012-10-25 18:03:48 -05:00
gribFiles = hdf5loc + " grib/ " + modelName + " / "
gridFiles = hdf5loc + " grid/ " + modelName + " / "
2012-10-11 13:28:04 -05:00
if not ( isdir ( hdf5loc + " grib/ " ) ) :
mkdir ( hdf5loc + " grib/ " )
if not ( isdir ( gribFiles ) ) :
mkdir ( gribFiles )
count = 0 ;
for row in rows :
gribforecasttime = row [ 0 ]
gribreftime = row [ 1 ]
gribdatauri = row [ 2 ]
gridcoverageid = row [ 3 ]
datauriparts = gribdatauri . split ( " / " )
datatime = datauriparts [ 2 ]
paramabbrev = datauriparts [ 4 ]
masterlevel = datauriparts [ 5 ]
levelone = datauriparts [ 6 ]
leveltwo = datauriparts [ 7 ]
pert = datauriparts [ 9 ]
version = datauriparts [ 10 ]
secondaryId = " null "
if version != " 0 " :
secondaryId = " Version " + version
ensembleId = convertPert ( pert )
newdatauri = " /grid/ " + datatime + " / " + modelName + " / " + secondaryId + " / " + ensembleId + " / " + gridcoverageid + " / " + paramabbrev + " / " + masterlevel + " / " + levelone + " / " + leveltwo
hdfTime - = time ( )
try :
forecast = int ( gribforecasttime ) / 3600
prevgrp = gribdatauri
newgrp = newdatauri
dataset = " Data "
if paramabbrev . startswith ( " static " ) :
2012-10-31 15:35:51 -05:00
prevgrp = " / " + gridcoverageid
newgrp = " / "
2012-10-11 13:28:04 -05:00
dataset = paramabbrev
2012-10-31 15:35:51 -05:00
if not ( paramabbrev . startswith ( " static " ) ) or forecast == 0 :
filebase = " / %s - %s -FH- %.3d .h5 " % ( modelName , gribreftime . split ( " : " ) [ 0 ] . replace ( " " , " - " ) , forecast )
hdf5file = gribFiles + masterlevel + filebase
if lastFile != None and lastFile . filename != hdf5file :
#print "Closing", lastFile.filename
lastFile . close ( )
lastFile = None
if lastFile == None :
if not ( exists ( hdf5file ) ) :
t0 = time ( )
if not ( isdir ( gribFiles + masterlevel ) ) :
mkdir ( gribFiles + masterlevel )
move ( gridFiles + masterlevel + filebase , gribFiles + masterlevel )
hdfTime - = ( time ( ) - t0 )
#print "Opening", hdf5file
lastFile = h5py . File ( hdf5file )
copyH5 ( lastFile , newgrp , prevgrp , dataset )
2012-10-11 13:28:04 -05:00
except :
print modelName , " Error " , gribdatauri
print sys . exc_info ( ) [ 1 ]
hdfTime + = time ( )
continue
hdfTime + = time ( )
count + = 1
if count % maxRecords == 0 :
print modelName , " Processed %d grid records %d %% " % ( maxRecords , 100 * count / len ( rows ) )
totTime + = time ( )
print modelName , " Time in hdf5 links = %d s " % ( hdfTime )
print modelName , " Total process Time = %d s " % ( totTime )
def convertPert ( pert ) :
if pert == " 1 " :
return " ctl1 "
elif pert == " 2 " :
return " ctl2 "
elif pert == " 3 " :
return " n1 "
elif pert == " 4 " :
return " p1 "
elif pert == " 5 " :
return " n2 "
elif pert == " 6 " :
return " p2 "
elif pert == " 7 " :
return " n3 "
elif pert == " 8 " :
return " p3 "
elif pert == " 9 " :
return " n4 "
elif pert == " 10 " :
return " p4 "
elif pert == " 11 " :
return " n5 "
elif pert == " 12 " :
return " p5 "
return " null "
def copyH5 ( h5 , gribdatauri , griddatauri , dataset = " Data " ) :
gribgrp = h5 [ ' / ' ]
gridgrp = gribgrp
for part in gribdatauri . split ( ' / ' ) :
if part :
gribgrp = gribgrp [ part ]
for part in griddatauri . split ( ' / ' ) :
if part :
gridgrp = gridgrp . require_group ( part )
if not ( dataset in gridgrp . keys ( ) ) :
plists = { ' lcpl ' : gribgrp [ dataset ] . _lcpl , ' lapl ' : gribgrp [ dataset ] . _lapl }
plists [ ' lcpl ' ] . set_create_intermediate_group ( False )
h5py . h5o . link ( gribgrp [ dataset ] . id , gridgrp . id , dataset , * * plists )
def processModels ( ) :
while ( True ) :
models_lock . acquire ( )
if len ( models ) == 0 :
global numThreads
numThreads - = 1
models_lock . release ( )
break
model = models . pop ( )
models_lock . release ( )
try :
convertModel ( model )
except :
print model , " Error model aborted "
print sys . exc_info ( ) [ 1 ]
def loadAll ( ) :
global models
print " This script will convert grid data in edex to use the old grib format "
print " You provided no arguments so this will convert almost all data. "
print " To convert only specific models you can cancel and list models as arguments "
print " "
for row in queryPostgres ( " select distinct modelname from grib_models " ) :
models . append ( row [ 0 ] )
print " To save time some grid models will be skipped, these grids will not be "
print " available until the next model run is ingested. If you would like to convert any "
print " of these models simply run the conversion script again with a list of models as arguments. "
print " "
bad = [ ]
good = [ ]
for model in models :
if model . startswith ( " UnknownModel " ) :
bad . append ( model )
else :
good . append ( model )
if len ( bad ) > 0 :
print " These Unknown Models will not be converted: " ,
for model in bad :
print " \" " + model + " \" " ,
print " "
print " "
models = good
for key in blacklistGrids :
blacklist = blacklistGrids [ key ]
bad = [ ]
good = [ ]
for model in models :
if model in blacklist :
bad . append ( model )
else :
good . append ( model )
if len ( bad ) > 0 :
print " These " + key + " will not be converted: " ,
for model in bad :
print " \" " + model + " \" " ,
print " "
print " "
models = good
print " To continue converting the data Press Enter or Ctrl-C to cancel. "
raw_input ( )
def check_table ( tablename ) :
rows = queryPostgres ( " SELECT count(*) FROM information_schema.tables WHERE table_name = ' " + tablename + " ' ; " )
if ( rows [ 0 ] [ 0 ] != " 1 " ) :
print tablename , " table does not exist, please create tables "
sys . exit ( 1 )
if __name__ == ' __main__ ' :
t = time ( )
check_table ( " grib " )
if len ( sys . argv ) == 1 :
loadAll ( )
else :
for i in range ( 1 , len ( sys . argv ) ) :
models . append ( sys . argv [ i ] )
print " Starting %d threads to process models " % ( numThreads )
for i in range ( numThreads - 1 ) :
start_new_thread ( processModels , ( ) )
processModels ( )
while numThreads > 0 :
sleep ( 5 )
print " Total Conversion time %d s " % ( time ( ) - t )