added a test case and fixes for the pooled connection factory. AMQ-449

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@358280 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-21 12:53:30 +00:00
parent 75a6b834b9
commit bd60590273
8 changed files with 216 additions and 93 deletions

View File

@ -172,7 +172,7 @@
<groupId>xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean_spring_version}</version>
<url>http://www.gbean.org</url>
<url>http://www.xbean.org</url>
<properties>
<war.bundle>true</war.bundle>
</properties>

View File

@ -0,0 +1,87 @@
/**
*
* Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.pool;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.activemq.ActiveMQConnection;
import org.activemq.util.JMSExceptionSupport;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* Holds a real JMS connection along with the session pools associated with it.
*
* @version $Revision$
*/
public class ConnectionPool {
private ActiveMQConnection connection;
private Map cache;
private AtomicBoolean started = new AtomicBoolean(false);
public ConnectionPool(ActiveMQConnection connection) {
this(connection, new HashMap());
}
public ConnectionPool(ActiveMQConnection connection, Map cache) {
this.connection = connection;
this.cache = cache;
}
public void start() throws JMSException {
if (started.compareAndSet(false, true)) {
connection.start();
}
}
public ActiveMQConnection getConnection() {
return connection;
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = (SessionPool) cache.get(key);
if (pool == null) {
pool = new SessionPool(this, key);
cache.put(key, pool);
}
return pool.borrowSession();
}
public void close() throws JMSException {
Iterator i = cache.values().iterator();
while (i.hasNext()) {
SessionPool pool = (SessionPool) i.next();
i.remove();
try {
pool.close();
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
connection.close();
connection = null;
}
}

View File

@ -18,9 +18,9 @@
**/
package org.activemq.pool;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQSession;
import org.activemq.AlreadyClosedException;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
@ -37,11 +37,6 @@ import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.activemq.ActiveMQConnection;
import org.activemq.ActiveMQSession;
import org.activemq.AlreadyClosedException;
import org.activemq.util.JMSExceptionSupport;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
@ -51,44 +46,27 @@ import org.activemq.util.JMSExceptionSupport;
*/
public class PooledConnection implements TopicConnection, QueueConnection {
private ActiveMQConnection connection;
private Map cache;
private ConnectionPool pool;
private boolean stopped;
public PooledConnection(ActiveMQConnection connection) {
this(connection, new HashMap());
}
public PooledConnection(ActiveMQConnection connection, Map cache) {
this.connection = connection;
this.cache = cache;
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
}
/**
* Factory method to create a new instance.
*/
public PooledConnection newInstance() {
return new PooledConnection(connection, cache);
return new PooledConnection(pool);
}
public void close() throws JMSException {
connection = null;
Iterator i = cache.values().iterator();
while (i.hasNext()) {
SessionPool pool = (SessionPool) i.next();
i.remove();
try {
pool.close();
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
pool = null;
}
public void start() throws JMSException {
// TODO should we start connections first before pooling them?
getConnection().start();
assertNotClosed();
pool.start();
}
public void stop() throws JMSException {
@ -144,22 +122,21 @@ public class PooledConnection implements TopicConnection, QueueConnection {
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = (SessionPool) cache.get(key);
if (pool == null) {
pool = new SessionPool(getConnection(), key);
cache.put(key, pool);
}
return pool.borrowSession();
return pool.createSession(transacted, ackMode);
}
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQConnection getConnection() throws JMSException {
if (stopped || connection == null) {
assertNotClosed();
return pool.getConnection();
}
protected void assertNotClosed() throws AlreadyClosedException {
if (stopped || pool == null) {
throw new AlreadyClosedException();
}
return connection;
}
protected ActiveMQSession createSession(SessionKey key) throws JMSException {

View File

@ -33,10 +33,12 @@ import java.util.Iterator;
import java.util.Map;
/**
* A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like
* Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
* A JMS provider which pools Connection, Session and MessageProducer instances
* so it can be used with tools like Spring's <a
* href="http://activemq.org/Spring+Support">JmsTemplate</a>.
*
* <b>NOTE</b> this implementation is only intended for use when sending messages.
* <b>NOTE</b> this implementation is only intended for use when sending
* messages.
*
* @version $Revision: 1.1 $
*/
@ -70,13 +72,13 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
public synchronized Connection createConnection(String userName, String password) throws JMSException {
ConnectionKey key = new ConnectionKey(userName, password);
PooledConnection connection = (PooledConnection) cache.get(key);
ConnectionPool connection = (ConnectionPool) cache.get(key);
if (connection == null) {
ActiveMQConnection delegate = createConnection(key);
connection = new PooledConnection(delegate);
connection = new ConnectionPool(delegate);
cache.put(key, connection);
}
return connection.newInstance();
return new PooledConnection(connection);
}
protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
@ -103,9 +105,8 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
public void stop() throws Exception {
ServiceStopper stopper = new ServiceStopper();
for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
PooledConnection connection = (PooledConnection) iter.next();
ConnectionPool connection = (ConnectionPool) iter.next();
try {
connection.getConnection().close();
connection.close();
}
catch (JMSException e) {

View File

@ -18,13 +18,13 @@
**/
package org.activemq.pool;
import org.activemq.ActiveMQMessageProducer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.activemq.ActiveMQMessageProducer;
/**
* A pooled {@link MessageProducer}
*

View File

@ -60,13 +60,13 @@ public class PooledSession implements TopicSession, QueueSession {
private static final transient Log log = LogFactory.getLog(PooledSession.class);
private ActiveMQSession session;
private ObjectPool sessionPool;
private SessionPool sessionPool;
private ActiveMQMessageProducer messageProducer;
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
public PooledSession(ActiveMQSession aSession, ObjectPool sessionPool) {
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
this.sessionPool = sessionPool;
this.transactional = session.isTransacted();
@ -99,12 +99,7 @@ public class PooledSession implements TopicSession, QueueSession {
}
}
try {
sessionPool.returnObject(this);
}
catch (Exception e) {
throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
}
sessionPool.returnSession(this);
}
public void commit() throws JMSException {

View File

@ -1,21 +1,21 @@
/**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.pool;
import org.activemq.ActiveMQConnection;
@ -30,32 +30,35 @@ import javax.jms.JMSException;
/**
* Represents the session pool for a given JMS connection.
*
*
* @version $Revision: 1.1 $
*/
public class SessionPool implements PoolableObjectFactory {
private ActiveMQConnection connection;
private ConnectionPool connectionPool;
private SessionKey key;
private ObjectPool sessionPool;
public SessionPool(ActiveMQConnection connection, SessionKey key) {
this(connection, key, new GenericObjectPool());
public SessionPool(ConnectionPool connectionPool, SessionKey key) {
this(connectionPool, key, new GenericObjectPool());
}
public SessionPool(ActiveMQConnection connection, SessionKey key, ObjectPool sessionPool) {
this.connection = connection;
public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
this.connectionPool = connectionPool;
this.key = key;
this.sessionPool = sessionPool;
sessionPool.setFactory(this);
}
public void close() throws Exception {
sessionPool.close();
if (sessionPool != null) {
sessionPool.close();
}
sessionPool = null;
}
public PooledSession borrowSession() throws JMSException {
try {
Object object = sessionPool.borrowObject();
Object object = getSessionPool().borrowObject();
return (PooledSession) object;
}
catch (JMSException e) {
@ -66,10 +69,21 @@ public class SessionPool implements PoolableObjectFactory {
}
}
public void returnSession(PooledSession session) throws JMSException {
// lets check if we are already closed
getConnection();
try {
getSessionPool().returnObject(this);
}
catch (Exception e) {
throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
}
}
// PoolableObjectFactory methods
//-------------------------------------------------------------------------
// -------------------------------------------------------------------------
public Object makeObject() throws Exception {
return new PooledSession(createSession(), sessionPool);
return new PooledSession(createSession(), this);
}
public void destroyObject(Object o) throws Exception {
@ -88,17 +102,20 @@ public class SessionPool implements PoolableObjectFactory {
}
// Implemention methods
//-------------------------------------------------------------------------
protected ActiveMQConnection getConnection() throws JMSException {
if (connection == null) {
// -------------------------------------------------------------------------
protected ObjectPool getSessionPool() throws AlreadyClosedException {
if (sessionPool == null) {
throw new AlreadyClosedException();
}
return connection;
return sessionPool;
}
protected ActiveMQConnection getConnection() throws JMSException {
return connectionPool.getConnection();
}
protected ActiveMQSession createSession() throws JMSException {
return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
}
}

View File

@ -0,0 +1,46 @@
/**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.pool;
import org.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
import javax.jms.Connection;
/**
* @version $Revision$
*/
public class JmsSendReceiveTwoConnectionsWithSenderUsingPoolTest extends JmsTopicSendReceiveWithTwoConnectionsTest {
protected PooledConnectionFactory senderConnectionFactory = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
protected Connection createSendConnection() throws Exception {
return senderConnectionFactory.createConnection();
}
protected void setUp() throws Exception {
verbose = true;
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
senderConnectionFactory.stop();
}
}