awips2/deltaScripts/archived/unified_grid_rollback/convert_grib_data.py
2017-04-21 18:33:55 -06:00

252 lines
No EOL
9.2 KiB
Python

#!/usr/bin/python
# This script will add register the gridcoverage plugin, which was previosly part of grib
#
# This needs to be performed with build ????
# create_grid_tables.sh must be run before running this script.
from shutil import copytree, move, copy
from subprocess import Popen, PIPE
from thread import start_new_thread, allocate_lock
import sys
from os.path import exists, isdir
from os import mkdir
from time import sleep, time
import h5py
import struct
# this is generally a disk bound process so more threads won't help unless the disk is fast
numThreads = 1
# setting too many records at once causes the exec to psql to fail because of the long arg list
maxRecords = 200
postgresCmd = "psql -U awips -d metadata -t -q -A -c "
hdf5loc = "/awips2/edex/data/hdf5/"
quadrantGrids = ["ENSEMBLE37", "ENSEMBLE38", "ENSEMBLE39", "ENSEMBLE40",
"ECMF1", "ECMF2", "ECMF3", "ECMF4", "ECMF5", "ECMF6", "ECMF7", "ECMF8",
"UKMET40", "UKMET39", "UKMET38", "UKMET37"]
akGrids = ["MOSGuide-AK", "AK-NamDNG5","AK-RTMA", "AKWAVE239", "AKwave10", "AKwave4", "HiResW-NMM-AK", "HiResW-ARW-AK",
"AK-NAM11", "AK-NAM22", "AK-NAM45","AK-NAM95", "AVN203", "MRF203", "GFS160"]
prGrids = ["HiResW-NMM-SJU", "HiResW-ARW-SJU", "PR-NamDNG5", "PR-RTMA", "MRF205", "GFS161", "PR-NAM"]
hiGrids = ["HI-NamDNG5", "HI-RTMA", "HiResW-NMM-HI", "HiResW-ARW-HI", "MRF204", "AVN225", "GFS254", "SREF243"]
guGrids = ["HiResW-NMM-GU", "HiResW-ARW-GU"]
blacklistGrids = {"quadrant grids which have already been converted in an assembled format":quadrantGrids,
"grids over Alaska":akGrids, "grids over Puerto Rico":prGrids,
"grids over Hawaii and the Pacific Region":hiGrids, "grids over Guam":guGrids}
parameters = {}
models = []
models_lock = allocate_lock()
def queryPostgres(sql):
result = Popen(postgresCmd + "\"" + sql + "\"", stdout=PIPE, shell=True)
retVal = []
for line in result.stdout:
retVal.append(line.strip().split("|"))
return retVal
def convertModel(modelName):
hdfTime = 0
totTime = 0
totTime -= time()
print modelName, "Loading existing grid_info"
print modelName, "Querying grib database"
rows = queryPostgres("select grib.forecasttime, grib.reftime, grib.datauri, gridcoverage.id from grib, grib_models, gridcoverage, level where grib.modelinfo_id = grib_models.id and grib_models.location_id = gridcoverage.id and grib_models.level_id = level.id and grib_models.modelName = '%s' order by grib.forecasttime, grib.reftime, level.masterlevel_name" % modelName)
print modelName, "Converting %d records" % len(rows)
gridSql = None
lastFile = None
gribFiles = hdf5loc + "grib/" + modelName + "/"
gridFiles = hdf5loc + "grid/" + modelName + "/"
if not(isdir(hdf5loc + "grib/")):
mkdir(hdf5loc + "grib/")
if not(isdir(gribFiles)):
mkdir(gribFiles)
count = 0;
for row in rows:
gribforecasttime = row[0]
gribreftime = row[1]
gribdatauri = row[2]
gridcoverageid = row[3]
datauriparts = gribdatauri.split("/")
datatime = datauriparts[2]
paramabbrev = datauriparts[4]
masterlevel = datauriparts[5]
levelone = datauriparts[6]
leveltwo = datauriparts[7]
pert = datauriparts[9]
version = datauriparts[10]
secondaryId = "null"
if version != "0":
secondaryId = "Version" + version
ensembleId = convertPert(pert)
newdatauri = "/grid/" + datatime + "/" + modelName + "/" + secondaryId + "/" + ensembleId + "/" + gridcoverageid + "/" + paramabbrev + "/" + masterlevel + "/" + levelone + "/" + leveltwo
hdfTime -= time()
try:
forecast = int(gribforecasttime)/3600
prevgrp = gribdatauri
newgrp = newdatauri
dataset="Data"
if paramabbrev.startswith("static"):
prevgrp = "/"+ gridcoverageid
newgrp = "/"
dataset=paramabbrev
if not(paramabbrev.startswith("static")) or forecast == 0:
filebase = "/%s-%s-FH-%.3d.h5" % (modelName, gribreftime.split(":")[0].replace(" ", "-"), forecast)
hdf5file = gribFiles + masterlevel + filebase
if lastFile != None and lastFile.filename != hdf5file:
#print "Closing", lastFile.filename
lastFile.close()
lastFile = None
if lastFile == None:
if not(exists(hdf5file)):
t0 = time()
if not(isdir(gribFiles + masterlevel)):
mkdir(gribFiles + masterlevel)
move(gridFiles + masterlevel + filebase, gribFiles + masterlevel)
hdfTime -= (time() - t0)
#print "Opening", hdf5file
lastFile = h5py.File(hdf5file)
copyH5(lastFile, newgrp, prevgrp, dataset)
except:
print modelName, "Error", gribdatauri
print sys.exc_info()[1]
hdfTime += time()
continue
hdfTime += time()
count += 1
if count % maxRecords == 0:
print modelName, "Processed %d grid records %d%%" % (maxRecords,100*count/len(rows))
totTime += time()
print modelName, "Time in hdf5 links = %ds" % (hdfTime)
print modelName, "Total process Time = %ds" % (totTime)
def convertPert(pert):
if pert == "1":
return "ctl1"
elif pert == "2":
return "ctl2"
elif pert == "3":
return "n1"
elif pert == "4":
return "p1"
elif pert == "5":
return "n2"
elif pert == "6":
return "p2"
elif pert == "7":
return "n3"
elif pert == "8":
return "p3"
elif pert == "9":
return "n4"
elif pert == "10":
return "p4"
elif pert == "11":
return "n5"
elif pert == "12":
return "p5"
return "null"
def copyH5(h5, gribdatauri, griddatauri, dataset="Data"):
gribgrp = h5['/']
gridgrp = gribgrp
for part in gribdatauri.split('/'):
if part:
gribgrp = gribgrp[part]
for part in griddatauri.split('/'):
if part:
gridgrp = gridgrp.require_group(part)
if not(dataset in gridgrp.keys()):
plists = {'lcpl': gribgrp[dataset]._lcpl, 'lapl': gribgrp[dataset]._lapl}
plists['lcpl'].set_create_intermediate_group(False)
h5py.h5o.link(gribgrp[dataset].id, gridgrp.id, dataset, **plists)
def processModels():
while(True):
models_lock.acquire()
if len(models) == 0:
global numThreads
numThreads -= 1
models_lock.release()
break
model = models.pop()
models_lock.release()
try:
convertModel(model)
except:
print model, "Error model aborted"
print sys.exc_info()[1]
def loadAll():
global models
print "This script will convert grid data in edex to use the old grib format"
print "You provided no arguments so this will convert almost all data."
print "To convert only specific models you can cancel and list models as arguments"
print ""
for row in queryPostgres("select distinct modelname from grib_models"):
models.append(row[0])
print "To save time some grid models will be skipped, these grids will not be"
print "available until the next model run is ingested. If you would like to convert any"
print "of these models simply run the conversion script again with a list of models as arguments."
print ""
bad = []
good = []
for model in models:
if model.startswith("UnknownModel"):
bad.append(model)
else:
good.append(model)
if len(bad) > 0:
print "These Unknown Models will not be converted:",
for model in bad:
print "\"" + model + "\"",
print ""
print ""
models = good
for key in blacklistGrids:
blacklist = blacklistGrids[key]
bad = []
good = []
for model in models:
if model in blacklist:
bad.append(model)
else:
good.append(model)
if len(bad) > 0:
print "These " + key + " will not be converted:",
for model in bad:
print "\"" + model + "\"",
print ""
print ""
models = good
print "To continue converting the data Press Enter or Ctrl-C to cancel."
raw_input()
def check_table(tablename):
rows = queryPostgres("SELECT count(*) FROM information_schema.tables WHERE table_name = '" + tablename + "';")
if(rows[0][0] != "1"):
print tablename, "table does not exist, please create tables"
sys.exit(1)
if __name__ == '__main__':
t = time()
check_table("grib")
if len(sys.argv) == 1:
loadAll()
else:
for i in range(1,len(sys.argv)):
models.append(sys.argv[i])
print "Starting %d threads to process models" % (numThreads)
for i in range(numThreads-1):
start_new_thread(processModels, ())
processModels()
while numThreads > 0:
sleep(5)
print "Total Conversion time %ds" % (time() - t)