AMQ-5721 - Update ActiveMQ to use commons-pool2 instead of commons-pool.AMQ-5636 will need it. The JMS pool and other components should use it as well.

This commit is contained in:
Jeff Genender 2015-04-14 06:57:00 -06:00 committed by Dejan Bosanac
parent 31834ed1fb
commit 6d6ed4eaaf
14 changed files with 88 additions and 80 deletions

View File

@ -109,7 +109,7 @@ public class ActiveMQConfiguration extends JmsConfiguration {
* than the default with the Spring {@link JmsTemplate} which will create a new connection, session, producer * than the default with the Spring {@link JmsTemplate} which will create a new connection, session, producer
* for each message then close them all down again. * for each message then close them all down again.
* <p/> * <p/>
* The default value is true. Note that this requires an extra dependency on commons-pool. * The default value is true. Note that this requires an extra dependency on commons-pool2.
*/ */
public void setUsePooledConnection(boolean usePooledConnection) { public void setUsePooledConnection(boolean usePooledConnection) {
this.usePooledConnection = usePooledConnection; this.usePooledConnection = usePooledConnection;
@ -162,7 +162,7 @@ public class ActiveMQConfiguration extends JmsConfiguration {
} }
protected ConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) { protected ConnectionFactory createPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
// lets not use classes directly to avoid a runtime dependency on commons-pool // lets not use classes directly to avoid a runtime dependency on commons-pool2
// for folks not using this option // for folks not using this option
try { try {
Class type = loadClass("org.apache.activemq.pool.PooledConnectionFactory", getClass().getClassLoader()); Class type = loadClass("org.apache.activemq.pool.PooledConnectionFactory", getClass().getClassLoader());

View File

@ -60,8 +60,8 @@
<artifactId>geronimo-jms_1.1_spec</artifactId> <artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.xbean</groupId> <groupId>org.apache.xbean</groupId>

View File

@ -69,8 +69,8 @@
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<!-- for testing use amq --> <!-- for testing use amq -->
<dependency> <dependency>

View File

@ -29,9 +29,13 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue; import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic; import javax.jms.TemporaryTopic;
import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,30 +71,30 @@ public class ConnectionPool implements ExceptionListener {
// Create our internal Pool of session instances. // Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>( this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>(
new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() { new KeyedPooledObjectFactory<SessionKey, SessionHolder>() {
@Override @Override
public void activateObject(SessionKey key, SessionHolder session) throws Exception { public PooledObject<SessionHolder> makeObject(SessionKey sessionKey) throws Exception {
return new DefaultPooledObject<SessionHolder>(new SessionHolder(makeSession(sessionKey)));
} }
@Override @Override
public void destroyObject(SessionKey key, SessionHolder session) throws Exception { public void destroyObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
session.close(); ((SessionHolder)pooledObject.getObject()).close();
} }
@Override @Override
public SessionHolder makeObject(SessionKey key) throws Exception { public boolean validateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) {
return new SessionHolder(makeSession(key));
}
@Override
public void passivateObject(SessionKey key, SessionHolder session) throws Exception {
}
@Override
public boolean validateObject(SessionKey key, SessionHolder session) {
return true; return true;
} }
@Override
public void activateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
}
@Override
public void passivateObject(SessionKey sessionKey, PooledObject<SessionHolder> pooledObject) throws Exception {
}
} }
); );
} }
@ -258,11 +262,11 @@ public class ConnectionPool implements ExceptionListener {
} }
public int getMaximumActiveSessionPerConnection() { public int getMaximumActiveSessionPerConnection() {
return this.sessionPool.getMaxActive(); return this.sessionPool.getMaxTotal();
} }
public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
this.sessionPool.setMaxActive(maximumActiveSessionPerConnection); this.sessionPool.setMaxTotal(maximumActiveSessionPerConnection);
} }
public boolean isUseAnonymousProducers() { public boolean isUseAnonymousProducers() {
@ -304,12 +308,11 @@ public class ConnectionPool implements ExceptionListener {
* Indicates whether blocking should be used to wait for more space to create a session. * Indicates whether blocking should be used to wait for more space to create a session.
*/ */
public void setBlockIfSessionPoolIsFull(boolean block) { public void setBlockIfSessionPoolIsFull(boolean block) {
this.sessionPool.setWhenExhaustedAction( this.sessionPool.setBlockWhenExhausted(block);
(block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
} }
public boolean isBlockIfSessionPoolIsFull() { public boolean isBlockIfSessionPoolIsFull() {
return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK; return this.sessionPool.getBlockWhenExhausted();
} }
/** /**
@ -319,7 +322,7 @@ public class ConnectionPool implements ExceptionListener {
* @see #setBlockIfSessionPoolIsFull(boolean) * @see #setBlockIfSessionPoolIsFull(boolean)
*/ */
public long getBlockIfSessionPoolIsFullTimeout() { public long getBlockIfSessionPoolIsFullTimeout() {
return this.sessionPool.getMaxWait(); return this.sessionPool.getMaxWaitMillis();
} }
/** /**
@ -337,7 +340,7 @@ public class ConnectionPool implements ExceptionListener {
* then use this setting to configure how long to block before retry * then use this setting to configure how long to block before retry
*/ */
public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout); this.sessionPool.setMaxWaitMillis(blockIfSessionPoolIsFullTimeout);
} }
/** /**

View File

@ -28,8 +28,10 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection; import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory; import javax.jms.TopicConnectionFactory;
import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -85,27 +87,10 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
public void initConnectionsPool() { public void initConnectionsPool() {
if (this.connectionsPool == null) { if (this.connectionsPool == null) {
this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>( this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() { new KeyedPooledObjectFactory<ConnectionKey, ConnectionPool>() {
@Override @Override
public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception { public PooledObject<ConnectionPool> makeObject(ConnectionKey connectionKey) throws Exception {
} Connection delegate = createConnection(connectionKey);
@Override
public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
}
@Override
public ConnectionPool makeObject(ConnectionKey key) throws Exception {
Connection delegate = createConnection(key);
ConnectionPool connection = createConnectionPool(delegate); ConnectionPool connection = createConnectionPool(delegate);
connection.setIdleTimeout(getIdleTimeout()); connection.setIdleTimeout(getIdleTimeout());
@ -124,15 +109,25 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
PooledConnectionFactory.this.mostRecentlyCreated.set(connection); PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
return connection; return new DefaultPooledObject<ConnectionPool>(connection);
} }
@Override @Override
public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception { public void destroyObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
ConnectionPool connection = pooledObject.getObject();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
} }
@Override @Override
public boolean validateObject(ConnectionKey key, ConnectionPool connection) { public boolean validateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) {
ConnectionPool connection = pooledObject.getObject();
if (connection != null && connection.expiredCheck()) { if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {} and will be destroyed", connection); LOG.trace("Connection has expired: {} and will be destroyed", connection);
@ -143,10 +138,19 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
return true; return true;
} }
@Override
public void activateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
}
@Override
public void passivateObject(ConnectionKey connectionKey, PooledObject<ConnectionPool> pooledObject) throws Exception {
}
}); });
// Set max idle (not max active) since our connections always idle in the pool. // Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdle(1); this.connectionsPool.setMaxIdlePerKey(1);
this.connectionsPool.setLifo(false); this.connectionsPool.setLifo(false);
// We always want our validate method to control when idle objects are evicted. // We always want our validate method to control when idle objects are evicted.
@ -378,7 +382,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
* @return the maxConnections that will be created for this pool. * @return the maxConnections that will be created for this pool.
*/ */
public int getMaxConnections() { public int getMaxConnections() {
return getConnectionsPool().getMaxIdle(); return getConnectionsPool().getMaxIdlePerKey();
} }
/** /**
@ -389,7 +393,8 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti
* @param maxConnections the maxConnections to set * @param maxConnections the maxConnections to set
*/ */
public void setMaxConnections(int maxConnections) { public void setMaxConnections(int maxConnections) {
getConnectionsPool().setMaxIdle(maxConnections); getConnectionsPool().setMaxIdlePerKey(maxConnections);
getConnectionsPool().setMaxTotalPerKey(maxConnections);
} }
/** /**

View File

@ -47,7 +47,7 @@ import javax.jms.TopicSubscriber;
import javax.jms.XASession; import javax.jms.XASession;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import org.apache.commons.pool.KeyedObjectPool; import org.apache.commons.pool2.KeyedObjectPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -30,7 +30,7 @@
<bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-j2ee-management_1.1_spec/1.0.1</bundle> <bundle dependency="true">mvn:org.apache.geronimo.specs/geronimo-j2ee-management_1.1_spec/1.0.1</bundle>
<bundle dependency="true">mvn:org.jvnet.jaxb2_commons/jaxb2-basics-runtime/${jaxb-basics-version}</bundle> <bundle dependency="true">mvn:org.jvnet.jaxb2_commons/jaxb2-basics-runtime/${jaxb-basics-version}</bundle>
<bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jaxb-impl/${jaxb-bundle-version}</bundle> <bundle dependency='true'>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.jaxb-impl/${jaxb-bundle-version}</bundle>
<bundle dependency="false">mvn:commons-pool/commons-pool/${commons-pool-version}</bundle> <bundle dependency="false">mvn:org.apache.commons/commons-pool2/${commons-pool2-version}</bundle>
<bundle dependency="false">mvn:commons-net/commons-net/${commons-net-version}</bundle> <bundle dependency="false">mvn:commons-net/commons-net/${commons-net-version}</bundle>
<bundle dependency='true'>mvn:org.apache.zookeeper/zookeeper/${zookeeper-version}</bundle> <bundle dependency='true'>mvn:org.apache.zookeeper/zookeeper/${zookeeper-version}</bundle>
<!-- uber osgi bundle means client is not that lean, todo: introduce client osgi bundle --> <!-- uber osgi bundle means client is not that lean, todo: introduce client osgi bundle -->

View File

@ -74,8 +74,8 @@
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>

View File

@ -162,8 +162,8 @@
<artifactId>commons-primitives</artifactId> <artifactId>commons-primitives</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
@ -296,8 +296,8 @@
<artifactId>commons-primitives</artifactId> <artifactId>commons-primitives</artifactId>
</exclusion> </exclusion>
<exclusion> <exclusion>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
@ -336,8 +336,8 @@
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId> <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>

View File

@ -149,8 +149,8 @@
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework</groupId> <groupId>org.springframework</groupId>

View File

@ -228,8 +228,8 @@
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -173,7 +173,7 @@
<include>commons-beanutils:commons-beanutils</include> <include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include> <include>commons-collections:commons-collections</include>
<include>org.apache.commons:commons-dbcp2</include> <include>org.apache.commons:commons-dbcp2</include>
<include>commons-pool:commons-pool</include> <include>org.apache.commons:commons-pool2</include>
<include>commons-codec:commons-codec</include> <include>commons-codec:commons-codec</include>
<include>commons-net:commons-net</include> <include>commons-net:commons-net</include>
<include>commons-lang:commons-lang</include> <include>commons-lang:commons-lang</include>

View File

@ -57,7 +57,7 @@
<commons-io-version>2.4</commons-io-version> <commons-io-version>2.4</commons-io-version>
<commons-lang-version>2.6</commons-lang-version> <commons-lang-version>2.6</commons-lang-version>
<commons-logging-version>1.1.3</commons-logging-version> <commons-logging-version>1.1.3</commons-logging-version>
<commons-pool-version>1.6</commons-pool-version> <commons-pool2-version>2.3</commons-pool2-version>
<commons-primitives-version>1.0</commons-primitives-version> <commons-primitives-version>1.0</commons-primitives-version>
<commons-net-version>3.3</commons-net-version> <commons-net-version>3.3</commons-net-version>
<directory-version>2.0.0-M6</directory-version> <directory-version>2.0.0-M6</directory-version>
@ -855,9 +855,9 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
<version>${commons-pool-version}</version> <version>${commons-pool2-version}</version>
</dependency> </dependency>
<!-- Optional Journal Implementation --> <!-- Optional Journal Implementation -->

View File

@ -93,8 +93,8 @@
<artifactId>httpclient</artifactId> <artifactId>httpclient</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-pool</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-pool</artifactId> <artifactId>commons-pool2</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>commons-collections</groupId> <groupId>commons-collections</groupId>