Issue #2009: Implement dual-domain GFE export_grids solution. Update MHS

emulator program to support new directory structure and some code cleanup and
refactoring.

Change-Id: Ib3cf5c0123cd87497517d4221edea5883db9157d

Former-commit-id: 2c4acfec0b [formerly 4702fd8a72 [formerly ae75638063] [formerly 2c4acfec0b [formerly baaefbd38040c69ba42feaa8b00e8ca4d69f4ba1]]]
Former-commit-id: 4702fd8a72 [formerly ae75638063]
Former-commit-id: 4702fd8a72
Former-commit-id: e3f47a588c
This commit is contained in:
David Gillingham 2013-07-12 13:49:42 -05:00
parent 770ebf3ce3
commit 7a27dc314e
8 changed files with 400 additions and 208 deletions

2
MHSEmulator/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
out/
*.jar

Binary file not shown.

Binary file not shown.

View file

@ -5,10 +5,30 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Library module for MHS emulator.
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* ??? ?? ???? bphillip Initial creation
* Jul 15, 2013 #2099 dgilling Use safer exception handling for file I/O.
*
* </pre>
*
* @author bphillip
* @version 1.0
*/
public class MhsUtil {
public static final SimpleDateFormat logDateFormat = new SimpleDateFormat(
@ -19,32 +39,51 @@ public class MhsUtil {
public static final String END_TOKEN = "------!!!!END!!!!------";
public static final File MY_MHS_FILE = new File(
"/awips2/.myMHS");
public static final File MY_MHS_FILE = new File("/awips2/.myMHS");
public static final File MSG_ID_FILE = new File(
"/awips2/.msgCount");
public static final File MSG_ID_FILE = new File("/awips2/.msgCount");
public static String getMsgId() throws Exception {
if (!MSG_ID_FILE.exists()) {
MSG_ID_FILE.createNewFile();
BufferedWriter out = new BufferedWriter(new FileWriter(MSG_ID_FILE));
out.write("0");
out.close();
private MhsUtil() {
throw new AssertionError();
}
public static String getMsgId() throws IOException {
if (MSG_ID_FILE.createNewFile()) {
BufferedWriter out = null;
try {
out = new BufferedWriter(new FileWriter(MSG_ID_FILE));
out.write("0");
} finally {
if (out != null) {
out.close();
}
}
}
BufferedReader in = null;
in = new BufferedReader(new FileReader(MSG_ID_FILE));
String msgId = in.readLine().trim();
int newMsgNumber = Integer.parseInt(msgId) + 1;
in.close();
BufferedWriter out = new BufferedWriter(new FileWriter(MSG_ID_FILE));
out.write(String.valueOf(newMsgNumber));
out.close();
for (int i = msgId.length(); i < 6; i++) {
msgId = "0" + msgId;
int newMsgNumber;
try {
in = new BufferedReader(new FileReader(MSG_ID_FILE));
String msgId = in.readLine().trim();
newMsgNumber = Integer.parseInt(msgId) + 1;
} finally {
if (in != null) {
in.close();
}
}
return msgId;
BufferedWriter out = null;
try {
out = new BufferedWriter(new FileWriter(MSG_ID_FILE));
out.write(String.valueOf(newMsgNumber));
} finally {
if (out != null) {
out.close();
}
}
NumberFormat formatter = new DecimalFormat("000000");
return formatter.format(newMsgNumber);
}
public static int byteArrayToInt(byte[] b, int offset) {
@ -73,26 +112,26 @@ public class MhsUtil {
logFile = new File(logDir
+ InetAddress.getLocalHost().getCanonicalHostName() + "-"
+ mode + "-" + MhsUtil.logDateFormat.format(new Date()));
logFile.createNewFile();
if (logFile != null) {
if (!logFile.exists()) {
logFile.createNewFile();
}
}
message += MhsUtil.logMsgFormat.format(new Date());
for (Object obj : msg) {
message += obj.toString() + " ";
}
message += "\n";
BufferedWriter out = new BufferedWriter(new FileWriter(logFile,
true));
out.write(message.trim());
out.write("\n");
out.close();
BufferedWriter out = null;
try {
out = new BufferedWriter(new FileWriter(logFile, true));
out.write(message.trim());
out.write("\n");
} finally {
if (out != null) {
out.close();
}
}
} catch (Exception e) {
// ignore
e.printStackTrace();
}
}

View file

@ -1,59 +1,68 @@
package mhs.core;
import java.io.File;
import java.io.FileFilter;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
public class RsyncThread extends Thread {
/**
* TODO Add Description
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* ??? ?? ???? bphillip Initial creation
* Jul 15, 2013 #2099 dgilling Modify to support recursive file listing
* since export grids dir structure uses
* multiple folders.
*
* </pre>
*
* @author bphillip
* @version 1.0
*/
public class RsyncThread implements Runnable {
private static final Map<String, Long> fileVersion = new HashMap<String, Long>();
private Properties props;
private Map<String, Long> fileVersion;
public RsyncThread(Properties props) {
this.props = props;
fileVersion = new HashMap<String, Long>();
this.setDaemon(true);
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
String exportGridsDir = props.getProperty("EXPORT_GRIDS");
String centralServerDir = props.getProperty("CENTRAL_SERVER");
String packScriptDir = props.getProperty("UTIL_DIR");
String exportGridsDir = (String) props.getProperty("EXPORT_GRIDS");
String centralServerDir = (String) props
.getProperty("CENTRAL_SERVER");
File[] fileList = new File(exportGridsDir).listFiles();
Collection<File> fileList = listCdfFiles(new File(exportGridsDir));
for (File file : fileList) {
if (file.isFile()) {
String currentFilePath = file.getPath();
String currentFilePath = null;
for (File file : fileList) {
if (file.isDirectory()) {
continue;
boolean copy = true;
if ((fileVersion.containsKey(currentFilePath))
&& (fileVersion.get(currentFilePath) >= file
.lastModified())) {
copy = false;
}
currentFilePath = file.getPath();
boolean copy = false;
if (fileVersion.containsKey(currentFilePath)) {
if (fileVersion.get(currentFilePath).longValue() != file
.lastModified()) {
copy = true;
}
} else {
copy = true;
}
if (copy) {
String[] copyCmd = new String[] {
centralServerDir + "/../util/packageFile",file.getPath(),
centralServerDir + "/../util/",
file.getName().substring(0, 3), centralServerDir };
packScriptDir + "/packageFile", currentFilePath,
packScriptDir, file.getName().substring(0, 3),
centralServerDir };
try {
Runtime.getRuntime().exec(copyCmd);
fileVersion.put(file.getPath(), file.lastModified());
fileVersion.put(currentFilePath, file.lastModified());
} catch (Exception e) {
e.printStackTrace();
}
@ -61,4 +70,34 @@ public class RsyncThread extends Thread {
}
}
}
private Collection<File> listCdfFiles(File path) {
Collection<File> fileList = new LinkedList<File>();
FileFilter cdfFilter = new FileFilter() {
@Override
public boolean accept(File pathname) {
return (pathname.isDirectory() || pathname.getName().endsWith(
".netcdf"));
}
};
innerListFiles(path, fileList, cdfFilter);
return fileList;
}
private void innerListFiles(File path, Collection<File> fileList,
FileFilter filter) {
try {
File[] matchingFiles = path.listFiles(filter);
for (File file : matchingFiles) {
if (file.isDirectory()) {
innerListFiles(file, fileList, filter);
} else if (file.isFile()) {
fileList.add(file);
}
}
} catch (SecurityException e) {
e.printStackTrace();
}
}
}

View file

@ -1,5 +1,6 @@
package mhs.core;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
@ -7,27 +8,63 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* TODO Add Description
*
* <pre>
*
* SOFTWARE HISTORY
*
* Date Ticket# Engineer Description
* ------------ ---------- ----------- --------------------------
* ??? ?? ???? bphillip Initial creation
* Jul 15, 2013 #2099 dgilling Code cleanup.
*
* </pre>
*
* @author bphillip
* @version 1.0
*/
public class SocketSrv {
private File propertiesFile;
private boolean runRsync;
private String myMHS;
private Properties serverProps;
private ExecutorService mhsRequestHandler;
private ScheduledExecutorService rsyncThread;
private int fileIndex = 0;
private String fileBase;
private Properties serverProps;
private String configDir;
private String centralServerDir;
@ -38,131 +75,188 @@ public class SocketSrv {
private Map<Integer, String> commandMap;
private RsyncThread rsync;
private String propertiesFile;
private String myMHS;
private int serverPort;
public static void main(String[] args) {
if (!System.getProperty("user.name").equals("root")) {
System.out
.println("Socket Server must be run as root user! Current user: "
+ System.getProperty("user.name"));
System.exit(1);
}
File propertiesFile = new File(args[0]);
String mhsId = args[1];
boolean startRsync = Boolean.parseBoolean(args[2]);
if (!propertiesFile.isFile()) {
System.out.println("Specified properties file ["
+ propertiesFile.toString() + "] does not exist. Exiting.");
System.exit(1);
}
try {
SocketSrv srv = null;
srv = new SocketSrv(args[0], args[1], args[2]);
srv.run();
final SocketSrv server = new SocketSrv(propertiesFile, mhsId,
startRsync);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.shutdown();
}
});
server.run();
} catch (Exception e) {
e.printStackTrace();
}
}
private SocketSrv(String propertiesFile, String myMHS, String startRsync)
throws Exception {
this.propertiesFile = propertiesFile;
this.myMHS = myMHS;
writeMyMHS(myMHS);
private SocketSrv(File propertiesFile, String mhsId, boolean startRsync)
throws UnknownHostException, IOException {
System.out.println("Setting up server ("
+ InetAddress.getLocalHost().getCanonicalHostName() + ")");
commandMap = new HashMap<Integer, String>();
configDir = propertiesFile.substring(0,
propertiesFile.lastIndexOf(File.separator) + 1);
loadProperties(true);
if (startRsync.equals("true")) {
System.out.println("Starting Rsync Thread...");
rsync = new RsyncThread(serverProps);
rsync.start();
System.out.println("Rsync Thread started!");
}
}
this.propertiesFile = propertiesFile;
this.myMHS = mhsId;
this.runRsync = startRsync;
private void writeMyMHS(String myMHS) throws Exception {
BufferedWriter out = new BufferedWriter(new FileWriter(
MhsUtil.MY_MHS_FILE));
out.write(myMHS + "\n");
out.write(this.propertiesFile);
out.close();
}
writeMyMHS(myMHS);
private void loadProperties(boolean print) throws Exception {
serverProps = new Properties();
FileInputStream fis = new FileInputStream(propertiesFile);
serverProps.load(fis);
fis.close();
fileBase = serverProps.getProperty("DATA_FOLDER");
centralServerDir = serverProps.getProperty("CENTRAL_SERVER");
binDir = serverProps.getProperty("UTIL_DIR");
this.commandMap = new HashMap<Integer, String>();
this.configDir = this.propertiesFile.getParent();
loadProperties();
loadRcvHandlerTable();
if (print) {
System.out.println(" Received Data directory: " + fileBase);
System.out.println("Central Server Data Directory: "
+ centralServerDir);
System.out.println(" Config directory: " + configDir);
this.fileBase = serverProps.getProperty("DATA_FOLDER");
this.centralServerDir = serverProps.getProperty("CENTRAL_SERVER");
this.binDir = serverProps.getProperty("UTIL_DIR");
this.serverPort = Integer.parseInt(serverProps
.getProperty("SERVER_PORT"));
System.out.println("\tReceived Data directory: " + fileBase);
System.out.println("\tCentral Server Data Directory: "
+ centralServerDir);
System.out.println("\tConfig directory: " + configDir);
this.mhsRequestHandler = Executors.newSingleThreadExecutor();
if (this.runRsync) {
System.out.println("Starting Rsync Thread...");
this.rsyncThread = Executors.newSingleThreadScheduledExecutor();
}
}
public void run() throws Exception {
public void run() throws IOException {
Runnable rsyncJob = new RsyncThread(serverProps);
rsyncThread.scheduleWithFixedDelay(rsyncJob, 1, 1, TimeUnit.SECONDS);
int port = Integer.parseInt((String) serverProps.get("SERVER_PORT"));
ServerSocket srv = new ServerSocket(port);
ServerSocket socket = new ServerSocket(serverPort);
while (!mhsRequestHandler.isShutdown()) {
try {
log("Waiting for connections...");
final Socket conn = socket.accept();
Runnable processTask = new Runnable() {
while (true) {
log("Waiting for connections...");
Socket socket = srv.accept();
InetSocketAddress client = (InetSocketAddress) socket
.getRemoteSocketAddress();
log("Connected to client: " + client.getHostName() + " at "
+ client);
loadProperties(false);
String sender = getMhsOfSender(client);
log("Message is from: " + sender);
InputStream in = socket.getInputStream();
byte[] message = null;
Map<String, String> params = new HashMap<String, String>();
String flag = "";
while (true) {
if (in.available() == 0) {
Thread.sleep(100);
continue;
}
message = getMessage(in);
if (message.length < 50) {
String strMessage = new String(message);
if (strMessage.equals(MhsUtil.END_TOKEN)) {
log("Disconnected from client: " + client);
if (params.containsKey("-c")) {
executeAction(sender, params);
files.clear();
params.clear();
flag = "";
@Override
public void run() {
try {
handleRequest(conn);
} catch (Exception e) {
e.printStackTrace();
}
break;
}
if (strMessage.startsWith("-")) {
flag = strMessage;
} else {
params.put(flag, strMessage);
}
} else {
log("File Received of size: " + message.length);
files.add(writeToFile(myMHS + "-" + params.get("-MSGID"),
message));
};
mhsRequestHandler.execute(processTask);
} catch (RejectedExecutionException e) {
if (!mhsRequestHandler.isShutdown()) {
e.printStackTrace();
}
}
}
}
public void shutdown() {
mhsRequestHandler.shutdown();
if (rsyncThread != null) {
rsyncThread.shutdown();
}
}
private void handleRequest(Socket connection) throws IOException,
InterruptedException {
InetSocketAddress client = (InetSocketAddress) connection
.getRemoteSocketAddress();
log("Connected to client: " + client.getHostName() + " at " + client);
loadProperties();
String sender = getMhsOfSender(client);
log("Message is from: " + sender);
InputStream in = connection.getInputStream();
byte[] message = null;
Map<String, String> params = new HashMap<String, String>();
String flag = "";
while (true) {
if (in.available() == 0) {
Thread.sleep(100);
continue;
}
message = getMessage(in);
if (message.length < 50) {
String strMessage = new String(message);
if (strMessage.equals(MhsUtil.END_TOKEN)) {
log("Disconnected from client: " + client);
if (params.containsKey("-c")) {
executeAction(sender, params);
files.clear();
params.clear();
flag = "";
}
break;
}
if (strMessage.startsWith("-")) {
flag = strMessage;
} else {
params.put(flag, strMessage);
}
} else {
log("File Received of size: " + message.length);
files.add(writeToFile(myMHS + "-" + params.get("-MSGID"),
message));
}
}
}
private void writeMyMHS(String myMHS) throws IOException {
BufferedWriter out = null;
try {
out = new BufferedWriter(new FileWriter(MhsUtil.MY_MHS_FILE));
out.write(myMHS + "\n");
out.write(propertiesFile.getPath());
} finally {
if (out != null) {
out.close();
}
}
}
private void loadProperties() throws IOException {
FileInputStream fis = null;
try {
fis = new FileInputStream(propertiesFile);
Properties newProps = new Properties();
newProps.load(fis);
serverProps = newProps;
} finally {
if (fis != null) {
fis.close();
}
}
}
private void executeAction(String sender, Map<String, String> params)
throws Exception {
throws IOException, InterruptedException {
int action = Integer.parseInt(params.get("-c"));
if (!commandMap.containsKey(action)) {
@ -200,31 +294,19 @@ public class SocketSrv {
command = command.replace("%SENDER", sender.toLowerCase());
String[] cmdArray = command.split(" ");
log("Executing: " + command);
//
// Map<String, String> sysEnv = System.getenv();
// Map<String, String> newEnv = new HashMap<String, String>();
// for (String key : sysEnv.keySet()) {
// newEnv.put(key, sysEnv.get(key));
// }
// newEnv.put("PATH", "/awips2/python/bin/:"+sysEnv.get("PATH"));
// newEnv.put("LD_PRELOAD", "/awips2/python/lib/libpython2.7.so");
// newEnv.put("LD_LIBRARY_PATH", "/awips2/python/lib");
// String[] envp = new String[newEnv.keySet().size()];
// int i = 0;
// for (String key : newEnv.keySet()) {
// envp[i] = key.trim() + "=" + newEnv.get(key).trim();
// i++;
// }
Process p = null;
try {
p = Runtime.getRuntime().exec(cmdArray);
p.waitFor();
} finally {
p.destroy();
if (p != null) {
p.destroy();
}
}
}
private byte[] getMessage(InputStream in) throws Exception {
private byte[] getMessage(InputStream in) throws IOException {
byte[] sizeBytes = new byte[4];
readBytes(in, sizeBytes);
int expectedSize = MhsUtil.byteArrayToInt(sizeBytes, 0);
@ -233,7 +315,7 @@ public class SocketSrv {
return message;
}
private void readBytes(InputStream in, byte[] bytes) throws Exception {
private void readBytes(InputStream in, byte[] bytes) throws IOException {
int expectedSize = bytes.length;
int bytesRead = 0;
int totalBytesRead = 0;
@ -245,13 +327,20 @@ public class SocketSrv {
}
private String writeToFile(String fileName, byte[] contents)
throws Exception {
throws IOException, InterruptedException {
String fileFQN = fileBase + fileName + "." + getFileIndex();
log("Writing file: " + fileFQN);
FileOutputStream fos = new FileOutputStream(new File(fileFQN));
fos.write(contents);
fos.flush();
fos.close();
BufferedOutputStream out = null;
try {
out = new BufferedOutputStream(new FileOutputStream(new File(
fileFQN)));
out.write(contents);
} finally {
if (out != null) {
out.close();
}
}
Process p = null;
try {
@ -259,7 +348,9 @@ public class SocketSrv {
new String[] { "/bin/chmod", "777", fileFQN });
p.waitFor();
} finally {
p.destroy();
if (p != null) {
p.destroy();
}
}
return fileFQN;
}
@ -269,40 +360,34 @@ public class SocketSrv {
if (fileIndex == 1000) {
fileIndex = 0;
}
String fileNumber = String.valueOf(fileIndex);
if (fileNumber.length() == 1) {
fileNumber = "00" + fileNumber;
} else if (fileNumber.length() == 2) {
fileNumber = "0" + fileNumber;
}
return fileNumber;
NumberFormat formatter = new DecimalFormat("000");
return formatter.format(fileIndex);
}
private void loadRcvHandlerTable() throws Exception {
private void loadRcvHandlerTable() throws IOException {
BufferedReader in = null;
try {
in = new BufferedReader(new FileReader(configDir
in = new BufferedReader(new FileReader(configDir + File.separator
+ "rcv_handler.tbl"));
String str = null;
String[] tokens = null;
String str = null;
while ((str = in.readLine()) != null) {
String command = "";
tokens = str.split(" ");
StringBuilder commandBuilder = new StringBuilder();
String[] tokens = str.split(" ");
for (int i = 1; i < tokens.length; i++) {
String cmd = tokens[i].trim();
if (!cmd.isEmpty()) {
if (i != 1) {
command += " ";
commandBuilder.append(' ');
}
command += cmd;
commandBuilder.append(cmd);
}
}
command = command.trim();
commandMap.put(Integer.parseInt(tokens[0]), command);
String commandString = commandBuilder.toString().trim();
commandMap.put(Integer.parseInt(tokens[0]), commandString);
}
in.close();
} finally {
if (in != null) {
in.close();
@ -311,20 +396,19 @@ public class SocketSrv {
}
private String getMhsOfSender(InetSocketAddress address) {
String hostAddress = address.getAddress().getHostAddress();
for (Object key : serverProps.keySet()) {
String value = serverProps.getProperty((String) key);
for (String mhsId : serverProps.stringPropertyNames()) {
String value = serverProps.getProperty(mhsId);
if (value.contains(",")) {
String[] addrs = value.split(",");
for (String addr : addrs) {
if (addr.contains(hostAddress)) {
return (String) key;
return mhsId;
}
}
} else {
if (value.contains(hostAddress)) {
return (String) key;
return mhsId;
}
}
}

View file

@ -28,6 +28,10 @@
<mkdir dir="${gfesuite.directory}/products/ISC" />
<mkdir dir="${gfesuite.directory}/products/ATBL" />
<!-- Create directories used by Service Backup export_grids script -->
<mkdir dir="${gfesuite.directory}/exportgrids/primary" />
<mkdir dir="${gfesuite.directory}/exportgrids/backup" />
<!-- Adjust GFESuite permissions -->
<chmod perm="ugo+rx">
<fileset dir="${gfesuite.directory}/bin">

View file

@ -10,6 +10,9 @@
# 04/29/13 #1761 dgilling Remove use of NATIONAL_CENTER,
# script caller will determine
# which sites to export.
# 05/16/13 #2009 dgilling New backup structure:
# PRIMARY_SITES go to exportgrids/primary/
# and all other go to exportgrids/backup/.
##############################################################################
if [ ${#AWIPS_HOME} = 0 ]
@ -114,9 +117,30 @@ else
fi
$LOGGER ${DB_NAME}
# use PRIMARY_SITES setting to determine NETCDF_PATH
IFS=',' read -ra PRI_SITES <<< "${PRIMARY_SITES}"
if [ ${#PRI_SITES[@]} -eq 0 ]
then
declare -a PRI_SITES=( "${AW_SITE_IDENTIFIER}" )
fi
EXPORT_FOR_PRIMARY=0
for primary_site in "${PRI_SITES[@]}"
do
primary_site=`echo $primary_site | tr [a-z] [A-Z]`
if [ "$primary_site" = "${CAPS_SITE}" ]
then
EXPORT_FOR_PRIMARY=1
break
fi
done
NETCDF_PATH=${GFESUITE_HOME}/exportgrids/primary
export NETCDF_TMP_PATH=/tmp/exportgrids
if [ $EXPORT_FOR_PRIMARY = 0 ]
then
NETCDF_PATH=${GFESUITE_HOME}/exportgrids/backup
fi
NETCDF_PATH=${GFESUITE_HOME}/exportgrids
export NETCDF_TMP_PATH=${NETCDF_PATH}/tmp
if [ ! -d ${NETCDF_TMP_PATH} ]; then
mkdir ${NETCDF_TMP_PATH}
chmod 777 ${NETCDF_TMP_PATH}