Issue #3102: Fix edexBridge connection leak

Change-Id: Ic0938d31d9ff607249e980c443f98f251501b823

Former-commit-id: 79c52892667d34abb651a36f7f5bd89f4143ffff
This commit is contained in:
Richard Peter 2014-05-06 10:32:31 -05:00
parent aa78ed1b69
commit 501980904d
2 changed files with 18 additions and 9 deletions

View file

@ -5,6 +5,8 @@
* Author: brockwoo * Author: brockwoo
* Updated on: June 21, 2013 (Re-written to work with the qpid messaging api) * Updated on: June 21, 2013 (Re-written to work with the qpid messaging api)
* Author: bkowal * Author: bkowal
* Updated on: May 06, 2014 (Issue #3102: Updated to call cleanup if connect failed. Limit number of messages to be sent to QPID on a single send call)
* Author: rjpeter
*/ */
#include <qpid/messaging/Connection.h> #include <qpid/messaging/Connection.h>
@ -44,21 +46,24 @@ private:
std::string password; std::string password;
list<string> filenameList; list<string> filenameList;
list<string> headerList; list<string> headerList;
int maxMessagesPerSend;
public: public:
LdmProducer(const std::string& brokerURI, int port = 5672, LdmProducer(const std::string& brokerURI, int port = 5672,
const std::string& username = "guest", const std::string& username = "guest",
const std::string& password = "guest", const std::string& password = "guest",
bool useTopic = false, bool sessionTransacted = false) bool useTopic = false, bool sessionTransacted = false,
int maxMessagesPerSend = 1000)
{ {
this->useTopic = useTopic; this->useTopic = useTopic;
this->sessionTransacted = sessionTransacted; this->sessionTransacted = sessionTransacted;
this->brokerURI = brokerURI; this->brokerURI = brokerURI;
this->isConnected = false; this->isConnected = false;
this->portNumber = port; this->portNumber = port;
this->username = username; this->username = username;
this->password = password; this->password = password;
this->maxMessagesPerSend = maxMessagesPerSend;
} }
~LdmProducer() { ~LdmProducer() {
@ -87,7 +92,9 @@ public:
string fileHeader; string fileHeader;
try { try {
while (!this->filenameList.empty()) { // limit number of messages sent at one time so we don't miss messages from shared memory if
// a message build up occurred due to qpid being down (usual rate is under 100 a second)
while ((!this->filenameList.empty()) && (messagesProcessed < this->maxMessagesPerSend)) {
Message message; Message message;
fileLocation = this->filenameList.front(); fileLocation = this->filenameList.front();
@ -159,7 +166,7 @@ private:
{ {
uwarn(error.what()); uwarn(error.what());
} }
} }
this->isConnected = false; this->isConnected = false;
} }
@ -205,7 +212,7 @@ private:
catch (const std::exception& error) catch (const std::exception& error)
{ {
uerror(error.what()); uerror(error.what());
this->isConnected = false; cleanup();
} }
return this->isConnected; return this->isConnected;
} }
@ -351,6 +358,8 @@ int main(int argc, char* argv[]) {
// createQueue to be used in both consumer an producer. // createQueue to be used in both consumer an producer.
//============================================================ //============================================================
bool useTopics = false; bool useTopics = false;
int maxMessagesPerSend = 1000;
bool sessionTransacted = false;
int shmid; int shmid;
int semid; int semid;
@ -378,7 +387,7 @@ int main(int argc, char* argv[]) {
messageCursor = (edex_message *) shmat(shmid, (void *) 0, 0); messageCursor = (edex_message *) shmat(shmid, (void *) 0, 0);
LdmProducer producer(brokerURI, port, username, password, useTopics); LdmProducer producer(brokerURI, port, username, password, useTopics, sessionTransacted, maxMessagesPerSend);
for (;;) { for (;;) {
if (hupped) { if (hupped) {
@ -425,7 +434,7 @@ int main(int argc, char* argv[]) {
sleep(1); sleep(1);
continue; continue;
} }
if (messagesSent != (queue_diff + endQueueDiff)) { if ((messagesSent != maxMessagesPerSend) && (messagesSent != (queue_diff + endQueueDiff))) {
uerror( uerror(
"Only %d messages were sent out of an expected %d. Will store those not sent and try again.", "Only %d messages were sent out of an expected %d. Will store those not sent and try again.",
messagesSent, queue_diff); messagesSent, queue_diff);

View file

@ -9,7 +9,7 @@
Name: awips2-ldm Name: awips2-ldm
Summary: AWIPS II LDM Distribution Summary: AWIPS II LDM Distribution
Version: %{_ldm_version} Version: %{_ldm_version}
Release: 10 Release: 11
Group: AWIPSII Group: AWIPSII
BuildRoot: /tmp BuildRoot: /tmp
BuildArch: noarch BuildArch: noarch