mirror of https://github.com/apache/activemq.git
Apply Fix For: https://issues.apache.org/jira/browse/AMQ-3680
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1325943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8d2c079e0c
commit
6687d567c7
|
@ -53,8 +53,7 @@ public class ConnectionPool {
|
|||
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
|
||||
this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
|
||||
// Add a transport Listener so that we can notice if this connection
|
||||
// should be expired due to
|
||||
// a connection failure.
|
||||
// should be expired due to a connection failure.
|
||||
connection.addTransportListener(new TransportListener() {
|
||||
public void onCommand(Object command) {
|
||||
}
|
||||
|
@ -71,10 +70,9 @@ public class ConnectionPool {
|
|||
public void transportResumed() {
|
||||
}
|
||||
});
|
||||
//
|
||||
|
||||
// make sure that we set the hasFailed flag, in case the transport already failed
|
||||
// prior to the addition of our new TransportListener
|
||||
//
|
||||
if(connection.isTransportFailed()) {
|
||||
hasFailed = true;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.pool;
|
||||
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionConsumer;
|
||||
import javax.jms.ConnectionMetaData;
|
||||
|
@ -27,6 +29,8 @@ import javax.jms.QueueConnection;
|
|||
import javax.jms.QueueSession;
|
||||
import javax.jms.ServerSessionPool;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicSession;
|
||||
|
@ -36,6 +40,8 @@ import org.apache.activemq.ActiveMQSession;
|
|||
import org.apache.activemq.AlreadyClosedException;
|
||||
import org.apache.activemq.EnhancedConnection;
|
||||
import org.apache.activemq.advisory.DestinationSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
|
||||
|
@ -47,12 +53,14 @@ import org.apache.activemq.advisory.DestinationSource;
|
|||
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
|
||||
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
|
||||
|
||||
private ConnectionPool pool;
|
||||
private boolean stopped;
|
||||
private final CopyOnWriteArrayList<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
|
||||
private final CopyOnWriteArrayList<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
|
||||
|
||||
public PooledConnection(ConnectionPool pool) {
|
||||
this.pool = pool;
|
||||
|
@ -67,6 +75,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
|
|||
}
|
||||
|
||||
public void close() throws JMSException {
|
||||
this.cleanupConnectionTemporaryDestinations();
|
||||
if (this.pool != null) {
|
||||
this.pool.decrementReferenceCount();
|
||||
this.pool = null;
|
||||
|
@ -82,22 +91,17 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
|
|||
stopped = true;
|
||||
}
|
||||
|
||||
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector,
|
||||
ServerSessionPool serverSessionPool, int maxMessages)
|
||||
throws JMSException {
|
||||
return getConnection()
|
||||
.createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
|
||||
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
|
||||
throws JMSException {
|
||||
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
|
||||
}
|
||||
|
||||
public ConnectionConsumer createConnectionConsumer(Topic topic, String s,
|
||||
ServerSessionPool serverSessionPool, int maxMessages)
|
||||
throws JMSException {
|
||||
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
|
||||
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
|
||||
}
|
||||
|
||||
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1,
|
||||
ServerSessionPool serverSessionPool, int i)
|
||||
throws JMSException {
|
||||
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
|
||||
throws JMSException {
|
||||
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
|
||||
}
|
||||
|
||||
|
@ -118,34 +122,49 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
|
|||
}
|
||||
|
||||
public void setClientID(String clientID) throws JMSException {
|
||||
|
||||
// ignore repeated calls to setClientID() with the same client id
|
||||
// this could happen when a JMS component such as Spring that uses a
|
||||
// PooledConnectionFactory shuts down and reinitializes.
|
||||
//
|
||||
|
||||
// ignore repeated calls to setClientID() with the same client id
|
||||
// this could happen when a JMS component such as Spring that uses a
|
||||
// PooledConnectionFactory shuts down and reinitializes.
|
||||
if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
|
||||
getConnection().setClientID(clientID);
|
||||
getConnection().setClientID(clientID);
|
||||
}
|
||||
}
|
||||
|
||||
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector,
|
||||
ServerSessionPool serverSessionPool, int maxMessages)
|
||||
throws JMSException {
|
||||
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
|
||||
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
|
||||
}
|
||||
|
||||
// Session factory methods
|
||||
// -------------------------------------------------------------------------
|
||||
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
|
||||
return (QueueSession)createSession(transacted, ackMode);
|
||||
return (QueueSession) createSession(transacted, ackMode);
|
||||
}
|
||||
|
||||
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
|
||||
return (TopicSession)createSession(transacted, ackMode);
|
||||
return (TopicSession) createSession(transacted, ackMode);
|
||||
}
|
||||
|
||||
public Session createSession(boolean transacted, int ackMode) throws JMSException {
|
||||
return pool.createSession(transacted, ackMode);
|
||||
PooledSession result;
|
||||
result = (PooledSession) pool.createSession(transacted, ackMode);
|
||||
|
||||
// Add a temporary destination event listener to the session that notifies us when
|
||||
// the session creates temporary destinations.
|
||||
result.addTempDestEventListener(new PooledSessionEventListener() {
|
||||
|
||||
@Override
|
||||
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
|
||||
connTempQueues.add(tempQueue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
|
||||
connTempTopics.add(tempTopic);
|
||||
}
|
||||
});
|
||||
|
||||
return (Session) result;
|
||||
}
|
||||
|
||||
// EnhancedCollection API
|
||||
|
@ -170,10 +189,39 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
|
|||
}
|
||||
|
||||
protected ActiveMQSession createSession(SessionKey key) throws JMSException {
|
||||
return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode());
|
||||
return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "PooledConnection { " + pool + " }";
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all of the temporary destinations created for this connection.
|
||||
* This is important since the underlying connection may be reused over a
|
||||
* long period of time, accumulating all of the temporary destinations from
|
||||
* each use. However, from the perspective of the lifecycle from the
|
||||
* client's view, close() closes the connection and, therefore, deletes all
|
||||
* of the temporary destinations created.
|
||||
*/
|
||||
protected void cleanupConnectionTemporaryDestinations() {
|
||||
|
||||
for (TemporaryQueue tempQueue : connTempQueues) {
|
||||
try {
|
||||
tempQueue.delete();
|
||||
} catch (JMSException ex) {
|
||||
LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
connTempQueues.clear();
|
||||
|
||||
for (TemporaryTopic tempTopic : connTempTopics) {
|
||||
try {
|
||||
tempTopic.delete();
|
||||
} catch (JMSException ex) {
|
||||
LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
connTempTopics.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import javax.jms.QueueBrowser;
|
|||
import javax.jms.QueueReceiver;
|
||||
import javax.jms.QueueSender;
|
||||
import javax.jms.QueueSession;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
|
@ -43,7 +44,6 @@ import javax.jms.TopicPublisher;
|
|||
import javax.jms.TopicSession;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import javax.jms.XASession;
|
||||
import javax.jms.Session;
|
||||
import javax.transaction.xa.XAResource;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageProducer;
|
||||
|
@ -54,9 +54,6 @@ import org.apache.activemq.AlreadyClosedException;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class PooledSession implements Session, TopicSession, QueueSession, XASession {
|
||||
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
|
||||
|
||||
|
@ -70,6 +67,8 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
|
||||
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
|
||||
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
|
||||
private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
|
||||
new CopyOnWriteArrayList<PooledSessionEventListener>();
|
||||
private boolean isXa;
|
||||
|
||||
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
|
||||
|
@ -78,6 +77,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
this.transactional = session.isTransacted();
|
||||
}
|
||||
|
||||
public void addTempDestEventListener(PooledSessionEventListener listener) {
|
||||
this.tempDestEventListeners.add(listener);
|
||||
}
|
||||
|
||||
protected boolean isIgnoreClose() {
|
||||
return ignoreClose;
|
||||
}
|
||||
|
@ -121,6 +124,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
consumers.clear();
|
||||
browsers.clear();
|
||||
}
|
||||
|
||||
if (invalidate) {
|
||||
// lets close the session and not put the session back into
|
||||
// the pool
|
||||
|
@ -172,11 +176,29 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
}
|
||||
|
||||
public TemporaryQueue createTemporaryQueue() throws JMSException {
|
||||
return getInternalSession().createTemporaryQueue();
|
||||
TemporaryQueue result;
|
||||
|
||||
result = getInternalSession().createTemporaryQueue();
|
||||
|
||||
// Notify all of the listeners of the created temporary Queue.
|
||||
for (PooledSessionEventListener listener : this.tempDestEventListeners) {
|
||||
listener.onTemporaryQueueCreate(result);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public TemporaryTopic createTemporaryTopic() throws JMSException {
|
||||
return getInternalSession().createTemporaryTopic();
|
||||
TemporaryTopic result;
|
||||
|
||||
result = getInternalSession().createTemporaryTopic();
|
||||
|
||||
// Notify all of the listeners of the created temporary Topic.
|
||||
for (PooledSessionEventListener listener : this.tempDestEventListeners) {
|
||||
listener.onTemporaryTopicCreate(result);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public void unsubscribe(String s) throws JMSException {
|
||||
|
@ -299,10 +321,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
/**
|
||||
* Callback invoked when the consumer is closed.
|
||||
* <p/>
|
||||
* This is used to keep track of an explicit closed consumer created by this session,
|
||||
* by which we know do not need to keep track of the consumer, as its already closed.
|
||||
* This is used to keep track of an explicit closed consumer created by this
|
||||
* session, by which we know do not need to keep track of the consumer, as
|
||||
* its already closed.
|
||||
*
|
||||
* @param consumer the consumer which is being closed
|
||||
* @param consumer
|
||||
* the consumer which is being closed
|
||||
*/
|
||||
protected void onConsumerClose(MessageConsumer consumer) {
|
||||
consumers.remove(consumer);
|
||||
|
@ -343,8 +367,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
|
|||
|
||||
private MessageConsumer addConsumer(MessageConsumer consumer) {
|
||||
consumers.add(consumer);
|
||||
// must wrap in PooledMessageConsumer to ensure the onConsumerClose method is invoked
|
||||
// when the returned consumer is closed, to avoid memory leak in this session class
|
||||
// must wrap in PooledMessageConsumer to ensure the onConsumerClose
|
||||
// method is invoked
|
||||
// when the returned consumer is closed, to avoid memory leak in this
|
||||
// session class
|
||||
// in case many consumers is created
|
||||
return new PooledMessageConsumer(this, consumer);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.pool;
|
||||
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
|
||||
interface PooledSessionEventListener {
|
||||
|
||||
/**
|
||||
* Called on successful creation of a new TemporaryQueue.
|
||||
*
|
||||
* @param tempQueue
|
||||
* The TemporaryQueue just created.
|
||||
*/
|
||||
void onTemporaryQueueCreate(TemporaryQueue tempQueue);
|
||||
|
||||
/**
|
||||
* Called on successful creation of a new TemporaryTopic.
|
||||
*
|
||||
* @param tempTopic
|
||||
* The TemporaryTopic just created.
|
||||
*/
|
||||
void onTemporaryTopicCreate(TemporaryTopic tempTopic);
|
||||
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.pool;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TemporaryQueue;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test of lingering temporary destinations on pooled connections when the
|
||||
* underlying connections are reused. Also tests that closing one
|
||||
* PooledConnection does not delete the temporary destinations of another
|
||||
* PooledConnection that uses the same underlying ConnectionPool.
|
||||
*
|
||||
* jira: AMQ-3457
|
||||
*/
|
||||
public class PooledConnectionTempDestCleanupTest {
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempDestCleanupTest.class);
|
||||
|
||||
protected BrokerService embeddedBroker;
|
||||
|
||||
protected ActiveMQConnectionFactory directConnFact;
|
||||
protected Connection directConn1;
|
||||
protected Connection directConn2;
|
||||
|
||||
protected PooledConnectionFactory pooledConnFact;
|
||||
protected Connection pooledConn1;
|
||||
protected Connection pooledConn2;
|
||||
|
||||
protected TemporaryQueue tempDest;
|
||||
protected TemporaryQueue otherTempDest;
|
||||
|
||||
/**
|
||||
* Prepare to run a test case: create, configure, and start the embedded
|
||||
* broker, as well as creating the client connections to the broker.
|
||||
*/
|
||||
@Before
|
||||
public void prepTest() throws java.lang.Exception {
|
||||
embeddedBroker = new BrokerService();
|
||||
configureBroker(embeddedBroker);
|
||||
embeddedBroker.start();
|
||||
embeddedBroker.waitUntilStarted();
|
||||
|
||||
// Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
|
||||
directConnFact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
|
||||
pooledConnFact = new PooledConnectionFactory(directConnFact);
|
||||
|
||||
// Prepare the connections
|
||||
directConn1 = directConnFact.createConnection();
|
||||
directConn1.start();
|
||||
directConn2 = directConnFact.createConnection();
|
||||
directConn2.start();
|
||||
|
||||
pooledConn1 = pooledConnFact.createConnection();
|
||||
pooledConn1.start();
|
||||
pooledConn2 = pooledConnFact.createConnection();
|
||||
pooledConn2.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupTest() throws java.lang.Exception {
|
||||
try {
|
||||
pooledConn1.stop();
|
||||
} catch (JMSException jms_exc) {
|
||||
}
|
||||
try {
|
||||
pooledConn2.stop();
|
||||
} catch (JMSException jms_exc) {
|
||||
}
|
||||
try {
|
||||
directConn1.stop();
|
||||
} catch (JMSException jms_exc) {
|
||||
}
|
||||
try {
|
||||
directConn2.stop();
|
||||
} catch (JMSException jms_exc) {
|
||||
}
|
||||
|
||||
try {
|
||||
embeddedBroker.stop();
|
||||
} catch (JMSException jms_exc) {
|
||||
}
|
||||
}
|
||||
|
||||
protected void configureBroker(BrokerService broker_svc) throws Exception {
|
||||
broker_svc.setBrokerName("testbroker1");
|
||||
broker_svc.setUseJmx(false);
|
||||
broker_svc.setPersistent(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for lingering temporary destinations after closing a
|
||||
* PooledConnection. Here are the steps:
|
||||
*
|
||||
* 1. create a session on the first pooled connection 2. create a session on
|
||||
* the second pooled connection 3. create a temporary destination on the
|
||||
* first session 4. confirm the temporary destination exists in the broker
|
||||
* 5. close the first connection 6. check that the temporary destination no
|
||||
* longer exists in the broker
|
||||
*/
|
||||
@Test
|
||||
public void testPooledLingeringTempDests() throws java.lang.Exception {
|
||||
Session session1;
|
||||
Session session2;
|
||||
|
||||
session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
tempDest = session1.createTemporaryQueue();
|
||||
|
||||
assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
|
||||
|
||||
pooledConn1.close();
|
||||
|
||||
assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
|
||||
|
||||
session2.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that closing one PooledConnection does not delete the temporary
|
||||
* destinations of another.
|
||||
*
|
||||
* 1. create a session on the first pooled connection 2. create a session on
|
||||
* the second pooled connection 3. create a temporary destination on the
|
||||
* first session 4. create a temporary destination on the second session 5.
|
||||
* confirm both temporary destinations exist in the broker 6. close the
|
||||
* first connection 7. check that the first temporary destination no longer
|
||||
* exists in the broker 8. check that the second temporary destination does
|
||||
* still exist in the broker
|
||||
*/
|
||||
@Test
|
||||
public void testPooledTempDestsCleanupOverzealous() throws java.lang.Exception {
|
||||
Session session1;
|
||||
Session session2;
|
||||
|
||||
session1 = pooledConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session2 = pooledConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
tempDest = session1.createTemporaryQueue();
|
||||
otherTempDest = session2.createTemporaryQueue();
|
||||
|
||||
assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
|
||||
assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(otherTempDest));
|
||||
|
||||
pooledConn1.close();
|
||||
|
||||
// Now confirm the first temporary destination no longer exists and the
|
||||
// second does.
|
||||
assertTrue("FAILED: temp dest from closed pooled connection is lingering", !destinationExists(tempDest));
|
||||
assertTrue("FAILED: second PooledConnectin's temporary destination was incorrectly deleted", destinationExists(otherTempDest));
|
||||
}
|
||||
|
||||
/**
|
||||
* CONTROL CASE
|
||||
*
|
||||
* Test for lingering temporary destinations after closing a Connection that
|
||||
* is NOT pooled. This demonstrates the standard JMS operation and helps to
|
||||
* validate the test methodology.
|
||||
*
|
||||
* 1. create a session on the first direct connection 2. create a session on
|
||||
* the second direct connection 3. create a temporary destination on the
|
||||
* first session 4. confirm the destination exists in the broker 5. close
|
||||
* the first connection 6. check that the destination no longer exists in
|
||||
* the broker
|
||||
*/
|
||||
@Test
|
||||
public void testDirectLingeringTempDests() throws java.lang.Exception {
|
||||
Session session1;
|
||||
Session session2;
|
||||
|
||||
session1 = directConn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session2 = directConn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
tempDest = session1.createTemporaryQueue();
|
||||
|
||||
assertTrue("TEST METHOD FAILURE - NEW TEMP DESTINATION DOES NOT EXIST", destinationExists(tempDest));
|
||||
|
||||
directConn1.close();
|
||||
|
||||
// Now confirm the temporary destination no longer exists.
|
||||
assertTrue("CONTROL TEST FAILURE - TEST METHOD IS SUSPECT", (!destinationExists(tempDest)));
|
||||
|
||||
session2.close();
|
||||
}
|
||||
|
||||
private boolean destinationExists(Destination dest) throws Exception {
|
||||
RegionBroker rb = (RegionBroker) embeddedBroker.getBroker().getAdaptor(RegionBroker.class);
|
||||
return rb.getTopicRegion().getDestinationMap().containsKey(dest) || rb.getQueueRegion().getDestinationMap().containsKey(dest)
|
||||
|| rb.getTempTopicRegion().getDestinationMap().containsKey(dest) || rb.getTempQueueRegion().getDestinationMap().containsKey(dest);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue