Issue #3102: Fix edexBridge connection leak

Change-Id: Ic0938d31d9ff607249e980c443f98f251501b823

Former-commit-id: 2668f09447 [formerly 501980904d] [formerly 2668f09447 [formerly 501980904d] [formerly b091a8827a [formerly 79c52892667d34abb651a36f7f5bd89f4143ffff]]]
Former-commit-id: b091a8827a
Former-commit-id: 8f1b5b647d [formerly d03c47719f]
Former-commit-id: 87c6348e61
This commit is contained in:
Richard Peter 2014-05-06 10:32:31 -05:00
parent da652d1c4a
commit e555144511
2 changed files with 18 additions and 9 deletions

View file

@ -5,6 +5,8 @@
* Author: brockwoo
* Updated on: June 21, 2013 (Re-written to work with the qpid messaging api)
* Author: bkowal
* Updated on: May 06, 2014 (Issue #3102: Updated to call cleanup if connect failed. Limit number of messages to be sent to QPID on a single send call)
* Author: rjpeter
*/
#include <qpid/messaging/Connection.h>
@ -44,21 +46,24 @@ private:
std::string password;
list<string> filenameList;
list<string> headerList;
int maxMessagesPerSend;
public:
LdmProducer(const std::string& brokerURI, int port = 5672,
const std::string& username = "guest",
const std::string& password = "guest",
bool useTopic = false, bool sessionTransacted = false)
bool useTopic = false, bool sessionTransacted = false,
int maxMessagesPerSend = 1000)
{
this->useTopic = useTopic;
this->sessionTransacted = sessionTransacted;
this->brokerURI = brokerURI;
this->isConnected = false;
this->portNumber = port;
this->username = username;
this->password = password;
this->username = username;
this->password = password;
this->maxMessagesPerSend = maxMessagesPerSend;
}
~LdmProducer() {
@ -87,7 +92,9 @@ public:
string fileHeader;
try {
while (!this->filenameList.empty()) {
// limit number of messages sent at one time so we don't miss messages from shared memory if
// a message build up occurred due to qpid being down (usual rate is under 100 a second)
while ((!this->filenameList.empty()) && (messagesProcessed < this->maxMessagesPerSend)) {
Message message;
fileLocation = this->filenameList.front();
@ -159,7 +166,7 @@ private:
{
uwarn(error.what());
}
}
}
this->isConnected = false;
}
@ -205,7 +212,7 @@ private:
catch (const std::exception& error)
{
uerror(error.what());
this->isConnected = false;
cleanup();
}
return this->isConnected;
}
@ -351,6 +358,8 @@ int main(int argc, char* argv[]) {
// createQueue to be used in both consumer an producer.
//============================================================
bool useTopics = false;
int maxMessagesPerSend = 1000;
bool sessionTransacted = false;
int shmid;
int semid;
@ -378,7 +387,7 @@ int main(int argc, char* argv[]) {
messageCursor = (edex_message *) shmat(shmid, (void *) 0, 0);
LdmProducer producer(brokerURI, port, username, password, useTopics);
LdmProducer producer(brokerURI, port, username, password, useTopics, sessionTransacted, maxMessagesPerSend);
for (;;) {
if (hupped) {
@ -425,7 +434,7 @@ int main(int argc, char* argv[]) {
sleep(1);
continue;
}
if (messagesSent != (queue_diff + endQueueDiff)) {
if ((messagesSent != maxMessagesPerSend) && (messagesSent != (queue_diff + endQueueDiff))) {
uerror(
"Only %d messages were sent out of an expected %d. Will store those not sent and try again.",
messagesSent, queue_diff);

View file

@ -9,7 +9,7 @@
Name: awips2-ldm
Summary: AWIPS II LDM Distribution
Version: %{_ldm_version}
Release: 10
Release: 11
Group: AWIPSII
BuildRoot: /tmp
BuildArch: noarch