git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1425162 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-12-21 22:05:48 +00:00
parent bc45bf8065
commit 49f16ce0eb
4 changed files with 274 additions and 17 deletions

View File

@ -62,6 +62,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
private volatile boolean stopped;
private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
/**
* Creates a new PooledConnection instance that uses the given ConnectionPool to create
@ -86,6 +87,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
@Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
this.cleanupAllLoanedSessions();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
@ -104,8 +106,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
throws JMSException {
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
@ -115,8 +116,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
throws JMSException {
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
@ -173,10 +173,14 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
// Add a temporary destination event listener to the session that notifies us when
// the session creates temporary destinations.
result.addTempDestEventListener(this);
return (Session) result;
// Store the session so we can close the sessions that this PooledConnection
// created in order to ensure that consumers etc are closed per the JMS contract.
loanedSessions.add(result);
// Add a event listener to the session that notifies us when the session
// creates / destroys temporary destinations and closes etc.
result.addSessionEventListener(this);
return result;
}
// EnhancedCollection API
@ -190,14 +194,23 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
// Implementation methods
// -------------------------------------------------------------------------
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
connTempTopics.add(tempTopic);
}
@Override
public void onSessionClosed(PooledSession session) {
if (session != null) {
this.loanedSessions.remove(session);
}
}
public ActiveMQConnection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
@ -213,6 +226,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
}
@Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
@ -246,6 +260,23 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
connTempTopics.clear();
}
/**
* The PooledSession tracks all Sessions that it created and now we close them. Closing the
* PooledSession will return the internal Session to the Pool of Session after cleaning up
* all the resources that the Session had allocated for this PooledConnection.
*/
protected void cleanupAllLoanedSessions() {
for (PooledSession session : loanedSessions) {
try {
session.close();
} catch (JMSException ex) {
LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
}
}
loanedSessions.clear();
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.

View File

@ -63,7 +63,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
new CopyOnWriteArrayList<PooledSessionEventListener>();
private ActiveMQSession session;
@ -81,10 +81,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
this.transactional = session.isTransacted();
}
public void addTempDestEventListener(PooledSessionEventListener listener) {
public void addSessionEventListener(PooledSessionEventListener listener) {
// only add if really needed
if (!tempDestEventListeners.contains(listener)) {
this.tempDestEventListeners.add(listener);
if (!sessionEventListeners.contains(listener)) {
this.sessionEventListeners.add(listener);
}
}
@ -129,7 +129,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
} finally {
consumers.clear();
browsers.clear();
tempDestEventListeners.clear();
for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onSessionClosed(this);
}
sessionEventListeners.clear();
}
if (invalidate) {
@ -205,7 +208,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
result = getInternalSession().createTemporaryQueue();
// Notify all of the listeners of the created temporary Queue.
for (PooledSessionEventListener listener : this.tempDestEventListeners) {
for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryQueueCreate(result);
}
@ -219,7 +222,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
result = getInternalSession().createTemporaryTopic();
// Notify all of the listeners of the created temporary Topic.
for (PooledSessionEventListener listener : this.tempDestEventListeners) {
for (PooledSessionEventListener listener : this.sessionEventListeners) {
listener.onTemporaryTopicCreate(result);
}

View File

@ -26,7 +26,7 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryQueue.
*
* @param tempQueue
* The TemporaryQueue just created.
* The TemporaryQueue just created.
*/
void onTemporaryQueueCreate(TemporaryQueue tempQueue);
@ -34,8 +34,16 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryTopic.
*
* @param tempTopic
* The TemporaryTopic just created.
* The TemporaryTopic just created.
*/
void onTemporaryTopicCreate(TemporaryTopic tempTopic);
/**
* Called when the PooledSession is closed.
*
* @param session
* The PooledSession that has been closed.
*/
void onSessionClosed(PooledSession session);
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PooledConnectionSessionCleanupTest {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionSessionCleanupTest.class);
protected BrokerService service;
protected ActiveMQConnectionFactory directConnFact;
protected Connection directConn1;
protected Connection directConn2;
protected PooledConnectionFactory pooledConnFact;
protected Connection pooledConn1;
protected Connection pooledConn2;
private final ActiveMQQueue queue = new ActiveMQQueue("ContendedQueue");
private final int MESSAGE_COUNT = 50;
/**
* Prepare to run a test case: create, configure, and start the embedded
* broker, as well as creating the client connections to the broker.
*/
@Before
public void prepTest() throws java.lang.Exception {
service = new BrokerService();
service.setUseJmx(true);
service.setPersistent(false);
service.setSchedulerSupport(false);
service.start();
service.waitUntilStarted();
// Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
// Set a long idle timeout on the pooled connections to better show the
// problem of holding onto created resources on close.
directConnFact = new ActiveMQConnectionFactory(service.getVmConnectorURI());
pooledConnFact = new PooledConnectionFactory(directConnFact);
pooledConnFact.setIdleTimeout((int)TimeUnit.MINUTES.toMillis(60));
pooledConnFact.setMaxConnections(1);
// Prepare the connections
directConn1 = directConnFact.createConnection();
directConn1.start();
directConn2 = directConnFact.createConnection();
directConn2.start();
// The pooled Connections should have the same underlying connection
pooledConn1 = pooledConnFact.createConnection();
pooledConn1.start();
pooledConn2 = pooledConnFact.createConnection();
pooledConn2.start();
}
@After
public void cleanupTest() throws java.lang.Exception {
try {
pooledConn1.close();
} catch (JMSException jms_exc) {
}
try {
pooledConn2.close();
} catch (JMSException jms_exc) {
}
try {
directConn1.close();
} catch (JMSException jms_exc) {
}
try {
directConn2.close();
} catch (JMSException jms_exc) {
}
try {
service.stop();
} catch (JMSException jms_exc) {
}
}
private void produceMessages() throws Exception {
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; ++i) {
producer.send(session.createTextMessage("Test Message: " + i));
}
producer.close();
}
private QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ ":Type=Queue,Destination=" + name
+ ",BrokerName=localhost");
QueueViewMBean proxy = (QueueViewMBean) service.getManagementContext()
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
return proxy;
}
@Test
public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws Exception {
produceMessages();
Session pooledSession1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
pooledSession1.createConsumer(queue);
final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return view.getInFlightCount() == MESSAGE_COUNT;
}
}));
// While all the message are in flight we should get anything on this consumer.
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(2000));
pooledConn1.close();
assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return view.getSubscriptions().length == 1;
}
}));
// Now we'd expect that the message stuck in the prefetch of the pooled session's
// consumer would be rerouted to the non-pooled session's consumer.
assertNotNull(consumer.receive(10000));
}
@Test
public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages() throws Exception {
produceMessages();
Session directSession = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
directSession.createConsumer(queue);
final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
assertTrue("Should have all sent messages in flight:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return view.getInFlightCount() == MESSAGE_COUNT;
}
}));
// While all the message are in flight we should get anything on this consumer.
Session session = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queue);
assertNull(consumer.receive(2000));
directConn2.close();
assertTrue("Should have only one consumer now:", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return view.getSubscriptions().length == 1;
}
}));
// Now we'd expect that the message stuck in the prefetch of the first session's
// consumer would be rerouted to the alternate session's consumer.
assertNotNull(consumer.receive(10000));
}
}