diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 4d19326b18..3042a7a2f4 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -172,7 +172,7 @@ xbean xbean-spring ${xbean_spring_version} - http://www.gbean.org + http://www.xbean.org true diff --git a/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java new file mode 100644 index 0000000000..1e4fbcf981 --- /dev/null +++ b/activemq-core/src/main/java/org/activemq/pool/ConnectionPool.java @@ -0,0 +1,87 @@ +/** + * + * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.pool; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; + +import org.activemq.ActiveMQConnection; +import org.activemq.util.JMSExceptionSupport; + +import javax.jms.JMSException; +import javax.jms.Session; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * Holds a real JMS connection along with the session pools associated with it. + * + * @version $Revision$ + */ +public class ConnectionPool { + private ActiveMQConnection connection; + private Map cache; + private AtomicBoolean started = new AtomicBoolean(false); + + public ConnectionPool(ActiveMQConnection connection) { + this(connection, new HashMap()); + } + + public ConnectionPool(ActiveMQConnection connection, Map cache) { + this.connection = connection; + this.cache = cache; + } + + public void start() throws JMSException { + if (started.compareAndSet(false, true)) { + connection.start(); + } + } + + public ActiveMQConnection getConnection() { + return connection; + } + + public Session createSession(boolean transacted, int ackMode) throws JMSException { + SessionKey key = new SessionKey(transacted, ackMode); + SessionPool pool = (SessionPool) cache.get(key); + if (pool == null) { + pool = new SessionPool(this, key); + cache.put(key, pool); + } + return pool.borrowSession(); + } + + public void close() throws JMSException { + Iterator i = cache.values().iterator(); + while (i.hasNext()) { + SessionPool pool = (SessionPool) i.next(); + i.remove(); + try { + pool.close(); + } + catch (Exception e) { + throw JMSExceptionSupport.create(e); + } + } + connection.close(); + connection = null; + } + +} diff --git a/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java b/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java index 8624006a8a..6ff5a51da9 100755 --- a/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java +++ b/activemq-core/src/main/java/org/activemq/pool/PooledConnection.java @@ -18,9 +18,9 @@ **/ package org.activemq.pool; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import org.activemq.ActiveMQConnection; +import org.activemq.ActiveMQSession; +import org.activemq.AlreadyClosedException; import javax.jms.Connection; import javax.jms.ConnectionConsumer; @@ -37,11 +37,6 @@ import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; -import org.activemq.ActiveMQConnection; -import org.activemq.ActiveMQSession; -import org.activemq.AlreadyClosedException; -import org.activemq.util.JMSExceptionSupport; - /** * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and * {@link QueueConnection} which is pooled and on {@link #close()} will return @@ -51,44 +46,27 @@ import org.activemq.util.JMSExceptionSupport; */ public class PooledConnection implements TopicConnection, QueueConnection { - private ActiveMQConnection connection; - private Map cache; + private ConnectionPool pool; private boolean stopped; - public PooledConnection(ActiveMQConnection connection) { - this(connection, new HashMap()); - } - - public PooledConnection(ActiveMQConnection connection, Map cache) { - this.connection = connection; - this.cache = cache; + public PooledConnection(ConnectionPool pool) { + this.pool = pool; } /** * Factory method to create a new instance. */ public PooledConnection newInstance() { - return new PooledConnection(connection, cache); + return new PooledConnection(pool); } public void close() throws JMSException { - connection = null; - Iterator i = cache.values().iterator(); - while (i.hasNext()) { - SessionPool pool = (SessionPool) i.next(); - i.remove(); - try { - pool.close(); - } - catch (Exception e) { - throw JMSExceptionSupport.create(e); - } - } + pool = null; } public void start() throws JMSException { - // TODO should we start connections first before pooling them? - getConnection().start(); + assertNotClosed(); + pool.start(); } public void stop() throws JMSException { @@ -144,22 +122,21 @@ public class PooledConnection implements TopicConnection, QueueConnection { } public Session createSession(boolean transacted, int ackMode) throws JMSException { - SessionKey key = new SessionKey(transacted, ackMode); - SessionPool pool = (SessionPool) cache.get(key); - if (pool == null) { - pool = new SessionPool(getConnection(), key); - cache.put(key, pool); - } - return pool.borrowSession(); + return pool.createSession(transacted, ackMode); } // Implementation methods // ------------------------------------------------------------------------- + protected ActiveMQConnection getConnection() throws JMSException { - if (stopped || connection == null) { + assertNotClosed(); + return pool.getConnection(); + } + + protected void assertNotClosed() throws AlreadyClosedException { + if (stopped || pool == null) { throw new AlreadyClosedException(); } - return connection; } protected ActiveMQSession createSession(SessionKey key) throws JMSException { diff --git a/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java b/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java index c1e269506f..32cdc048c6 100644 --- a/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java +++ b/activemq-core/src/main/java/org/activemq/pool/PooledConnectionFactory.java @@ -33,10 +33,12 @@ import java.util.Iterator; import java.util.Map; /** - * A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like - * Spring's JmsTemplate. + * A JMS provider which pools Connection, Session and MessageProducer instances + * so it can be used with tools like Spring's JmsTemplate. * - * NOTE this implementation is only intended for use when sending messages. + * NOTE this implementation is only intended for use when sending + * messages. * * @version $Revision: 1.1 $ */ @@ -70,13 +72,13 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { public synchronized Connection createConnection(String userName, String password) throws JMSException { ConnectionKey key = new ConnectionKey(userName, password); - PooledConnection connection = (PooledConnection) cache.get(key); + ConnectionPool connection = (ConnectionPool) cache.get(key); if (connection == null) { ActiveMQConnection delegate = createConnection(key); - connection = new PooledConnection(delegate); + connection = new ConnectionPool(delegate); cache.put(key, connection); } - return connection.newInstance(); + return new PooledConnection(connection); } protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException { @@ -103,9 +105,8 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { public void stop() throws Exception { ServiceStopper stopper = new ServiceStopper(); for (Iterator iter = cache.values().iterator(); iter.hasNext();) { - PooledConnection connection = (PooledConnection) iter.next(); + ConnectionPool connection = (ConnectionPool) iter.next(); try { - connection.getConnection().close(); connection.close(); } catch (JMSException e) { diff --git a/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java b/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java index 034754b431..7d2f6145e8 100644 --- a/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java +++ b/activemq-core/src/main/java/org/activemq/pool/PooledProducer.java @@ -18,13 +18,13 @@ **/ package org.activemq.pool; +import org.activemq.ActiveMQMessageProducer; + import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; -import org.activemq.ActiveMQMessageProducer; - /** * A pooled {@link MessageProducer} * diff --git a/activemq-core/src/main/java/org/activemq/pool/PooledSession.java b/activemq-core/src/main/java/org/activemq/pool/PooledSession.java index 9e898f5064..4a335cd0f5 100644 --- a/activemq-core/src/main/java/org/activemq/pool/PooledSession.java +++ b/activemq-core/src/main/java/org/activemq/pool/PooledSession.java @@ -60,13 +60,13 @@ public class PooledSession implements TopicSession, QueueSession { private static final transient Log log = LogFactory.getLog(PooledSession.class); private ActiveMQSession session; - private ObjectPool sessionPool; + private SessionPool sessionPool; private ActiveMQMessageProducer messageProducer; private ActiveMQQueueSender queueSender; private ActiveMQTopicPublisher topicPublisher; private boolean transactional = true; - public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) { + public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { this.session = aSession; this.sessionPool = sessionPool; this.transactional = session.isTransacted(); @@ -99,12 +99,7 @@ public class PooledSession implements TopicSession, QueueSession { } } - try { - sessionPool.returnObject(this); - } - catch (Exception e) { - throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e); - } + sessionPool.returnSession(this); } public void commit() throws JMSException { diff --git a/activemq-core/src/main/java/org/activemq/pool/SessionPool.java b/activemq-core/src/main/java/org/activemq/pool/SessionPool.java index 90e8c7dea9..774b181ce5 100644 --- a/activemq-core/src/main/java/org/activemq/pool/SessionPool.java +++ b/activemq-core/src/main/java/org/activemq/pool/SessionPool.java @@ -1,21 +1,21 @@ /** -* ActiveMQ: The Open Source Message Fabric -* -* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -**/ + * ActiveMQ: The Open Source Message Fabric + * + * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ package org.activemq.pool; import org.activemq.ActiveMQConnection; @@ -30,32 +30,35 @@ import javax.jms.JMSException; /** * Represents the session pool for a given JMS connection. - * + * * @version $Revision: 1.1 $ */ public class SessionPool implements PoolableObjectFactory { - private ActiveMQConnection connection; + private ConnectionPool connectionPool; private SessionKey key; private ObjectPool sessionPool; - public SessionPool(ActiveMQConnection connection, SessionKey key) { - this(connection, key, new GenericObjectPool()); + public SessionPool(ConnectionPool connectionPool, SessionKey key) { + this(connectionPool, key, new GenericObjectPool()); } - public SessionPool(ActiveMQConnection connection, SessionKey key, ObjectPool sessionPool) { - this.connection = connection; + public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) { + this.connectionPool = connectionPool; this.key = key; this.sessionPool = sessionPool; sessionPool.setFactory(this); } public void close() throws Exception { - sessionPool.close(); + if (sessionPool != null) { + sessionPool.close(); + } + sessionPool = null; } - + public PooledSession borrowSession() throws JMSException { try { - Object object = sessionPool.borrowObject(); + Object object = getSessionPool().borrowObject(); return (PooledSession) object; } catch (JMSException e) { @@ -66,10 +69,21 @@ public class SessionPool implements PoolableObjectFactory { } } + public void returnSession(PooledSession session) throws JMSException { + // lets check if we are already closed + getConnection(); + try { + getSessionPool().returnObject(this); + } + catch (Exception e) { + throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e); + } + } + // PoolableObjectFactory methods - //------------------------------------------------------------------------- + // ------------------------------------------------------------------------- public Object makeObject() throws Exception { - return new PooledSession(createSession(), sessionPool); + return new PooledSession(createSession(), this); } public void destroyObject(Object o) throws Exception { @@ -88,17 +102,20 @@ public class SessionPool implements PoolableObjectFactory { } // Implemention methods - //------------------------------------------------------------------------- - protected ActiveMQConnection getConnection() throws JMSException { - if (connection == null) { + // ------------------------------------------------------------------------- + protected ObjectPool getSessionPool() throws AlreadyClosedException { + if (sessionPool == null) { throw new AlreadyClosedException(); } - return connection; + return sessionPool; + } + + protected ActiveMQConnection getConnection() throws JMSException { + return connectionPool.getConnection(); } protected ActiveMQSession createSession() throws JMSException { return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode()); } - } diff --git a/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java b/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java new file mode 100755 index 0000000000..90e2440403 --- /dev/null +++ b/activemq-core/src/test/java/org/activemq/pool/JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest.java @@ -0,0 +1,46 @@ +/** + * ActiveMQ: The Open Source Message Fabric + * + * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + **/ +package org.activemq.pool; + +import org.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest; + +import javax.jms.Connection; + +/** + * @version $Revision$ + */ +public class JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest extends JmsTopicSendReceiveWithTwoConnectionsTest { + + protected PooledConnectionFactory senderConnectionFactory = new PooledConnectionFactory("vm://localhost?broker.persistent=false"); + + protected Connection createSendConnection() throws Exception { + return senderConnectionFactory.createConnection(); + } + + protected void setUp() throws Exception { + verbose = true; + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + senderConnectionFactory.stop(); + } + +}