AMQ-1802: extract activemq pool in its own module

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@668559 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Guillaume Nodet 2008-06-17 08:16:50 +00:00
parent 702e2a0170
commit 2e57fb5fd9
29 changed files with 699 additions and 87 deletions

View File

@ -25,7 +25,6 @@ import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
/**
@ -42,8 +41,7 @@ public abstract class EmbeddedBrokerTestSupport extends TestCase {
protected boolean useTopic;
protected Destination destination;
protected JmsTemplate template;
private boolean usePooledConnectionWithTemplate = true;
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker();
@ -72,12 +70,7 @@ public abstract class EmbeddedBrokerTestSupport extends TestCase {
* @return a newly created JmsTemplate
*/
protected JmsTemplate createJmsTemplate() {
if (usePooledConnectionWithTemplate) {
// lets use a pool to avoid creating and closing producers
return new JmsTemplate(new PooledConnectionFactory(bindAddress));
} else {
return new JmsTemplate(connectionFactory);
}
return new JmsTemplate(connectionFactory);
}
/**

View File

@ -30,7 +30,6 @@ import javax.net.ssl.X509TrustManager;
import junit.framework.TestCase;
import org.apache.activemq.AMQDeadlockTest3;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.logging.Log;

99
activemq-pool/pom.xml Executable file
View File

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.2-SNAPSHOT</version>
</parent>
<artifactId>activemq-pool</artifactId>
<packaging>bundle</packaging>
<name>ActiveMQ :: Pool</name>
<description>ActiveMQ Pooled ConnectionFactory</description>
<properties>
<activemq.osgi.import.pkg>
javax.transaction*;resolution:=optional,
org.apache.activemq.ra*;resolution:=optional,
org.apache.geronimo.transaction.manager*;resolution:=optional,
org.springframework*;resolution:=optional,
*
</activemq.osgi.import.pkg>
<activemq.osgi.export>
org.apache.activemq.pool*;version=${project.version},
</activemq.osgi.export>
</properties>
<dependencies>
<!-- =============================== -->
<!-- Required Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-ra</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.0.1B_spec</artifactId>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import java.io.IOException;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
import org.apache.geronimo.transaction.manager.NamedXAResource;
import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
/**
* This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
* in a way that will allow the transaction manager to correctly recover XA transactions.
*
* For example, it can be used the following way:
* <pre>
* <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
* <property name="brokerURL" value="tcp://localhost:61616" />
* </bean>
*
* <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
* <property name="maxConnections" value="8" />
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
*
* <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
* </pre>
*/
public class ActiveMQResourceManager {
private static final Log LOGGER = LogFactory.getLog(ActiveMQResourceManager.class);
private String resourceName;
private TransactionManager transactionManager;
private ConnectionFactory connectionFactory;
public void recoverResource() {
try {
if (!Recovery.recover(this)) {
LOGGER.info("Resource manager is unrecoverable");
}
} catch (NoClassDefFoundError e) {
LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
} catch (Throwable e) {
LOGGER.warn("Error while recovering resource manager", e);
}
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* This class will ensure the broker is properly recovered when wired with
* the Geronimo transaction manager.
*/
public static class Recovery {
public static boolean isRecoverable(ActiveMQResourceManager rm) {
return rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
rm.getTransactionManager() instanceof RecoverableTransactionManager &&
rm.getResourceName() != null && !"".equals(rm.getResourceName());
}
public static boolean recover(ActiveMQResourceManager rm) throws IOException {
if (isRecoverable(rm)) {
try {
ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
rtxManager.recoverResourceManager(namedXaResource);
return true;
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
} else {
return false;
}
}
}
}

View File

@ -42,7 +42,6 @@ import org.apache.commons.pool.ObjectPoolFactory;
*/
public class ConnectionPool {
private TransactionManager transactionManager;
private ActiveMQConnection connection;
private Map<SessionKey, SessionPool> cache;
private AtomicBoolean started = new AtomicBoolean(false);
@ -53,9 +52,8 @@ public class ConnectionPool {
private boolean hasExpired;
private int idleTimeout = 30 * 1000;
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory,
TransactionManager transactionManager) {
this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory, transactionManager);
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
this(connection, new HashMap<SessionKey, SessionPool>(), poolFactory);
// Add a transport Listener so that we can notice if this connection
// should be expired due to
// a connection failure.
@ -84,12 +82,10 @@ public class ConnectionPool {
}
}
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory,
TransactionManager transactionManager) {
public ConnectionPool(ActiveMQConnection connection, Map<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
this.connection = connection;
this.cache = cache;
this.poolFactory = poolFactory;
this.transactionManager = transactionManager;
}
public void start() throws JMSException {
@ -103,35 +99,14 @@ public class ConnectionPool {
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
boolean isXa = transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION;
if (isXa) {
transacted = true;
ackMode = Session.SESSION_TRANSACTED;
}
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = cache.get(key);
if (pool == null) {
pool = new SessionPool(this, key, poolFactory.createPool());
cache.put(key, pool);
}
PooledSession session = pool.borrowSession();
if (isXa) {
session.setIgnoreClose(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
}
return session;
} catch (RollbackException e) {
final JMSException jmsException = new JMSException("Rollback Exception");
jmsException.initCause(e);
throw jmsException;
} catch (SystemException e) {
final JMSException jmsException = new JMSException("System Exception");
jmsException.initCause(e);
throw jmsException;
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = cache.get(key);
if (pool == null) {
pool = createSessionPool(key);
cache.put(key, pool);
}
PooledSession session = pool.borrowSession();
return session;
}
public synchronized void close() {
@ -201,29 +176,8 @@ public class ConnectionPool {
this.idleTimeout = idleTimeout;
}
protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getSession().getTransactionContext();
protected SessionPool createSessionPool(SessionKey key) {
return new SessionPool(this, key, poolFactory.createPool());
}
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
protected Synchronization(PooledSession session) {
this.session = session;
}
public void beforeCompletion() {
}
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
decrementReferenceCount();
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.jms.JMSException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ra.LocalAndXATransaction;
import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
public class JcaConnectionPool extends XaConnectionPool {
private String name;
public JcaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) {
super(connection, poolFactory, transactionManager);
this.name = name;
}
protected XAResource createXaResource(PooledSession session) throws JMSException {
XAResource xares = new LocalAndXATransaction(session.getSession().getTransactionContext());
if (name != null) {
xares = new WrapperNamedXAResource(xares, name);
}
return xares;
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
*/
public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
private String name;
public JcaPooledConnectionFactory() {
super();
}
public JcaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public JcaPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new JcaConnectionPool(connection, getPoolFactory(), getTransactionManager(), getName());
}
}

View File

@ -51,7 +51,6 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
private ObjectPoolFactory poolFactory;
private int maximumActive = 500;
private int maxConnections = 1;
private TransactionManager transactionManager;
private int idleTimeout = 30 * 1000;
public PooledConnectionFactory() {
@ -74,14 +73,6 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.connectionFactory = connectionFactory;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
@ -115,7 +106,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
ConnectionPool result = new ConnectionPool(connection, getPoolFactory(), transactionManager);
ConnectionPool result = new ConnectionPool(connection, getPoolFactory());
result.setIdleTimeout(getIdleTimeout());
return result;
}
@ -124,8 +115,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
if (key.getUserName() == null && key.getPassword() == null) {
return (ActiveMQConnection)connectionFactory.createConnection();
} else {
return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key
.getPassword());
return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
}
}
@ -144,7 +134,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
LinkedList list = iter.next();
for (Iterator i = list.iterator(); i.hasNext();) {
ConnectionPool connection = (ConnectionPool)i.next();
ConnectionPool connection = (ConnectionPool) i.next();
connection.close();
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.jms.ConnectionFactory;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.ObjectPoolFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
/**
* Simple factory bean used to create a jencks connection pool.
* Depending on the properties set, it will create a simple pool,
* a transaction aware connection pool, or a jca aware connection pool.
*
* <pre class="code">
* <bean id="pooledConnectionFactory" class="javax.script.ScriptEngineFactory.PooledConnectionFactoryFactoryBean">
* <property name="connectionFactory" ref="connectionFactory" />
* <property name="transactionManager" ref="transactionManager" />
* <property name="resourceName" value="ResourceName" />
* </bean>
* </pre>
*
* The <code>resourceName</code> property should be used along with the {@link ActiveMQResourceManager} and have
* the same value than its <code>resourceName</code> property. This will make sure the transaction manager
* maps correctly the connection factory to the recovery process.
*
*/
public class PooledConnectionFactoryBean implements FactoryBean, InitializingBean {
private static final Log LOGGER = LogFactory.getLog(PooledConnectionFactoryBean.class);
private ConnectionFactory pooledConnectionFactory;
private ConnectionFactory connectionFactory;
private int maxConnections;
private int maximumActive;
private Object transactionManager;
private String resourceName;
private ObjectPoolFactory poolFactory;
public Object getObject() throws Exception {
return pooledConnectionFactory;
}
public Class getObjectType() {
return ConnectionFactory.class;
}
public boolean isSingleton() {
return true;
}
public int getMaxConnections() {
return maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
public int getMaximumActive() {
return maximumActive;
}
public void setMaximumActive(int maximumActive) {
this.maximumActive = maximumActive;
}
public Object getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(Object transactionManager) {
this.transactionManager = transactionManager;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public ObjectPoolFactory getPoolFactory() {
return poolFactory;
}
public void setPoolFactory(ObjectPoolFactory poolFactory) {
this.poolFactory = poolFactory;
}
public void afterPropertiesSet() throws Exception {
if (pooledConnectionFactory == null && transactionManager != null && resourceName != null) {
try {
LOGGER.debug("Trying to build a JcaPooledConnectionFactory");
JcaPooledConnectionFactory f = new JcaPooledConnectionFactory();
f.setName(resourceName);
f.setTransactionManager((TransactionManager) transactionManager);
f.setMaxConnections(maxConnections);
f.setMaximumActive(maximumActive);
f.setConnectionFactory(connectionFactory);
f.setPoolFactory(poolFactory);
this.pooledConnectionFactory = f;
} catch (Throwable t) {
LOGGER.debug("Could not create JCA enabled connection factory: " + t, t);
}
}
if (pooledConnectionFactory == null && transactionManager != null) {
try {
LOGGER.debug("Trying to build a XaPooledConnectionFactory");
XaPooledConnectionFactory f = new XaPooledConnectionFactory();
f.setTransactionManager((TransactionManager) transactionManager);
f.setMaxConnections(maxConnections);
f.setMaximumActive(maximumActive);
f.setConnectionFactory(connectionFactory);
f.setPoolFactory(poolFactory);
this.pooledConnectionFactory = f;
} catch (Throwable t) {
LOGGER.debug("Could not create XA enabled connection factory: " + t, t);
}
}
if (pooledConnectionFactory == null) {
try {
LOGGER.debug("Trying to build a PooledConnectionFactory");
PooledConnectionFactory f = new PooledConnectionFactory();
f.setMaxConnections(maxConnections);
f.setMaximumActive(maximumActive);
f.setConnectionFactory(connectionFactory);
f.setPoolFactory(poolFactory);
this.pooledConnectionFactory = f;
} catch (Throwable t) {
LOGGER.debug("Could not create pooled connection factory: " + t, t);
}
}
if (pooledConnectionFactory == null) {
throw new IllegalStateException("Unable to create pooled connection factory. Enable DEBUG log level for more informations");
}
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.commons.pool.ObjectPoolFactory;
/**
* An XA-aware connection pool. When a session is created and an xa transaction is active,
* the session will automatically be enlisted in the current transaction.
*
* @author gnodet
*/
public class XaConnectionPool extends ConnectionPool {
private TransactionManager transactionManager;
public XaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
super(connection, poolFactory);
this.transactionManager = transactionManager;
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
if (isXa) {
transacted = true;
ackMode = Session.SESSION_TRANSACTED;
}
PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
if (isXa) {
session.setIgnoreClose(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
}
return session;
} catch (RollbackException e) {
final JMSException jmsException = new JMSException("Rollback Exception");
jmsException.initCause(e);
throw jmsException;
} catch (SystemException e) {
final JMSException jmsException = new JMSException("System Exception");
jmsException.initCause(e);
throw jmsException;
}
}
protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getSession().getTransactionContext();
}
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
private Synchronization(PooledSession session) {
this.session = session;
}
public void beforeCompletion() {
}
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
decrementReferenceCount();
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.transaction.TransactionManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* A pooled connection factory that automatically enlists
* sessions in the current active XA transaction if any.
*/
public class XaPooledConnectionFactory extends PooledConnectionFactory {
private TransactionManager transactionManager;
public XaPooledConnectionFactory() {
super();
}
public XaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public XaPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new XaConnectionPool(connection, getPoolFactory(), getTransactionManager());
}
}

View File

@ -23,12 +23,14 @@ import javax.jms.TopicSession;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.command.ActiveMQTopic;
/**
* @version $Revision$
*/
public class PooledTopicPublisherTest extends TestCase {
public class PooledTopicPublisherTest extends TestSupport {
private TopicConnection connection;
public void testPooledConnectionFactory() throws Exception {

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.ArrayList;
@ -35,7 +35,6 @@ import javax.jms.MessageListener;
import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@ -45,13 +44,15 @@ import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQDeadlockTest3 extends TestCase {
public class AMQDeadlockTest3 extends org.apache.activemq.test.TestSupport {
private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTest3.class);
private static final String URL1 = "tcp://localhost:61616";

View File

@ -36,6 +36,7 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -50,7 +51,8 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQDeadlockTestW4Brokers extends TestCase {
public class AMQDeadlockTestW4Brokers extends org.apache.activemq.test.TestSupport {
private static final transient Log LOG = LogFactory.getLog(AMQDeadlockTestW4Brokers.class);
private static final String BROKER_URL1 = "tcp://localhost:61616";

View File

@ -35,6 +35,7 @@ import javax.jms.Session;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.test.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -47,7 +48,7 @@ import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQFailoverIssue extends TestCase {
public class AMQFailoverIssue extends org.apache.activemq.test.TestSupport {
private static final String URL1 = "tcp://localhost:61616";
private static final String QUEUE1_NAME = "test.queue.1";

View File

@ -50,6 +50,10 @@
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-optional</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activemq-web</artifactId>

View File

@ -128,6 +128,7 @@
<includes>
<include>${pom.groupId}:activemq-rar</include>
<include>${pom.groupId}:activemq-optional</include>
<include>${pom.groupId}:activemq-pool</include>
<include>${pom.groupId}:activemq-xmpp</include>
<include>${pom.groupId}:activecluster</include>
<include>${pom.groupId}:activeio-core</include>

12
pom.xml
View File

@ -128,6 +128,7 @@
<module>activemq-jpa-store</module>
<module>activemq-openwire-generator</module>
<module>activemq-optional</module>
<module>activemq-pool</module>
<module>activemq-ra</module>
<module>activemq-rar</module>
<module>activemq-run</module>
@ -189,6 +190,11 @@
<artifactId>activemq-jmdns_1.0</artifactId>
<version>${activemq-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-web</artifactId>
@ -897,6 +903,12 @@
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<version>2.1</version>
</dependency>
</dependencies>
</dependencyManagement>