+
+namespace activemq{
+namespace concurrent{
+
+ /**
+ * Defines a Thread Pool object that implements the functionality
+ * of pooling threads to perform user tasks. The Thread Poll has
+ * max size that it will grow to. The thread pool allocates threads
+ * in blocks. When there are no waiting worker threads and a task
+ * is queued then a new batch is allocated. The user can specify
+ * the size of the blocks, otherwise a default value is used.
+ *
+ * When the user queues a task they must also queue a listner to
+ * be notified when the task has completed, this provides the user
+ * with a mechanism to know when a task object can be freed.
+ *
+ * To have the Thread Pool perform a task, the user enqueue's an
+ * object that implements the Runnable
insterface and
+ * one of the worker threads will executing it in its thread context.
+ */
+ class ThreadPool : public PooledThreadListener
+ {
+ public:
+
+ // Constants
+ static const size_t DEFAULT_MAX_POOL_SIZE = 10;
+ static const size_t DEFAULT_MAX_BLOCK_SIZE = 3;
+
+ // Types
+ typedef std::pair Task;
+
+ private:
+
+ // Vector of threads that this object has created for its pool.
+ std::vector< PooledThread* > pool;
+
+ // Queue of Task that are in need of completion
+ util::Queue queue;
+
+ // Max number of Threads this Pool can contian
+ unsigned long maxThreads;
+
+ // Max number of tasks that can be allocated at a time
+ unsigned long blockSize;
+
+ // boolean flag use to indocate that this object is shutting down.
+ bool shutdown;
+
+ // Count of threads that are currently free to perfom some work.
+ unsigned long freeThreads;
+
+ // Mutex for locking operations that affect the pool.
+ Mutex poolLock;
+
+ // Logger Init
+ LOGCMS_DECLARE(logger);
+ LOGCMS_DECLARE(marker);
+
+ private: // Statics
+
+ // The singleton instance of this class
+ static ThreadPool instance;
+
+ public:
+
+ /**
+ * Constructor
+ */
+ ThreadPool(void);
+
+ /**
+ * Destructor
+ */
+ virtual ~ThreadPool(void);
+
+ /**
+ * Queue a task to be completed by one of the Pooled Threads.
+ * tasks are serviced as soon as a PooledThread
+ * is available to run it.
+ * @param object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual void queueTask(Task task)
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * DeQueue a task to be completed by one of the Pooled Threads.
+ * A caller of this method will block until there is something
+ * in the tasks queue, therefore care must be taken when calling
+ * this function. Normally clients of ThreadPool don't use
+ * this, only the PooledThread
objects owned by
+ * this ThreadPool.
+ * @return object that derives from Runnable
+ * @throws ActiveMQException
+ */
+ virtual Task deQueueTask(void)
+ throw ( exceptions::ActiveMQException );
+
+ /**
+ * Returns the current number of Threads in the Pool, this is
+ * how many there are now, not how many are active or the max
+ * number that might exist.
+ * @return integer number of threads in existance.
+ */
+ virtual unsigned long getPoolSize(void) const { return pool.size(); }
+
+ /**
+ * Returns the current backlog of items in the tasks queue, this
+ * is how much work is still waiting to get done.
+ * @return number of outstanding tasks.
+ */
+ virtual unsigned long getBacklog(void) const { return queue.size(); }
+
+ /**
+ * Ensures that there is at least the specified number of Threads
+ * allocated to the pool. If the size is greater than the MAX
+ * number of threads in the pool, then only MAX threads are
+ * reservved. If the size is smaller than the number of threads
+ * currently in the pool, than nothing is done.
+ * @param number of threads to reserve.
+ */
+ virtual void reserve(unsigned long size);
+
+ /**
+ * Get the Max Number of Threads this Pool can contain
+ * @return max size
+ */
+ virtual unsigned long getMaxThreads(void) const { return maxThreads; }
+
+ /**
+ * Sets the Max number of threads this pool can contian.
+ * if this value is smaller than the current size of the
+ * pool nothing is done.
+ */
+ virtual void setMaxThreads(unsigned long maxThreads);
+
+ /**
+ * Gets the Max number of threads that can be allocated at a time
+ * when new threads are needed.
+ * @return max Thread Block Size
+ */
+ virtual unsigned long getBlockSize(void) const { return blockSize; }
+
+ /**
+ * Sets the Max number of Threads that can be allocated at a time
+ * when the Thread Pool determines that more Threads are needed.
+ * @param Max Thread Block Size
+ */
+ virtual void setBlockSize(unsigned long blockSize);
+
+ /**
+ * Returns the current number of available threads in the pool, threads
+ * that are performing a user task are considered unavailable. This value
+ * could change immeadiately after calling as Threads could finish right
+ * after and be available again. This is informational only.
+ * @return totoal free threads
+ */
+ virtual unsigned long getFreeThreadCount(void) const { return freeThreads; }
+
+ public: // PooledThreadListener Callbacks
+
+ /**
+ * Called by a pooled thread when it is about to begin
+ * executing a new task. This will decrement the available
+ * threads counter so that this object knows when there are
+ * no more free threads and must create new ones.
+ * @param Pointer to the Pooled Thread that is making this call
+ */
+ virtual void onTaskStarted(PooledThread* thread);
+
+ /**
+ * Called by a pooled thread when it has completed a task
+ * and is going back to waiting for another task to run,
+ * this will increment the free threads counter.
+ * @param Pointer the the Pooled Thread that is making this call.
+ */
+ virtual void onTaskCompleted(PooledThread* thread);
+
+ /**
+ * Called by a pooled thread when it has encountered an exception
+ * while running a user task, after receiving this notification
+ * the callee should assume that the PooledThread is now no longer
+ * running.
+ * @param Pointer to the Pooled Thread that is making this call
+ * @param The Exception that occured.
+ */
+ virtual void onTaskException(PooledThread* thread,
+ exceptions::ActiveMQException& ex);
+
+ public: // Statics
+
+ /**
+ * Return the one and only Thread Pool instance.
+ * @return The Thread Pool Pointer
+ */
+ static ThreadPool* getInstance(void) { return &instance; }
+
+ private:
+
+ /**
+ * Allocates the requested ammount of Threads, won't exceed
+ * maxThreads
.
+ * @param the number of threads to create
+ */
+ void AllocateThreads(unsigned long count);
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONCURRENT_THREADPOOL_H_*/
diff --git a/activemq-cpp/src/main/activemq/connector/Connector.h b/activemq-cpp/src/main/activemq/connector/Connector.h
new file mode 100644
index 0000000000..4c1b8edae6
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/connector/Connector.h
@@ -0,0 +1,317 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+#define _ACTIVEMQ_CONNECTOR_CONNECTOR_H_
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace activemq{
+namespace connector{
+
+ // Forward declarations.
+ class Connector
+ :
+ public cms::Startable,
+ public cms::Closeable
+ {
+ public: // Connector Types
+
+ enum AckType
+ {
+ DeliveredAck = 0, // Message delivered but not consumed
+ PoisonAck = 1, // Message could not be processed due to
+ // poison pill but discard anyway
+ ConsumedAck = 2 // Message consumed, discard
+ };
+
+ public:
+
+ virtual ~Connector(void) {};
+
+ /**
+ * Gets the Client Id for this connection, if this
+ * connection has been closed, then this method returns ""
+ * @return Client Id String
+ */
+ virtual std::string getClientId(void) const = 0;
+
+ /**
+ * Gets a reference to the Transport that this connection
+ * is using.
+ * @param reference to a transport
+ * @throws InvalidStateException if the Transport is not set
+ */
+ virtual transport::Transport& getTransport(void) const
+ throw (exceptions::InvalidStateException ) = 0;
+
+ /**
+ * Creates a Session Info object for this connector
+ * @param Acknowledgement Mode of the Session
+ * @returns Session Info Object
+ * @throws ConnectorException
+ */
+ virtual SessionInfo* createSession(
+ cms::Session::AcknowledgeMode ackMode)
+ throw( ConnectorException ) = 0;
+
+ /**
+ * Create a Consumer for the given Session
+ * @param Destination to Subscribe to.
+ * @param Session Information.
+ * @return Consumer Information
+ * @throws ConnectorException
+ */
+ virtual ConsumerInfo* createConsumer(
+ cms::Destination* destination,
+ SessionInfo* session,
+ const std::string& selector = "")
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Create a Durable Consumer for the given Session
+ * @param Topic to Subscribe to.
+ * @param Session Information.
+ * @param name of the Durable Topic
+ * @param Selector
+ * @param if set, inhibits the delivery of messages
+ * published by its own connection
+ * @return Consumer Information
+ * @throws ConnectorException
+ */
+ virtual ConsumerInfo* createDurableConsumer(
+ cms::Topic* topic,
+ SessionInfo* session,
+ const std::string& name,
+ const std::string& selector = "",
+ bool noLocal = false)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Create a Consumer for the given Session
+ * @param Destination to Subscribe to.
+ * @param Session Information.
+ * @return Producer Information
+ * @throws ConnectorException
+ */
+ virtual ProducerInfo* createProducer(
+ cms::Destination* destination,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Topic given a name and session info
+ * @param Topic Name
+ * @param Session Information
+ * @return a newly created Topic Object
+ * @throws ConnectorException
+ */
+ virtual cms::Topic* createTopic(const std::string& name,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Queue given a name and session info
+ * @param Queue Name
+ * @param Session Information
+ * @return a newly created Queue Object
+ * @throws ConnectorException
+ */
+ virtual cms::Queue* createQueue(const std::string& name,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Temporary Topic given a name and session info
+ * @param Temporary Topic Name
+ * @param Session Information
+ * @return a newly created Temporary Topic Object
+ * @throws ConnectorException
+ */
+ virtual cms::TemporaryTopic* createTemporaryTopic(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a Temporary Queue given a name and session info
+ * @param Temporary Queue Name
+ * @param Session Information
+ * @return a newly created Temporary Queue Object
+ * @throws ConnectorException
+ */
+ virtual cms::TemporaryQueue* createTemporaryQueue(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sends a Message
+ * @param The Message to send.
+ * @param Producer Info for the sender of this message
+ * @throws ConnectorException
+ */
+ virtual void send(cms::Message* message, ProducerInfo* producerInfo)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sends a set of Messages
+ * @param List of Messages to send.
+ * @param Producer Info for the sender of this message
+ * @throws ConnectorException
+ */
+ virtual void send(std::list& messages,
+ ProducerInfo* producerInfo)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Acknowledges a Message
+ * @param An ActiveMQMessage to Ack.
+ * @throws ConnectorException
+ */
+ virtual void acknowledge(const SessionInfo* session,
+ const cms::Message* message,
+ AckType ackType = ConsumedAck)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Starts a new Transaction.
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual TransactionInfo* startTransaction(
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Commits a Transaction.
+ * @param The Transaction information
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual void commit(TransactionInfo* transaction,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Rolls back a Transaction.
+ * @param The Transaction information
+ * @param Session Information
+ * @throws ConnectorException
+ */
+ virtual void rollback(TransactionInfo* transaction,
+ SessionInfo* session)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new Message.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::Message* createMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new BytesMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::BytesMessage* createBytesMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new TextMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::TextMessage* createTextMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Creates a new MapMessage.
+ * @param Session Information
+ * @param Transaction Info for this Message
+ * @throws ConnectorException
+ */
+ virtual cms::MapMessage* createMapMessage(
+ SessionInfo* session,
+ TransactionInfo* transaction)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Unsubscribe from a givenDurable Subscription
+ * @param name of the Subscription
+ * @throws ConnectorException
+ */
+ virtual void unsubscribe(const std::string& name)
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Destroys the given connector resource.
+ * @param resource the resource to be destroyed.
+ * @throws ConnectorException
+ */
+ virtual void destroyResource( ConnectorResource* resource )
+ throw ( ConnectorException ) = 0;
+
+ /**
+ * Sets the listener of consumer messages.
+ * @param listener the observer.
+ */
+ virtual void setConsumerMessageListener(
+ ConsumerMessageListener* listener) = 0;
+
+ /**
+ * Sets the Listner of exceptions for this connector
+ * @param ExceptionListener the observer.
+ */
+ virtual void setExceptionListener(
+ cms::ExceptionListener* listener) = 0;
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_CONNECTOR_H_*/
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorException.h b/activemq-cpp/src/main/activemq/connector/ConnectorException.h
new file mode 100644
index 0000000000..86f60a3a36
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorException.h
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTOREXCEPTION_H_
+#define CONNECTOREXCEPTION_H_
+
+#include
+
+namespace activemq{
+namespace connector{
+
+ /*
+ * Signals that an Connector exception of some sort has occurred.
+ */
+ class ConnectorException : public exceptions::ActiveMQException
+ {
+ public:
+
+ ConnectorException() {}
+ ConnectorException( const exceptions::ActiveMQException& ex ){
+ *(ActiveMQException*)this = ex;
+ }
+ ConnectorException( const ConnectorException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+ ConnectorException(const char* file, const int lineNumber,
+ const char* msg, ...)
+ {
+ va_list vargs ;
+ va_start(vargs, msg) ;
+ buildMessage(msg, vargs) ;
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new ConnectorException( *this );
+ }
+ virtual ~ConnectorException() {}
+
+ };
+
+}}
+
+#endif /*CONNECTOREXCEPTION_H_*/
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h b/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
new file mode 100644
index 0000000000..629b9af572
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactory.h
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORY_H_
+#define CONNECTORFACTORY_H_
+
+#include
+#include
+#include
+
+namespace activemq{
+namespace connector{
+
+ /**
+ * Interface class for all Connector Factory Classes
+ */
+ class ConnectorFactory
+ {
+ public:
+
+ virtual ~ConnectorFactory(void) {};
+
+ /**
+ * Creates a connector
+ * @param The Properties that the new connector is configured with
+ */
+ virtual Connector* createConnector(
+ const activemq::util::Properties& properties,
+ activemq::transport::Transport* transport) = 0;
+
+ };
+
+}}
+
+#endif /*CONNECTORFACTORY_H_*/
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
new file mode 100644
index 0000000000..09cb417384
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.cpp
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+
+using namespace activemq;
+using namespace activemq::connector;
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactoryMap* ConnectorFactoryMap::getInstance(void)
+{
+ // Static instance of this Map, create here so that one will
+ // always exist, the one and only Connector Map.
+ static ConnectorFactoryMap instance;
+
+ return &instance;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::registerConnectorFactory(const std::string& name,
+ ConnectorFactory* factory)
+{
+ factoryMap[name] = factory;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectorFactoryMap::unregisterConnectorFactory(const std::string& name)
+{
+ factoryMap.erase(name);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactory* ConnectorFactoryMap::lookup(const std::string& name)
+{
+ std::map::const_iterator itr =
+ factoryMap.find(name);
+
+ if(itr != factoryMap.end())
+ {
+ return itr->second;
+ }
+
+ // Didn't find it, return nothing, not a single thing.
+ return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::size_t ConnectorFactoryMap::getFactoryNames(
+ std::vector& factoryList)
+{
+ std::map::const_iterator itr =
+ factoryMap.begin();
+
+ for(; itr != factoryMap.end(); ++itr)
+ {
+ factoryList.insert(factoryList.end(), itr->first);
+ }
+
+ return factoryMap.size();
+}
diff --git a/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
new file mode 100644
index 0000000000..b3269abece
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/connector/ConnectorFactoryMap.h
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef CONNECTORFACTORYMAP_H_
+#define CONNECTORFACTORYMAP_H_
+
+#include