From c0bc3e00c13a5a5565744ab340768424a3da340b Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Tue, 14 Nov 2006 14:12:03 +0000 Subject: [PATCH] http://issues.apache.org/activemq/browse/AMQ-1045 we now evict failed connections from a connection pool. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@474799 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/pool/ConnectionPool.java | 101 ++++++++++++++---- .../activemq/pool/PooledConnection.java | 8 +- .../pool/PooledConnectionFactory.java | 36 +++---- .../pool/ConnectionFailureEvictsFromPool.java | 79 ++++++++++++++ 4 files changed, 182 insertions(+), 42 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java index 399a5c1495..6cc42c6974 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -17,18 +17,19 @@ */ package org.apache.activemq.pool; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.util.JMSExceptionSupport; -import org.apache.commons.pool.ObjectPoolFactory; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import javax.jms.JMSException; import javax.jms.Session; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.transport.TransportListener; +import org.apache.commons.pool.ObjectPoolFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * Holds a real JMS connection along with the session pools associated with it. @@ -36,13 +37,33 @@ import java.util.Map; * @version $Revision$ */ public class ConnectionPool { + private ActiveMQConnection connection; private Map cache; private AtomicBoolean started = new AtomicBoolean(false); + private int referenceCount; private ObjectPoolFactory poolFactory; + private long lastUsed; + private boolean hasFailed; + private int idleTimeout = 30*1000; public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { this(connection, new HashMap(), poolFactory); + // Add a transport Listener so that we can notice if this connection should be expired due to + // a connection failure. + connection.addTransportListener(new TransportListener(){ + public void onCommand(Object command) { + } + public void onException(IOException error) { + synchronized(ConnectionPool.this) { + hasFailed = true; + } + } + public void transportInterupted() { + } + public void transportResumed() { + } + }); } public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) { @@ -57,7 +78,7 @@ public class ConnectionPool { } } - public ActiveMQConnection getConnection() { + synchronized public ActiveMQConnection getConnection() { return connection; } @@ -71,20 +92,58 @@ public class ConnectionPool { return pool.borrowSession(); } - public void close() throws JMSException { - Iterator i = cache.values().iterator(); - while (i.hasNext()) { - SessionPool pool = (SessionPool) i.next(); - i.remove(); + synchronized public void close() { + if( connection!=null ) { + Iterator i = cache.values().iterator(); + while (i.hasNext()) { + SessionPool pool = (SessionPool) i.next(); + i.remove(); + try { + pool.close(); + } catch (Exception e) { + } + } try { - pool.close(); + connection.close(); + } catch (Exception e) { } - catch (Exception e) { - throw JMSExceptionSupport.create(e); - } - } - connection.close(); - connection = null; + connection = null; + } } + synchronized public void incrementReferenceCount() { + referenceCount++; + } + + synchronized public void decrementReferenceCount() { + referenceCount--; + if( referenceCount == 0 ) { + lastUsed = System.currentTimeMillis(); + expiredCheck(); + } + } + + /** + * @return true if this connection has expired. + */ + synchronized public boolean expiredCheck() { + if( connection == null ) + return true; + if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) { + if( referenceCount == 0 ) { + close(); + } + return true; + } + return false; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java index 69d5aad728..7d7db793e4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java @@ -56,6 +56,7 @@ public class PooledConnection implements TopicConnection, QueueConnection { public PooledConnection(ConnectionPool pool) { this.pool = pool; + this.pool.incrementReferenceCount(); } /** @@ -66,7 +67,10 @@ public class PooledConnection implements TopicConnection, QueueConnection { } public void close() throws JMSException { - pool = null; + if( this.pool!=null ) { + this.pool.decrementReferenceCount(); + this.pool = null; + } } public void start() throws JMSException { @@ -133,7 +137,7 @@ public class PooledConnection implements TopicConnection, QueueConnection { // Implementation methods // ------------------------------------------------------------------------- - protected ActiveMQConnection getConnection() throws JMSException { + ActiveMQConnection getConnection() throws JMSException { assertNotClosed(); return pool.getConnection(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java index b071192f70..f53bda2f7b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java @@ -17,22 +17,20 @@ */ package org.apache.activemq.pool; -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.Service; -import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; -import org.apache.commons.pool.ObjectPoolFactory; -import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.commons.pool.impl.GenericObjectPool.Config; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.Service; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.commons.pool.ObjectPoolFactory; +import org.apache.commons.pool.impl.GenericObjectPoolFactory; /** * A JMS provider which pools Connection, Session and MessageProducer instances @@ -79,6 +77,13 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { public synchronized Connection createConnection(String userName, String password) throws JMSException { ConnectionKey key = new ConnectionKey(userName, password); ConnectionPool connection = (ConnectionPool) cache.get(key); + + // Now.. we might get a connection, but it might be that we need to + // dump it.. + if( connection!=null && connection.expiredCheck() ) { + connection=null; + } + if (connection == null) { ActiveMQConnection delegate = createConnection(key); connection = new ConnectionPool(delegate, getPoolFactory()); @@ -109,17 +114,10 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { } public void stop() throws Exception { - ServiceStopper stopper = new ServiceStopper(); for (Iterator iter = cache.values().iterator(); iter.hasNext();) { ConnectionPool connection = (ConnectionPool) iter.next(); - try { - connection.close(); - } - catch (JMSException e) { - stopper.onException(this, e); - } + connection.close(); } - stopper.throwFirstException(); } public ObjectPoolFactory getPoolFactory() { diff --git a/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java b/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java new file mode 100644 index 0000000000..523d7c5b88 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.pool; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.test.TestSupport; +import org.apache.activemq.transport.mock.MockTransport; + +public class ConnectionFailureEvictsFromPool extends TestSupport { + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory("mock:"+connector.getConnectUri()); + pooledFactory = new PooledConnectionFactory(factory); + } + + public void testEviction() throws Exception { + Connection connection = pooledFactory.createConnection(); + sendMessage(connection); + createConnectionFailure(connection); + try { + sendMessage(connection); + fail("Expected Error"); + } catch ( JMSException e) { + } + + // If we get another connection now it should be a new connection that works. + Connection connection2 = pooledFactory.createConnection(); + sendMessage(connection2); + } + + private void createConnectionFailure(Connection connection) throws Exception { + ActiveMQConnection c = ((PooledConnection)connection).getConnection(); + MockTransport t = (MockTransport) c.getTransportChannel().narrow(MockTransport.class); + t.stop(); + } + + private void sendMessage(Connection connection) throws JMSException { + Session session = connection.createSession(false, 0); + MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO")); + producer.send(session.createTextMessage("Test")); + session.close(); + } + + protected void tearDown() throws Exception { + broker.stop(); + } +}