From 600610444c444d446c092804b575f5f3eeb49917 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 21 May 2010 12:49:22 +0000 Subject: [PATCH 01/24] Added a lock to protect MessageList in MessageStoreImpl and the static variables in JournalImpl; Switched all locks at this level to qpid::sys::Mutex and qpid::sys::ScopedLock for consistency. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3980 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/JournalImpl.cpp | 41 +++++++++++++++++++++++++---------------- lib/JournalImpl.h | 13 ++++++------- lib/MessageStoreImpl.cpp | 32 ++++++++++++++++++++++++-------- lib/MessageStoreImpl.h | 1 + 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp index eafd807..ed1c334 100644 --- a/lib/JournalImpl.cpp +++ b/lib/JournalImpl.cpp @@ -40,12 +40,13 @@ using namespace mrg::journal; using qpid::management::ManagementAgent; namespace _qmf = qmf::com::redhat::rhm::store; +qpid::sys::Mutex JournalImpl::_static_lock; qpid::sys::Timer* JournalImpl::journalTimerPtr = 0; u_int32_t JournalImpl::cnt = 0; -void InactivityFireEvent::fire() { slock s(_ife_mutex); if (_parent) _parent->flushFire(); } +void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); } -void GetEventsFireEvent::fire() { slock s(_gefe_mutex); if (_parent) _parent->getEventsFire(); } +void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); } JournalImpl::JournalImpl(const std::string& journalId, const std::string& journalDirectory, @@ -68,12 +69,15 @@ JournalImpl::JournalImpl(const std::string& journalId, { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); - if (journalTimerPtr == 0) - journalTimerPtr = new qpid::sys::Timer; - assert (journalTimerPtr != 0); - cnt++; - journalTimerPtr->start(); - journalTimerPtr->add(inactivityFireEventPtr); + { + qpid::sys::Mutex::ScopedLock sl(_static_lock); + if (journalTimerPtr == 0) + journalTimerPtr = new qpid::sys::Timer; + assert (journalTimerPtr != 0); + cnt++; + journalTimerPtr->start(); + journalTimerPtr->add(inactivityFireEventPtr); + } if (_agent != 0) { @@ -112,11 +116,13 @@ JournalImpl::~JournalImpl() inactivityFireEventPtr->cancel(); free_read_buffers(); - // TODO: Make this if() thread-safe - if (journalTimerPtr && --cnt == 0) { - delete journalTimerPtr; - journalTimerPtr = 0; + qpid::sys::Mutex::ScopedLock sl(_static_lock); + if (journalTimerPtr && --cnt == 0) + { + delete journalTimerPtr; + journalTimerPtr = 0; + } } if (_mgmtObject != 0) { @@ -503,7 +509,7 @@ JournalImpl::flush(const bool block_till_aio_cmpl) { const iores res = jcntl::flush(block_till_aio_cmpl); { - slock s(_getf_mutex); + qpid::sys::Mutex::ScopedLock sl(_getf_lock); if (_wmgr.get_aio_evt_rem() && !getEventsTimerSetFlag) { setGetEventTimer(); } } return res; @@ -533,7 +539,7 @@ JournalImpl::log(mrg::journal::log_level ll, const char* const log_stmt) const void JournalImpl::getEventsFire() { - slock s(_getf_mutex); + qpid::sys::Mutex::ScopedLock sl(_getf_lock); getEventsTimerSetFlag = false; if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); } if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); } @@ -552,8 +558,11 @@ JournalImpl::flushFire() } } inactivityFireEventPtr->setupNextFire(); - assert(journalTimerPtr != 0); - journalTimerPtr->add(inactivityFireEventPtr); + { + qpid::sys::Mutex::ScopedLock sl(_static_lock); + assert(journalTimerPtr != 0); + journalTimerPtr->add(inactivityFireEventPtr); + } } void diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h index 2d1b869..a3f5a17 100644 --- a/lib/JournalImpl.h +++ b/lib/JournalImpl.h @@ -27,8 +27,6 @@ #include #include "jrnl/enums.hpp" #include "jrnl/jcntl.hpp" -#include "jrnl/slock.hpp" -#include "jrnl/smutex.hpp" #include "DataTokenImpl.h" #include "PreparedTransaction.h" #include @@ -47,38 +45,39 @@ namespace mrg { class InactivityFireEvent : public qpid::sys::TimerTask { JournalImpl* _parent; - mrg::journal::smutex _ife_mutex; + qpid::sys::Mutex _ife_lock; public: InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout), _parent(p) {} virtual ~InactivityFireEvent() {} void fire(); - inline void cancel() { mrg::journal::slock s(_ife_mutex); _parent = 0; } + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } }; class GetEventsFireEvent : public qpid::sys::TimerTask { JournalImpl* _parent; - mrg::journal::smutex _gefe_mutex; + qpid::sys::Mutex _gefe_lock; public: GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): qpid::sys::TimerTask(timeout), _parent(p) {} virtual ~GetEventsFireEvent() {} void fire(); - inline void cancel() { mrg::journal::slock s(_gefe_mutex); _parent = 0; } + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } }; class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback { private: + static qpid::sys::Mutex _static_lock; static qpid::sys::Timer* journalTimerPtr; static u_int32_t cnt; bool getEventsTimerSetFlag; boost::intrusive_ptr getEventsFireEventsPtr; - mrg::journal::smutex _getf_mutex; + qpid::sys::Mutex _getf_lock; u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests std::vector oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 9b4bf25..ed3975d 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -362,10 +362,13 @@ void MessageStoreImpl::init() void MessageStoreImpl::finalize() { if (tplStorePtr->is_ready()) tplStorePtr->stop(true); - for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++) { - JournalImpl* jQueue = i->second; - if (jQueue->is_ready()) jQueue->stop(true); + qpid::sys::Mutex::ScopedLock sl(journalListLock); + for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++) + { + JournalImpl* jQueue = i->second; + if (jQueue->is_ready()) jQueue->stop(true); + } } if (mgmtObject != 0) { @@ -377,10 +380,13 @@ void MessageStoreImpl::finalize() void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles) { if (isInit) { - if (journalList.size()) { // check no queues exist - std::ostringstream oss; - oss << "truncateInit() called with " << journalList.size() << " queues still in existence"; - THROW_STORE_EXCEPTION(oss.str()); + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + if (journalList.size()) { // check no queues exist + std::ostringstream oss; + oss << "truncateInit() called with " << journalList.size() << " queues still in existence"; + THROW_STORE_EXCEPTION(oss.str()); + } } for (std::list::iterator i = dbs.begin(); i != dbs.end(); i++) { (*i)->close(0); @@ -402,6 +408,7 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles) void MessageStoreImpl::chkTplStoreInit() { + // Don't take lock unless necessary if (!tplStorePtr->is_ready()) { qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock); if (!tplStorePtr->is_ready()) { @@ -480,6 +487,9 @@ void MessageStoreImpl::create(PersistableQueue& queue, jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent); + } + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queue.getName()]=jQueue; } @@ -517,7 +527,10 @@ void MessageStoreImpl::destroy(PersistableQueue& queue) JournalImpl* jQueue = static_cast(eqs); jQueue->delete_jrnl_files(); queue.setExternalQueueStore(0); // will delete the journal if exists - journalList.erase(journalList.find(queue.getName())); + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList.erase(journalList.find(queue.getName())); + } } } @@ -759,6 +772,9 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, { qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock); jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent); + } + { + qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queueName] = jQueue; } queue->setExternalQueueStore(dynamic_cast(jQueue)); diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 076a0ca..136659f 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -126,6 +126,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem boost::shared_ptr tplStorePtr; TplRecoverMap tplRecoverMap; JournalListMap journalList; + qpid::sys::Mutex journalListLock; IdSequence queueIdSequence; IdSequence exchangeIdSequence; -- 1.7.3.4 From 1392d6fdbf4625bcbcadc16ba323af467305e4d8 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 21 May 2010 16:46:34 +0000 Subject: [PATCH 02/24] Removed redundant locks; the previous checkin installed the correct lock in JournalImpl::JournalImpl. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3982 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 24 +++++++----------------- lib/MessageStoreImpl.h | 2 -- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index ed3975d..8cedb51 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -410,12 +410,9 @@ void MessageStoreImpl::chkTplStoreInit() { // Don't take lock unless necessary if (!tplStorePtr->is_ready()) { - qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock); - if (!tplStorePtr->is_ready()) { - journal::jdir::create_dir(getTplBaseDir()); - tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); - if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true); - } + journal::jdir::create_dir(getTplBaseDir()); + tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); + if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true); } } @@ -481,13 +478,8 @@ void MessageStoreImpl::create(PersistableQueue& queue, return; } - { - // TODO: Is this mutex necessary? - qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock); - jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), - std::string("JournalData"), defJournalGetEventsTimeout, - defJournalFlushTimeout, agent); - } + jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"), + defJournalGetEventsTimeout, defJournalFlushTimeout, agent); { qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queue.getName()]=jQueue; @@ -769,10 +761,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } - { - qpid::sys::Mutex::ScopedLock sl(jrnlCreateLock); - jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent); - } + jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), + defJournalGetEventsTimeout, defJournalFlushTimeout, agent); { qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queueName] = jQueue; diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 136659f..12e1d97 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -93,7 +93,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem typedef std::map TplRecoverMap; typedef TplRecoverMap::const_iterator TplRecoverMapCitr; - typedef std::pair JournalListPair; typedef std::map JournalListMap; typedef JournalListMap::iterator JournalListMapItr; @@ -149,7 +148,6 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem const char* envPath; qmf::com::redhat::rhm::store::Store* mgmtObject; - qpid::sys::Mutex jrnlCreateLock; qpid::management::ManagementAgent* agent; // Parameter validation and calculation -- 1.7.3.4 From e62a68ecde439e5a98576cbbae65bbc3f3005449 Mon Sep 17 00:00:00 2001 From: aconway Date: Thu, 27 May 2010 18:06:48 +0000 Subject: [PATCH 03/24] Bug 596765: Remove global shared_ptr to store in store plugin. The global shared_ptr delays destruction of the store till after the broker is deleted causing core dumps when unregistering management objects. https://bugzilla.redhat.com/show_bug.cgi?id=596765 git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3995 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 2 +- lib/StorePlugin.cpp | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 8cedb51..e7cb405 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -428,11 +428,11 @@ void MessageStoreImpl::open(db_ptr db, MessageStoreImpl::~MessageStoreImpl() { + finalize(); try { for (std::list::iterator i = dbs.begin(); i != dbs.end(); i++) { (*i)->close(0); } -// if (tplStorePtr->is_ready()) tplStorePtr->stop(true); } catch (const DbException& e) { QPID_LOG(error, "Error closing BDB databases: " << e.what()); } catch (const journal::jexception& e) { diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp index 1cbdbff..0fb3512 100644 --- a/lib/StorePlugin.cpp +++ b/lib/StorePlugin.cpp @@ -36,16 +36,15 @@ using namespace std; struct StorePlugin : public Plugin { mrg::msgstore::MessageStoreImpl::StoreOptions options; - boost::shared_ptr store; Options* getOptions() { return &options; } void earlyInitialize (Plugin::Target& target) { Broker* broker = dynamic_cast(&target); - store.reset(new mrg::msgstore::MessageStoreImpl ()); + if (!broker) return; + boost::shared_ptr store(new mrg::msgstore::MessageStoreImpl ()); DataDir& dataDir = broker->getDataDir (); - if (options.storeDir.empty ()) { if (!dataDir.isEnabled ()) @@ -67,8 +66,7 @@ struct StorePlugin : public Plugin { void finalize() { - MessageStore* sp = store.get(); - static_cast(sp)->finalize(); + // This function intentionally left blank } const char* id() {return "StorePlugin";} -- 1.7.3.4 From 91dcd499e54fd82b0a808f705b18c2b6c5775bd0 Mon Sep 17 00:00:00 2001 From: aconway Date: Mon, 31 May 2010 14:08:28 +0000 Subject: [PATCH 04/24] Skip cluster_tests.ShortTests.test_sasl as it depends on a SASL database not available in the store build environment. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3996 06e15bec-b515-0410-bef0-cc27a458cf48 --- tests/cluster/run_python_cluster_tests | 7 ++++++- 1 files changed, 6 insertions(+), 1 deletions(-) diff --git a/tests/cluster/run_python_cluster_tests b/tests/cluster/run_python_cluster_tests index 4bd2126..ce96152 100755 --- a/tests/cluster/run_python_cluster_tests +++ b/tests/cluster/run_python_cluster_tests @@ -28,8 +28,13 @@ func_check_qpid_python || exit 0 # A warning, not a failure. echo "Running Python cluster tests..." OUTDIR=brokertest.tmp rm -rf $OUTDIR -# Ignore tests requiring a store by default. + +# Ignore tests known to fail. CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:-"-I ${CLUSTER_TESTS_FAIL}"} +# Ignore tests that don't work in the store environment +# SASL test needs sasl test database which is not installed. +CLUSTER_TESTS_IGNORE="${CLUSTER_TESTS_IGNORE} -i cluster_tests.ShortTests.test_sasl" + CLUSTER_TESTS=${CLUSTER_TESTS:-$*} TEST_CMD="${QPID_PYTHON_TEST} -m cluster_tests ${CLUSTER_TESTS_IGNORE} ${CLUSTER_TESTS} -DOUTDIR=$OUTDIR" -- 1.7.3.4 From d4795a9796726dbdb1f911c81e3b2e899fbfc40e Mon Sep 17 00:00:00 2001 From: aconway Date: Mon, 31 May 2010 19:31:45 +0000 Subject: [PATCH 05/24] Fix valgrind errors caused by order of destruction issue. Added a callback so that MessageStoreImpl is informed when JournalImpl instances are deleted and can remove them from its map. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3997 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/JournalImpl.cpp | 7 +- lib/JournalImpl.h | 467 +++++++++++++++++++++++----------------------- lib/MessageStoreImpl.cpp | 17 ++- lib/MessageStoreImpl.h | 5 + 4 files changed, 261 insertions(+), 235 deletions(-) diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp index ed1c334..a660d3c 100644 --- a/lib/JournalImpl.cpp +++ b/lib/JournalImpl.cpp @@ -53,7 +53,8 @@ JournalImpl::JournalImpl(const std::string& journalId, const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* a): + qpid::management::ManagementAgent* a, + DeleteCallback onDelete): jcntl(journalId, journalDirectory, journalBaseFilename), getEventsTimerSetFlag(false), lastReadRid(0), @@ -65,7 +66,8 @@ JournalImpl::JournalImpl(const std::string& journalId, _dtok(), _external(false), _agent(a), - _mgmtObject(0) + _mgmtObject(0), + deleteCallback(onDelete) { getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); @@ -108,6 +110,7 @@ JournalImpl::JournalImpl(const std::string& journalId, JournalImpl::~JournalImpl() { + if (deleteCallback) deleteCallback(*this); if (_init_flag && !_stop_flag){ try { stop(true); } // NOTE: This will *block* until all outstanding disk aio calls are complete! catch (const jexception& e) { log(LOG_ERROR, e.what()); } diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h index a3f5a17..aab8467 100644 --- a/lib/JournalImpl.h +++ b/lib/JournalImpl.h @@ -1,25 +1,25 @@ /* - Copyright (c) 2007, 2008, 2009 Red Hat, Inc. + Copyright (c) 2007, 2008, 2009 Red Hat, Inc. - This file is part of the Qpid async store library msgstore.so. + This file is part of the Qpid async store library msgstore.so. - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 - USA + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 + USA - The GNU Lesser General Public License is available in the file COPYING. - */ + The GNU Lesser General Public License is available in the file COPYING. +*/ #ifndef _JournalImpl_ #define _JournalImpl_ @@ -38,219 +38,228 @@ #include "qmf/com/redhat/rhm/store/Journal.h" namespace mrg { - namespace msgstore { - - class JournalImpl; - - class InactivityFireEvent : public qpid::sys::TimerTask - { - JournalImpl* _parent; - qpid::sys::Mutex _ife_lock; - - public: - InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): - qpid::sys::TimerTask(timeout), _parent(p) {} - virtual ~InactivityFireEvent() {} - void fire(); - inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } - }; - - class GetEventsFireEvent : public qpid::sys::TimerTask - { - JournalImpl* _parent; - qpid::sys::Mutex _gefe_lock; - - public: - GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): - qpid::sys::TimerTask(timeout), _parent(p) {} - virtual ~GetEventsFireEvent() {} - void fire(); - inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } - }; - - class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback - { - private: - static qpid::sys::Mutex _static_lock; - static qpid::sys::Timer* journalTimerPtr; - static u_int32_t cnt; - - bool getEventsTimerSetFlag; - boost::intrusive_ptr getEventsFireEventsPtr; - qpid::sys::Mutex _getf_lock; - - u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests - std::vector oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence - - bool writeActivityFlag; - bool flushTriggeredFlag; - boost::intrusive_ptr inactivityFireEventPtr; - - // temp local vars for loadMsgContent below - void* _xidp; - void* _datap; - size_t _dlen; - mrg::journal::data_tok _dtok; - bool _external; - - qpid::management::ManagementAgent* _agent; - qmf::com::redhat::rhm::store::Journal* _mgmtObject; - - public: - JournalImpl(const std::string& journalId, - const std::string& journalDirectory, - const std::string& journalBaseFilename, - const qpid::sys::Duration getEventsTimeout, - const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* agent); - - virtual ~JournalImpl(); - - void initialize(const u_int16_t num_jfiles, - const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, - mrg::journal::aio_callback* const cbp); - - inline void initialize(const u_int16_t num_jfiles, - const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks) { - initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, - this); - } - - void recover(const u_int16_t num_jfiles, - const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, - mrg::journal::aio_callback* const cbp, - boost::ptr_list* prep_tx_list_ptr, - u_int64_t& highest_rid, - u_int64_t queue_id); - - inline void recover(const u_int16_t num_jfiles, - const bool auto_expand, - const u_int16_t ae_max_jfiles, - const u_int32_t jfsize_sblks, - const u_int16_t wcache_num_pages, - const u_int32_t wcache_pgsize_sblks, - boost::ptr_list* prep_tx_list_ptr, - u_int64_t& highest_rid, - u_int64_t queue_id) { - recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, - this, prep_tx_list_ptr, highest_rid, queue_id); - } - - void recover_complete(); - - // Temporary fn to read and save last msg read from journal so it can be assigned - // in chunks. To be replaced when coding to do this direct from the journal is ready. - // Returns true if the record is extern, false if local. - bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0); - - // Overrides for write inactivity timer - void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, mrg::journal::data_tok* dtokp, - const bool transient = false); - - void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, - const bool transient = false); - - void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, - const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid, - const bool transient = false); - - void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, - const std::string& xid, const bool transient = false); - - void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false); - - void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); - - mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff, - size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp, - bool ignore_pending_txns = false); - - void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid); - - void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid); - - void stop(bool block_till_aio_cmpl = false); - - // Logging - void log(mrg::journal::log_level level, const std::string& log_stmt) const; - void log(mrg::journal::log_level level, const char* const log_stmt) const; - - // Overrides for get_events timer - mrg::journal::iores flush(const bool block_till_aio_cmpl = false); - - // TimerTask callback - void getEventsFire(); - void flushFire(); - - // AIO callbacks - virtual void wr_aio_cb(std::vector& dtokl); - virtual void rd_aio_cb(std::vector& pil); - - qpid::management::ManagementObject* GetManagementObject (void) const - { return _mgmtObject; } - - qpid::management::Manageable::status_t ManagementMethod (uint32_t, - qpid::management::Args&, - std::string&); - - private: - void free_read_buffers(); - - inline void setGetEventTimer() - { - assert(journalTimerPtr != 0); - getEventsFireEventsPtr->setupNextFire(); - journalTimerPtr->add(getEventsFireEventsPtr); - getEventsTimerSetFlag = true; - } - void handleIoResult(const mrg::journal::iores r); - - // Management instrumentation callbacks overridden from jcntl - inline void instr_incr_outstanding_aio_cnt() { - if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs(); - } - inline void instr_decr_outstanding_aio_cnt() { - if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs(); - } - }; // class JournalImpl - - class TplJournalImpl : public JournalImpl - { - public: - TplJournalImpl(const std::string& journalId, - const std::string& journalDirectory, - const std::string& journalBaseFilename, - const qpid::sys::Duration getEventsTimeout, - const qpid::sys::Duration flushTimeout, - qpid::management::ManagementAgent* agent) : - JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) - {} - - ~TplJournalImpl() {} - - // Special version of read_data_record that ignores transactions - needed when reading the TPL - inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize, - void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, - mrg::journal::data_tok* const dtokp) { - return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); - } - inline void read_reset() { _rmgr.invalidate(); } - }; // class TplJournalImpl - - } // namespace msgstore +namespace msgstore { + +class JournalImpl; + +class InactivityFireEvent : public qpid::sys::TimerTask +{ + JournalImpl* _parent; + qpid::sys::Mutex _ife_lock; + + public: + InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): + qpid::sys::TimerTask(timeout), _parent(p) {} + virtual ~InactivityFireEvent() {} + void fire(); + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); _parent = 0; } +}; + +class GetEventsFireEvent : public qpid::sys::TimerTask +{ + JournalImpl* _parent; + qpid::sys::Mutex _gefe_lock; + + public: + GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout): + qpid::sys::TimerTask(timeout), _parent(p) {} + virtual ~GetEventsFireEvent() {} + void fire(); + inline void cancel() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); _parent = 0; } +}; + +class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal::jcntl, public mrg::journal::aio_callback +{ + public: + typedef boost::function DeleteCallback; + + private: + static qpid::sys::Mutex _static_lock; + static qpid::sys::Timer* journalTimerPtr; + static u_int32_t cnt; + + bool getEventsTimerSetFlag; + boost::intrusive_ptr getEventsFireEventsPtr; + qpid::sys::Mutex _getf_lock; + + u_int64_t lastReadRid; // rid of last read msg for loadMsgContent() - detects out-of-order read requests + std::vector oooRidList; // list of out-of-order rids (greater than current rid) encountered during read sequence + + bool writeActivityFlag; + bool flushTriggeredFlag; + boost::intrusive_ptr inactivityFireEventPtr; + + // temp local vars for loadMsgContent below + void* _xidp; + void* _datap; + size_t _dlen; + mrg::journal::data_tok _dtok; + bool _external; + + qpid::management::ManagementAgent* _agent; + qmf::com::redhat::rhm::store::Journal* _mgmtObject; + DeleteCallback deleteCallback; + + public: + + JournalImpl(const std::string& journalId, + const std::string& journalDirectory, + const std::string& journalBaseFilename, + const qpid::sys::Duration getEventsTimeout, + const qpid::sys::Duration flushTimeout, + qpid::management::ManagementAgent* agent, + DeleteCallback deleteCallback=DeleteCallback() ); + + virtual ~JournalImpl(); + + void initialize(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp); + + inline void initialize(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks) { + initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + this); + } + + void recover(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + mrg::journal::aio_callback* const cbp, + boost::ptr_list* prep_tx_list_ptr, + u_int64_t& highest_rid, + u_int64_t queue_id); + + inline void recover(const u_int16_t num_jfiles, + const bool auto_expand, + const u_int16_t ae_max_jfiles, + const u_int32_t jfsize_sblks, + const u_int16_t wcache_num_pages, + const u_int32_t wcache_pgsize_sblks, + boost::ptr_list* prep_tx_list_ptr, + u_int64_t& highest_rid, + u_int64_t queue_id) { + recover(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, + this, prep_tx_list_ptr, highest_rid, queue_id); + } + + void recover_complete(); + + // Temporary fn to read and save last msg read from journal so it can be assigned + // in chunks. To be replaced when coding to do this direct from the journal is ready. + // Returns true if the record is extern, false if local. + bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0); + + // Overrides for write inactivity timer + void enqueue_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, mrg::journal::data_tok* dtokp, + const bool transient = false); + + void enqueue_extern_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, + const bool transient = false); + + void enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len, + const size_t this_data_len, mrg::journal::data_tok* dtokp, const std::string& xid, + const bool transient = false); + + void enqueue_extern_txn_data_record(const size_t tot_data_len, mrg::journal::data_tok* dtokp, + const std::string& xid, const bool transient = false); + + void dequeue_data_record(mrg::journal::data_tok* const dtokp, const bool txn_coml_commit = false); + + void dequeue_txn_data_record(mrg::journal::data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit = false); + + mrg::journal::iores read_data_record(void** const data_buff, size_t& tot_data_len, void** const xid_buff, + size_t& xid_len, bool& transient, bool& external, mrg::journal::data_tok* const dtokp, + bool ignore_pending_txns = false); + + void txn_abort(mrg::journal::data_tok* const dtokp, const std::string& xid); + + void txn_commit(mrg::journal::data_tok* const dtokp, const std::string& xid); + + void stop(bool block_till_aio_cmpl = false); + + // Logging + void log(mrg::journal::log_level level, const std::string& log_stmt) const; + void log(mrg::journal::log_level level, const char* const log_stmt) const; + + // Overrides for get_events timer + mrg::journal::iores flush(const bool block_till_aio_cmpl = false); + + // TimerTask callback + void getEventsFire(); + void flushFire(); + + // AIO callbacks + virtual void wr_aio_cb(std::vector& dtokl); + virtual void rd_aio_cb(std::vector& pil); + + qpid::management::ManagementObject* GetManagementObject (void) const + { return _mgmtObject; } + + qpid::management::Manageable::status_t ManagementMethod (uint32_t, + qpid::management::Args&, + std::string&); + + void resetDeleteCallback() { deleteCallback = DeleteCallback(); } + + private: + void free_read_buffers(); + + inline void setGetEventTimer() + { + assert(journalTimerPtr != 0); + getEventsFireEventsPtr->setupNextFire(); + journalTimerPtr->add(getEventsFireEventsPtr); + getEventsTimerSetFlag = true; + } + void handleIoResult(const mrg::journal::iores r); + + // Management instrumentation callbacks overridden from jcntl + inline void instr_incr_outstanding_aio_cnt() { + if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs(); + } + inline void instr_decr_outstanding_aio_cnt() { + if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs(); + } + +}; // class JournalImpl + +class TplJournalImpl : public JournalImpl +{ + public: + TplJournalImpl(const std::string& journalId, + const std::string& journalDirectory, + const std::string& journalBaseFilename, + const qpid::sys::Duration getEventsTimeout, + const qpid::sys::Duration flushTimeout, + qpid::management::ManagementAgent* agent) : + JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) + {} + + ~TplJournalImpl() {} + + // Special version of read_data_record that ignores transactions - needed when reading the TPL + inline mrg::journal::iores read_data_record(void** const datapp, std::size_t& dsize, + void** const xidpp, std::size_t& xidsize, bool& transient, bool& external, + mrg::journal::data_tok* const dtokp) { + return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true); + } + inline void read_reset() { _rmgr.invalidate(); } +}; // class TplJournalImpl + +} // namespace msgstore } // namespace mrg #endif diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index e7cb405..04297e8 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -367,6 +367,7 @@ void MessageStoreImpl::finalize() for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++) { JournalImpl* jQueue = i->second; + jQueue->resetDeleteCallback(); if (jQueue->is_ready()) jQueue->stop(true); } } @@ -479,7 +480,8 @@ void MessageStoreImpl::create(PersistableQueue& queue, } jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"), - defJournalGetEventsTimeout, defJournalFlushTimeout, agent); + defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queue.getName()]=jQueue; @@ -521,7 +523,7 @@ void MessageStoreImpl::destroy(PersistableQueue& queue) queue.setExternalQueueStore(0); // will delete the journal if exists { qpid::sys::Mutex::ScopedLock sl(journalListLock); - journalList.erase(journalList.find(queue.getName())); + journalList.erase(queue.getName()); } } } @@ -762,7 +764,8 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, break; } jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), - defJournalGetEventsTimeout, defJournalFlushTimeout, agent); + defJournalGetEventsTimeout, defJournalFlushTimeout, agent, + boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { qpid::sys::Mutex::ScopedLock sl(journalListLock); journalList[queueName] = jQueue; @@ -1644,6 +1647,11 @@ std::string MessageStoreImpl::getJrnlHashDir(const std::string& queueName) //for std::string MessageStoreImpl::getStoreDir() const { return storeDir; } +void MessageStoreImpl::journalDeleted(JournalImpl& j) { + qpid::sys::Mutex::ScopedLock sl(journalListLock); + journalList.erase(j.id()); +} + MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : qpid::Options(name), numJrnlFiles(defNumJrnlFiles), @@ -1668,7 +1676,7 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : // "If no|false|0, the number of journal files will remain fixed (num-jfiles).") // ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"), // "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.") - ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), + ("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"), "Default size for each journal file in multiples of read pages (1 read page = 64kiB)") ("truncate", qpid::optValue(truncateFlag, "yes|no"), "If yes|true|1, will truncate the store (discard any existing records). If no|false|0, will preserve " @@ -1687,3 +1695,4 @@ MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name) : "Lower values decrease latency at the expense of throughput.") ; } + diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 12e1d97..d650020 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -149,6 +149,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem qmf::com::redhat::rhm::store::Store* mgmtObject; qpid::management::ManagementAgent* agent; + // Parameter validation and calculation static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, @@ -359,6 +360,10 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem { return qpid::management::Manageable::STATUS_OK; } std::string getStoreDir() const; + + private: + void journalDeleted(JournalImpl&); + }; // class MessageStoreImpl } // namespace msgstore -- 1.7.3.4 From 821aca95eeb79c7c83bc6d47176d188d8a8a1ba1 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Tue, 1 Jun 2010 16:02:36 +0000 Subject: [PATCH 06/24] Fix for Bug 598557: "qpidd --no-data dir with store loaded segfaults". git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@3998 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 2 +- 1 files changed, 1 insertions(+), 1 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 04297e8..e3b2599 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -361,7 +361,7 @@ void MessageStoreImpl::init() void MessageStoreImpl::finalize() { - if (tplStorePtr->is_ready()) tplStorePtr->stop(true); + if (tplStorePtr.get() && tplStorePtr->is_ready()) tplStorePtr->stop(true); { qpid::sys::Mutex::ScopedLock sl(journalListLock); for (JournalListMapItr i = journalList.begin(); i != journalList.end(); i++) -- 1.7.3.4 From 3e2438975882993c66f4f8b9fb40dd37da3fe95e Mon Sep 17 00:00:00 2001 From: gordonsim Date: Wed, 2 Jun 2010 19:33:45 +0000 Subject: [PATCH 07/24] Set reliability for link to prevent auto-delete being set on subscription queues (broker now cleans these up even for connections that are open when broker is shutdown) git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4005 06e15bec-b515-0410-bef0-cc27a458cf48 --- tests/python_tests/client_persistence.py | 6 +++--- tests/python_tests/store_test.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/python_tests/client_persistence.py b/tests/python_tests/client_persistence.py index f16e548..dc197dc 100644 --- a/tests/python_tests/client_persistence.py +++ b/tests/python_tests/client_persistence.py @@ -103,9 +103,9 @@ class ExchangeQueueTests(StoreTest): broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK) ssn = broker.connect().session() snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}") - ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True}}") - ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True}}") - ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q2\", durable: True, reliability:at-least-once}}") + ssn.receiver("TestFanoutExchange; {link: {name: \"q3\", durable: True, reliability:at-least-once}}") msg1 = Message("Msg1", durable=True, correlation_id="Msg0001") snd.send(msg1) msg2 = Message("Msg2", durable=True, correlation_id="Msg0002") diff --git a/tests/python_tests/store_test.py b/tests/python_tests/store_test.py index 61d3687..87dcefa 100644 --- a/tests/python_tests/store_test.py +++ b/tests/python_tests/store_test.py @@ -301,9 +301,11 @@ class StoreTest(BrokerTest): x_bindings_list = [] for binding in binding_list: x_bindings_list.append("{exchange: %s, key: %s}" % binding) + if durable: reliability = 'at-least-once' + else: reliability = None return self.addr_fmt(node_name, create_policy=create_policy, delete_policy=delete_policy, mode=mode, link=True, link_name=link_name, durable=durable, x_declare_list=x_declare_list, - x_bindings_list=x_bindings_list) + x_bindings_list=x_bindings_list, link_reliability=reliability) def check_message(self, broker, queue, exp_msg, transactional=False, empty=False, ack=True, browse=False): """Check that a message is on a queue by dequeuing it and comparing it to the expected message""" -- 1.7.3.4 From 22c33fd72f2e8e32a6537b1891934834dc7f2d95 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 4 Jun 2010 17:37:23 +0000 Subject: [PATCH 08/24] Fixes for various Coverity-indicated problems: 11689(MessageStoreImpl.cpp), 11691(jdir.cpp) and 11688(JournalImpl.cpp). git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4008 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/JournalImpl.cpp | 4 +++- lib/MessageStoreImpl.cpp | 7 +++---- lib/jrnl/jdir.cpp | 1 + 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp index a660d3c..5e1ed7a 100644 --- a/lib/JournalImpl.cpp +++ b/lib/JournalImpl.cpp @@ -498,7 +498,9 @@ JournalImpl::txn_commit(data_tok* const dtokp, const std::string& xid) void JournalImpl::stop(bool block_till_aio_cmpl) { - (dynamic_cast(inactivityFireEventPtr.get()))->cancel(); + InactivityFireEvent* ifep = dynamic_cast(inactivityFireEventPtr.get()); + assert(ifep); // dynamic_cast can return null if the cast fails + ifep->cancel(); jcntl::stop(block_till_aio_cmpl); if (_mgmtObject != 0) { diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index e3b2599..2262b0d 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -193,8 +193,7 @@ void MessageStoreImpl::chkJrnlAutoExpandOptions(const StoreOptions* opts, << JRNL_MAX_NUM_FILES << "); changing this parameter to maximum value."); return; } - u_int16_t q = opts->autoJrnlExpandMaxFiles; - if (q && q == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) { + if (p && p == defAutoJrnlExpandMaxFiles && numJrnlFiles != defTplNumJrnlFiles) { // num-jfiles is different from the default AND max-auto-expand-jfiles is still at default // change value of max-auto-expand-jfiles autoJrnlExpand = true; @@ -1327,10 +1326,10 @@ void MessageStoreImpl::store(const PersistableQueue* queue, } } } else { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: queue NULL."); + THROW_STORE_EXCEPTION(std::string("MessageStoreImpl::store() failed: queue NULL.")); } } catch (const journal::jexception& e) { - THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": store() failed: " + + THROW_STORE_EXCEPTION(std::string("Queue ") + queue->getName() + ": MessageStoreImpl::store() failed: " + e.what()); } } diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp index 74651bd..d26cef0 100644 --- a/lib/jrnl/jdir.cpp +++ b/lib/jrnl/jdir.cpp @@ -202,6 +202,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const break; } } + close_dir(dir, dirname, "push_down"); return bak_dir_name; } -- 1.7.3.4 From fbd21b64d3caa4c6a1937dcc560973de1c173b2d Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 4 Jun 2010 18:43:28 +0000 Subject: [PATCH 09/24] Further tidy-up: closing directory handles in exception paths git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4009 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/jrnl/jdir.cpp | 5 +++++ 1 files changed, 5 insertions(+), 0 deletions(-) diff --git a/lib/jrnl/jdir.cpp b/lib/jrnl/jdir.cpp index d26cef0..b718f74 100644 --- a/lib/jrnl/jdir.cpp +++ b/lib/jrnl/jdir.cpp @@ -152,6 +152,7 @@ jdir::clear_dir(const std::string& dirname, const std::string& newname << bak_dir << "/" << entry->d_name; if (::rename(oldname.str().c_str(), newname.str().c_str())) { + ::closedir(dir); std::ostringstream oss; oss << "file=\"" << oldname.str() << "\" dest=\"" << newname.str() << "\"" << FORMAT_SYSERR(errno); @@ -195,6 +196,7 @@ jdir::push_down(const std::string& dirname, const std::string& target_dir, const newname << bak_dir_name << "/" << target_dir; if (::rename(oldname.str().c_str(), newname.str().c_str())) { + ::closedir(dir); std::ostringstream oss; oss << "file=\"" << oldname.str() << "\" dest=\"" << newname.str() << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "jdir", "push_down"); @@ -284,6 +286,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only) std::string full_name(dirname + "/" + entry->d_name); if (::stat(full_name.c_str(), &s)) { + ::closedir(dir); std::ostringstream oss; oss << "stat: file=\"" << full_name << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_STAT, oss.str(), "jdir", "delete_dir"); @@ -294,6 +297,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only) { if(::unlink(full_name.c_str())) { + ::closedir(dir); std::ostringstream oss; oss << "unlink: file=\"" << entry->d_name << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_UNLINK, oss.str(), "jdir", "delete_dir"); @@ -305,6 +309,7 @@ jdir::delete_dir(const std::string& dirname, bool children_only) } else // all other types, throw up! { + ::closedir(dir); std::ostringstream oss; oss << "file=\"" << entry->d_name << "\" is not a dir, file or slink."; oss << " (mode=0x" << std::hex << s.st_mode << std::dec << ")"; -- 1.7.3.4 From 7d09142721e18ff5769b9c35c446eab723793d44 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Tue, 8 Jun 2010 19:11:00 +0000 Subject: [PATCH 10/24] Fix for a recent regression in r.3982 in which a lock wich protects the TPL from being initialized by multiple threads was erroneously removed. The lock is now replaced. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4017 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 3 ++- lib/MessageStoreImpl.h | 1 + 2 files changed, 3 insertions(+), 1 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 2262b0d..5f98055 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -408,7 +408,8 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles) void MessageStoreImpl::chkTplStoreInit() { - // Don't take lock unless necessary + // Prevent multiple threads from late-initializing the TPL + qpid::sys::Mutex::ScopedLock sl(tplInitLock); if (!tplStorePtr->is_ready()) { journal::jdir::create_dir(getTplBaseDir()); tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks); diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index d650020..2659f32 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -124,6 +124,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem // Pointer to Transaction Prepared List (TPL) journal instance boost::shared_ptr tplStorePtr; TplRecoverMap tplRecoverMap; + qpid::sys::Mutex tplInitLock; JournalListMap journalList; qpid::sys::Mutex journalListLock; -- 1.7.3.4 From 1bb317d8e88c910b5247b54a9530a5505fb67168 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Thu, 17 Jun 2010 18:58:04 +0000 Subject: [PATCH 11/24] Added variable MSGSTORE_VERSION_INFO to control msgstore.so.x.x.x lib version numbers git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4034 06e15bec-b515-0410-bef0-cc27a458cf48 --- configure.ac | 9 +-------- lib/Makefile.am | 6 ++++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/configure.ac b/configure.ac index 9a32097..3c014d9 100644 --- a/configure.ac +++ b/configure.ac @@ -21,7 +21,7 @@ dnl The GNU Lesser General Public License is available in the file COPYING. dnl dnl Process this file with autoconf to produce a configure script. -AC_INIT([msg-store], [0.6], [rhemrg-users-list@redhat.com]) +AC_INIT([msg-store], [0.7], [rhemrg-users-list@redhat.com]) AC_CONFIG_AUX_DIR([build-aux]) AM_INIT_AUTOMAKE([dist-bzip2]) @@ -201,13 +201,6 @@ if test x$DB_CXX_HEADER_PREFIX = x; then fi AC_SUBST(DB_CXX_HEADER_PREFIX) -# Set the argument to be used in "libtool -version-info ARG". -QPID_CURRENT=1 -QPID_REVISION=0 -QPID_AGE=1 -LIBTOOL_VERSION_INFO_ARG=$QPID_CURRENT:$QPID_REVISION:$QPID_AGE -AC_SUBST(LIBTOOL_VERSION_INFO_ARG) - gl_CLOCK_TIME # We use valgrind for the tests. See if it's available. diff --git a/lib/Makefile.am b/lib/Makefile.am index ab72d96..8f0301b 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -34,8 +34,10 @@ msgstore_la_LIBADD = \ $(LIB_CLOCK_GETTIME) \ $(QPID_LIBS) -msgstore_la_LDFLAGS = \ - $(PLUGINLDFLAGS) +MSGSTORE_VERSION_INFO = 1:0:0 +msgstore_la_LDFLAGS = \ + $(PLUGINLDFLAGS) \ + -version-info $(MSGSTORE_VERSION_INFO) msgstore_la_SOURCES = \ StorePlugin.cpp \ -- 1.7.3.4 From 01305c0b44a6167ca587ddd940361bd623677564 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 18 Jun 2010 14:06:28 +0000 Subject: [PATCH 12/24] Removed the lib version info from the previous checkin git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4036 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/Makefile.am | 10 ++++------ 1 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/Makefile.am b/lib/Makefile.am index 8f0301b..95428f1 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -27,19 +27,17 @@ PLUGINLDFLAGS=-no-undefined -module -avoid-version dmoduledir=$(libdir)/qpid/daemon dmodule_LTLIBRARIES = msgstore.la -msgstore_la_LIBADD = \ +msgstore_la_LIBADD = \ $(APR_LIBS) \ $(LIB_DLOPEN) \ $(LIB_BERKELEY_DB) \ $(LIB_CLOCK_GETTIME) \ $(QPID_LIBS) -MSGSTORE_VERSION_INFO = 1:0:0 -msgstore_la_LDFLAGS = \ - $(PLUGINLDFLAGS) \ - -version-info $(MSGSTORE_VERSION_INFO) +msgstore_la_LDFLAGS = \ + $(PLUGINLDFLAGS) -msgstore_la_SOURCES = \ +msgstore_la_SOURCES = \ StorePlugin.cpp \ BindingDbt.cpp \ BufferValue.cpp \ -- 1.7.3.4 From fe4143cc7226143cb3eb025efcf0e6a8d873866d Mon Sep 17 00:00:00 2001 From: aconway Date: Mon, 28 Jun 2010 18:18:31 +0000 Subject: [PATCH 13/24] Bug 607748 - Crash on exit in store cluster tests. This is an order-of-static-destructors problem. This is an order-of-static-destructors problem. Fixed by having the store use the broker's Timer. This ensures orderly shut down as the brokers destructor will destroy the store first and then the timer. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4053 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/JournalImpl.cpp | 31 ++++++---------------------- lib/JournalImpl.h | 17 ++++++++++----- lib/MessageStoreImpl.cpp | 9 ++++--- lib/MessageStoreImpl.h | 7 +++++- lib/StorePlugin.cpp | 2 +- tests/OrderingTest.cpp | 7 ++++- tests/SimpleTest.cpp | 45 ++++++++++++++++++++++------------------- tests/TransactionalTest.cpp | 9 +++++-- tests/TwoPhaseCommitTest.cpp | 9 +++++-- 9 files changed, 71 insertions(+), 65 deletions(-) diff --git a/lib/JournalImpl.cpp b/lib/JournalImpl.cpp index 5e1ed7a..962125b 100644 --- a/lib/JournalImpl.cpp +++ b/lib/JournalImpl.cpp @@ -33,6 +33,7 @@ #include "qmf/com/redhat/rhm/store/EventFull.h" #include "qmf/com/redhat/rhm/store/EventRecovered.h" #include "qpid/sys/Monitor.h" +#include "qpid/sys/Timer.h" #include "StoreException.h" using namespace mrg::msgstore; @@ -40,15 +41,12 @@ using namespace mrg::journal; using qpid::management::ManagementAgent; namespace _qmf = qmf::com::redhat::rhm::store; -qpid::sys::Mutex JournalImpl::_static_lock; -qpid::sys::Timer* JournalImpl::journalTimerPtr = 0; -u_int32_t JournalImpl::cnt = 0; - void InactivityFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_ife_lock); if (_parent) _parent->flushFire(); } void GetEventsFireEvent::fire() { qpid::sys::Mutex::ScopedLock sl(_gefe_lock); if (_parent) _parent->getEventsFire(); } -JournalImpl::JournalImpl(const std::string& journalId, +JournalImpl::JournalImpl(qpid::sys::Timer& timer_, + const std::string& journalId, const std::string& journalDirectory, const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, @@ -56,6 +54,7 @@ JournalImpl::JournalImpl(const std::string& journalId, qpid::management::ManagementAgent* a, DeleteCallback onDelete): jcntl(journalId, journalDirectory, journalBaseFilename), + timer(timer_), getEventsTimerSetFlag(false), lastReadRid(0), writeActivityFlag(false), @@ -72,13 +71,8 @@ JournalImpl::JournalImpl(const std::string& journalId, getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout); inactivityFireEventPtr = new InactivityFireEvent(this, flushTimeout); { - qpid::sys::Mutex::ScopedLock sl(_static_lock); - if (journalTimerPtr == 0) - journalTimerPtr = new qpid::sys::Timer; - assert (journalTimerPtr != 0); - cnt++; - journalTimerPtr->start(); - journalTimerPtr->add(inactivityFireEventPtr); + timer.start(); + timer.add(inactivityFireEventPtr); } if (_agent != 0) @@ -119,15 +113,6 @@ JournalImpl::~JournalImpl() inactivityFireEventPtr->cancel(); free_read_buffers(); - { - qpid::sys::Mutex::ScopedLock sl(_static_lock); - if (journalTimerPtr && --cnt == 0) - { - delete journalTimerPtr; - journalTimerPtr = 0; - } - } - if (_mgmtObject != 0) { _mgmtObject->resourceDestroy(); _mgmtObject = 0; @@ -564,9 +549,7 @@ JournalImpl::flushFire() } inactivityFireEventPtr->setupNextFire(); { - qpid::sys::Mutex::ScopedLock sl(_static_lock); - assert(journalTimerPtr != 0); - journalTimerPtr->add(inactivityFireEventPtr); + timer.add(inactivityFireEventPtr); } } diff --git a/lib/JournalImpl.h b/lib/JournalImpl.h index aab8467..b85cf02 100644 --- a/lib/JournalImpl.h +++ b/lib/JournalImpl.h @@ -37,6 +37,10 @@ #include "qpid/management/Manageable.h" #include "qmf/com/redhat/rhm/store/Journal.h" +namespace qpid { namespace sys { +class Timer; +}} + namespace mrg { namespace msgstore { @@ -75,9 +79,9 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal private: static qpid::sys::Mutex _static_lock; - static qpid::sys::Timer* journalTimerPtr; static u_int32_t cnt; + qpid::sys::Timer& timer; bool getEventsTimerSetFlag; boost::intrusive_ptr getEventsFireEventsPtr; qpid::sys::Mutex _getf_lock; @@ -102,7 +106,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal public: - JournalImpl(const std::string& journalId, + JournalImpl(qpid::sys::Timer& timer, + const std::string& journalId, const std::string& journalDirectory, const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, @@ -219,9 +224,8 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal inline void setGetEventTimer() { - assert(journalTimerPtr != 0); getEventsFireEventsPtr->setupNextFire(); - journalTimerPtr->add(getEventsFireEventsPtr); + timer.add(getEventsFireEventsPtr); getEventsTimerSetFlag = true; } void handleIoResult(const mrg::journal::iores r); @@ -239,13 +243,14 @@ class JournalImpl : public qpid::broker::ExternalQueueStore, public mrg::journal class TplJournalImpl : public JournalImpl { public: - TplJournalImpl(const std::string& journalId, + TplJournalImpl(qpid::sys::Timer& timer, + const std::string& journalId, const std::string& journalDirectory, const std::string& journalBaseFilename, const qpid::sys::Duration getEventsTimeout, const qpid::sys::Duration flushTimeout, qpid::management::ManagementAgent* agent) : - JournalImpl(journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) + JournalImpl(timer, journalId, journalDirectory, journalBaseFilename, getEventsTimeout, flushTimeout, agent) {} ~TplJournalImpl() {} diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 5f98055..e4f98b5 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -62,7 +62,7 @@ MessageStoreImpl::TplRecoverStruct::TplRecoverStruct(const u_int64_t _rid, tpc_flag(_tpc_flag) {} -MessageStoreImpl::MessageStoreImpl(const char* envpath) : +MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) : numJrnlFiles(0), autoJrnlExpand(false), autoJrnlExpandMaxFiles(0), @@ -77,6 +77,7 @@ MessageStoreImpl::MessageStoreImpl(const char* envpath) : highestRid(0), isInit(false), envPath(envpath), + timer(timer_), mgmtObject(0), agent(0) {} @@ -339,7 +340,7 @@ void MessageStoreImpl::init() open(mappingDb, txn.get(), "mappings.db", true); open(bindingDb, txn.get(), "bindings.db", true); open(generalDb, txn.get(), "general.db", false); - tplStorePtr.reset(new TplJournalImpl("TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent)); + tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent)); txn.commit(); } catch (const journal::jexception& e) { QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); @@ -479,7 +480,7 @@ void MessageStoreImpl::create(PersistableQueue& queue, return; } - jQueue = new JournalImpl(queue.getName(), getJrnlDir(queue), std::string("JournalData"), + jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { @@ -763,7 +764,7 @@ void MessageStoreImpl::recoverQueues(TxnCtxt& txn, QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue."); break; } - jQueue = new JournalImpl(queueName, getJrnlHashDir(queueName), std::string("JournalData"), + jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"), defJournalGetEventsTimeout, defJournalFlushTimeout, agent, boost::bind(&MessageStoreImpl::journalDeleted, this, _1)); { diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 2659f32..8e46dd2 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -45,6 +45,10 @@ #define DB_BUFFER_SMALL ENOMEM #endif +namespace qpid { namespace sys { +class Timer; +}} + namespace mrg { namespace msgstore { @@ -147,6 +151,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem u_int64_t highestRid; bool isInit; const char* envPath; + qpid::sys::Timer& timer; qmf::com::redhat::rhm::store::Store* mgmtObject; qpid::management::ManagementAgent* agent; @@ -266,7 +271,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem public: typedef boost::shared_ptr shared_ptr; - MessageStoreImpl(const char* envpath = 0); + MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0); virtual ~MessageStoreImpl(); diff --git a/lib/StorePlugin.cpp b/lib/StorePlugin.cpp index 0fb3512..8231bd6 100644 --- a/lib/StorePlugin.cpp +++ b/lib/StorePlugin.cpp @@ -43,7 +43,7 @@ struct StorePlugin : public Plugin { { Broker* broker = dynamic_cast(&target); if (!broker) return; - boost::shared_ptr store(new mrg::msgstore::MessageStoreImpl ()); + boost::shared_ptr store(new mrg::msgstore::MessageStoreImpl (broker->getTimer())); DataDir& dataDir = broker->getDataDir (); if (options.storeDir.empty ()) { diff --git a/tests/OrderingTest.cpp b/tests/OrderingTest.cpp index 16f88d0..10fda1d 100644 --- a/tests/OrderingTest.cpp +++ b/tests/OrderingTest.cpp @@ -30,6 +30,9 @@ #include #include #include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::sys::Timer timer; #define SET_LOG_LEVEL(level) \ qpid::log::Options opts(""); \ @@ -59,7 +62,7 @@ int counter = 1; void setup() { - store = std::auto_ptr(new MessageStoreImpl()); + store = std::auto_ptr(new MessageStoreImpl(timer)); store->init(test_dir, 4, 1, true); // truncate store queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0)); @@ -98,7 +101,7 @@ void restart() queue.reset(); store.reset(); - store = std::auto_ptr(new MessageStoreImpl()); + store = std::auto_ptr(new MessageStoreImpl(timer)); store->init(test_dir, 4, 1); ExchangeRegistry exchanges; LinkRegistry links; diff --git a/tests/SimpleTest.cpp b/tests/SimpleTest.cpp index 4d5f155..c62869d 100644 --- a/tests/SimpleTest.cpp +++ b/tests/SimpleTest.cpp @@ -32,6 +32,9 @@ #include #include #include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::sys::Timer timer; #define SET_LOG_LEVEL(level) \ qpid::log::Options opts(""); \ @@ -92,7 +95,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName, const string& key, const FieldTable& args) { { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0)); @@ -102,7 +105,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName, store.bind(*exchange, *queue, key, args); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry exchanges; QueueRegistry queues; @@ -121,7 +124,7 @@ void bindAndUnbind(const string& exchangeName, const string& queueName, store.unbind(*exchange, *queue, key, args); } { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry exchanges; QueueRegistry queues; @@ -148,7 +151,7 @@ QPID_AUTO_TEST_CASE(CreateDelete) SET_LOG_LEVEL("error+"); // This only needs to be set once. cout << test_filename << ".CreateDelete: " << flush; - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store string name("CreateDeleteQueue"); Queue queue(name, 0, &store, 0); @@ -164,7 +167,7 @@ QPID_AUTO_TEST_CASE(CreateDelete) QPID_AUTO_TEST_CASE(EmptyRecover) { cout << test_filename << ".EmptyRecover: " << flush; - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store QueueRegistry registry; registry.setStore (&store); @@ -181,7 +184,7 @@ QPID_AUTO_TEST_CASE(QueueCreate) uint64_t id(0); string name("MyDurableQueue"); { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Queue queue(name, 0, &store, 0); store.create(queue, qpid::framing::FieldTable()); @@ -189,7 +192,7 @@ QPID_AUTO_TEST_CASE(QueueCreate) id = queue.getPersistenceId(); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); QueueRegistry registry; registry.setStore (&store); @@ -209,7 +212,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings) std::auto_ptr policy( QueuePolicy::createQueuePolicy(101, 202)); string name("MyDurableQueue"); { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Queue queue(name, 0, &store, 0); FieldTable settings; @@ -218,7 +221,7 @@ QPID_AUTO_TEST_CASE(QueueCreateWithSettings) BOOST_REQUIRE(queue.getPersistenceId()); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); QueueRegistry registry; registry.setStore (&store); @@ -239,14 +242,14 @@ QPID_AUTO_TEST_CASE(QueueDestroy) string name("MyDurableQueue"); { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Queue queue(name, 0, &store, 0); store.create(queue, qpid::framing::FieldTable()); store.destroy(queue); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); QueueRegistry registry; registry.setStore (&store); @@ -272,7 +275,7 @@ QPID_AUTO_TEST_CASE(Enqueue) string data1("abcdefg"); string data2("hijklmn"); { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); FieldTable settings; @@ -290,7 +293,7 @@ QPID_AUTO_TEST_CASE(Enqueue) queue->enqueue(0, msg); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); QueueRegistry registry; registry.setStore (&store); @@ -331,7 +334,7 @@ QPID_AUTO_TEST_CASE(Dequeue) string routingKey("MyRoutingKey"); Uuid messageId(true); string data("abcdefg"); - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Queue::shared_ptr queue(new Queue(name, 0, &store, 0)); FieldTable settings; @@ -347,7 +350,7 @@ QPID_AUTO_TEST_CASE(Dequeue) queue->dequeue(0, qm); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); QueueRegistry registry; registry.setStore (&store); @@ -370,7 +373,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) FieldTable args; args.setString("a", "A"); { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store ExchangeRegistry registry; Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first; @@ -379,7 +382,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) BOOST_REQUIRE(id); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry registry; @@ -393,7 +396,7 @@ QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy) store.destroy(*exchange); } { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry registry; @@ -441,7 +444,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) string key("my-routing-key"); FieldTable args; { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1, true); // truncate store Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args)); Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0)); @@ -455,7 +458,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) store.destroy(*queue1); }//db will be closed { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry exchanges; QueueRegistry queues; @@ -472,7 +475,7 @@ QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind) store.destroy(*exchange); } { - MessageStoreImpl store; + MessageStoreImpl store(timer); store.init(test_dir, 4, 1); ExchangeRegistry exchanges; QueueRegistry queues; diff --git a/tests/TransactionalTest.cpp b/tests/TransactionalTest.cpp index d6f6d7f..ac5a6b6 100644 --- a/tests/TransactionalTest.cpp +++ b/tests/TransactionalTest.cpp @@ -32,6 +32,9 @@ #include "qpid/framing/AMQHeaderBody.h" #include "qpid/log/Statement.h" #include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::sys::Timer timer; #define SET_LOG_LEVEL(level) \ qpid::log::Options opts(""); \ @@ -69,7 +72,7 @@ class TestTxnCtxt : public TxnCtxt class TestMessageStore: public MessageStoreImpl { public: - TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {} + TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {} std::auto_ptr begin() { checkInit(); // pass sequence number for c/a @@ -109,7 +112,7 @@ Queue::shared_ptr queueB; template void setup() { - store = std::auto_ptr(new T()); + store = std::auto_ptr(new T(timer)); store->init(test_dir, 4, 1, true); // truncate store //create two queues: @@ -128,7 +131,7 @@ void restart() queues.reset(); store.reset(); - store = std::auto_ptr(new T()); + store = std::auto_ptr(new T(timer)); store->init(test_dir, 4, 1); queues = std::auto_ptr(new QueueRegistry); ExchangeRegistry exchanges; diff --git a/tests/TwoPhaseCommitTest.cpp b/tests/TwoPhaseCommitTest.cpp index 86d3976..f442310 100644 --- a/tests/TwoPhaseCommitTest.cpp +++ b/tests/TwoPhaseCommitTest.cpp @@ -32,6 +32,9 @@ #include "qpid/log/Statement.h" #include "TxnCtxt.h" #include "qpid/log/Logger.h" +#include "qpid/sys/Timer.h" + +qpid::sys::Timer timer; #define SET_LOG_LEVEL(level) \ qpid::log::Options opts(""); \ @@ -182,7 +185,7 @@ class TwoPhaseCommitTest class TestMessageStore: public MessageStoreImpl { public: - TestMessageStore(const char* envpath = 0) : MessageStoreImpl(envpath) {} + TestMessageStore(qpid::sys::Timer& timer, const char* envpath = 0) : MessageStoreImpl(timer, envpath) {} std::auto_ptr begin(const std::string& xid) { checkInit(); IdSequence* jtx = &messageIdSequence; @@ -325,7 +328,7 @@ class TwoPhaseCommitTest template void setup() { - store = std::auto_ptr(new T()); + store = std::auto_ptr(new T(timer)); store->init(test_dir, 4, 1, true); // truncate store //create two queues: @@ -353,7 +356,7 @@ class TwoPhaseCommitTest queues.reset(); links.reset(); - store = std::auto_ptr(new T()); + store = std::auto_ptr(new T(timer)); store->init(test_dir, 4, 1); sys::Timer t; ExchangeRegistry exchanges; -- 1.7.3.4 From fe15a7208f3659c06b0992ea9fcc1eac6b2d1d18 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Tue, 20 Jul 2010 17:07:24 +0000 Subject: [PATCH 14/24] Bug 614944 - "qpidd broker crash in mrg::msgstore::TxnCtxt::abort() -> DbTxn::abort()": Fix part 1, which improves the exception handling so that there is no throw within a catch. This will not change the probability of occurance of this bug, but the logs and exception message outcome will be different. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4133 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 126 ++++++++++++++++++++++++--------------------- lib/MessageStoreImpl.h | 1 + 2 files changed, 68 insertions(+), 59 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index e4f98b5..37456ea 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -306,57 +306,61 @@ bool MessageStoreImpl::init(const std::string& dir, void MessageStoreImpl::init() { - journal::jdir::create_dir(getBdbBaseDir()); - - try { - dbenv.reset(new DbEnv(0)); - dbenv->set_errpfx("msgstore"); - dbenv->set_lg_regionmax(256000); // default = 65000 - dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0); - } catch (const DbException& e) { - if (e.get_errno() == DB_VERSION_MISMATCH) - THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of bd4 does not match that which created the store database. " - "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using " - "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e); - THROW_STORE_EXCEPTION_2("Error opening environment", e); - } - - TxnCtxt txn; - try { - // Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used - // against the database environment. Recover can only be performed if no databases have been created - // against the environment at the time of recovery, as recovery invalidates the environment. - queueDb.reset(new Db(dbenv.get(), 0)); - configDb.reset(new Db(dbenv.get(), 0)); - exchangeDb.reset(new Db(dbenv.get(), 0)); - mappingDb.reset(new Db(dbenv.get(), 0)); - bindingDb.reset(new Db(dbenv.get(), 0)); - generalDb.reset(new Db(dbenv.get(), 0)); - - txn.begin(dbenv.get(), false); - open(queueDb, txn.get(), "queues.db", false); - open(configDb, txn.get(), "config.db", false); - open(exchangeDb, txn.get(), "exchanges.db", false); - open(mappingDb, txn.get(), "mappings.db", true); - open(bindingDb, txn.get(), "bindings.db", true); - open(generalDb, txn.get(), "general.db", false); - tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent)); - txn.commit(); - } catch (const journal::jexception& e) { - QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); - txn.abort(); - THROW_STORE_EXCEPTION_2("Error opening tplStore instance", e.what()); - } catch (const DbException& e) { - QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what()); - txn.abort(); - THROW_STORE_EXCEPTION_2("Error opening databases", e); - } catch (...) { - QPID_LOG(error, "Unknown exception occurred while initializing store."); - txn.abort(); - throw; - } - - isInit = true; + try { + journal::jdir::create_dir(getBdbBaseDir()); + + dbenv.reset(new DbEnv(0)); + dbenv->set_errpfx("msgstore"); + dbenv->set_lg_regionmax(256000); // default = 65000 + dbenv->open(getBdbBaseDir().c_str(), DB_THREAD | DB_CREATE | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_USE_ENVIRON | DB_RECOVER, 0); + + // Databases are constructed here instead of the constructor so that the DB_RECOVER flag can be used + // against the database environment. Recover can only be performed if no databases have been created + // against the environment at the time of recovery, as recovery invalidates the environment. + queueDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(queueDb); + configDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(configDb); + exchangeDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(exchangeDb); + mappingDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(mappingDb); + bindingDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(bindingDb); + generalDb.reset(new Db(dbenv.get(), 0)); + dbs.push_back(generalDb); + + TxnCtxt txn; + txn.begin(dbenv.get(), false); + try { + open(queueDb, txn.get(), "queues.db", false); + open(configDb, txn.get(), "config.db", false); + open(exchangeDb, txn.get(), "exchanges.db", false); + open(mappingDb, txn.get(), "mappings.db", true); + open(bindingDb, txn.get(), "bindings.db", true); + open(generalDb, txn.get(), "general.db", false); + txn.commit(); + } catch (...) { txn.abort(); throw; } + + tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, agent)); + isInit = true; + } catch (const DbException& e) { + if (e.get_errno() == DB_VERSION_MISMATCH) + { + QPID_LOG(error, "Database environment mismatch: This version of db4 does not match that which created the store database.: " << e.what()); + THROW_STORE_EXCEPTION_2("Database environment mismatch: This version of db4 does not match that which created the store database. " + "(If recovery is not important, delete the contents of the store directory. Otherwise, try upgrading the database using " + "db_upgrade or using db_recover - but the db4-utils package must also be installed to use these utilities.)", e); + } + QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what()); + THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); + } catch (const journal::jexception& e) { + QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); + THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); + } catch (...) { + QPID_LOG(error, "Unknown exception occurred while initializing store."); + throw; + } } void MessageStoreImpl::finalize() @@ -389,20 +393,19 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles) THROW_STORE_EXCEPTION(oss.str()); } } - for (std::list::iterator i = dbs.begin(); i != dbs.end(); i++) { - (*i)->close(0); - } + closeDbs(); dbs.clear(); if (tplStorePtr->is_ready()) tplStorePtr->stop(true); dbenv->close(0); + isInit = false; } std::ostringstream oss; oss << storeDir << "/" << storeTopLevelDir; if (pushDownStoreFiles) { QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down into directory " << mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster") << "."); } else { - QPID_LOG(notice, "Store directory " << oss.str() << " was truncated."); mrg::journal::jdir::delete_dir(oss.str().c_str()); + QPID_LOG(notice, "Store directory " << oss.str() << " was truncated."); } init(); } @@ -425,16 +428,21 @@ void MessageStoreImpl::open(db_ptr db, { if(dupKey) db->set_flags(DB_DUPSORT); db->open(txn, file, 0, DB_BTREE, DB_CREATE | DB_THREAD, 0); - dbs.push_back(db); +} + +void MessageStoreImpl::closeDbs() +{ + for (std::list::iterator i = dbs.begin(); i != dbs.end(); i++) { + (*i)->close(0); + } + dbs.clear(); } MessageStoreImpl::~MessageStoreImpl() { finalize(); try { - for (std::list::iterator i = dbs.begin(); i != dbs.end(); i++) { - (*i)->close(0); - } + closeDbs(); } catch (const DbException& e) { QPID_LOG(error, "Error closing BDB databases: " << e.what()); } catch (const journal::jexception& e) { diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 8e46dd2..99d8ed2 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -240,6 +240,7 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem DbTxn* txn, const char* file, bool dupKey); + void closeDbs(); // journal functions void createJrnlQueue(const qpid::broker::PersistableQueue& queue); -- 1.7.3.4 From 5fd176ac826097d7485e896dcf8cfe919dd89a5c Mon Sep 17 00:00:00 2001 From: kpvdr Date: Tue, 10 Aug 2010 17:33:04 +0000 Subject: [PATCH 15/24] Fix for BZ 620676 - "Store resize operation fails with large messages (greater than store file size)". Also included new resize tests that would catch this bug. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4196 06e15bec-b515-0410-bef0-cc27a458cf48 --- tests/python_tests/__init__.py | 1 + tests/python_tests/client_persistence.py | 30 +++--- tests/python_tests/resize.py | 169 ++++++++++++++++++++++++++++++ tests/python_tests/store_test.py | 6 +- tests/run_python_tests | 2 +- tools/janal.py | 6 +- tools/resize | 2 +- 7 files changed, 194 insertions(+), 22 deletions(-) create mode 100644 tests/python_tests/resize.py diff --git a/tests/python_tests/__init__.py b/tests/python_tests/__init__.py index d40cb00..917dc79 100644 --- a/tests/python_tests/__init__.py +++ b/tests/python_tests/__init__.py @@ -23,3 +23,4 @@ from client_persistence import * from flow_to_disk import * +from resize import * diff --git a/tests/python_tests/client_persistence.py b/tests/python_tests/client_persistence.py index dc197dc..e7a253d 100644 --- a/tests/python_tests/client_persistence.py +++ b/tests/python_tests/client_persistence.py @@ -33,20 +33,20 @@ class ExchangeQueueTests(StoreTest): def test_direct_exchange(self): """Test Direct exchange.""" - broker = self.broker(store_args(), name="testDirectExchange", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_direct_exchange", expect=EXPECT_EXIT_OK) msg1 = Message("A_Message1", durable=True, correlation_id="Msg0001") msg2 = Message("B_Message1", durable=True, correlation_id="Msg0002") broker.send_message("a", msg1) broker.send_message("b", msg2) broker.terminate() - broker = self.broker(store_args(), name="testDirectExchange") + broker = self.broker(store_args(), name="test_direct_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg2, True) def test_topic_exchange(self): """Test Topic exchange.""" - broker = self.broker(store_args(), name="testTopicExchange", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_topic_exchange", expect=EXPECT_EXIT_OK) ssn = broker.connect().session() snd1 = ssn.sender("abc/key1; {create:always, node:{type:topic, durable:True}}") snd2 = ssn.sender("abc/key2; {create:always, node:{type:topic, durable:True}}") @@ -62,7 +62,7 @@ class ExchangeQueueTests(StoreTest): snd2.send(msg2) broker.terminate() - broker = self.broker(store_args(), name="testTopicExchange") + broker = self.broker(store_args(), name="test_topic_exchange") self.check_message(broker, "a", msg1, True) self.check_message(broker, "b", msg1, True) self.check_messages(broker, "c", [msg1, msg2], True) @@ -72,7 +72,7 @@ class ExchangeQueueTests(StoreTest): def test_lvq(self): """Test LVQ.""" - broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ma1 = Message("A1", durable=True, correlation_id="Msg0005", properties={"qpid.LVQ_key":"A"}) ma2 = Message("A2", durable=True, correlation_id="Msg0006", properties={"qpid.LVQ_key":"A"}) mb1 = Message("B1", durable=True, correlation_id="Msg0007", properties={"qpid.LVQ_key":"B"}) @@ -83,7 +83,7 @@ class ExchangeQueueTests(StoreTest): xprops="arguments:{\"qpid.last_value_queue\":True}") broker.terminate() - broker = self.broker(store_args(), name="testLVQ", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_lvq", expect=EXPECT_EXIT_OK) ssn = self.check_messages(broker, "lvq-test", [ma2, mb3, mc1], empty=True, ack=False) # Add more messages while subscriber is active (no replacement): ma3 = Message("A3", durable=True, correlation_id="Msg0011", properties={"qpid.LVQ_key":"A"}) @@ -95,12 +95,12 @@ class ExchangeQueueTests(StoreTest): ssn.acknowledge() broker.terminate() - broker = self.broker(store_args(), name="testLVQ") + broker = self.broker(store_args(), name="test_lvq") self.check_messages(broker, "lvq-test", [mc4, ma4], True) def test_fanout_exchange(self): """Test Fanout Exchange""" - broker = self.broker(store_args(), name="testFanout", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_fanout_exchange", expect=EXPECT_EXIT_OK) ssn = broker.connect().session() snd = ssn.sender("TestFanoutExchange; {create: always, node: {type: topic, x-declare: {type: fanout}}}") ssn.receiver("TestFanoutExchange; {link: {name: \"q1\", durable: True, reliability:at-least-once}}") @@ -112,7 +112,7 @@ class ExchangeQueueTests(StoreTest): snd.send(msg2) broker.terminate() - broker = self.broker(store_args(), name="testFanout") + broker = self.broker(store_args(), name="test_fanout_exchange") self.check_messages(broker, "q1", [msg1, msg2], True) self.check_messages(broker, "q2", [msg1, msg2], True) self.check_messages(broker, "q3", [msg1, msg2], True) @@ -125,14 +125,14 @@ class AlternateExchagePropertyTests(StoreTest): def test_exchange(self): """Exchange alternate exchange property persistence test""" - broker = self.broker(store_args(), name="testExchangeBroker", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_exchange", expect=EXPECT_EXIT_OK) qmf = Qmf(broker) qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance qmf.add_exchange("testExch", "direct", durable=True, alt_exchange_name="altExch") qmf.close() broker.terminate() - broker = self.broker(store_args(), name="testExchangeBroker") + broker = self.broker(store_args(), name="test_exchange") qmf = Qmf(broker) try: qmf.add_exchange("altExch", "direct", passive=True) @@ -148,14 +148,14 @@ class AlternateExchagePropertyTests(StoreTest): def test_queue(self): """Queue alternate exchange property persistexchangeNamece test""" - broker = self.broker(store_args(), name="testQueueBroker", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_queue", expect=EXPECT_EXIT_OK) qmf = Qmf(broker) qmf.add_exchange("altExch", "direct", durable=True) # Serves as alternate exchange instance qmf.add_queue("testQueue", durable=True, alt_exchange_name="altExch") qmf.close() broker.terminate() - broker = self.broker(store_args(), name="testQueueBroker") + broker = self.broker(store_args(), name="test_queue") qmf = Qmf(broker) try: qmf.add_exchange("altExch", "direct", passive=True) @@ -177,13 +177,13 @@ class RedeliveredTests(StoreTest): def test_broker_recovery(self): """Test that the redelivered flag is set on messages after recovery of broker""" - broker = self.broker(store_args(), name="testAfterRecover", expect=EXPECT_EXIT_OK) + broker = self.broker(store_args(), name="test_broker_recovery", expect=EXPECT_EXIT_OK) msg_content = "xyz"*100 msg = Message(msg_content, durable=True) broker.send_message("testQueue", msg) broker.terminate() - broker = self.broker(store_args(), name="testAfterRecover") + broker = self.broker(store_args(), name="test_broker_recovery") rcv_msg = broker.get_message("testQueue") self.assertEqual(msg_content, rcv_msg.content) self.assertTrue(rcv_msg.redelivered) diff --git a/tests/python_tests/resize.py b/tests/python_tests/resize.py new file mode 100644 index 0000000..acef3b7 --- /dev/null +++ b/tests/python_tests/resize.py @@ -0,0 +1,169 @@ +""" +Copyright (c) 2008 Red Hat, Inc. + +This file is part of the Qpid async store library msgstore.so. + +This library is free software; you can redistribute it and/or +modify it under the terms of the GNU Lesser General Public +License as published by the Free Software Foundation; either +version 2.1 of the License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public +License along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 + USA + +The GNU Lesser General Public License is available in the file COPYING. +""" + +import glob +import os +import subprocess + +from qpid.brokertest import EXPECT_EXIT_OK +from qpid.datatypes import uuid4 +from store_test import StoreTest, store_args +from qpid.messaging import Message + +class ResizeTest(StoreTest): + + resize_tool = os.getenv("RESIZE_TOOL", "../../../tools/resize") + + def _resize_store(self, store_dir, queue_name, resize_num_files, resize_file_size, exp_fail): + for f in glob.glob(os.path.join(store_dir, "*")): + final_store_dir = os.path.join(f, queue_name) + p = subprocess.Popen([self.resize_tool, final_store_dir, "--num-jfiles", str(resize_num_files), + "--jfile-size-pgs", str(resize_file_size), "--quiet"], stdout = subprocess.PIPE, + stderr = subprocess.STDOUT) + res = p.wait() + err_found = False + try: + for l in p.stdout: + if exp_fail: + err_found = True + print "[Expected error]:", + print l, + finally: + p.stdout.close() + return res + + def _resize_test(self, queue_name, num_msgs, msg_size, resize_num_files, resize_file_size, init_num_files = 8, + init_file_size = 24, exp_fail = False, wait_time = None): + # Using a sender will force the creation of an empty persistent queue which is needed for some tests + broker = self.broker(store_args(), name="broker", expect=EXPECT_EXIT_OK, wait=wait_time) + ssn = broker.connect().session() + snd = ssn.sender("%s; {create:always, node:{durable:True}}" % queue_name) + + msgs = [] + for index in range(0, num_msgs): + msg = Message(self.make_message(index, msg_size), durable=True, id=uuid4(), correlation_id="msg-%04d"%index) + msgs.append(msg) + snd.send(msg) + broker.terminate() + + res = self._resize_store(os.path.join(self.dir, "broker", "rhm", "jrnl"), queue_name, resize_num_files, + resize_file_size, exp_fail) + if res != 0: + if exp_fail: + return + self.fail("ERROR: Resize operation failed with return code %d" % res) + elif exp_fail: + self.fail("ERROR: Resize operation succeeded, but a failure was expected") + + broker = self.broker(store_args(), name="broker") + self.check_messages(broker, queue_name, msgs, True) + + +class SimpleTest(ResizeTest): + """ + Simple tests of the resize utility for resizing a journal to larger and smaller sizes. + """ + + def test_empty_store_same(self): + self._resize_test(queue_name = "empty_store_same", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 8, resize_file_size = 24) + + def test_empty_store_up(self): + self._resize_test(queue_name = "empty_store_up", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_empty_store_down(self): + self._resize_test(queue_name = "empty_store_down", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 6, resize_file_size = 12) + +# Put into long tests, make sure there is > 128GB free disk space +# def test_empty_store_max(self): +# self._resize_test(queue_name = "empty_store_max", +# num_msgs = 0, msg_size = 0, +# init_num_files = 8, init_file_size = 24, +# resize_num_files = 64, resize_file_size = 32768, +# wait_time = 120) + + def test_empty_store_min(self): + self._resize_test(queue_name = "empty_store_min", + num_msgs = 0, msg_size = 0, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 1) + + def test_basic_up(self): + self._resize_test(queue_name = "basic_up", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_basic_down(self): + self._resize_test(queue_name = "basic_down", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 15) + + def test_basic_low(self): + self._resize_test(queue_name = "basic_low", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 4, + exp_fail = True) + + def test_basic_under(self): + self._resize_test(queue_name = "basic_under", + num_msgs = 100, msg_size = 10000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 4, resize_file_size = 3, + exp_fail = True) + + def test_very_large_msg_up(self): + self._resize_test(queue_name = "very_large_msg_up", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 16, resize_file_size = 48) + + def test_very_large_msg_down(self): + self._resize_test(queue_name = "very_large_msg_down", + num_msgs = 4, msg_size = 2000000, + init_num_files = 16, init_file_size = 64, + resize_num_files = 16, resize_file_size = 48) + + def test_very_large_msg_low(self): + self._resize_test(queue_name = "very_large_msg_low", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 7, resize_file_size = 20, + exp_fail = True) + + def test_very_large_msg_under(self): + self._resize_test(queue_name = "very_large_msg_under", + num_msgs = 4, msg_size = 2000000, + init_num_files = 8, init_file_size = 24, + resize_num_files = 6, resize_file_size = 8, + exp_fail = True) diff --git a/tests/python_tests/store_test.py b/tests/python_tests/store_test.py index 87dcefa..d44d3f3 100644 --- a/tests/python_tests/store_test.py +++ b/tests/python_tests/store_test.py @@ -27,10 +27,12 @@ from qpid.messaging import Empty from qmf.console import Session -def store_args(): +def store_args(store_dir = None): """Return the broker args necessary to load the async store""" assert BrokerTest.store_lib - return ["--load-module", BrokerTest.store_lib] + if store_dir == None: + return ["--load-module", BrokerTest.store_lib] + return ["--load-module", BrokerTest.store_lib, "--store-dir", store_dir] class Qmf: """ diff --git a/tests/run_python_tests b/tests/run_python_tests index cee0919..a9d1355 100755 --- a/tests/run_python_tests +++ b/tests/run_python_tests @@ -46,7 +46,7 @@ case x$1 in xLONG_TEST) DEFAULT_PYTHON_TESTS= ;; x) - DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1" ;; + DEFAULT_PYTHON_TESTS="*.client_persistence.* *.flow_to_disk.SimpleMaxSizeCountTest.* *.flow_to_disk.MultiDurableQueue*.test_mixed_limit_1 *.flow_to_disk.MultiQueue*.test_mixed_limit_1 *.resize.SimpleTest.*" ;; *) DEFAULT_PYTHON_TESTS=$1 esac diff --git a/tools/janal.py b/tools/janal.py index 8d01d5e..8b5f687 100644 --- a/tools/janal.py +++ b/tools/janal.py @@ -408,11 +408,11 @@ class JrnlReader(object): return self._txn_msg_cnt def txn_obj_list(self): - """Get a cululative list of transaction objects (commits and aborts)""" + """Get a cumulative list of transaction objects (commits and aborts)""" return self._txn_obj_list def _advance_jrnl_file(self, *oldest_file_info): - """Rotate to using the next journal file. Return False if the operation was sucessful, True if there are no + """Rotate to using the next journal file. Return False if the operation was successful, True if there are no more files to read.""" fro_seek_flag = False if len(oldest_file_info) > 0: @@ -454,7 +454,7 @@ class JrnlReader(object): def _check_owi(self, hdr): """Return True if the header's owi indicator matches that of the file header record; False otherwise. This can - indicate wheher the last record in a file has been read and now older records which have not yet been + indicate whether the last record in a file has been read and now older records which have not yet been overwritten are now being read.""" return self._file_hdr_owi == hdr.owi() diff --git a/tools/resize b/tools/resize index ccaa1bc..3ac69e3 100755 --- a/tools/resize +++ b/tools/resize @@ -155,7 +155,7 @@ class Resize(object): if self._file == None: rid = hdr.rid elif len(rid_list) == 0: - rid = None + rid = 0 else: rid = rid_list[0] if not self._rotate_file(rid, fro): -- 1.7.3.4 From f1d76e8f480d83ffa6a1279dfe16614e204faad6 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Wed, 11 Aug 2010 17:07:24 +0000 Subject: [PATCH 16/24] Fix for Bug 622889 - "Store resize operation causes Qpid broker recovery to fail with JERR_FCNTL_RDOFFSOVFL" A partly written journal (caused by a failure in the resize utility - fixed under another BZ) was recovered, and caused this failure. This showed up a corner case in which a large single recored spanning more than one journal file was partly written. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4199 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/jrnl/jcntl.hpp | 1 + lib/jrnl/rmgr.cpp | 3 +++ tests/python_tests/resize.py | 4 +++- 3 files changed, 7 insertions(+), 1 deletions(-) diff --git a/lib/jrnl/jcntl.hpp b/lib/jrnl/jcntl.hpp index e87e2dd..f048a87 100644 --- a/lib/jrnl/jcntl.hpp +++ b/lib/jrnl/jcntl.hpp @@ -648,6 +648,7 @@ namespace journal void chk_wr_frot(); inline u_int32_t unflushed_dblks() { return _wmgr.unflushed_dblks(); } void fhdr_wr_sync(const u_int16_t lid); + inline u_int32_t wr_subm_cnt_dblks(const u_int16_t lfid) const { return _lpmgr.get_fcntlp(lfid)->wr_subm_cnt_dblks(); } // Management instrumentation callbacks inline virtual void instr_incr_outstanding_aio_cnt() {} diff --git a/lib/jrnl/rmgr.cpp b/lib/jrnl/rmgr.cpp index 928e8d1..49bf0bb 100644 --- a/lib/jrnl/rmgr.cpp +++ b/lib/jrnl/rmgr.cpp @@ -289,6 +289,9 @@ rmgr::get_events(page_state state) _rrfc.add_cmpl_cnt_dblks(JRNL_SBLK_SIZE); u_int32_t fro_dblks = (_fhdr._fro / JRNL_DBLK_SIZE) - JRNL_SBLK_SIZE; + // Check fro_dblks does not exceed the write pointers which can happen in some corrupted journal recoveries + if (fro_dblks > _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE) + fro_dblks = _jc->wr_subm_cnt_dblks(_fhdr._pfid) - JRNL_SBLK_SIZE; _pg_cntr = fro_dblks / (JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE); u_int32_t tot_pg_offs_dblks = _pg_cntr * JRNL_RMGR_PAGE_SIZE * JRNL_SBLK_SIZE; _pg_index = _pg_cntr % JRNL_RMGR_PAGES; diff --git a/tests/python_tests/resize.py b/tests/python_tests/resize.py index acef3b7..8ffaefd 100644 --- a/tests/python_tests/resize.py +++ b/tests/python_tests/resize.py @@ -77,6 +77,8 @@ class ResizeTest(StoreTest): broker = self.broker(store_args(), name="broker") self.check_messages(broker, queue_name, msgs, True) + + # TODO: Check the physical files to check number and size are as expected. class SimpleTest(ResizeTest): @@ -102,7 +104,7 @@ class SimpleTest(ResizeTest): init_num_files = 8, init_file_size = 24, resize_num_files = 6, resize_file_size = 12) -# Put into long tests, make sure there is > 128GB free disk space +# TODO: Put into long tests, make sure there is > 128GB free disk space # def test_empty_store_max(self): # self._resize_test(queue_name = "empty_store_max", # num_msgs = 0, msg_size = 0, -- 1.7.3.4 From e8c4e5a1ee329a4229d997fb7c6f9130ff021cfc Mon Sep 17 00:00:00 2001 From: kpvdr Date: Mon, 9 Aug 2010 19:01:28 +0000 Subject: [PATCH 17/24] Minor change to tools which speeds up (somewhat) the resize and check_jrnl utilities git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4195 06e15bec-b515-0410-bef0-cc27a458cf48 --- tools/janal.py | 14 +++++++------- tools/resize | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tools/janal.py b/tools/janal.py index 8b5f687..7ced467 100644 --- a/tools/janal.py +++ b/tools/janal.py @@ -40,17 +40,17 @@ class EnqMap(object): def add(self, fid, hdr, lock = False): """Add a new record into the map""" - if hdr.rid in self.__map.keys(): + if hdr.rid in self.__map: raise jerr.DuplicateRidError(hdr.rid) self.__map[hdr.rid] = (fid, hdr, lock) def contains(self, rid): """Return True if the map contains the given rid""" - return rid in self.__map.keys() + return rid in self.__map def delete(self, rid): """Delete the rid and its associated data from the map""" - if rid in self.__map.keys(): + if rid in self.__map: if self.get_lock(rid): raise jerr.DeleteLockedRecordError(rid) del self.__map[rid] @@ -87,7 +87,7 @@ class EnqMap(object): def lock(self, rid): """Set the transaction lock for a given rid to True""" - if rid in self.__map.keys(): + if rid in self.__map: tup = self.__map[rid] if not tup[2]: self.__map[rid] = (tup[0], tup[1], True) @@ -126,7 +126,7 @@ class EnqMap(object): def unlock(self, rid): """Set the transaction lock for a given rid to False""" - if rid in self.__map.keys(): + if rid in self.__map: tup = self.__map[rid] if tup[2]: self.__map[rid] = (tup[0], tup[1], False) @@ -154,14 +154,14 @@ class TxnMap(object): """Add a new transactional record into the map""" if isinstance(hdr, jrnl.DeqRec): self.__emap.lock(hdr.deq_rid) - if hdr.xid in self.__map.keys(): + if hdr.xid in self.__map: self.__map[hdr.xid].append((fid, hdr)) # append to existing list else: self.__map[hdr.xid] = [(fid, hdr)] # create new list def contains(self, xid): """Return True if the xid exists in the map; False otherwise""" - return xid in self.__map.keys() + return xid in self.__map def delete(self, hdr): """Remove a transaction record from the map using either a commit or abort header""" diff --git a/tools/resize b/tools/resize index 3ac69e3..631e798 100755 --- a/tools/resize +++ b/tools/resize @@ -118,7 +118,7 @@ class Resize(object): hdr = tup[1] hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi master_record_list[long(hdr.rid)] = hdr - if hdr.xidsize > 0 and hdr.xid in txn_record_list.keys(): + if hdr.xidsize > 0 and hdr.xid in txn_record_list: txn_hdr = txn_record_list[hdr.xid] del(txn_record_list[hdr.xid]) txn_hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi -- 1.7.3.4 From 4f09175c0d8ca20a26c4cc1f58985814436c2803 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Thu, 12 Aug 2010 14:57:55 +0000 Subject: [PATCH 18/24] Fix for BZ623653 - "resize and store_chk tools do not correctly analyze some transactional records" git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4204 06e15bec-b515-0410-bef0-cc27a458cf48 --- tools/janal.py | 50 ++++++++++++++++++++++++++++++++++---------------- tools/jerr.py | 6 ++++++ tools/resize | 4 ++-- tools/store_chk | 10 +++++----- 4 files changed, 47 insertions(+), 23 deletions(-) diff --git a/tools/janal.py b/tools/janal.py index 7ced467..b722f4f 100644 --- a/tools/janal.py +++ b/tools/janal.py @@ -42,7 +42,7 @@ class EnqMap(object): """Add a new record into the map""" if hdr.rid in self.__map: raise jerr.DuplicateRidError(hdr.rid) - self.__map[hdr.rid] = (fid, hdr, lock) + self.__map[hdr.rid] = [fid, hdr, lock] def contains(self, rid): """Return True if the map contains the given rid""" @@ -58,7 +58,7 @@ class EnqMap(object): raise jerr.JWarning("ERROR: Deleting non-existent rid from EnqMap: rid=0x%x" % rid) def get(self, rid): - """Return a tuple (fid, hdr, lock) for the given rid""" + """Return a list [fid, hdr, lock] for the given rid""" if self.contains(rid): return self.__map[rid] return None @@ -88,9 +88,8 @@ class EnqMap(object): def lock(self, rid): """Set the transaction lock for a given rid to True""" if rid in self.__map: - tup = self.__map[rid] - if not tup[2]: - self.__map[rid] = (tup[0], tup[1], True) + if not self.__map[rid][2]: # locked + self.__map[rid][2] = True else: raise jerr.AlreadyLockedError(rid) else: @@ -106,8 +105,7 @@ class EnqMap(object): rid_list = self.__map.keys() rid_list.sort() for rid in rid_list: - rec = self.__map[rid] - if rec[2]: + if self.__map[rid][2]: lock_str = " [LOCKED]" else: lock_str = "" @@ -127,9 +125,8 @@ class EnqMap(object): def unlock(self, rid): """Set the transaction lock for a given rid to False""" if rid in self.__map: - tup = self.__map[rid] - if tup[2]: - self.__map[rid] = (tup[0], tup[1], False) + if self.__map[rid][2]: + self.__map[rid][2] = False else: raise jerr.NotLockedError(rid) else: @@ -153,11 +150,20 @@ class TxnMap(object): def add(self, fid, hdr): """Add a new transactional record into the map""" if isinstance(hdr, jrnl.DeqRec): - self.__emap.lock(hdr.deq_rid) + try: + self.__emap.lock(hdr.deq_rid) + except jerr.JWarning: + # Not in emap, look for rid in tmap + l = self.find_rid(hdr.deq_rid, hdr.xid) + if l == None: + raise jerr.DequeueNonExistentEnqueueError(hdr.deq_rid) + if l[2]: + raise jerr.AlreadyLockedError(hdr.deq_rid) + l[2] = True if hdr.xid in self.__map: - self.__map[hdr.xid].append((fid, hdr)) # append to existing list + self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list else: - self.__map[hdr.xid] = [(fid, hdr)] # create new list + self.__map[hdr.xid] = [[fid, hdr, False]] # create new list def contains(self, xid): """Return True if the xid exists in the map; False otherwise""" @@ -171,6 +177,18 @@ class TxnMap(object): self._abort(hdr.xid) else: raise jerr.InvalidRecordTypeError("delete from TxnMap", hdr.magic, hdr.rid) + + def find_rid(self, rid, xid_hint = None): + """ Search for and return map list with supplied rid. If xid_hint is supplied, try that xid first""" + if xid_hint != None and self.contains(xid_hint): + for l in self.__map[xid_hint]: + if l[1].rid == rid: + return l + for xid in self.__map.iterkeys(): + if xid_hint == None or xid != xid_hint: + for l in self.__map[xid]: + if l[1].rid == rid: + return l def get(self, xid): """Return a list of operations for the given xid""" @@ -210,9 +228,9 @@ class TxnMap(object): def _commit(self, xid): """Perform a commit operation for the given xid record""" mismatch_list = [] - for fid, hdr in self.__map[xid]: - if isinstance(hdr, jrnl.EnqRec): - self.__emap.add(fid, hdr) # Transfer enq to emap + for fid, hdr, lock in self.__map[xid]: + if isinstance(hdr, jrnl.EnqRec): + self.__emap.add(fid, hdr, lock) # Transfer enq to emap else: if self.__emap.contains(hdr.deq_rid): self.__emap.unlock(hdr.deq_rid) diff --git a/tools/jerr.py b/tools/jerr.py index 05fed9f..a0d3b2f 100644 --- a/tools/jerr.py +++ b/tools/jerr.py @@ -63,6 +63,12 @@ class DeleteLockedRecordError(Exception): """Constructor""" Exception.__init__(self, "Deleting locked record from EnqMap: rid=0x%s" % rid) +class DequeueNonExistentEnqueueError(Exception): + """Error class for attempting to dequeue a non-existent enqueue record (rid)""" + def __init__(self, deq_rid): + """Constructor""" + Exception.__init__(self, "Dequeuing non-existent rnqueue record: rid=0x%s" % deq_rid) + class DuplicateRidError(Exception): """Error class for placing duplicate rid into enqueue map""" def __init__(self, rid): diff --git a/tools/resize b/tools/resize index 631e798..e3fcd3e 100755 --- a/tools/resize +++ b/tools/resize @@ -126,8 +126,8 @@ class Resize(object): if self._opts.vflag and self._jrnl_reader.tmap().size() > 0: print "* Assembling %d records from tmap" % self._jrnl_reader.tmap().size() for xid in self._jrnl_reader.tmap().xids(): - for tup in self._jrnl_reader.tmap().get(xid): - hdr = tup[1] + for l in self._jrnl_reader.tmap().get(xid): + hdr = l[1] hdr.flags &= ~jrnl.Hdr.OWI_MASK # Turn off owi master_record_list[hdr.rid] = hdr rid_list = master_record_list.keys() diff --git a/tools/store_chk b/tools/store_chk index c5faf92..262eebf 100755 --- a/tools/store_chk +++ b/tools/store_chk @@ -84,11 +84,11 @@ class StoreChk(object): rid_list = self.jrnl_rdr.emap().rids() rid_list.sort() for rid in rid_list: - tup = self.jrnl_rdr.emap().get(rid) + l = self.jrnl_rdr.emap().get(rid) locked = "" - if tup[2]: + if l[2]: locked += " (locked)" - print " fid=%d %s%s" % (tup[0], tup[1], locked) + print " fid=%d %s%s" % (l[0], l[1], locked) print "WARNING: Enqueue-Dequeue mismatch, %d enqueued records remain." % self.jrnl_rdr.emap().size() else: print "No remaining enqueued records found (emap empty)." @@ -99,8 +99,8 @@ class StoreChk(object): for xid in self.jrnl_rdr.tmap().xids(): jrnl.Utils.format_xid(xid) recs = self.jrnl_rdr.tmap().get(xid) - for tup in recs: - print " fid=%d %s" % (tup[0], tup[1]) + for l in recs: + print " fid=%d %s" % (l[0], l[1]) print " Total: %d records for %s" % (len(recs), jrnl.Utils.format_xid(xid)) print txn_rec_cnt += len(recs) -- 1.7.3.4 From 7eb54b7f02c545a1d5aabc775a6b70fdbe8389e4 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Fri, 13 Aug 2010 13:31:44 +0000 Subject: [PATCH 19/24] Fix for BZ 624033 - "Store tools regression - "Dequeuing non-existent rnqueue record"" git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4209 06e15bec-b515-0410-bef0-cc27a458cf48 --- tools/janal.py | 9 ++++----- tools/jerr.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/tools/janal.py b/tools/janal.py index b722f4f..7293467 100644 --- a/tools/janal.py +++ b/tools/janal.py @@ -155,11 +155,10 @@ class TxnMap(object): except jerr.JWarning: # Not in emap, look for rid in tmap l = self.find_rid(hdr.deq_rid, hdr.xid) - if l == None: - raise jerr.DequeueNonExistentEnqueueError(hdr.deq_rid) - if l[2]: - raise jerr.AlreadyLockedError(hdr.deq_rid) - l[2] = True + if l != None: + if l[2]: + raise jerr.AlreadyLockedError(hdr.deq_rid) + l[2] = True if hdr.xid in self.__map: self.__map[hdr.xid].append([fid, hdr, False]) # append to existing list else: diff --git a/tools/jerr.py b/tools/jerr.py index a0d3b2f..813033c 100644 --- a/tools/jerr.py +++ b/tools/jerr.py @@ -67,7 +67,7 @@ class DequeueNonExistentEnqueueError(Exception): """Error class for attempting to dequeue a non-existent enqueue record (rid)""" def __init__(self, deq_rid): """Constructor""" - Exception.__init__(self, "Dequeuing non-existent rnqueue record: rid=0x%s" % deq_rid) + Exception.__init__(self, "Dequeuing non-existent enqueue record: rid=0x%s" % deq_rid) class DuplicateRidError(Exception): """Error class for placing duplicate rid into enqueue map""" -- 1.7.3.4 From 8433e9525b81262f325acdf3bdb7f7402ac45874 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Wed, 18 Aug 2010 15:40:28 +0000 Subject: [PATCH 20/24] BZ 614944 - "qpidd broker crash in mrg::msgstore::TxnCtxt::abort() -> DbTxn::abort()". Added additional exception handling for store BDB initialization in TxnCtxt::begin() after QE came across another unhandled case. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4212 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 2 ++ lib/TxnCtxt.cpp | 11 +++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 37456ea..a505861 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -354,6 +354,8 @@ void MessageStoreImpl::init() } QPID_LOG(error, "BDB exception occurred while initializing store: " << e.what()); THROW_STORE_EXCEPTION_2("BDB exception occurred while initializing store", e); + } catch (const StoreException&) { + throw; } catch (const journal::jexception& e) { QPID_LOG(error, "Journal Exception occurred while initializing store: " << e); THROW_STORE_EXCEPTION_2("Journal Exception occurred while initializing store", e.what()); diff --git a/lib/TxnCtxt.cpp b/lib/TxnCtxt.cpp index 1fcb0e0..e522d37 100644 --- a/lib/TxnCtxt.cpp +++ b/lib/TxnCtxt.cpp @@ -74,7 +74,7 @@ TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTok TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {} -TxnCtxt::~TxnCtxt() { if(txn) abort(); } +TxnCtxt::~TxnCtxt() { abort(); } #define MAX_SYNC_SLEEPS 5000 // ~1 second #define SYNC_SLEEP_TIME 200 // 0.2 ms @@ -113,7 +113,14 @@ void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) { } void TxnCtxt::begin(DbEnv* env, bool sync) { - env->txn_begin(0, &txn, 0); + int err; + try { err = env->txn_begin(0, &txn, 0); } + catch (const DbException&) { txn = 0; throw; } + if (err != 0) { + std::ostringstream oss; + oss << "Error: Env::txn_begin() returned error code: " << err; + THROW_STORE_EXCEPTION(oss.str()); + } if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser)); } -- 1.7.3.4 From a87f07db6ef9ec869777dc9e8bedf54fb1568225 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Thu, 12 Aug 2010 19:58:56 +0000 Subject: [PATCH 21/24] Fix for BZ620742 - "Qpid starts with faulty value --num-jfiles 1". git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4205 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 4 ++-- 1 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index a505861..1cce89b 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -478,11 +478,11 @@ void MessageStoreImpl::create(PersistableQueue& queue, value = args.get("qpid.file_count"); if (value.get() != 0 && !value->empty() && value->convertsTo()) - localFileCount = (u_int16_t) value->get(); + localFileCount = chkJrnlNumFilesParam((u_int16_t) value->get(), "qpid.file_count"); value = args.get("qpid.file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo()) - localFileSizeSblks = (u_int32_t) value->get() * JRNL_RMGR_PAGE_SIZE; + localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get(), "qpid.file_size") * JRNL_RMGR_PAGE_SIZE; if (queue.getName().size() == 0) { -- 1.7.3.4 From 831a21e6dd3206ecffcb16c8c20dccd820d63db6 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 30 Sep 2010 14:58:50 -0400 Subject: [PATCH 22/24] Bug 632188 - Broker restart fails without removal of store directory. The old store was not being pushed down if "notice" level logging was disabled. A broker trying to re-join an active cluster would fail to start with: Daemon startup failed: Exchange already exists: amq.topic (MessageStoreImpl.cpp:564) git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4363 06e15bec-b515-0410-bef0-cc27a458cf48 Conflicts: lib/MessageStoreImpl.cpp --- lib/MessageStoreImpl.cpp | 6 ++++-- 1 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index 1cce89b..c1b61a1 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -48,7 +48,8 @@ using qpid::management::ManagementAgent; namespace _qmf = qmf::com::redhat::rhm::store; const std::string MessageStoreImpl::storeTopLevelDir("rhm"); // Sets the top-level store dir name -qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(10 * qpid::sys::TIME_MSEC); // 10ms +// FIXME aconway 2010-03-09: was 10 +qpid::sys::Duration MessageStoreImpl::defJournalGetEventsTimeout(1 * qpid::sys::TIME_MSEC); // 10ms qpid::sys::Duration MessageStoreImpl::defJournalFlushTimeout(500 * qpid::sys::TIME_MSEC); // 0.5s qpid::sys::Mutex TxnCtxt::globalSerialiser; @@ -404,7 +405,8 @@ void MessageStoreImpl::truncateInit(const bool pushDownStoreFiles) std::ostringstream oss; oss << storeDir << "/" << storeTopLevelDir; if (pushDownStoreFiles) { - QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down into directory " << mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster") << "."); + string dir = mrg::journal::jdir::push_down(storeDir, storeTopLevelDir, "cluster"); + QPID_LOG(notice, "Store directory " << oss.str() << " was pushed down (saved) into directory " << dir << "."); } else { mrg::journal::jdir::delete_dir(oss.str().c_str()); QPID_LOG(notice, "Store directory " << oss.str() << " was truncated."); -- 1.7.3.4 From 21fa87eea11245bf8977f6fa2dbd01a0a2bac635 Mon Sep 17 00:00:00 2001 From: kpvdr Date: Wed, 1 Dec 2010 14:13:40 +0000 Subject: [PATCH 23/24] Fix for BZ656385 "Data store can become corrupt with small store file size and large wcache-page-size". This is an undetected illegal combination of parameters, allowing the write cache to be larger than the journal file. Fixed by adding detection for this condition. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4418 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/MessageStoreImpl.cpp | 19 ++++++++++++++----- lib/MessageStoreImpl.h | 6 ++++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/lib/MessageStoreImpl.cpp b/lib/MessageStoreImpl.cpp index c1b61a1..2f31ecd 100644 --- a/lib/MessageStoreImpl.cpp +++ b/lib/MessageStoreImpl.cpp @@ -96,7 +96,7 @@ u_int16_t MessageStoreImpl::chkJrnlNumFilesParam(const u_int16_t param, const st return p; } -u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName) +u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const std::string paramName, const u_int32_t wCachePgSizeSblks) { u_int32_t p = param; u_int32_t min = JRNL_MIN_FILE_SIZE / JRNL_RMGR_PAGE_SIZE; @@ -108,10 +108,15 @@ u_int32_t MessageStoreImpl::chkJrnlFileSizeParam(const u_int32_t param, const st p = max; QPID_LOG(warning, "parameter " << paramName << " (" << param << ") is above allowable maximum (" << max << "); changing this parameter to maximum value."); } + if (wCachePgSizeSblks > p * JRNL_RMGR_PAGE_SIZE) { + std::ostringstream oss; + oss << "Cannot create store with file size less than write page cache size. [file size = " << p << " (" << (p * JRNL_RMGR_PAGE_SIZE / 2) << " kB); write page cache = " << (wCachePgSizeSblks / 2) << " kB]"; + THROW_STORE_EXCEPTION(oss.str()); + } return p; } -u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName) +u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const std::string paramName, const u_int16_t jrnlFsizePgs) { u_int32_t p = param; switch (p) @@ -124,6 +129,10 @@ u_int32_t MessageStoreImpl::chkJrnlWrPageCacheSize(const u_int32_t param, const case 32: case 64: case 128: + if (jrnlFsizePgs == 1) { + p = 64; + QPID_LOG(warning, "parameter " << paramName << " (" << param << ") cannot set a page size greater than the journal file size; changing this parameter to the journal file size (" << p << ")"); + } break; default: if (p == 0) { @@ -245,10 +254,10 @@ bool MessageStoreImpl::init(const qpid::Options* options) const StoreOptions* opts = static_cast(options); u_int16_t numJrnlFiles = chkJrnlNumFilesParam(opts->numJrnlFiles, "num-jfiles"); u_int32_t jrnlFsizePgs = chkJrnlFileSizeParam(opts->jrnlFsizePgs, "jfile-size-pgs"); - u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size"); + u_int32_t jrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->wCachePageSizeKib, "wcache-page-size", jrnlFsizePgs); u_int16_t tplNumJrnlFiles = chkJrnlNumFilesParam(opts->tplNumJrnlFiles, "tpl-num-jfiles"); u_int32_t tplJrnlFSizePgs = chkJrnlFileSizeParam(opts->tplJrnlFsizePgs, "tpl-jfile-size-pgs"); - u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size"); + u_int32_t tplJrnlWrCachePageSizeKib = chkJrnlWrPageCacheSize(opts->tplWCachePageSizeKib, "tpl-wcache-page-size", tplJrnlFSizePgs); bool autoJrnlExpand; u_int16_t autoJrnlExpandMaxFiles; chkJrnlAutoExpandOptions(opts, autoJrnlExpand, autoJrnlExpandMaxFiles, "auto-expand-max-jfiles", numJrnlFiles, "num-jfiles"); @@ -484,7 +493,7 @@ void MessageStoreImpl::create(PersistableQueue& queue, value = args.get("qpid.file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo()) - localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get(), "qpid.file_size") * JRNL_RMGR_PAGE_SIZE; + localFileSizeSblks = chkJrnlFileSizeParam((u_int32_t) value->get(), "qpid.file_size", wCachePgSizeSblks) * JRNL_RMGR_PAGE_SIZE; if (queue.getName().size() == 0) { diff --git a/lib/MessageStoreImpl.h b/lib/MessageStoreImpl.h index 99d8ed2..aabefc6 100644 --- a/lib/MessageStoreImpl.h +++ b/lib/MessageStoreImpl.h @@ -161,9 +161,11 @@ class MessageStoreImpl : public qpid::broker::MessageStore, public qpid::managem static u_int16_t chkJrnlNumFilesParam(const u_int16_t param, const std::string paramName); static u_int32_t chkJrnlFileSizeParam(const u_int32_t param, - const std::string paramName); + const std::string paramName, + const u_int32_t wCachePgSizeSblks = 0); static u_int32_t chkJrnlWrPageCacheSize(const u_int32_t param, - const std::string paramName); + const std::string paramName, + const u_int16_t jrnlFsizePgs); static u_int16_t getJrnlWrNumPages(const u_int32_t wrPageSizeKib); void chkJrnlAutoExpandOptions(const MessageStoreImpl::StoreOptions* opts, bool& autoJrnlExpand, -- 1.7.3.4 From ce91ccbdb5756712407e683c7581c0d321f9d6c6 Mon Sep 17 00:00:00 2001 From: tedross Date: Wed, 10 Nov 2010 00:46:08 +0000 Subject: [PATCH 24/24] Re-generated qmf files. git-svn-id: https://svn.jboss.org/repos/rhmessaging/store/trunk/cpp@4412 06e15bec-b515-0410-bef0-cc27a458cf48 --- lib/gen/qmf/com/redhat/rhm/store/Journal.cpp | 16 ++++++++++++---- lib/gen/qmf/com/redhat/rhm/store/Journal.h | 6 ++++-- lib/gen/qmf/com/redhat/rhm/store/Store.cpp | 4 ++-- lib/gen/qmf/com/redhat/rhm/store/Store.h | 6 ++++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp b/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp index 1e95f4a..599599b 100644 --- a/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp +++ b/lib/gen/qmf/com/redhat/rhm/store/Journal.cpp @@ -689,7 +689,7 @@ void Journal::writeStatistics (std::string& _sBuf, bool skipHeaders) buf.getRawData(_sBuf, _bufLen); } -void Journal::doMethod (string& methodName, const string& inStr, string& outStr) +void Journal::doMethod (string& methodName, const string& inStr, string& outStr, const string& userId) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; @@ -709,7 +709,11 @@ void Journal::doMethod (string& methodName, const string& inStr, string& outStr) _matched = true; ArgsJournalExpand ioArgs; ioArgs.i_by = inBuf.getLong(); - status = coreObject->ManagementMethod (METHOD_EXPAND, ioArgs, text); + bool allow = coreObject->AuthorizeMethod(METHOD_EXPAND, ioArgs, userId); + if (allow) + status = coreObject->ManagementMethod (METHOD_EXPAND, ioArgs, text); + else + status = Manageable::STATUS_FORBIDDEN; outBuf.putLong (status); outBuf.putMediumString(::qpid::management::Manageable::StatusText (status, text)); } @@ -868,7 +872,7 @@ void Journal::mapDecodeValues (const ::qpid::types::Variant::Map& _map) } -void Journal::doMethod (string& methodName, const ::qpid::types::Variant::Map& inMap, ::qpid::types::Variant::Map& outMap) +void Journal::doMethod (string& methodName, const ::qpid::types::Variant::Map& inMap, ::qpid::types::Variant::Map& outMap, const string& userId) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; @@ -880,7 +884,11 @@ void Journal::doMethod (string& methodName, const ::qpid::types::Variant::Map& i if ((_i = inMap.find("by")) != inMap.end()) { ioArgs.i_by = _i->second; } - status = coreObject->ManagementMethod (METHOD_EXPAND, ioArgs, text); + bool allow = coreObject->AuthorizeMethod(METHOD_EXPAND, ioArgs, userId); + if (allow) + status = coreObject->ManagementMethod (METHOD_EXPAND, ioArgs, text); + else + status = Manageable::STATUS_FORBIDDEN; outMap["_status_code"] = (uint32_t) status; outMap["_status_text"] = ::qpid::management::Manageable::StatusText(status, text); return; diff --git a/lib/gen/qmf/com/redhat/rhm/store/Journal.h b/lib/gen/qmf/com/redhat/rhm/store/Journal.h index f64214a..18e861d 100644 --- a/lib/gen/qmf/com/redhat/rhm/store/Journal.h +++ b/lib/gen/qmf/com/redhat/rhm/store/Journal.h @@ -134,7 +134,8 @@ class Journal : public ::qpid::management::ManagementObject void mapDecodeValues(const ::qpid::types::Variant::Map& map); void doMethod(std::string& methodName, const ::qpid::types::Variant::Map& inMap, - ::qpid::types::Variant::Map& outMap); + ::qpid::types::Variant::Map& outMap, + const std::string& userId); std::string getKey() const; uint32_t writePropertiesSize() const; @@ -143,7 +144,8 @@ class Journal : public ::qpid::management::ManagementObject void writeStatistics(std::string& buf, bool skipHeaders = false); void doMethod(std::string& methodName, const std::string& inBuf, - std::string& outBuf); + std::string& outBuf, + const std::string& userId); writeSchemaCall_t getWriteSchemaCall() { return writeSchema; } diff --git a/lib/gen/qmf/com/redhat/rhm/store/Store.cpp b/lib/gen/qmf/com/redhat/rhm/store/Store.cpp index 1cf32a3..4027621 100644 --- a/lib/gen/qmf/com/redhat/rhm/store/Store.cpp +++ b/lib/gen/qmf/com/redhat/rhm/store/Store.cpp @@ -452,7 +452,7 @@ void Store::writeStatistics (std::string& _sBuf, bool skipHeaders) buf.getRawData(_sBuf, _bufLen); } -void Store::doMethod (string&, const string&, string& outStr) +void Store::doMethod (string&, const string&, string& outStr, const string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; @@ -580,7 +580,7 @@ void Store::mapDecodeValues (const ::qpid::types::Variant::Map& _map) } -void Store::doMethod (string&, const ::qpid::types::Variant::Map&, ::qpid::types::Variant::Map& outMap) +void Store::doMethod (string&, const ::qpid::types::Variant::Map&, ::qpid::types::Variant::Map& outMap, const string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; std::string text; diff --git a/lib/gen/qmf/com/redhat/rhm/store/Store.h b/lib/gen/qmf/com/redhat/rhm/store/Store.h index 7c050b5..20dced2 100644 --- a/lib/gen/qmf/com/redhat/rhm/store/Store.h +++ b/lib/gen/qmf/com/redhat/rhm/store/Store.h @@ -104,7 +104,8 @@ class Store : public ::qpid::management::ManagementObject void mapDecodeValues(const ::qpid::types::Variant::Map& map); void doMethod(std::string& methodName, const ::qpid::types::Variant::Map& inMap, - ::qpid::types::Variant::Map& outMap); + ::qpid::types::Variant::Map& outMap, + const std::string& userId); std::string getKey() const; uint32_t writePropertiesSize() const; @@ -113,7 +114,8 @@ class Store : public ::qpid::management::ManagementObject void writeStatistics(std::string& buf, bool skipHeaders = false); void doMethod(std::string& methodName, const std::string& inBuf, - std::string& outBuf); + std::string& outBuf, + const std::string& userId); writeSchemaCall_t getWriteSchemaCall() { return writeSchema; } -- 1.7.3.4