::iterator itr =
+ std::find(pool.begin(), pool.end(), thread);
- if(itr != pool.end())
- {
- pool.erase(itr);
- }
+ if(itr != pool.end())
+ {
+ pool.erase(itr);
+ }
- // Bye-Bye Thread Object
- delete thread;
+ // Bye-Bye Thread Object
+ delete thread;
- // Now allocate a replacement
- AllocateThreads(1);
- }
- }
- AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
- AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+ // Now allocate a replacement
+ AllocateThreads(1);
+ }
+ }
+ AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+ AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
}
diff --git a/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h b/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
index 8743d56a03..1f9c64afba 100644
--- a/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
+++ b/activemq-cpp/src/main/activemq/concurrent/ThreadPool.h
@@ -30,209 +30,202 @@
namespace activemq{
namespace concurrent{
- /**
- * Defines a Thread Pool object that implements the functionality
- * of pooling threads to perform user tasks. The Thread Poll has
- * max size that it will grow to. The thread pool allocates threads
- * in blocks. When there are no waiting worker threads and a task
- * is queued then a new batch is allocated. The user can specify
- * the size of the blocks, otherwise a default value is used.
- *
- * When the user queues a task they must also queue a listner to
- * be notified when the task has completed, this provides the user
- * with a mechanism to know when a task object can be freed.
- *
- * To have the Thread Pool perform a task, the user enqueue's an
- * object that implements the Runnable
insterface and
- * one of the worker threads will executing it in its thread context.
- */
- class ThreadPool : public PooledThreadListener
- {
- public:
+ /**
+ * Defines a Thread Pool object that implements the functionality
+ * of pooling threads to perform user tasks. The Thread Poll has
+ * max size that it will grow to. The thread pool allocates threads
+ * in blocks. When there are no waiting worker threads and a task
+ * is queued then a new batch is allocated. The user can specify
+ * the size of the blocks, otherwise a default value is used.
+ *
+ * When the user queues a task they must also queue a listner to
+ * be notified when the task has completed, this provides the user
+ * with a mechanism to know when a task object can be freed.
+ *
+ * To have the Thread Pool perform a task, the user enqueue's an
+ * object that implements the Runnable
insterface and
+ * one of the worker threads will executing it in its thread context.
+ */
+ class ThreadPool : public PooledThreadListener
+ {
+ public:
- // Constants
- static const size_t DEFAULT_MAX_POOL_SIZE = 10;
- static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+ // Constants
+ static const size_t DEFAULT_MAX_POOL_SIZE = 10;
+ static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
- // Types
- typedef std::pair Task;
+ // Types
+ typedef std::pair Task;
- private:
+ private:
- // Vector of threads that this object has created for its pool.
- std::vector< PooledThread* > pool;
-
- // Queue of Task that are in need of completion
- util::Queue queue;
-
- // Max number of Threads this Pool can contian
- unsigned long maxThreads;
-
- // Max number of tasks that can be allocated at a time
- unsigned long blockSize;
-
- // boolean flag use to indocate that this object is shutting down.
- bool shutdown;
-
- // Count of threads that are currently free to perfom some work.
- unsigned long freeThreads;
-
- // Mutex for locking operations that affect the pool.
- Mutex poolLock;
+ // Vector of threads that this object has created for its pool.
+ std::vector< PooledThread* > pool;
- // Logger Init
- LOGCMS_DECLARE(logger);
- LOGCMS_DECLARE(marker);
+ // Queue of Task that are in need of completion
+ util::Queue queue;
- private: // Statics
+ // Max number of Threads this Pool can contian
+ unsigned long maxThreads;
+
+ // Max number of tasks that can be allocated at a time
+ unsigned long blockSize;
+
+ // boolean flag use to indocate that this object is shutting down.
+ bool shutdown;
+
+ // Count of threads that are currently free to perfom some work.
+ unsigned long freeThreads;
+
+ // Mutex for locking operations that affect the pool.
+ Mutex poolLock;
+
+ // Logger Init
+ LOGCMS_DECLARE(logger);
+ LOGCMS_DECLARE(marker);
+
+ private: // Statics
- // The singleton instance of this class
- static ThreadPool instance;
+ // The singleton instance of this class
+ static ThreadPool instance;
- public:
+ public:
- /**
- * Constructor
- */
- ThreadPool(void);
+ ThreadPool(void);
+ virtual ~ThreadPool(void);
- /**
- * Destructor
- */
- virtual ~ThreadPool(void);
+ /**
+ * Queue a task to be completed by one of the Pooled Threads.
+ * tasks are serviced as soon as a PooledThread
+ * is available to run it.
+ * @param object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual void queueTask(Task task)
+ throw ( exceptions::ActiveMQException );
- /**
- * Queue a task to be completed by one of the Pooled Threads.
- * tasks are serviced as soon as a PooledThread
- * is available to run it.
- * @param object that derives from Runnable
- * @throws ActiveMQException
- */
- virtual void queueTask(Task task)
- throw ( exceptions::ActiveMQException );
+ /**
+ * DeQueue a task to be completed by one of the Pooled Threads.
+ * A caller of this method will block until there is something
+ * in the tasks queue, therefore care must be taken when calling
+ * this function. Normally clients of ThreadPool don't use
+ * this, only the PooledThread
objects owned by
+ * this ThreadPool.
+ * @return object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual Task deQueueTask(void)
+ throw ( exceptions::ActiveMQException );
- /**
- * DeQueue a task to be completed by one of the Pooled Threads.
- * A caller of this method will block until there is something
- * in the tasks queue, therefore care must be taken when calling
- * this function. Normally clients of ThreadPool don't use
- * this, only the PooledThread
objects owned by
- * this ThreadPool.
- * @return object that derives from Runnable
- * @throws ActiveMQException
- */
- virtual Task deQueueTask(void)
- throw ( exceptions::ActiveMQException );
+ /**
+ * Returns the current number of Threads in the Pool, this is
+ * how many there are now, not how many are active or the max
+ * number that might exist.
+ * @return integer number of threads in existance.
+ */
+ virtual unsigned long getPoolSize(void) const { return pool.size(); }
+
+ /**
+ * Returns the current backlog of items in the tasks queue, this
+ * is how much work is still waiting to get done.
+ * @return number of outstanding tasks.
+ */
+ virtual unsigned long getBacklog(void) const { return queue.size(); }
+
+ /**
+ * Ensures that there is at least the specified number of Threads
+ * allocated to the pool. If the size is greater than the MAX
+ * number of threads in the pool, then only MAX threads are
+ * reservved. If the size is smaller than the number of threads
+ * currently in the pool, than nothing is done.
+ * @param number of threads to reserve.
+ */
+ virtual void reserve( unsigned long size );
+
+ /**
+ * Get the Max Number of Threads this Pool can contain
+ * @return max size
+ */
+ virtual unsigned long getMaxThreads(void) const { return maxThreads; }
- /**
- * Returns the current number of Threads in the Pool, this is
- * how many there are now, not how many are active or the max
- * number that might exist.
- * @return integer number of threads in existance.
- */
- virtual unsigned long getPoolSize(void) const { return pool.size(); }
+ /**
+ * Sets the Max number of threads this pool can contian.
+ * if this value is smaller than the current size of the
+ * pool nothing is done.
+ */
+ virtual void setMaxThreads( unsigned long maxThreads );
- /**
- * Returns the current backlog of items in the tasks queue, this
- * is how much work is still waiting to get done.
- * @return number of outstanding tasks.
- */
- virtual unsigned long getBacklog(void) const { return queue.size(); }
+ /**
+ * Gets the Max number of threads that can be allocated at a time
+ * when new threads are needed.
+ * @return max Thread Block Size
+ */
+ virtual unsigned long getBlockSize(void) const { return blockSize; }
- /**
- * Ensures that there is at least the specified number of Threads
- * allocated to the pool. If the size is greater than the MAX
- * number of threads in the pool, then only MAX threads are
- * reservved. If the size is smaller than the number of threads
- * currently in the pool, than nothing is done.
- * @param number of threads to reserve.
- */
- virtual void reserve(unsigned long size);
+ /**
+ * Sets the Max number of Threads that can be allocated at a time
+ * when the Thread Pool determines that more Threads are needed.
+ * @param Max Thread Block Size
+ */
+ virtual void setBlockSize( unsigned long blockSize );
- /**
- * Get the Max Number of Threads this Pool can contain
- * @return max size
- */
- virtual unsigned long getMaxThreads(void) const { return maxThreads; }
-
- /**
- * Sets the Max number of threads this pool can contian.
- * if this value is smaller than the current size of the
- * pool nothing is done.
- */
- virtual void setMaxThreads(unsigned long maxThreads);
-
- /**
- * Gets the Max number of threads that can be allocated at a time
- * when new threads are needed.
- * @return max Thread Block Size
- */
- virtual unsigned long getBlockSize(void) const { return blockSize; }
-
- /**
- * Sets the Max number of Threads that can be allocated at a time
- * when the Thread Pool determines that more Threads are needed.
- * @param Max Thread Block Size
- */
- virtual void setBlockSize(unsigned long blockSize);
-
- /**
- * Returns the current number of available threads in the pool, threads
- * that are performing a user task are considered unavailable. This value
- * could change immeadiately after calling as Threads could finish right
- * after and be available again. This is informational only.
- * @return totoal free threads
- */
- virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
+ /**
+ * Returns the current number of available threads in the pool, threads
+ * that are performing a user task are considered unavailable. This value
+ * could change immeadiately after calling as Threads could finish right
+ * after and be available again. This is informational only.
+ * @return totoal free threads
+ */
+ virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
- public: // PooledThreadListener Callbacks
+ public: // PooledThreadListener Callbacks
- /**
- * Called by a pooled thread when it is about to begin
- * executing a new task. This will decrement the available
- * threads counter so that this object knows when there are
- * no more free threads and must create new ones.
- * @param Pointer to the Pooled Thread that is making this call
- */
- virtual void onTaskStarted(PooledThread* thread);
+ /**
+ * Called by a pooled thread when it is about to begin
+ * executing a new task. This will decrement the available
+ * threads counter so that this object knows when there are
+ * no more free threads and must create new ones.
+ * @param Pointer to the Pooled Thread that is making this call
+ */
+ virtual void onTaskStarted( PooledThread* thread );
- /**
- * Called by a pooled thread when it has completed a task
- * and is going back to waiting for another task to run,
- * this will increment the free threads counter.
- * @param Pointer the the Pooled Thread that is making this call.
- */
- virtual void onTaskCompleted(PooledThread* thread);
+ /**
+ * Called by a pooled thread when it has completed a task
+ * and is going back to waiting for another task to run,
+ * this will increment the free threads counter.
+ * @param Pointer the the Pooled Thread that is making this call.
+ */
+ virtual void onTaskCompleted( PooledThread* thread );
- /**
- * Called by a pooled thread when it has encountered an exception
- * while running a user task, after receiving this notification
- * the callee should assume that the PooledThread is now no longer
- * running.
- * @param Pointer to the Pooled Thread that is making this call
- * @param The Exception that occured.
- */
- virtual void onTaskException(PooledThread* thread,
- exceptions::ActiveMQException& ex);
+ /**
+ * Called by a pooled thread when it has encountered an exception
+ * while running a user task, after receiving this notification
+ * the callee should assume that the PooledThread is now no longer
+ * running.
+ * @param Pointer to the Pooled Thread that is making this call
+ * @param The Exception that occured.
+ */
+ virtual void onTaskException( PooledThread* thread,
+ exceptions::ActiveMQException& ex);
- public: // Statics
+ public: // Statics
- /**
- * Return the one and only Thread Pool instance.
- * @return The Thread Pool Pointer
- */
- static ThreadPool* getInstance(void) { return &instance; }
+ /**
+ * Return the one and only Thread Pool instance.
+ * @return The Thread Pool Pointer
+ */
+ static ThreadPool* getInstance(void) { return &instance; }
- private:
+ private:
- /**
- * Allocates the requested ammount of Threads, won't exceed
- * maxThreads
.
- * @param the number of threads to create
- */
- void AllocateThreads(unsigned long count);
+ /**
+ * Allocates the requested ammount of Threads, won't exceed
+ * maxThreads
.
+ * @param the number of threads to create
+ */
+ void AllocateThreads( unsigned long count );
- };
+ };
}}
diff --git a/activemq-cpp/src/main/activemq/connector/Connector.h b/activemq-cpp/src/main/activemq/connector/Connector.h
index 484c2ebaf1..c04675925e 100644
--- a/activemq-cpp/src/main/activemq/connector/Connector.h
+++ b/activemq-cpp/src/main/activemq/connector/Connector.h
@@ -72,6 +72,20 @@ namespace connector{
*/
virtual std::string getClientId(void) const = 0;
+ /**
+ * Gets the Username for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Username String
+ */
+ virtual std::string getUsername(void) const = 0;
+
+ /**
+ * Gets the Password for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Password String
+ */
+ virtual std::string getPassword(void) const = 0;
+
/**
* Gets a reference to the Transport that this connection
* is using.
@@ -88,7 +102,7 @@ namespace connector{
* @throws ConnectorException
*/
virtual SessionInfo* createSession(
- cms::Session::AcknowledgeMode ackMode)
+ cms::Session::AcknowledgeMode ackMode )
throw( ConnectorException ) = 0;
/**
@@ -99,9 +113,9 @@ namespace connector{
* @throws ConnectorException
*/
virtual ConsumerInfo* createConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
- const std::string& selector = "")
+ const std::string& selector = "" )
throw ( ConnectorException ) = 0;
/**
@@ -116,11 +130,11 @@ namespace connector{
* @throws ConnectorException
*/
virtual ConsumerInfo* createDurableConsumer(
- cms::Topic* topic,
+ const cms::Topic* topic,
SessionInfo* session,
const std::string& name,
const std::string& selector = "",
- bool noLocal = false)
+ bool noLocal = false )
throw ( ConnectorException ) = 0;
/**
@@ -131,8 +145,8 @@ namespace connector{
* @throws ConnectorException
*/
virtual ProducerInfo* createProducer(
- cms::Destination* destination,
- SessionInfo* session)
+ const cms::Destination* destination,
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -142,8 +156,8 @@ namespace connector{
* @return a newly created Topic Object
* @throws ConnectorException
*/
- virtual cms::Topic* createTopic(const std::string& name,
- SessionInfo* session)
+ virtual cms::Topic* createTopic( const std::string& name,
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -153,8 +167,8 @@ namespace connector{
* @return a newly created Queue Object
* @throws ConnectorException
*/
- virtual cms::Queue* createQueue(const std::string& name,
- SessionInfo* session)
+ virtual cms::Queue* createQueue( const std::string& name,
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -165,7 +179,7 @@ namespace connector{
* @throws ConnectorException
*/
virtual cms::TemporaryTopic* createTemporaryTopic(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -176,7 +190,7 @@ namespace connector{
* @throws ConnectorException
*/
virtual cms::TemporaryQueue* createTemporaryQueue(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -185,7 +199,7 @@ namespace connector{
* @param Producer Info for the sender of this message
* @throws ConnectorException
*/
- virtual void send(cms::Message* message, ProducerInfo* producerInfo)
+ virtual void send( cms::Message* message, ProducerInfo* producerInfo )
throw ( ConnectorException ) = 0;
/**
@@ -194,8 +208,8 @@ namespace connector{
* @param Producer Info for the sender of this message
* @throws ConnectorException
*/
- virtual void send(std::list& messages,
- ProducerInfo* producerInfo)
+ virtual void send( std::list& messages,
+ ProducerInfo* producerInfo)
throw ( ConnectorException ) = 0;
/**
@@ -203,9 +217,9 @@ namespace connector{
* @param An ActiveMQMessage to Ack.
* @throws ConnectorException
*/
- virtual void acknowledge(const SessionInfo* session,
- const cms::Message* message,
- AckType ackType = ConsumedAck)
+ virtual void acknowledge( const SessionInfo* session,
+ const cms::Message* message,
+ AckType ackType = ConsumedAck)
throw ( ConnectorException ) = 0;
/**
@@ -214,7 +228,7 @@ namespace connector{
* @throws ConnectorException
*/
virtual TransactionInfo* startTransaction(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -223,8 +237,8 @@ namespace connector{
* @param Session Information
* @throws ConnectorException
*/
- virtual void commit(TransactionInfo* transaction,
- SessionInfo* session)
+ virtual void commit( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -233,8 +247,8 @@ namespace connector{
* @param Session Information
* @throws ConnectorException
*/
- virtual void rollback(TransactionInfo* transaction,
- SessionInfo* session)
+ virtual void rollback( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException ) = 0;
/**
@@ -245,7 +259,7 @@ namespace connector{
*/
virtual cms::Message* createMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException ) = 0;
/**
@@ -256,7 +270,7 @@ namespace connector{
*/
virtual cms::BytesMessage* createBytesMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException ) = 0;
/**
@@ -267,7 +281,7 @@ namespace connector{
*/
virtual cms::TextMessage* createTextMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException ) = 0;
/**
@@ -278,7 +292,7 @@ namespace connector{
*/
virtual cms::MapMessage* createMapMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException ) = 0;
/**
@@ -286,7 +300,7 @@ namespace connector{
* @param name of the Subscription
* @throws ConnectorException
*/
- virtual void unsubscribe(const std::string& name)
+ virtual void unsubscribe( const std::string& name )
throw ( ConnectorException ) = 0;
/**
@@ -302,14 +316,14 @@ namespace connector{
* @param listener the observer.
*/
virtual void setConsumerMessageListener(
- ConsumerMessageListener* listener) = 0;
+ ConsumerMessageListener* listener ) = 0;
/**
* Sets the Listner of exceptions for this connector
* @param ExceptionListener the observer.
*/
virtual void setExceptionListener(
- cms::ExceptionListener* listener) = 0;
+ cms::ExceptionListener* listener ) = 0;
};
}}
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorException.h b/activemq-cpp/src/main/activemq/connector/ConnectorException.h
index cace625584..51359d7f21 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorException.h
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorException.h
@@ -22,42 +22,44 @@
namespace activemq{
namespace connector{
- /*
- * Signals that an Connector exception of some sort has occurred.
- */
- class ConnectorException : public exceptions::ActiveMQException
- {
- public:
+ /*
+ * Signals that an Connector exception of some sort has occurred.
+ */
+ class ConnectorException : public exceptions::ActiveMQException
+ {
+ public:
- ConnectorException() {}
- ConnectorException( const exceptions::ActiveMQException& ex ){
- *(ActiveMQException*)this = ex;
- }
- ConnectorException( const ConnectorException& ex ){
- *(exceptions::ActiveMQException*)this = ex;
- }
- ConnectorException(const char* file, const int lineNumber,
- const char* msg, ...)
- {
- va_list vargs ;
- va_start(vargs, msg) ;
- buildMessage(msg, vargs) ;
+ ConnectorException() {}
+ ConnectorException( const exceptions::ActiveMQException& ex ){
+ *(ActiveMQException*)this = ex;
+ }
+ ConnectorException( const ConnectorException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ ConnectorException( const char* file,
+ const int lineNumber,
+ const char* msg, ... )
+ {
+ va_list vargs;
+ va_start( vargs, msg );
+ buildMessage( msg, vargs );
- // Set the first mark for this exception.
- setMark( file, lineNumber );
- }
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
- /**
- * Clones this exception. This is useful for cases where you need
- * to preserve the type of the original exception as well as the message.
- * All subclasses should override.
- */
- virtual exceptions::ActiveMQException* clone() const{
- return new ConnectorException( *this );
- }
- virtual ~ConnectorException() {}
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new ConnectorException( *this );
+ }
+
+ virtual ~ConnectorException() {}
- };
+ };
}}
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h b/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
index aa652d5f47..d80a3e2489 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
@@ -36,10 +36,11 @@ namespace connector{
/**
* Creates a connector
* @param The Properties that the new connector is configured with
+ * @param the Transport that the connector should use
*/
virtual Connector* createConnector(
const activemq::util::Properties& properties,
- activemq::transport::Transport* transport) = 0;
+ activemq::transport::Transport* transport ) = 0;
};
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
index b1514413bb..607ed7f463 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
@@ -31,25 +31,25 @@ ConnectorFactoryMap* ConnectorFactoryMap::getInstance(void)
}
////////////////////////////////////////////////////////////////////////////////
-void ConnectorFactoryMap::registerConnectorFactory(const std::string& name,
- ConnectorFactory* factory)
+void ConnectorFactoryMap::registerConnectorFactory( const std::string& name,
+ ConnectorFactory* factory )
{
factoryMap[name] = factory;
}
////////////////////////////////////////////////////////////////////////////////
-void ConnectorFactoryMap::unregisterConnectorFactory(const std::string& name)
+void ConnectorFactoryMap::unregisterConnectorFactory( const std::string& name )
{
- factoryMap.erase(name);
+ factoryMap.erase( name );
}
////////////////////////////////////////////////////////////////////////////////
-ConnectorFactory* ConnectorFactoryMap::lookup(const std::string& name)
+ConnectorFactory* ConnectorFactoryMap::lookup( const std::string& name )
{
std::map::const_iterator itr =
- factoryMap.find(name);
+ factoryMap.find( name );
- if(itr != factoryMap.end())
+ if( itr != factoryMap.end() )
{
return itr->second;
}
@@ -60,14 +60,14 @@ ConnectorFactory* ConnectorFactoryMap::lookup(const std::string& name)
////////////////////////////////////////////////////////////////////////////////
std::size_t ConnectorFactoryMap::getFactoryNames(
- std::vector& factoryList)
+ std::vector& factoryList )
{
std::map::const_iterator itr =
factoryMap.begin();
- for(; itr != factoryMap.end(); ++itr)
+ for( ; itr != factoryMap.end(); ++itr )
{
- factoryList.insert(factoryList.end(), itr->first);
+ factoryList.insert( factoryList.end(), itr->first );
}
return factoryMap.size();
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
index 353596d4c5..fa3eb22e0e 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
@@ -27,66 +27,66 @@
namespace activemq{
namespace connector{
- /**
- * Lookup Map for Connector Factories. Use the Connector name to
- * find the associated factory. This class does not take ownership
- * of the stored factories, they must be deallocated somewhere.
- */
- class ConnectorFactoryMap
- {
- public:
+ /**
+ * Lookup Map for Connector Factories. Use the Connector name to
+ * find the associated factory. This class does not take ownership
+ * of the stored factories, they must be deallocated somewhere.
+ */
+ class ConnectorFactoryMap
+ {
+ public:
- /**
- * Gets a singleton instance of this class.
- */
- static ConnectorFactoryMap* getInstance(void);
+ /**
+ * Gets a singleton instance of this class.
+ */
+ static ConnectorFactoryMap* getInstance(void);
- /**
- * Registers a new Connector Factory with this map
- * @param name to associate the factory with
- * @param factory to store.
- */
- void registerConnectorFactory(const std::string& name,
- ConnectorFactory* factory);
+ /**
+ * Registers a new Connector Factory with this map
+ * @param name to associate the factory with
+ * @param factory to store.
+ */
+ void registerConnectorFactory( const std::string& name,
+ ConnectorFactory* factory );
- /**
- * Unregisters a Connector Factory with this map
- * @param name of the factory to remove
- */
- void unregisterConnectorFactory(const std::string& name);
+ /**
+ * Unregisters a Connector Factory with this map
+ * @param name of the factory to remove
+ */
+ void unregisterConnectorFactory( const std::string& name );
- /**
- * Lookup the named factory in the Map
- * @param the factory name to lookup
- * @return the factory assciated with the name, or NULL
- */
- ConnectorFactory* lookup(const std::string& name);
+ /**
+ * Lookup the named factory in the Map
+ * @param the factory name to lookup
+ * @return the factory assciated with the name, or NULL
+ */
+ ConnectorFactory* lookup( const std::string& name );
- /**
- * Fetch a list of factory names that this Map contains
- * @param vector object to receive the list
- * @returns count of factories.
- */
- std::size_t getFactoryNames(std::vector& factoryList);
+ /**
+ * Fetch a list of factory names that this Map contains
+ * @param vector object to receive the list
+ * @returns count of factories.
+ */
+ std::size_t getFactoryNames( std::vector< std::string >& factoryList );
- private:
+ private:
- // Hidden Contrustor, prevents instantiation
- ConnectorFactoryMap() {};
+ // Hidden Contrustor, prevents instantiation
+ ConnectorFactoryMap() {};
- // Hidden Destructor.
- virtual ~ConnectorFactoryMap() {};
+ // Hidden Destructor.
+ virtual ~ConnectorFactoryMap() {};
- // Hidden Copy Constructore
- ConnectorFactoryMap(const ConnectorFactoryMap& factoryMap);
+ // Hidden Copy Constructore
+ ConnectorFactoryMap( const ConnectorFactoryMap& factoryMap );
- // Hidden Assignment operator
- ConnectorFactoryMap operator=(const ConnectorFactoryMap& factoryMap);
+ // Hidden Assignment operator
+ ConnectorFactoryMap operator=( const ConnectorFactoryMap& factoryMap );
- // Map of Factories
- std::map factoryMap;
+ // Map of Factories
+ std::map< std::string, ConnectorFactory* > factoryMap;
- };
+ };
}}
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h
index 78e61b8042..62c15dc274 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMapRegistrar.h
@@ -59,9 +59,9 @@ namespace connector{
{
// UnRegister it in the map.
ConnectorFactoryMap::getInstance()->
- unregisterConnectorFactory(name);
+ unregisterConnectorFactory( name );
- if(manageLifetime)
+ if( manageLifetime )
{
delete factory;
}
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorResource.h b/activemq-cpp/src/main/activemq/connector/ConnectorResource.h
index 1278354299..1d064421e9 100644
--- a/activemq-cpp/src/main/activemq/connector/ConnectorResource.h
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorResource.h
@@ -14,10 +14,8 @@ namespace connector{
{
public:
- /**
- * Destructor
- */
virtual ~ConnectorResource() {}
+
};
}}
diff --git a/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h b/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h
index f5daf6678a..9d5c4e0926 100644
--- a/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/ConsumerInfo.h
@@ -29,9 +29,6 @@ namespace connector{
{
public:
- /**
- * Destructor
- */
virtual ~ConsumerInfo(void) {}
/**
diff --git a/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h b/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
index 6e534be5d2..9021569552 100644
--- a/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
+++ b/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
@@ -37,8 +37,10 @@ namespace connector{
* @param consumer the target consumer of the dispatch.
* @param msg the message to be dispatched.
*/
- virtual void onConsumerMessage( ConsumerInfo* consumer,
+ virtual void onConsumerMessage(
+ ConsumerInfo* consumer,
core::ActiveMQMessage* msg ) = 0;
+
};
}}
diff --git a/activemq-cpp/src/main/activemq/connector/SessionInfo.h b/activemq-cpp/src/main/activemq/connector/SessionInfo.h
index fbd1662c6d..d6b45aaff3 100644
--- a/activemq-cpp/src/main/activemq/connector/SessionInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/SessionInfo.h
@@ -28,9 +28,6 @@ namespace connector{
{
public:
- /**
- * Destructor
- */
virtual ~SessionInfo(void) {}
/**
diff --git a/activemq-cpp/src/main/activemq/connector/TransactionInfo.h b/activemq-cpp/src/main/activemq/connector/TransactionInfo.h
index 677674878c..75d75fb2eb 100644
--- a/activemq-cpp/src/main/activemq/connector/TransactionInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/TransactionInfo.h
@@ -28,9 +28,6 @@ namespace connector{
{
public:
- /**
- * Destructor
- */
virtual ~TransactionInfo(void) {}
/**
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h
index e83ba43d13..dc7be426f7 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandListener.h
@@ -33,7 +33,7 @@ namespace stomp{
{
public:
- virtual ~StompCommandListener(void) {}
+ virtual ~StompCommandListener(void) {}
/**
* Process the Stomp Command
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
index ff73064ebb..2ba0d9b164 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
@@ -35,14 +35,14 @@ StompCommandReader::StompCommandReader(void)
}
////////////////////////////////////////////////////////////////////////////////
-StompCommandReader::StompCommandReader(InputStream* is)
+StompCommandReader::StompCommandReader( InputStream* is )
{
inputStream = is;
}
////////////////////////////////////////////////////////////////////////////////
Command* StompCommandReader::readCommand(void)
- throw (CommandIOException)
+ throw ( CommandIOException )
{
try
{
@@ -70,53 +70,53 @@ Command* StompCommandReader::readCommand(void)
void StompCommandReader::readStompCommand( StompFrame& frame )
throw ( StompConnectorException )
{
- while( true )
- {
- // Clean up the mess.
- buffer.clear();
+ while( true )
+ {
+ // Clean up the mess.
+ buffer.clear();
- // Read the command;
- readStompHeaderLine();
+ // Read the command;
+ readStompHeaderLine();
// Ignore all white space before the command.
int offset=-1;
for( size_t ix = 0; ix < buffer.size()-1; ++ix )
{
- // Find the first non space character
- char b = buffer[ix];
+ // Find the first non space character
+ char b = buffer[ix];
switch ( b )
{
- case '\n':
- case '\t':
- case '\r':
- break;
-
- default:
- offset = ix;
- break;
+ case '\n':
+ case '\t':
+ case '\r':
+ break;
+
+ default:
+ offset = ix;
+ break;
}
- if( offset != -1 )
- {
- break;
- }
+ if( offset != -1 )
+ {
+ break;
+ }
}
-
- if( offset >= 0 )
- {
- // Set the command in the frame - copy the memory.
- frame.setCommand( reinterpret_cast(&buffer[offset]) );
- break;
- }
-
- }
+
+ if( offset >= 0 )
+ {
+ // Set the command in the frame - copy the memory.
+ frame.setCommand( reinterpret_cast(&buffer[offset]) );
+ break;
+ }
+
+ }
// Clean up the mess.
buffer.clear();
}
////////////////////////////////////////////////////////////////////////////////
void StompCommandReader::readStompHeaders( StompFrame& frame )
- throw (StompConnectorException)
+ throw ( StompConnectorException )
{
// Read the command;
bool endOfHeaders = false;
@@ -173,7 +173,7 @@ void StompCommandReader::readStompHeaders( StompFrame& frame )
////////////////////////////////////////////////////////////////////////////////
int StompCommandReader::readStompHeaderLine(void)
- throw (StompConnectorException)
+ throw ( StompConnectorException )
{
int count = 0;
@@ -223,10 +223,10 @@ void StompCommandReader::readStompBody( StompFrame& frame )
content_length = strtoul(
length.c_str(),
&stopped_string,
- 10);
+ 10 );
}
- if(content_length != 0)
+ if( content_length != 0 )
{
// For this case its assumed that content length indicates how
// much to read. We reserve space in the buffer for it to
@@ -271,7 +271,7 @@ void StompCommandReader::readStompBody( StompFrame& frame )
content_length++;
- if(byte != '\0')
+ if( byte != '\0' )
{
continue;
}
@@ -283,7 +283,7 @@ void StompCommandReader::readStompBody( StompFrame& frame )
if( content_length != 0 )
{
char* cpyBody = new char[content_length];
- memcpy(cpyBody, &buffer[0], content_length);
+ memcpy( cpyBody, &buffer[0], content_length );
// Set the body contents in the frame - copy the memory
frame.setBody( cpyBody, content_length );
@@ -294,8 +294,8 @@ void StompCommandReader::readStompBody( StompFrame& frame )
}
////////////////////////////////////////////////////////////////////////////////
-int StompCommandReader::read(unsigned char* buffer, int count)
- throw(io::IOException)
+int StompCommandReader::read( unsigned char* buffer, int count )
+ throw( io::IOException )
{
if( inputStream == NULL )
{
@@ -312,9 +312,9 @@ int StompCommandReader::read(unsigned char* buffer, int count)
// pause in hopes that some more data will show up.
while( true )
{
- head += inputStream->read(&buffer[head], count - head);
+ head += inputStream->read( &buffer[head], count - head );
- if(head == count)
+ if( head == count )
{
return count;
}
@@ -325,7 +325,7 @@ int StompCommandReader::read(unsigned char* buffer, int count)
}
////////////////////////////////////////////////////////////////////////////////
-unsigned char StompCommandReader::readByte(void) throw(io::IOException)
+unsigned char StompCommandReader::readByte(void) throw( io::IOException )
{
if( inputStream == NULL )
{
@@ -335,6 +335,6 @@ unsigned char StompCommandReader::readByte(void) throw(io::IOException)
}
unsigned char c = 0;
- inputStream->read(&c, 1);
+ inputStream->read( &c, 1 );
return c;
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
index 385ebfe369..e4461e94af 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
@@ -53,7 +53,7 @@ namespace stomp{
/**
* Deafult Constructor
*/
- StompCommandReader( void );
+ StompCommandReader( void );
/**
* Constructor.
@@ -61,24 +61,21 @@ namespace stomp{
*/
StompCommandReader( io::InputStream* is );
- /**
- * Destructor
- */
- virtual ~StompCommandReader(void) {}
+ virtual ~StompCommandReader(void) {}
/**
* Reads a command from the given input stream.
* @return The next command available on the stream.
* @throws CommandIOException if a problem occurs during the read.
*/
- virtual transport::Command* readCommand( void )
+ virtual transport::Command* readCommand(void)
throw ( transport::CommandIOException );
/**
* Sets the target input stream.
* @param Target Input Stream
*/
- virtual void setInputStream(io::InputStream* is){
+ virtual void setInputStream( io::InputStream* is ){
inputStream = is;
}
@@ -86,7 +83,7 @@ namespace stomp{
* Gets the target input stream.
* @return Target Input Stream
*/
- virtual io::InputStream* getInputStream( void ){
+ virtual io::InputStream* getInputStream(void){
return inputStream;
}
@@ -97,7 +94,7 @@ namespace stomp{
* @return The number of bytes read.
* @throws IOException thrown if an error occurs.
*/
- virtual int read(unsigned char* buffer, int count)
+ virtual int read( unsigned char* buffer, int count )
throw( io::IOException );
/**
@@ -130,7 +127,7 @@ namespace stomp{
* @return number of bytes read, zero if there was a problem.
* @throws StompConnectorException
*/
- int readStompHeaderLine( void ) throw ( StompConnectorException );
+ int readStompHeaderLine(void) throw ( StompConnectorException );
/**
* Reads the Stomp Body from the Wire and store it in the frame.
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp
index aa8ec59eee..be832fe6b5 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp
@@ -35,7 +35,7 @@ StompCommandWriter::StompCommandWriter(void)
}
////////////////////////////////////////////////////////////////////////////////
-StompCommandWriter::StompCommandWriter(OutputStream* os)
+StompCommandWriter::StompCommandWriter( OutputStream* os )
{
outputStream = os;
}
@@ -103,8 +103,8 @@ void StompCommandWriter::writeCommand( const Command* command )
}
////////////////////////////////////////////////////////////////////////////////
-void StompCommandWriter::write(const unsigned char* buffer, int count)
- throw(IOException)
+void StompCommandWriter::write( const unsigned char* buffer, int count )
+ throw( IOException )
{
if( outputStream == NULL )
{
@@ -117,7 +117,7 @@ void StompCommandWriter::write(const unsigned char* buffer, int count)
}
////////////////////////////////////////////////////////////////////////////////
-void StompCommandWriter::writeByte(unsigned char v) throw(IOException)
+void StompCommandWriter::writeByte( unsigned char v ) throw( IOException )
{
if( outputStream == NULL )
{
@@ -130,8 +130,8 @@ void StompCommandWriter::writeByte(unsigned char v) throw(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void StompCommandWriter::write(const char* buffer, int count)
- throw(io::IOException)
+void StompCommandWriter::write( const char* buffer, int count )
+ throw( io::IOException )
{
- write(reinterpret_cast(buffer), count);
+ write( reinterpret_cast( buffer ), count );
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h
index e98c827f6b..03f76789e2 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h
@@ -48,7 +48,7 @@ namespace stomp{
/**
* Default Constructor
*/
- StompCommandWriter(void);
+ StompCommandWriter(void);
/**
* Constructor.
@@ -56,15 +56,12 @@ namespace stomp{
*/
StompCommandWriter( io::OutputStream* os );
- /**
- * Destructor
- */
- virtual ~StompCommandWriter(void) {}
+ virtual ~StompCommandWriter(void) {}
/**
* Sets the target output stream.
*/
- virtual void setOutputStream(io::OutputStream* os){
+ virtual void setOutputStream( io::OutputStream* os ){
outputStream = os;
}
@@ -90,7 +87,7 @@ namespace stomp{
* @param count the number of bytes in the array to write.
* @throws IOException thrown if an error occurs.
*/
- virtual void write(const unsigned char* buffer, int count)
+ virtual void write( const unsigned char* buffer, int count )
throw( io::IOException );
/**
@@ -98,7 +95,7 @@ namespace stomp{
* @param v The value to be written.
* @throws IOException thrown if an error occurs.
*/
- virtual void writeByte(unsigned char v) throw( io::IOException );
+ virtual void writeByte( unsigned char v ) throw( io::IOException );
private:
@@ -108,7 +105,7 @@ namespace stomp{
* @param count the number of bytes in the array to write.
* @throws IOException thrown if an error occurs.
*/
- virtual void write(const char* buffer, int count)
+ virtual void write( const char* buffer, int count )
throw( io::IOException );
};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp b/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
index cbfea839a2..e1102a7abb 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
@@ -52,7 +52,7 @@ StompConnector::StompConnector( Transport* transport,
const util::Properties& properties )
throw ( IllegalArgumentException )
{
- if(transport == NULL)
+ if( transport == NULL )
{
throw IllegalArgumentException(
__FILE__, __LINE__,
@@ -64,8 +64,8 @@ StompConnector::StompConnector( Transport* transport,
this->exceptionListener = NULL;
this->messageListener = NULL;
this->sessionManager = NULL;
- this->nextProducerId = 0;
- this->nextTransactionId = 0;
+ this->nextProducerId = 1;
+ this->nextTransactionId = 1;
this->properties.copy( &properties );
// Observe the transport for events.
@@ -96,7 +96,7 @@ StompConnector::~StompConnector(void)
////////////////////////////////////////////////////////////////////////////////
unsigned int StompConnector::getNextProducerId(void)
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
return nextProducerId++;
}
@@ -107,7 +107,7 @@ unsigned int StompConnector::getNextProducerId(void)
////////////////////////////////////////////////////////////////////////////////
unsigned int StompConnector::getNextTransactionId(void)
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
return nextTransactionId++;
}
@@ -138,7 +138,7 @@ void StompConnector::addCmdListener(
void StompConnector::removeCmdListener(
commands::CommandConstants::CommandId commandId )
{
- cmdListenerMap.erase(commandId);
+ cmdListenerMap.erase( commandId );
}
////////////////////////////////////////////////////////////////////////////////
@@ -199,7 +199,7 @@ void StompConnector::connect(void)
ConnectCommand cmd;
// Encode User Name and Password and Client ID
- string login = getLogin();
+ string login = getUsername();
if( login.length() > 0 ){
cmd.setLogin( login );
}
@@ -216,6 +216,8 @@ void StompConnector::connect(void)
if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
{
+ delete response;
+
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::connect - Failed on Connect Request" );
@@ -226,6 +228,8 @@ void StompConnector::connect(void)
if( connected == NULL )
{
+ delete response;
+
throw StompConnectorException(
__FILE__, __LINE__,
"StompConnector::connect - "
@@ -281,7 +285,7 @@ void StompConnector::disconnect(void)
////////////////////////////////////////////////////////////////////////////////
SessionInfo* StompConnector::createSession(
- cms::Session::AcknowledgeMode ackMode)
+ cms::Session::AcknowledgeMode ackMode )
throw( ConnectorException )
{
try
@@ -296,9 +300,9 @@ SessionInfo* StompConnector::createSession(
////////////////////////////////////////////////////////////////////////////////
ConsumerInfo* StompConnector::createConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
- const std::string& selector)
+ const std::string& selector )
throw ( ConnectorException )
{
try
@@ -314,11 +318,11 @@ ConsumerInfo* StompConnector::createConsumer(
////////////////////////////////////////////////////////////////////////////////
ConsumerInfo* StompConnector::createDurableConsumer(
- cms::Topic* topic,
+ const cms::Topic* topic,
SessionInfo* session,
const std::string& name,
const std::string& selector,
- bool noLocal)
+ bool noLocal )
throw ( ConnectorException )
{
try
@@ -334,8 +338,8 @@ ConsumerInfo* StompConnector::createDurableConsumer(
////////////////////////////////////////////////////////////////////////////////
ProducerInfo* StompConnector::createProducer(
- cms::Destination* destination,
- SessionInfo* session)
+ const cms::Destination* destination,
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -355,30 +359,30 @@ ProducerInfo* StompConnector::createProducer(
}
////////////////////////////////////////////////////////////////////////////////
-cms::Topic* StompConnector::createTopic(const std::string& name,
- SessionInfo* session)
+cms::Topic* StompConnector::createTopic( const std::string& name,
+ SessionInfo* session )
throw ( ConnectorException )
{
try
{
enforceConnected();
- return new StompTopic(name);
+ return new StompTopic( name );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
}
////////////////////////////////////////////////////////////////////////////////
-cms::Queue* StompConnector::createQueue(const std::string& name,
- SessionInfo* session)
+cms::Queue* StompConnector::createQueue( const std::string& name,
+ SessionInfo* session )
throw ( ConnectorException )
{
try
{
enforceConnected();
- return new StompQueue(name);
+ return new StompQueue( name );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCHALL_THROW( ConnectorException );
@@ -386,7 +390,7 @@ cms::Queue* StompConnector::createQueue(const std::string& name,
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryTopic* StompConnector::createTemporaryTopic(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -401,7 +405,7 @@ cms::TemporaryTopic* StompConnector::createTemporaryTopic(
////////////////////////////////////////////////////////////////////////////////
cms::TemporaryQueue* StompConnector::createTemporaryQueue(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -416,7 +420,7 @@ cms::TemporaryQueue* StompConnector::createTemporaryQueue(
////////////////////////////////////////////////////////////////////////////////
void StompConnector::send(cms::Message* message,
- ProducerInfo* producerInfo)
+ ProducerInfo* producerInfo )
throw ( ConnectorException )
{
try
@@ -434,7 +438,7 @@ void StompConnector::send(cms::Message* message,
"Message is not a valid stomp type.");
}
- if( session->getAckMode() == cms::Session::Transactional )
+ if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
{
StompCommand* stompCommand =
dynamic_cast< StompCommand* >( message );
@@ -460,19 +464,19 @@ void StompConnector::send(cms::Message* message,
}
////////////////////////////////////////////////////////////////////////////////
-void StompConnector::send(std::list& messages,
- ProducerInfo* producerInfo)
+void StompConnector::send( std::list& messages,
+ ProducerInfo* producerInfo )
throw ( ConnectorException )
{
try
{
enforceConnected();
- list::const_iterator itr = messages.begin();
+ list< cms::Message* >::const_iterator itr = messages.begin();
- for(; itr != messages.end(); ++itr)
+ for( ; itr != messages.end(); ++itr )
{
- this->send(*itr, producerInfo);
+ this->send( *itr, producerInfo );
}
}
AMQ_CATCH_RETHROW( ConnectorException )
@@ -491,7 +495,7 @@ void StompConnector::acknowledge( const SessionInfo* session,
// Auto to Stomp means don't do anything, so we drop it here
// for client acknowledge we have to send and ack.
- if( session->getAckMode() == cms::Session::ClientAcknowledge )
+ if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
{
AckCommand cmd;
@@ -505,7 +509,7 @@ void StompConnector::acknowledge( const SessionInfo* session,
cmd.setMessageId( message->getCMSMessageId() );
- if( session->getAckMode() == cms::Session::Transactional )
+ if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
{
cmd.setTransactionId(
Integer::toString(
@@ -521,7 +525,7 @@ void StompConnector::acknowledge( const SessionInfo* session,
////////////////////////////////////////////////////////////////////////////////
TransactionInfo* StompConnector::startTransaction(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -548,8 +552,8 @@ TransactionInfo* StompConnector::startTransaction(
}
////////////////////////////////////////////////////////////////////////////////
-void StompConnector::commit(TransactionInfo* transaction,
- SessionInfo* session)
+void StompConnector::commit( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -568,8 +572,8 @@ void StompConnector::commit(TransactionInfo* transaction,
}
////////////////////////////////////////////////////////////////////////////////
-void StompConnector::rollback(TransactionInfo* transaction,
- SessionInfo* session)
+void StompConnector::rollback( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException )
{
try
@@ -590,7 +594,7 @@ void StompConnector::rollback(TransactionInfo* transaction,
////////////////////////////////////////////////////////////////////////////////
cms::Message* StompConnector::createMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException )
{
try
@@ -614,7 +618,7 @@ cms::Message* StompConnector::createMessage(
////////////////////////////////////////////////////////////////////////////////
cms::BytesMessage* StompConnector::createBytesMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException )
{
try
@@ -638,7 +642,7 @@ cms::BytesMessage* StompConnector::createBytesMessage(
////////////////////////////////////////////////////////////////////////////////
cms::TextMessage* StompConnector::createTextMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException )
{
try
@@ -662,7 +666,7 @@ cms::TextMessage* StompConnector::createTextMessage(
////////////////////////////////////////////////////////////////////////////////
cms::MapMessage* StompConnector::createMapMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException )
{
try
@@ -676,7 +680,7 @@ cms::MapMessage* StompConnector::createMapMessage(
}
////////////////////////////////////////////////////////////////////////////////
-void StompConnector::unsubscribe(const std::string& name)
+void StompConnector::unsubscribe( const std::string& name )
throw ( ConnectorException )
{
try
@@ -723,7 +727,7 @@ void StompConnector::onCommand( transport::Command* command )
{
StompCommand* stompCommand = dynamic_cast< StompCommand* >(command);
- if(stompCommand == NULL)
+ if( stompCommand == NULL )
{
fire( ConnectorException(
__FILE__, __LINE__,
@@ -777,18 +781,21 @@ void StompConnector::onStompCommand( commands::StompCommand* command )
try
{
ErrorCommand* error =
- dynamic_cast(command);
+ dynamic_cast( command );
- if(error != NULL)
+ if( error != NULL )
{
fire( StompConnectorException(
- __FILE__, __LINE__,
- (string( "StompConnector::onStompCommand - " ) +
- error->getErrorMessage() ).c_str() ) );
+ __FILE__, __LINE__,
+ ( string( "StompConnector::onStompCommand - " ) +
+ error->getErrorMessage() ).c_str() ) );
// Shutdown
close();
}
+
+ // command is done here, delete it.
+ delete command;
}
AMQ_CATCH_RETHROW( StompConnectorException )
AMQ_CATCHALL_THROW( StompConnectorException );
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h b/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
index a8a2fd5633..a81c3c7ea3 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
@@ -29,6 +29,7 @@
#include
#include
#include
+#include
#include
namespace activemq{
@@ -57,7 +58,7 @@ namespace stomp{
// Maps Command Ids to listener that are interested
typedef std::map< commands::CommandConstants::CommandId,
- StompCommandListener*> CmdListenerMap;
+ StompCommandListener* > CmdListenerMap;
private:
@@ -192,20 +193,30 @@ namespace stomp{
*/
virtual std::string getClientId(void) const {
return properties.getProperty(
- commands::CommandConstants::toString(
- commands::CommandConstants::HEADER_CLIENT_ID ), "" );
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_CLIENTID ), "" );
}
- virtual std::string getLogin(void) const {
+ /**
+ * Gets the Username for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Username String
+ */
+ virtual std::string getUsername(void) const {
return properties.getProperty(
- commands::CommandConstants::toString(
- commands::CommandConstants::HEADER_LOGIN ), "" );
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_USERNAME ), "" );
}
+ /**
+ * Gets the Password for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Password String
+ */
virtual std::string getPassword(void) const {
return properties.getProperty(
- commands::CommandConstants::toString(
- commands::CommandConstants::HEADER_PASSWORD ), "" );
+ core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::PARAM_PASSWORD ), "" );
}
/**
@@ -215,7 +226,7 @@ namespace stomp{
* @throws InvalidStateException if the Transport is not set
*/
virtual transport::Transport& getTransport(void) const
- throw (exceptions::InvalidStateException ) {
+ throw ( exceptions::InvalidStateException ) {
if( transport == NULL ) {
throw exceptions::InvalidStateException(
@@ -234,7 +245,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual SessionInfo* createSession(
- cms::Session::AcknowledgeMode ackMode)
+ cms::Session::AcknowledgeMode ackMode )
throw( ConnectorException );
/**
@@ -245,9 +256,9 @@ namespace stomp{
* @throws ConnectorException
*/
virtual ConsumerInfo* createConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
- const std::string& selector = "")
+ const std::string& selector = "" )
throw ( ConnectorException );
/**
@@ -262,11 +273,11 @@ namespace stomp{
* @throws ConnectorException
*/
virtual ConsumerInfo* createDurableConsumer(
- cms::Topic* topic,
+ const cms::Topic* topic,
SessionInfo* session,
const std::string& name,
const std::string& selector = "",
- bool noLocal = false)
+ bool noLocal = false )
throw ( ConnectorException );
/**
@@ -277,7 +288,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual ProducerInfo* createProducer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session)
throw ( ConnectorException );
@@ -311,7 +322,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual cms::TemporaryTopic* createTemporaryTopic(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -322,7 +333,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual cms::TemporaryQueue* createTemporaryQueue(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -351,7 +362,7 @@ namespace stomp{
*/
virtual void acknowledge( const SessionInfo* session,
const cms::Message* message,
- AckType ackType)
+ AckType ackType )
throw ( ConnectorException );
/**
@@ -360,7 +371,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual TransactionInfo* startTransaction(
- SessionInfo* session)
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -369,8 +380,8 @@ namespace stomp{
* @param Session Information
* @throws ConnectorException
*/
- virtual void commit(TransactionInfo* transaction,
- SessionInfo* session)
+ virtual void commit( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -379,8 +390,8 @@ namespace stomp{
* @param Session Information
* @throws ConnectorException
*/
- virtual void rollback(TransactionInfo* transaction,
- SessionInfo* session)
+ virtual void rollback( TransactionInfo* transaction,
+ SessionInfo* session )
throw ( ConnectorException );
/**
@@ -391,7 +402,7 @@ namespace stomp{
*/
virtual cms::Message* createMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException );
/**
@@ -402,7 +413,7 @@ namespace stomp{
*/
virtual cms::BytesMessage* createBytesMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException );
/**
@@ -413,7 +424,7 @@ namespace stomp{
*/
virtual cms::TextMessage* createTextMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException );
/**
@@ -424,7 +435,7 @@ namespace stomp{
*/
virtual cms::MapMessage* createMapMessage(
SessionInfo* session,
- TransactionInfo* transaction)
+ TransactionInfo* transaction )
throw ( ConnectorException );
/**
@@ -448,7 +459,7 @@ namespace stomp{
* @param listener the observer.
*/
virtual void setConsumerMessageListener(
- ConsumerMessageListener* listener)
+ ConsumerMessageListener* listener )
{
this->messageListener = listener;
@@ -463,7 +474,7 @@ namespace stomp{
* @param ExceptionListener the observer.
*/
virtual void setExceptionListener(
- cms::ExceptionListener* listener)
+ cms::ExceptionListener* listener )
{
this->exceptionListener = listener;
}
@@ -520,8 +531,8 @@ namespace stomp{
private:
- unsigned int getNextProducerId( void );
- unsigned int getNextTransactionId( void );
+ unsigned int getNextProducerId(void);
+ unsigned int getNextTransactionId(void);
// Check for Connected State and Throw an exception if not.
void enforceConnected( void ) throw ( ConnectorException );
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h
index 155e0a991b..09a866e87b 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h
@@ -40,9 +40,9 @@ namespace stomp{
StompConnectorException(const char* file, const int lineNumber,
const char* msg, ...)
{
- va_list vargs ;
- va_start(vargs, msg) ;
- buildMessage(msg, vargs) ;
+ va_list vargs;
+ va_start( vargs, msg );
+ buildMessage( msg, vargs );
// Set the first mark for this exception.
setMark( file, lineNumber );
@@ -56,6 +56,7 @@ namespace stomp{
virtual exceptions::ActiveMQException* clone() const{
return new StompConnectorException( *this );
}
+
virtual ~StompConnectorException() {}
};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp
index 80b43821be..c293f71864 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp
@@ -31,10 +31,10 @@ using namespace activemq::connector::stomp;
////////////////////////////////////////////////////////////////////////////////
Connector* StompConnectorFactory::createConnector(
const activemq::util::Properties& properties,
- activemq::transport::Transport* transport)
+ activemq::transport::Transport* transport )
{
return dynamic_cast(
- new StompConnector(transport, properties));
+ new StompConnector( transport, properties ) );
}
////////////////////////////////////////////////////////////////////////////////
@@ -43,7 +43,7 @@ ConnectorFactory& StompConnectorFactory::getInstance(void)
// Create a static instance of the registrar and return a reference to
// its internal instance of this class.
static ConnectorFactoryMapRegistrar registrar(
- "stomp", new StompConnectorFactory());
+ "stomp", new StompConnectorFactory() );
return registrar.getFactory();
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h
index 0cd6d8a886..091e11d7a9 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h
@@ -26,9 +26,6 @@ namespace stomp{
class StompConnectorFactory : public connector::ConnectorFactory
{
- private:
-
-
public:
virtual ~StompConnectorFactory(void) {}
@@ -39,7 +36,7 @@ namespace stomp{
*/
virtual Connector* createConnector(
const activemq::util::Properties& properties,
- activemq::transport::Transport* transport);
+ activemq::transport::Transport* transport );
/**
* Returns an instance of this Factory by reference
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h b/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
index b043a4cf14..20a2306aa5 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
@@ -45,7 +45,8 @@ namespace stomp{
consumerId = 0;
destination = NULL;
}
- virtual ~StompConsumerInfo(void) { delete destination; }
+
+ virtual ~StompConsumerInfo(void) { delete destination; }
/**
* Gets this message consumer's message selector expression.
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h b/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h
index b899cf23b4..ca9fb1dcdf 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h
@@ -20,7 +20,7 @@
#include
-#include
+#include
namespace activemq{
namespace connector{
@@ -32,26 +32,25 @@ namespace stomp{
* one of Topic, Queue, TemporaryTopic, or TemporaryQueue.
*/
template
- class StompDestination : public T
+ class StompDestination : public core::ActiveMQDestination
{
- private:
-
- // Destination type
- cms::Destination::DestinationType destType;
-
- // Name of the Destination
- std::string name;
-
public:
- StompDestination(void) {}
+ /**
+ * Copy Consturctor
+ * @param CMS Dest to Copy, must be a compatible type
+ */
+ StompDestination( const cms::Destination* source ) :
+ core::ActiveMQDestination( source ) {}
+ /**
+ * Custom Constructor
+ * @param string destination name plus any params
+ * @param type of destination this represents.
+ */
StompDestination( const std::string& name,
- cms::Destination::DestinationType type )
- {
- this->name = name;
- this->destType = type;
- }
+ cms::Destination::DestinationType type ) :
+ core::ActiveMQDestination( name, type ){}
virtual ~StompDestination(void) {}
@@ -61,15 +60,7 @@ namespace stomp{
* @return name
*/
virtual std::string toProviderString(void) const {
- return getPrefix() + name;
- }
-
- /**
- * Retrieve the Destination Type for this Destination
- * @return The Destination Type
- */
- virtual cms::Destination::DestinationType getDestinationType(void) const {
- return destType;
+ return getPrefix() + core::ActiveMQDestination::getName();
}
/**
@@ -78,16 +69,7 @@ namespace stomp{
* @return string name
*/
virtual std::string toString(void) const {
- return name;
- }
-
- /**
- * Copies the contents of the given Destinastion object to this one.
- * @param source The source Destination object.
- */
- virtual void copy( const cms::Destination& source ) {
- this->destType = source.getDestinationType();
- this->name = source.toString();
+ return core::ActiveMQDestination::getName();
}
protected:
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h b/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
index ac235856f6..ad4d011d52 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompQueue.h
@@ -26,17 +26,27 @@ namespace activemq{
namespace connector{
namespace stomp{
- class StompQueue : public StompDestination
+ class StompQueue : public StompDestination< cms::Queue >
{
public:
- StompQueue(void) : StompDestination< cms::Queue >() {}
+ /**
+ * Copy Consturctor
+ * @param CMS Dest to Copy, must be a compatible type
+ */
+ StompQueue( const cms::Destination* source ) :
+ StompDestination< cms::Queue >( source ) {}
+ /**
+ * Custom Constructor
+ * @param string destination name plus any params
+ * @param type of destination this represents.
+ */
StompQueue(const std::string& name) :
StompDestination< cms::Queue >( name, cms::Destination::QUEUE )
{}
- virtual ~StompQueue(void) {}
+ virtual ~StompQueue(void) {}
/**
* Gets the name of this queue.
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h
index 3449255acb..f75398f8aa 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionInfo.h
@@ -44,18 +44,12 @@ namespace stomp{
public:
- /**
- * Constructor
- */
StompSessionInfo(void)
{
sessionId = 0;
- ackMode = cms::Session::AutoAcknowledge;
+ ackMode = cms::Session::AUTO_ACKNOWLEDGE;
}
- /**
- * Destructor
- */
virtual ~StompSessionInfo(void) {}
/**
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
index 7c424584dc..faa6c486a6 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
@@ -18,16 +18,19 @@
#include "StompSessionManager.h"
#include
+#include
#include
#include
#include
#include
#include
#include
+#include
using namespace std;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::util;
using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace activemq::connector;
@@ -65,7 +68,7 @@ StompSessionManager::~StompSessionManager(void)
////////////////////////////////////////////////////////////////////////////////
unsigned int StompSessionManager::getNextSessionId(void)
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
return nextSessionId++;
}
@@ -76,7 +79,7 @@ unsigned int StompSessionManager::getNextSessionId(void)
////////////////////////////////////////////////////////////////////////////////
unsigned int StompSessionManager::getNextConsumerId(void)
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
return nextConsumerId++;
}
@@ -86,7 +89,7 @@ unsigned int StompSessionManager::getNextConsumerId(void)
////////////////////////////////////////////////////////////////////////////////
connector::SessionInfo* StompSessionManager::createSession(
- cms::Session::AcknowledgeMode ackMode)
+ cms::Session::AcknowledgeMode ackMode )
throw ( exceptions::ActiveMQException )
{
try
@@ -94,7 +97,7 @@ connector::SessionInfo* StompSessionManager::createSession(
SessionInfo* session = new StompSessionInfo();
// Init data
- session->setAckMode(ackMode);
+ session->setAckMode( ackMode );
session->setConnectionId( connectionId );
session->setSessionId( getNextSessionId() );
@@ -114,10 +117,10 @@ void StompSessionManager::removeSession(
////////////////////////////////////////////////////////////////////////////////
connector::ConsumerInfo* StompSessionManager::createConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
- const std::string& selector)
- throw( ConnectorException )
+ const std::string& selector )
+ throw( StompConnectorException )
{
try
{
@@ -127,46 +130,54 @@ connector::ConsumerInfo* StompSessionManager::createConsumer(
return createDurableConsumer(
destination, session, "", selector, false );
}
- AMQ_CATCH_RETHROW( ConnectorException )
- AMQ_CATCHALL_THROW( ConnectorException )
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
connector::ConsumerInfo* StompSessionManager::createDurableConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
const std::string& name,
const std::string& selector,
bool noLocal )
- throw ( ConnectorException )
+ throw ( StompConnectorException )
{
try
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
// Find the right mapping to consumers
ConsumerMap& consumerMap =
destinationMap[ destination->toString() ];
-
+
// We only need to send a sub request if there are no active
// consumers on this destination.
if( consumerMap.empty() )
{
// Send the request to the Broker
SubscribeCommand cmd;
-
- if( session->getAckMode() == cms::Session::ClientAcknowledge )
+
+ if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE )
{
cmd.setAckMode( CommandConstants::ACK_CLIENT );
}
cmd.setDestination( destination->toProviderString() );
- cmd.setNoLocal( noLocal );
+
+ if( noLocal == true )
+ {
+ cmd.setNoLocal( noLocal );
+ }
if( name != "" )
{
cmd.setSubscriptionName( name );
}
+ // Grab any options from the destination and set them
+ // for this subscription.
+ setSubscribeOptions( destination, cmd );
+
// The Selector is set on the first subscribe on this dest,
// and if another consumer is created on this destination
// that specifies a selector it will be ignored. While
@@ -198,18 +209,18 @@ connector::ConsumerInfo* StompSessionManager::createDurableConsumer(
return NULL;
}
- AMQ_CATCH_RETHROW( ConnectorException )
- AMQ_CATCHALL_THROW( ConnectorException )
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
void StompSessionManager::removeConsumer(
- connector::ConsumerInfo* consumer)
- throw( ConnectorException )
+ connector::ConsumerInfo* consumer )
+ throw( StompConnectorException )
{
try
{
- synchronized(&mutex)
+ synchronized( &mutex )
{
DestinationMap::iterator itr =
destinationMap.find( consumer->getDestination().toString() );
@@ -238,8 +249,8 @@ void StompSessionManager::removeConsumer(
}
}
}
- AMQ_CATCH_RETHROW( ConnectorException )
- AMQ_CATCHALL_THROW( ConnectorException )
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -265,10 +276,10 @@ void StompSessionManager::onStompCommand( commands::StompCommand* command )
"No Message Listener Registered." );
}
- synchronized(&mutex)
+ synchronized( &mutex )
{
DestinationMap::iterator itr =
- destinationMap.find( message->getCMSDestination().toString() );
+ destinationMap.find( message->getCMSDestination()->toString() );
if( itr == destinationMap.end() )
{
@@ -280,7 +291,7 @@ void StompSessionManager::onStompCommand( commands::StompCommand* command )
// If we only have 1 consumer, we don't need to clone the original
// message.
- if(itr->second.size() == 1)
+ if( itr->second.size() == 1 )
{
ConsumerInfo* consumerInfo = itr->second.begin()->second;
@@ -301,7 +312,7 @@ void StompSessionManager::onStompCommand( commands::StompCommand* command )
// message.
ConsumerMap::iterator c_itr = itr->second.begin();
- for(; c_itr != itr->second.end(); ++c_itr )
+ for( ; c_itr != itr->second.end(); ++c_itr )
{
ConsumerInfo* consumerInfo = c_itr->second;
@@ -324,3 +335,110 @@ void StompSessionManager::onStompCommand( commands::StompCommand* command )
AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, StompConnectorException )
AMQ_CATCHALL_THROW( StompConnectorException )
}
+
+void StompSessionManager::setSubscribeOptions( const cms::Destination* dest,
+ SubscribeCommand& command )
+ throw ( StompConnectorException )
+{
+ try
+ {
+ // Get the properties of this destination
+ const Properties& destProperties = dest->getProperties();
+
+ if( destProperties.isEmpty() )
+ {
+ // Nothing to do, so save some work and quit now.
+ return;
+ }
+
+ std::string noLocalStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_NOLOCAL );
+
+ if( destProperties.getProperty( noLocalStr, "false" ) == "true" )
+ {
+ command.setNoLocal(
+ Boolean::parseBoolean(
+ destProperties.getProperty( noLocalStr ) ) );
+ }
+
+ std::string selectorStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_SELECTOR );
+
+ if( destProperties.hasProperty( selectorStr ) )
+ {
+ command.setMessageSelector(
+ destProperties.getProperty( selectorStr ) );
+ }
+
+ std::string priorityStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_PRIORITY );
+
+ if( destProperties.hasProperty( priorityStr ) )
+ {
+ command.setPriority(
+ Integer::parseInt(
+ destProperties.getProperty( priorityStr ) ) );
+ }
+
+ std::string dispatchAsyncStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_DISPATCHASYNC );
+
+ if( destProperties.hasProperty( dispatchAsyncStr ) )
+ {
+ command.setDispatchAsync(
+ Boolean::parseBoolean(
+ destProperties.getProperty( dispatchAsyncStr ) ) );
+ }
+
+ std::string exclusiveStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_EXCLUSIVE );
+
+ if( destProperties.hasProperty( exclusiveStr ) )
+ {
+ command.setExclusive(
+ Boolean::parseBoolean(
+ destProperties.getProperty( exclusiveStr ) ) );
+ }
+
+ std::string maxPendingMsgLimitStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
+
+ if( destProperties.hasProperty( maxPendingMsgLimitStr ) )
+ {
+ command.setMaxPendingMsgLimit(
+ Integer::parseInt(
+ destProperties.getProperty( maxPendingMsgLimitStr ) ) );
+ }
+
+ std::string prefetchSizeStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
+
+ if( destProperties.hasProperty( prefetchSizeStr ) )
+ {
+ command.setPrefetchSize(
+ Integer::parseInt(
+ destProperties.getProperty( prefetchSizeStr ) ) );
+ }
+
+ std::string retroactiveStr =
+ ActiveMQConstants::toString(
+ ActiveMQConstants::CONSUMER_RETROACTIVE );
+
+ if( destProperties.hasProperty( retroactiveStr ) )
+ {
+ command.setRetroactive(
+ Boolean::parseBoolean(
+ destProperties.getProperty( retroactiveStr ) ) );
+ }
+ }
+ AMQ_CATCH_RETHROW( StompConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, StompConnectorException )
+ AMQ_CATCHALL_THROW( StompConnectorException )
+}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
index 725be89ebf..c015183cea 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.h
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
namespace activemq{
namespace connector{
@@ -44,8 +45,8 @@ namespace stomp{
private:
// Map Types
- typedef std::map ConsumerMap;
- typedef std::map DestinationMap;
+ typedef std::map< unsigned int, ConsumerInfo* > ConsumerMap;
+ typedef std::map< std::string, ConsumerMap > DestinationMap;
private:
@@ -85,7 +86,7 @@ namespace stomp{
* @return new SessionInfo object
*/
virtual connector::SessionInfo* createSession(
- cms::Session::AcknowledgeMode ackMode)
+ cms::Session::AcknowledgeMode ackMode )
throw ( exceptions::ActiveMQException );
/**
@@ -108,10 +109,10 @@ namespace stomp{
* @return new ConsumerInfo object.
*/
virtual connector::ConsumerInfo* createConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
- const std::string& selector)
- throw( ConnectorException );
+ const std::string& selector )
+ throw( StompConnectorException );
/**
* Creates a new durable consumer to the specified session, will
@@ -126,12 +127,12 @@ namespace stomp{
* @return new ConsumerInfo object.
*/
virtual connector::ConsumerInfo* createDurableConsumer(
- cms::Destination* destination,
+ const cms::Destination* destination,
SessionInfo* session,
const std::string& name,
const std::string& selector,
bool noLocal )
- throw ( ConnectorException );
+ throw ( StompConnectorException );
/**
* Removes the Consumer from the session, will unsubscrive if the
@@ -142,7 +143,7 @@ namespace stomp{
* @throws ConnectorException
*/
virtual void removeConsumer( connector::ConsumerInfo* consumer )
- throw( ConnectorException );
+ throw( StompConnectorException );
/**
* Sets the listener of consumer messages.
@@ -162,7 +163,19 @@ namespace stomp{
* @throw ConnterException
*/
virtual void onStompCommand( commands::StompCommand* command )
- throw ( StompConnectorException );
+ throw ( StompConnectorException );
+
+ protected:
+
+ /**
+ * Sets Subscribe Command options from the properties of a
+ * destination object.
+ * @param The destination that we are subscribing to.
+ * @param The pending Subscribe command
+ */
+ virtual void setSubscribeOptions( const cms::Destination* dest,
+ commands::SubscribeCommand& command )
+ throw ( StompConnectorException );
protected:
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h b/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
index d9a0f64cd9..488183ea20 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompTopic.h
@@ -30,8 +30,18 @@ namespace stomp{
{
public:
- StompTopic(void) : StompDestination() {}
+ /**
+ * Copy Consturctor
+ * @param CMS Dest to Copy, must be a compatible type
+ */
+ StompTopic( const cms::Destination* source ) :
+ StompDestination< cms::Topic >( source ) {}
+ /**
+ * Custom Constructor
+ * @param string destination name plus any params
+ * @param type of destination this represents.
+ */
StompTopic(const std::string& name) :
StompDestination< cms::Topic >( name, cms::Destination::TOPIC )
{}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h b/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h
index 9c05b03e55..4f44b674cb 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/StompTransactionInfo.h
@@ -37,16 +37,13 @@ namespace stomp{
public:
/**
- * TransactionInfo Constructor
+ * Default Constructor
*/
StompTransactionInfo(void) {
transactionId = 0;
session = NULL;
}
- /**
- * Destructor
- */
virtual ~StompTransactionInfo(void) {}
/**
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h
index 855bf98040..8d7cd79177 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/AbstractCommand.h
@@ -113,13 +113,24 @@ namespace commands{
AbstractCommand(void){
frame = new StompFrame;
}
- AbstractCommand(StompFrame* frame){
+ AbstractCommand( StompFrame* frame ){
this->frame = frame;
}
virtual ~AbstractCommand(void){
destroyFrame();
}
+ /**
+ * Gets the properties map for this command.
+ * @return Reference to a Properties object
+ */
+ virtual util::Properties& getProperties(void){
+ return getFrame().getProperties();
+ }
+ virtual const util::Properties& getProperties(void) const{
+ return getFrame().getProperties();
+ }
+
/**
* Sets the Command Id of this Message
* @param Command Id
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h
index 370caa0b65..fbf6269770 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/AckCommand.h
@@ -44,7 +44,7 @@ namespace commands{
initialize( getFrame() );
}
AckCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~AckCommand(void) {}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h
index 3deb4465eb..06db5dca64 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/BeginCommand.h
@@ -45,7 +45,7 @@ namespace commands{
initialize( getFrame() );
}
BeginCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~BeginCommand(void) {}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
index 4a24c9c9da..3058e1ae98 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/BytesMessageCommand.h
@@ -41,7 +41,7 @@ namespace commands{
initialize( getFrame() );
}
BytesMessageCommand( StompFrame* frame ) :
- StompMessage< cms::BytesMessage >(frame) {
+ StompMessage< cms::BytesMessage >( frame ) {
validate( getFrame() );
}
virtual ~BytesMessageCommand(void) {}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
index baa45bee5c..d5b27d62cd 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.cpp
@@ -62,7 +62,8 @@ CommandConstants::StaticInitializer::StaticInitializer(){
stompHeaders[HEADER_RESPONSEID] = "response-id";
stompHeaders[HEADER_EXPIRES] = "expires";
stompHeaders[HEADER_PERSISTANT] = "persistent";
- stompHeaders[HEADER_PRIORITY] = "priority";
+ stompHeaders[HEADER_JMSPRIORITY] = "priority";
+ stompHeaders[HEADER_CONSUMERPRIORITY] = "activemq.priority";
stompHeaders[HEADER_REPLYTO] = "reply-to";
stompHeaders[HEADER_TYPE] = "type";
stompHeaders[HEADER_AMQMSGTYPE] = "amq-msg-type";
@@ -74,7 +75,7 @@ CommandConstants::StaticInitializer::StaticInitializer(){
stompHeaders[HEADER_MAXPENDINGMSGLIMIT] = "activemq.maximumPendingMessageLimit";
stompHeaders[HEADER_NOLOCAL] = "activemq.noLocal";
stompHeaders[HEADER_PREFETCHSIZE] = "activemq.prefetchSize";
- stompHeaders[HEADER_PRIORITY] = "activemq.priority";
+ stompHeaders[HEADER_CONSUMERPRIORITY] = "activemq.priority";
stompHeaders[HEADER_RETROACTIVE] = "activemq.retroactive";
stompHeaders[HEADER_SUBSCRIPTIONNAME] = "activemq.subscriptionName";
stompHeaders[HEADER_TIMESTAMP] = "timestamp";
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h
index 5a6ee768c1..a96cca4bf0 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommandConstants.h
@@ -76,7 +76,8 @@ namespace commands{
HEADER_MAXPENDINGMSGLIMIT,
HEADER_NOLOCAL,
HEADER_PREFETCHSIZE,
- HEADER_PRIORITY,
+ HEADER_JMSPRIORITY,
+ HEADER_CONSUMERPRIORITY,
HEADER_RETROACTIVE,
HEADER_SUBSCRIPTIONNAME,
HEADER_TIMESTAMP,
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h
index 5f1bc64885..df184fa37a 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/CommitCommand.h
@@ -40,7 +40,7 @@ namespace commands{
initialize( getFrame() );
}
CommitCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~CommitCommand(void) {}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h
index a0ef70b905..c8afa2ffc5 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/ConnectCommand.h
@@ -39,7 +39,7 @@ namespace commands{
initialize( getFrame() );
}
ConnectCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~ConnectCommand(void) {};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
index 3423dc2052..f9fe4ba2c4 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/DisconnectCommand.h
@@ -40,7 +40,7 @@ namespace commands{
initialize( getFrame() );
}
DisconnectCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~DisconnectCommand(void){};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h
index f17671a9e8..0b3cc4c1b5 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/ErrorCommand.h
@@ -40,7 +40,7 @@ namespace commands{
initialize( getFrame() );
}
ErrorCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~ErrorCommand(void) {};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
index 053191010f..f3a1b47fbd 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/ReceiptCommand.h
@@ -40,7 +40,7 @@ namespace commands{
initialize( getFrame() );
}
ReceiptCommand( StompFrame* frame ) :
- AbstractCommand(frame) {
+ AbstractCommand( frame ) {
validate( getFrame() );
}
virtual ~ReceiptCommand(void) {}
@@ -60,7 +60,7 @@ namespace commands{
virtual void setReceiptId( const std::string& id ){
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_RECEIPTID),
+ CommandConstants::HEADER_RECEIPTID ),
id );
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h
index b56a6b1a13..c31a0880ca 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/StompCommand.h
@@ -103,6 +103,13 @@ namespace commands{
* @return Stomp CommandId enum
*/
virtual CommandConstants::CommandId getStompCommandId(void) const = 0;
+
+ /**
+ * Retrieves the Properties that are part of this command
+ * @return const reference to a properties object
+ */
+ virtual util::Properties& getProperties(void) = 0;
+ virtual const util::Properties& getProperties(void) const = 0;
};
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
index 9d38fb9d78..f0d3761796 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h
@@ -62,7 +62,7 @@ namespace commands{
StompMessage(void) :
AbstractCommand< transport::Command >(),
- ackHandler( NULL ) { dest = new StompTopic(); }
+ ackHandler( NULL ) { dest = NULL; }
StompMessage( StompFrame* frame ) :
AbstractCommand< transport::Command >( frame ),
ackHandler( NULL )
@@ -112,55 +112,57 @@ namespace commands{
* of this consumed message.
*/
virtual void acknowledge(void) const throw( cms::CMSException ) {
- if(ackHandler != NULL) ackHandler->acknowledgeMessage(this);
+ if(ackHandler != NULL) ackHandler->acknowledgeMessage( this );
}
/**
* Sets the DeliveryMode for this message
* @return DeliveryMode enumerated value.
*/
- virtual cms::Message::DeliveryMode getCMSDeliveryMode(void) const {
+ virtual int getCMSDeliveryMode(void) const {
if(!getFrame().getProperties().hasProperty(
CommandConstants::toString(
CommandConstants::HEADER_PERSISTANT ) ) ) {
- return cms::Message::PERSISTANT;
+ return cms::DeliveryMode::PERSISTANT;
}
- return (cms::Message::DeliveryMode)(
- util::Integer::parseInt( getPropertyValue(
- CommandConstants::toString(
- CommandConstants::HEADER_PERSISTANT ) ) ) );
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PERSISTANT ) ) );
}
/**
* Sets the DeliveryMode for this message
* @param DeliveryMode enumerated value.
*/
- virtual void setCMSDeliveryMode(cms::Message::DeliveryMode mode) {
+ virtual void setCMSDeliveryMode( int mode ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_PERSISTANT ) ,
- util::Integer::toString((int)mode) );
+ util::Integer::toString( mode ) );
}
/**
* Gets the Destination for this Message
- * @return Destination object
+ * @return Destination object can be NULL
*/
- virtual const cms::Destination& getCMSDestination(void) const{
- return *dest;
+ virtual const cms::Destination* getCMSDestination(void) const{
+ return dest;
}
/**
* Sets the Destination for this message
* @param Destination Object
*/
- virtual void setCMSDestination(const cms::Destination& destination) {
- dest->copy( destination );
- setPropertyValue(
- CommandConstants::toString(
- CommandConstants::HEADER_DESTINATION ),
- dest->toProviderString() );
+ virtual void setCMSDestination( const cms::Destination* destination ) {
+ if( destination != NULL )
+ {
+ dest = destination->clone();
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DESTINATION ),
+ dest->toProviderString() );
+ }
}
/**
@@ -177,7 +179,7 @@ namespace commands{
* Sets the Expiration Time for this message
* @param time value
*/
- virtual void setCMSExpiration(long expireTime) {
+ virtual void setCMSExpiration( long expireTime ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_EXPIRES) ,
@@ -198,7 +200,7 @@ namespace commands{
* Sets the CMS Message Id for this message
* @param time value
*/
- virtual void setCMSMessageId(const std::string& id) {
+ virtual void setCMSMessageId( const std::string& id ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_MESSAGEID ),
@@ -212,17 +214,17 @@ namespace commands{
virtual int getCMSPriority(void) const {
return util::Integer::parseInt( getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_PRIORITY ), "0" ) );
+ CommandConstants::HEADER_JMSPRIORITY ), "0" ) );
}
/**
* Sets the Priority Value for this message
* @param priority value
*/
- virtual void setCMSPriority(int priority) {
+ virtual void setCMSPriority( int priority ) {
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_PRIORITY),
+ CommandConstants::HEADER_JMSPRIORITY),
util::Integer::toString( priority ) );
}
@@ -241,7 +243,7 @@ namespace commands{
* Sets the Redelivered Flag for this message
* @param redelivered value
*/
- virtual void setCMSRedelivered(bool redelivered) {
+ virtual void setCMSRedelivered( bool redelivered ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_REDELIVERED ),
@@ -262,7 +264,7 @@ namespace commands{
* Sets the CMS Reply To Address for this message
* @param Reply To value
*/
- virtual void setCMSReplyTo(const std::string& id) {
+ virtual void setCMSReplyTo( const std::string& id ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_REPLYTO ),
@@ -283,7 +285,7 @@ namespace commands{
* Sets the Time Stamp for this message
* @param time stamp value
*/
- virtual void setCMSTimeStamp(long timeStamp) {
+ virtual void setCMSTimeStamp( long timeStamp ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_TIMESTAMP ),
@@ -304,7 +306,7 @@ namespace commands{
* Sets the CMS Message Type for this message
* @param type value
*/
- virtual void setCMSMessageType(const std::string& type) {
+ virtual void setCMSMessageType( const std::string& type ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_TYPE ),
@@ -318,7 +320,7 @@ namespace commands{
* when the Acknowledge method is called.
* @param ActiveMQAckHandler
*/
- virtual void setAckHandler(core::ActiveMQAckHandler* handler) {
+ virtual void setAckHandler( core::ActiveMQAckHandler* handler ) {
this->ackHandler = handler;
}
@@ -338,7 +340,7 @@ namespace commands{
* redelivered
* @param redelivery count
*/
- virtual void setRedeliveryCount(int count) {
+ virtual void setRedeliveryCount( int count ) {
setPropertyValue(
CommandConstants::toString(
CommandConstants::HEADER_REDELIVERYCOUNT ),
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
index d23e9a0d21..918d2860a7 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/SubscribeCommand.h
@@ -53,7 +53,7 @@ namespace commands{
virtual const char* getDestination(void) const{
return getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_DESTINATION) );
+ CommandConstants::HEADER_DESTINATION ) );
}
/**
@@ -63,7 +63,7 @@ namespace commands{
virtual void setDestination( const std::string& dest ){
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_DESTINATION),
+ CommandConstants::HEADER_DESTINATION ),
dest );
}
@@ -74,7 +74,7 @@ namespace commands{
virtual void setAckMode( const CommandConstants::AckMode mode ){
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_ACK),
+ CommandConstants::HEADER_ACK ),
CommandConstants::toString( mode ) );
}
@@ -86,7 +86,7 @@ namespace commands{
return CommandConstants::toAckMode(
getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_ACK) ) );
+ CommandConstants::HEADER_ACK ) ) );
}
/**
@@ -97,7 +97,7 @@ namespace commands{
virtual void setMessageSelector( const std::string& selector ) {
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_SELECTOR),
+ CommandConstants::HEADER_SELECTOR ),
selector );
}
@@ -109,7 +109,7 @@ namespace commands{
virtual const char* getMessageSelector(void) const{
return getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_SELECTOR) );
+ CommandConstants::HEADER_SELECTOR ) );
}
/**
@@ -120,7 +120,7 @@ namespace commands{
virtual void setSubscriptionName( const std::string& name ) {
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_SUBSCRIPTIONNAME),
+ CommandConstants::HEADER_SUBSCRIPTIONNAME ),
name );
}
@@ -132,11 +132,11 @@ namespace commands{
virtual const char* getSubscriptionName(void) const{
return getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_SUBSCRIPTIONNAME) );
+ CommandConstants::HEADER_SUBSCRIPTIONNAME ) );
}
/**
- * Gets hether or not locally sent messages should be ignored for
+ * Gets whether or not locally sent messages should be ignored for
* subscriptions. Set to true to filter out locally sent messages
* @return NoLocal value
*/
@@ -148,7 +148,7 @@ namespace commands{
}
/**
- * Gets hether or not locally sent messages should be ignored for
+ * Sets whether or not locally sent messages should be ignored for
* subscriptions. Set to true to filter out locally sent messages
* @param NoLocal value
*/
@@ -159,6 +159,162 @@ namespace commands{
util::Boolean::toString( noLocal ) );
}
+ /**
+ * Sets whether or not the broker is to dispatch messages in an
+ * asynchronous manner. Set to true if you want Async.
+ * @return true if in dispatch async mode
+ */
+ virtual bool getDispatchAsync(void) const {
+ return util::Boolean::parseBoolean( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DISPATCH_ASYNC ),
+ "false" ) );
+ }
+
+ /**
+ * Sets whether or not the broker is to dispatch messages in an
+ * asynchronous manner. Set to true if you want Async.
+ * @param true for async dispatch mode
+ */
+ virtual void setDispatchAsync( bool dispatchAsync ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_DISPATCH_ASYNC ),
+ util::Boolean::toString( dispatchAsync ) );
+ }
+
+ /**
+ * Gets whether or not this consumer is an exclusive consumer for
+ * this destination.
+ * @return true for exclusive mode
+ */
+ virtual bool getExclusive(void) const {
+ return util::Boolean::parseBoolean( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_EXCLUSIVE ),
+ "false" ) );
+ }
+
+ /**
+ * Sets whether or not this consumer is an exclusive consumer for
+ * this destination.
+ * @param true if in exclusive mode
+ */
+ virtual void setExclusive( bool exclusive ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_EXCLUSIVE ),
+ util::Boolean::toString( exclusive ) );
+ }
+
+ /**
+ * Get the max number of pending messages on a destination
+ * For Slow Consumer Handlingon non-durable topics by dropping old
+ * messages - we can set a maximum pending limit which once a slow
+ * consumer backs up to this high water mark we begin to discard
+ * old messages
+ * @return Max value
+ */
+ virtual int getMaxPendingMsgLimit(void) const {
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MAXPENDINGMSGLIMIT ),
+ "0" ) );
+ }
+
+ /**
+ * Set the max number of pending messages on a destination
+ * For Slow Consumer Handlingon non-durable topics by dropping old
+ * messages - we can set a maximum pending limit which once a slow
+ * consumer backs up to this high water mark we begin to discard
+ * old messages
+ * @param Max value
+ */
+ virtual void setMaxPendingMsgLimit( int limit ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_MAXPENDINGMSGLIMIT ),
+ util::Integer::toString( limit ) );
+ }
+
+ /**
+ * Get the maximum number of pending messages that will be
+ * dispatched to the client. Once this maximum is reached no more
+ * messages are dispatched until the client acknowledges a message.
+ * Set to 1 for very fair distribution of messages across consumers
+ * where processing messages can be slow
+ * @return prefetch size value
+ */
+ virtual int getPrefetchSize(void) const {
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PREFETCHSIZE ),
+ "1000" ) );
+ }
+
+ /**
+ * Set the maximum number of pending messages that will be
+ * dispatched to the client. Once this maximum is reached no more
+ * messages are dispatched until the client acknowledges a message.
+ * Set to 1 for very fair distribution of messages across consumers
+ * where processing messages can be slow
+ * @param prefetch size value
+ */
+ virtual void setPrefetchSize( int size ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_PREFETCHSIZE ),
+ util::Integer::toString( size ) );
+ }
+
+ /**
+ * Gets the priority of the consumer so that dispatching can be
+ * weighted in priority order
+ * @return priority level
+ */
+ virtual int getPriority(void) const {
+ return util::Integer::parseInt( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CONSUMERPRIORITY ),
+ "0" ) );
+ }
+
+ /**
+ * Sets the priority of the consumer so that dispatching can be
+ * weighted in priority order
+ * @param prioirty level
+ */
+ virtual void setPriority( int priority ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_CONSUMERPRIORITY ),
+ util::Integer::toString( priority ) );
+ }
+
+ /**
+ * Get For non-durable topics if this subscription is set to be
+ * retroactive
+ * @return true for retroactive mode
+ */
+ virtual bool getRetroactive(void) const {
+ return util::Boolean::parseBoolean( getPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RETROACTIVE ),
+ "false" ) );
+ }
+
+ /**
+ * Set For non-durable topics if this subscription is set to be
+ * retroactive
+ * @param true if in retroactive mode
+ */
+ virtual void setRetroactive( bool retroactive ) {
+ setPropertyValue(
+ CommandConstants::toString(
+ CommandConstants::HEADER_RETROACTIVE ),
+ util::Boolean::toString( retroactive ) );
+ }
+
protected:
/**
@@ -173,7 +329,7 @@ namespace commands{
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_ACK),
+ CommandConstants::HEADER_ACK ),
CommandConstants::toString(
CommandConstants::ACK_AUTO ) );
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
index e354558b8f..2a11e9ec27 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/TextMessageCommand.h
@@ -68,6 +68,14 @@ namespace commands{
setBytes( msg, strlen(msg) + 1, false );
}
+ /**
+ * Sets the message contents.
+ * @param msg The message buffer.
+ */
+ virtual void setText( const std::string& msg ) throw( cms::CMSException ) {
+ setBytes( msg.c_str(), msg.length() + 1, false );
+ }
+
};
}}}}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h b/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
index 2a9a4a7528..65cd1aec2b 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/commands/UnsubscribeCommand.h
@@ -51,7 +51,7 @@ namespace commands{
virtual const char* getDestination(void) const{
return getPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_DESTINATION) );
+ CommandConstants::HEADER_DESTINATION ) );
}
/**
@@ -60,7 +60,7 @@ namespace commands{
virtual void setDestination( const std::string& dest ){
setPropertyValue(
CommandConstants::toString(
- CommandConstants::HEADER_DESTINATION) ,
+ CommandConstants::HEADER_DESTINATION ),
dest );
}
diff --git a/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h b/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h
index b2b8cc7a52..f7270ffeab 100644
--- a/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h
+++ b/activemq-cpp/src/main/activemq/connector/stomp/marshal/Marshalable.h
@@ -30,7 +30,7 @@ namespace marshal{
{
public:
- virtual ~Marshalable(void) {}
+ virtual ~Marshalable(void) {}
/**
* Marshals the command to a stomp frame.
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h b/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
index 529c66b61c..9b8f2f0fab 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQAckHandler.h
@@ -32,7 +32,7 @@ namespace core{
{
public:
- virtual ~ActiveMQAckHandler(void) {};
+ virtual ~ActiveMQAckHandler(void) {};
/**
* Method called to acknowledge the message passed
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 8d25c7bc78..05c11f677e 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -34,6 +34,7 @@ ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData)
{
this->connectionData = connectionData;
this->started = false;
+ this->closed = false;
this->exceptionListener = NULL;
// We want to be the sink for all messages from the Connector
@@ -57,7 +58,7 @@ cms::Session* ActiveMQConnection::createSession(void)
{
try
{
- return this->createSession( Session::AutoAcknowledge );
+ return this->createSession( Session::AUTO_ACKNOWLEDGE );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -90,10 +91,20 @@ void ActiveMQConnection::close(void) throw ( cms::CMSException )
{
try
{
+ if( closed )
+ {
+ return;
+ }
+
// Once current deliveries are done this stops the delivery
// of any new messages.
started = false;
-
+ closed = true;
+
+ // Shutdown connector and transport
+ connectionData->getConnector()->close();
+ connectionData->getTransport()->close();
+
// Destroy the connection data
delete connectionData;
connectionData = NULL;
@@ -124,7 +135,7 @@ void ActiveMQConnection::addMessageListener( const unsigned int consumerId,
ActiveMQMessageListener* listener )
{
// Place in Map
- synchronized(&mutex)
+ synchronized( &mutex )
{
consumers[consumerId] = listener;
}
@@ -134,7 +145,7 @@ void ActiveMQConnection::addMessageListener( const unsigned int consumerId,
void ActiveMQConnection::removeMessageListener( const unsigned int consumerId )
{
// Remove from Map
- synchronized(&mutex)
+ synchronized( &mutex )
{
consumers.erase( consumerId );
}
@@ -175,9 +186,9 @@ void ActiveMQConnection::onConsumerMessage( connector::ConsumerInfo* consumer,
}
// Started, so lock map and dispatch the message.
- synchronized(&mutex)
+ synchronized( &mutex )
{
- if(consumers.find(consumer->getConsumerId()) != consumers.end())
+ if(consumers.find( consumer->getConsumerId()) != consumers.end() )
{
consumers[consumer->getConsumerId()]->
onActiveMQMessage( message );
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 516179dde2..56ea1327cb 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -32,147 +32,149 @@
namespace activemq{
namespace core{
- class cms::Session;
- class ActiveMQConsumer;
+ class cms::Session;
+ class ActiveMQConsumer;
- class ActiveMQConnection :
- public cms::Connection,
- public connector::ConsumerMessageListener
- {
- private:
+ class ActiveMQConnection :
+ public cms::Connection,
+ public connector::ConsumerMessageListener
+ {
+ private:
- // the registered exception listener
- cms::ExceptionListener* exceptionListener;
+ // the registered exception listener
+ cms::ExceptionListener* exceptionListener;
- // All the data that is used to connect this Connection
- ActiveMQConnectionData* connectionData;
+ // All the data that is used to connect this Connection
+ ActiveMQConnectionData* connectionData;
- // Indicates if this Connection is started
- bool started;
-
- // Map of Consumer Ids to ActiveMQMessageListeners
- std::map consumers;
-
- // Mutex to lock the Consumers Map
- concurrent::Mutex mutex;
-
- public:
+ // Indicates if this Connection is started
+ bool started;
- /**
- * Constructor
- */
- ActiveMQConnection(ActiveMQConnectionData* connectionData);
-
- /**
- * Destructor
- */
- virtual ~ActiveMQConnection(void);
-
- public: // Connection Interface Methods
-
- /**
- * Creates a new Session to work for this Connection
- */
- virtual cms::Session* createSession(void) throw ( cms::CMSException );
+ // Indicates that this connection has been closed, it is no longer
+ // usable after this becomes true
+ bool closed;
- /**
- * Creates a new Session to work for this Connection using the
- * specified acknowledgment mode
- * @param the Acknowledgement Mode to use.
- */
- virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode)
- throw ( cms::CMSException );
+ // Map of Consumer Ids to ActiveMQMessageListeners
+ std::map< unsigned int, ActiveMQMessageListener* > consumers;
+
+ // Mutex to lock the Consumers Map
+ concurrent::Mutex mutex;
+
+ public:
+
+ /**
+ * Constructor
+ * @param Pointer to an ActiveMQConnectionData object, owned here
+ */
+ ActiveMQConnection( ActiveMQConnectionData* connectionData );
+
+ virtual ~ActiveMQConnection(void);
+
+ public: // Connection Interface Methods
+
+ /**
+ * Creates a new Session to work for this Connection
+ */
+ virtual cms::Session* createSession(void) throw ( cms::CMSException );
+
+ /**
+ * Creates a new Session to work for this Connection using the
+ * specified acknowledgment mode
+ * @param the Acknowledgement Mode to use.
+ */
+ virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode )
+ throw ( cms::CMSException );
- /**
- * Get the Client Id for this session
- * @return string version of Client Id
- */
- virtual std::string getClientId(void) const;
+ /**
+ * Get the Client Id for this session
+ * @return string version of Client Id
+ */
+ virtual std::string getClientId(void) const;
- /**
- * Retrieves the Connection Data object for this object.
- * @return pointer to a connection data object.
- */
- virtual ActiveMQConnectionData* getConnectionData(void){
- return connectionData;
- }
+ /**
+ * Retrieves the Connection Data object for this object.
+ * @return pointer to a connection data object.
+ */
+ virtual ActiveMQConnectionData* getConnectionData(void){
+ return connectionData;
+ }
- /**
- * Gets the registered Exception Listener for this connection
- * @return pointer to an exception listnener or NULL
- */
- virtual cms::ExceptionListener* getExceptionListener(void) const{
- return exceptionListener; };
+ /**
+ * Gets the registered Exception Listener for this connection
+ * @return pointer to an exception listnener or NULL
+ */
+ virtual cms::ExceptionListener* getExceptionListener(void) const{
+ return exceptionListener; };
- /**
- * Sets the registed Exception Listener for this connection
- * @param pointer to and ExceptionListener
- */
- virtual void setExceptionListener(cms::ExceptionListener* listener){
- exceptionListener = listener; };
+ /**
+ * Sets the registed Exception Listener for this connection
+ * @param pointer to and ExceptionListener
+ */
+ virtual void setExceptionListener( cms::ExceptionListener* listener ){
+ exceptionListener = listener; };
- /**
- * Close the currently open connection
- * @throws CMSException
- */
- virtual void close(void) throw ( cms::CMSException );
+ /**
+ * Close the currently open connection
+ * @throws CMSException
+ */
+ virtual void close(void) throw ( cms::CMSException );
- /**
- * Starts or (restarts) a connections delivery of incoming messages
- * @throws CMSException
- */
- virtual void start(void) throw ( cms::CMSException );
+ /**
+ * Starts or (restarts) a connections delivery of incoming messages
+ * @throws CMSException
+ */
+ virtual void start(void) throw ( cms::CMSException );
- /**
- * Stop the flow of incoming messages
- * @throws CMSException
- */
- virtual void stop(void) throw ( cms::CMSException );
+ /**
+ * Stop the flow of incoming messages
+ * @throws CMSException
+ */
+ virtual void stop(void) throw ( cms::CMSException );
- public: // ActiveMQConnection Methods
+ public: // ActiveMQConnection Methods
- /**
- * Adds the ActiveMQMessageListener to the Mapping of Consumer Id's
- * to listeners, all message to that id will be routed to the given
- * listener
- * @param Consumer Id String
- * @param ActiveMQMessageListener Pointer
- */
- virtual void addMessageListener(const unsigned int consumerId,
- ActiveMQMessageListener* listener);
+ /**
+ * Adds the ActiveMQMessageListener to the Mapping of Consumer Id's
+ * to listeners, all message to that id will be routed to the given
+ * listener
+ * @param Consumer Id String
+ * @param ActiveMQMessageListener Pointer
+ */
+ virtual void addMessageListener( const unsigned int consumerId,
+ ActiveMQMessageListener* listener );
- /**
- * Remove the Listener for the specified Consumer Id
- * @param Consumer Id string
- */
- virtual void removeMessageListener(const unsigned int consumerId);
+ /**
+ * Remove the Listener for the specified Consumer Id
+ * @param Consumer Id string
+ */
+ virtual void removeMessageListener( const unsigned int consumerId );
- private:
+ private:
- /**
- * Notify the excpetion listener
- */
- void fire( exceptions::ActiveMQException& ex )
- {
- if( exceptionListener != NULL )
- {
- try
+ /**
+ * Notify the excpetion listener
+ */
+ void fire( exceptions::ActiveMQException& ex )
+ {
+ if( exceptionListener != NULL )
{
- exceptionListener->onException( ex );
+ try
+ {
+ exceptionListener->onException( ex );
+ }
+ catch(...){}
}
- catch(...){}
- }
- }
+ }
- /**
- * Called to dispatch a message to a particular consumer.
- * @param consumer the target consumer of the dispatch.
- * @param msg the message to be dispatched.
- */
- virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
- core::ActiveMQMessage* message );
+ /**
+ * Called to dispatch a message to a particular consumer.
+ * @param consumer the target consumer of the dispatch.
+ * @param msg the message to be dispatched.
+ */
+ virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
+ core::ActiveMQMessage* message );
- };
+ };
}}
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
index a3ed6e8bd5..9219866f28 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
#include
#include
@@ -48,10 +49,11 @@ ActiveMQConnectionFactory::ActiveMQConnectionFactory(void)
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnectionFactory::ActiveMQConnectionFactory(const std::string& url,
- const std::string& username,
- const std::string& password,
- const std::string& clientId)
+ActiveMQConnectionFactory::ActiveMQConnectionFactory(
+ const std::string& url,
+ const std::string& username,
+ const std::string& password,
+ const std::string& clientId )
{
brokerURL = url;
@@ -64,7 +66,7 @@ ActiveMQConnectionFactory::ActiveMQConnectionFactory(const std::string& url,
cms::Connection* ActiveMQConnectionFactory::createConnection(void)
throw ( cms::CMSException )
{
- return createConnection(username, password);
+ return createConnection( username, password, clientId );
}
////////////////////////////////////////////////////////////////////////////////
@@ -96,9 +98,18 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
}
// Store login data in the properties
- properties->setProperty( "username", this->username );
- properties->setProperty( "password", this->password );
- properties->setProperty( "clientId", this->clientId );
+ properties->setProperty(
+ ActiveMQConstants::toString(
+ ActiveMQConstants::PARAM_USERNAME ),
+ this->username );
+ properties->setProperty(
+ ActiveMQConstants::toString(
+ ActiveMQConstants::PARAM_PASSWORD ),
+ this->password );
+ properties->setProperty(
+ ActiveMQConstants::toString(
+ ActiveMQConstants::PARAM_CLIENTID ),
+ this->clientId );
// Parse out the properties from the URI
parseURL( brokerURL, *properties );
@@ -112,7 +123,7 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConnectionFactory::createConnection - "
- "unknown transport factory");
+ "unknown transport factory" );
}
// Create the transport.
@@ -121,7 +132,7 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConnectionFactory::createConnection - "
- "failed creating new Transport");
+ "failed creating new Transport" );
}
// What wire format are we using, defaults to Stomp
@@ -137,18 +148,18 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQConnectionFactory::createConnection - "
- "Connector for Wire Format not registered in Map");
+ "Connector for Wire Format not registered in Map" );
}
// Create the Connector.
connector = connectorfactory->createConnector( *properties, transport );
- if(connector == NULL)
+ if( connector == NULL )
{
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQConnectionFactory::createConnection - "
- "Failed to Create the Connector");
+ "Failed to Create the Connector" );
}
// Start the Connector
@@ -177,9 +188,9 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
catch( ... )
{
exceptions::ActiveMQException ex(
- __FILE__, __LINE__,
- "ActiveMQConnectionFactory::create - "
- "caught unknown exception" );
+ __FILE__, __LINE__,
+ "ActiveMQConnectionFactory::create - "
+ "caught unknown exception" );
delete connection;
delete connector;
@@ -191,8 +202,8 @@ cms::Connection* ActiveMQConnectionFactory::createConnection(
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionFactory::parseURL(const std::string& URI,
- Properties& properties)
+void ActiveMQConnectionFactory::parseURL( const std::string& URI,
+ Properties& properties )
throw ( exceptions::IllegalArgumentException )
{
try
@@ -203,14 +214,14 @@ void ActiveMQConnectionFactory::parseURL(const std::string& URI,
// Require that there be three tokens at the least, these are
// transport, url, port.
- if(tokenizer.countTokens() < 3)
+ if( tokenizer.countTokens() < 3 )
{
throw exceptions::IllegalArgumentException(
__FILE__, __LINE__,
(string("ActiveMQConnectionFactory::parseURL - "
"Marlformed URI: ") + URI).c_str());
}
-
+
// First element should be the Transport Type, following that is the
// URL and any params.
properties.setProperty( "transport", tokenizer.nextToken() );
@@ -219,24 +230,29 @@ void ActiveMQConnectionFactory::parseURL(const std::string& URI,
// and then each param set is delimited with & we extract first
// three chars as they are the left over ://
properties.setProperty( "uri", tokenizer.nextToken("&?").substr(3) );
-
+
// Now get all the optional parameters and store them as properties
int count = tokenizer.toArray(tokens);
- for(int i = 0; i < count; ++i)
+ for( int i = 0; i < count; ++i )
{
- tokenizer.reset(tokens[i], "=");
+ tokenizer.reset( tokens[i], "=" );
- if(tokenizer.countTokens() != 2)
+ if( tokenizer.countTokens() != 2 )
{
throw exceptions::IllegalArgumentException(
__FILE__, __LINE__,
- (string("ActiveMQConnectionFactory::parseURL - "
- "Marlformed Parameter = ") + tokens[i]).c_str());
+ ( string( "ActiveMQConnectionFactory::parseURL - "
+ "Marlformed Parameter = " ) + tokens[i] ).c_str() );
}
+
+ // Get them in order, passing both as nextToken calls in the
+ // set Property can cause reversed order.
+ string key = tokenizer.nextToken();
+ string value = tokenizer.nextToken();
// Store this param as a property
- properties.setProperty(tokenizer.nextToken(), tokenizer.nextToken());
+ properties.setProperty( key, value );
}
}
AMQ_CATCH_RETHROW( IllegalArgumentException )
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
index 47b22fb2f3..b64336de34 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -25,153 +25,150 @@
namespace activemq{
namespace core{
- class util::Properties;
+ class util::Properties;
- class ActiveMQConnectionFactory : public cms::ConnectionFactory
- {
- private:
+ class ActiveMQConnectionFactory : public cms::ConnectionFactory
+ {
+ private:
- // The user name this factory will use to connect
- std::string username;
+ // The user name this factory will use to connect
+ std::string username;
- // The password this factory will use to connect
- std::string password;
+ // The password this factory will use to connect
+ std::string password;
- // The client id to assign to the connection created
- std::string clientId;
+ // The client id to assign to the connection created
+ std::string clientId;
- // The URL of the Broker, the default is:
- // "tcp://localhost:61616"
- std::string brokerURL;
+ // The URL of the Broker, the default is:
+ // "tcp://localhost:61616"
+ std::string brokerURL;
- public:
+ public:
- /**
- * Constructor
- */
- ActiveMQConnectionFactory(void);
+ /**
+ * Constructor
+ */
+ ActiveMQConnectionFactory(void);
- /**
- * Constructor
- * @param the URL of the Broker we are connecting to.
- * @param username to authenticate with, defaults to ""
- * @param password to authenticate with, defaults to ""
- * @param client Id to assign to connection, defaults to ""
- */
- ActiveMQConnectionFactory(const std::string& url,
- const std::string& username = "",
- const std::string& password = "",
- const std::string& clientId = "");
+ /**
+ * Constructor
+ * @param the URL of the Broker we are connecting to.
+ * @param username to authenticate with, defaults to ""
+ * @param password to authenticate with, defaults to ""
+ * @param client Id to assign to connection, defaults to ""
+ */
+ ActiveMQConnectionFactory( const std::string& url,
+ const std::string& username = "",
+ const std::string& password = "",
+ const std::string& clientId = "" );
- /**
- * Destructor
- */
- virtual ~ActiveMQConnectionFactory(void) {}
+ virtual ~ActiveMQConnectionFactory(void) {}
- /**
- * Creates a connection with the default user identity. The
- * connection is created in stopped mode. No messages will be
- * delivered until the Connection.start method is explicitly
- * called.
- * @throws CMSException
- */
- virtual cms::Connection* createConnection(void) throw ( cms::CMSException );
+ /**
+ * Creates a connection with the default user identity. The
+ * connection is created in stopped mode. No messages will be
+ * delivered until the Connection.start method is explicitly
+ * called.
+ * @throws CMSException
+ */
+ virtual cms::Connection* createConnection(void) throw ( cms::CMSException );
- /**
- * Creates a connection with the specified user identity. The
- * connection is created in stopped mode. No messages will be
- * delivered until the Connection.start method is explicitly called.
- * @throw CMSException.
- */
- virtual cms::Connection* createConnection(const std::string& username,
- const std::string& password,
- const std::string& clientId = "")
- throw ( cms::CMSException );
+ /**
+ * Creates a connection with the specified user identity. The
+ * connection is created in stopped mode. No messages will be
+ * delivered until the Connection.start method is explicitly called.
+ * @throw CMSException.
+ */
+ virtual cms::Connection* createConnection( const std::string& username,
+ const std::string& password,
+ const std::string& clientId = "" )
+ throw ( cms::CMSException );
- /**
- * Sets the username that should be used when creating a new connection
- * @param username string
- */
- virtual void setUsername(const std::string& username){
- this->username = username;
- }
+ /**
+ * Sets the username that should be used when creating a new connection
+ * @param username string
+ */
+ virtual void setUsername( const std::string& username ){
+ this->username = username;
+ }
- /**
- * Gets the username that this factory will use when creating a new
- * connection instance.
- * @return username string, "" for default credentials
- */
- virtual const std::string& getUsername(void) const {
- return username;
- }
+ /**
+ * Gets the username that this factory will use when creating a new
+ * connection instance.
+ * @return username string, "" for default credentials
+ */
+ virtual const std::string& getUsername(void) const {
+ return username;
+ }
- /**
- * Sets the password that should be used when creating a new connection
- * @param password string
- */
- virtual void setPassword(const std::string& password){
- this->password = password;
- }
+ /**
+ * Sets the password that should be used when creating a new connection
+ * @param password string
+ */
+ virtual void setPassword( const std::string& password ){
+ this->password = password;
+ }
- /**
- * Gets the password that this factory will use when creating a new
- * connection instance.
- * @return password string, "" for default credentials
- */
- virtual const std::string& getPassword(void) const {
- return password;
- }
+ /**
+ * Gets the password that this factory will use when creating a new
+ * connection instance.
+ * @return password string, "" for default credentials
+ */
+ virtual const std::string& getPassword(void) const {
+ return password;
+ }
- /**
- * Sets the Broker URL that should be used when creating a new
- * connection instance
- * @param brokerURL string
- */
- virtual void setBrokerURL(const std::string& brokerURL){
- this->brokerURL = brokerURL;
- }
+ /**
+ * Sets the Broker URL that should be used when creating a new
+ * connection instance
+ * @param brokerURL string
+ */
+ virtual void setBrokerURL( const std::string& brokerURL ){
+ this->brokerURL = brokerURL;
+ }
- /**
- * Gets the Broker URL that this factory will use when creating a new
- * connection instance.
- * @return brokerURL string
- */
- virtual const std::string& getBrokerURL(void) const {
- return brokerURL;
- }
+ /**
+ * Gets the Broker URL that this factory will use when creating a new
+ * connection instance.
+ * @return brokerURL string
+ */
+ virtual const std::string& getBrokerURL(void) const {
+ return brokerURL;
+ }
- /**
- * Sets the Client Id that should be used when creating a new
- * connection instance
- * @param clientId string
- */
- virtual void setClientId(const std::string& clientId){
- this->clientId = clientId;
- }
+ /**
+ * Sets the Client Id that should be used when creating a new
+ * connection instance
+ * @param clientId string
+ */
+ virtual void setClientId( const std::string& clientId ){
+ this->clientId = clientId;
+ }
- /**
- * Gets the Client Id that this factory will use when creating a new
- * connection instance.
- * @return clientId string
- */
- virtual const std::string& getClientId(void) const {
- return clientId;
- }
+ /**
+ * Gets the Client Id that this factory will use when creating a new
+ * connection instance.
+ * @return clientId string
+ */
+ virtual const std::string& getClientId(void) const {
+ return clientId;
+ }
- protected:
+ protected:
- /**
- * Parses the properties out of the provided Broker URI and sets
- * them in the passed Properties Object.
- * @param a Broker URI to parse
- * @param a Properties object to set the parsed values in
- * @throws IllegalArgumentException if the passed URI is invalid
- */
- virtual void parseURL(const std::string& URI,
- util::Properties& properties)
- throw ( exceptions::IllegalArgumentException );
+ /**
+ * Parses the properties out of the provided Broker URI and sets
+ * them in the passed Properties Object.
+ * @param a Broker URI to parse
+ * @param a Properties object to set the parsed values in
+ * @throws IllegalArgumentException if the passed URI is invalid
+ */
+ virtual void parseURL( const std::string& URI,
+ util::Properties& properties )
+ throw ( exceptions::IllegalArgumentException );
- };
+ };
}}
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
new file mode 100644
index 0000000000..b11e479918
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConstants.h"
+#include
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+
+////////////////////////////////////////////////////////////////////////////////
+string ActiveMQConstants::StaticInitializer::destOptions[NUM_OPTIONS];
+string ActiveMQConstants::StaticInitializer::uriParams[NUM_PARAMS];
+
+map< std::string, ActiveMQConstants::DestinationOption >
+ ActiveMQConstants::StaticInitializer::destOptionMap;
+map< std::string, ActiveMQConstants::URIParam >
+ ActiveMQConstants::StaticInitializer::uriParamsMap;
+
+ActiveMQConstants::StaticInitializer ActiveMQConstants::staticInits;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConstants::StaticInitializer::StaticInitializer(){
+
+ destOptions[CONSUMER_PREFECTCHSIZE] = "consumer.prefetchSize";
+ destOptions[CUNSUMER_MAXPENDINGMSGLIMIT] = "consumer.maximumPendingMessageLimit";
+ destOptions[CONSUMER_NOLOCAL] = "consumer.noLocal";
+ destOptions[CONSUMER_DISPATCHASYNC] = "consumer.dispatchAsync";
+ destOptions[CONSUMER_RETROACTIVE] = "consumer.retroactive";
+ destOptions[CONSUMER_SELECTOR] = "consumer.selector";
+ destOptions[CONSUMER_EXCLUSIVE] = "consumer.exclusive";
+ destOptions[CONSUMER_PRIORITY] = "consumer.priority";
+
+ uriParams[PARAM_USERNAME] = "username";
+ uriParams[PARAM_PASSWORD] = "password";
+ uriParams[PARAM_CLIENTID] = "client-id";
+
+ for( int ix=0; ix
+
+#include
+#include