fix for: https://issues.apache.org/jira/browse/AMQ-2716

When a PooledConnection is closed the Temp Destinations of the contained Connection should be removed.

Applied patch from AMQ-2349 with modifications to prevent NullPointerExceptions and some other small cleanups.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1142267 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-07-02 18:59:33 +00:00
parent f24b4f7569
commit d226f5089f
3 changed files with 254 additions and 131 deletions

View File

@ -196,7 +196,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Construct an <code>ActiveMQConnection</code>
*
*
* @param transport
* @param factoryStats
* @throws Exception
@ -243,7 +243,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* A static helper method to create a new connection
*
*
* @return an ActiveMQConnection
* @throws JMSException
*/
@ -254,7 +254,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* A static helper method to create a new connection
*
*
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
@ -266,7 +266,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* A static helper method to create a new connection
*
*
* @param user
* @param password
* @param uri
@ -287,7 +287,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a <CODE>Session</CODE> object.
*
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@ -334,7 +334,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
* dynamically by the application by calling the <code>setClientID</code>
* method.
*
*
* @return the unique client identifier
* @throws JMSException if the JMS provider fails to return the client ID
* for this connection due to some internal error.
@ -372,7 +372,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* If another connection with the same <code>clientID</code> is already
* running when this method is called, the JMS provider should detect the
* duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
*
*
* @param newClientID the unique client identifier
* @throws JMSException if the JMS provider fails to set the client ID for
* this connection due to some internal error.
@ -409,7 +409,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Gets the metadata for this connection.
*
*
* @return the connection metadata
* @throws JMSException if the JMS provider fails to get the connection
* metadata for this connection.
@ -424,7 +424,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
* every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
* associated with it.
*
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null, if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
@ -455,7 +455,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* <P>
* A JMS provider should attempt to resolve connection problems itself
* before it notifies the client of them.
*
*
* @param listener the exception listener
* @throws JMSException if the JMS provider fails to set the exception
* listener for this connection.
@ -469,7 +469,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Gets the <code>ClientInternalExceptionListener</code> object for this connection.
* Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
* associated with it.
*
*
* @return the listener or <code>null</code> if no listener is registered with the connection.
*/
public ClientInternalExceptionListener getClientInternalExceptionListener()
@ -483,19 +483,19 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
* It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
* describing the problem.
*
*
* @param listener the exception listener
*/
public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
{
this.clientInternalExceptionListener = listener;
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
*
*
* @throws JMSException if the JMS provider fails to start message delivery
* due to some internal error.
* @see javax.jms.Connection#stop()
@ -537,7 +537,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* <CODE>stop</CODE> call must wait until all of them have returned before
* it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
*
*
* @throws JMSException if the JMS provider fails to stop message delivery
* due to some internal error.
* @see javax.jms.Connection#start()
@ -591,7 +591,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* a closed connection's session must throw an
* <CODE>IllegalStateException</CODE>. Closing a closed connection must
* NOT throw an exception.
*
*
* @throws JMSException if the JMS provider fails to close the connection
* due to some internal error. For example, a failure to
* release resources or to close a socket connection can
@ -651,7 +651,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQTempDestination c = i.next();
c.delete();
}
if (isConnectionInfoSentToBroker) {
// If we announced ourselfs to the broker.. Try to let
// the broker
@ -706,7 +706,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
@ -737,7 +737,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message
@ -788,7 +788,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Returns true if this connection has been started
*
*
* @return true if this Connection is started
*/
public boolean isStarted() {
@ -936,7 +936,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Enables or disables whether or not queue consumers should be exclusive or
* not for example to preserve ordering when not using <a
* href="http://activemq.apache.org/message-groups.html">Message Groups</a>
*
*
* @param exclusiveConsumer
*/
public void setExclusiveConsumer(boolean exclusiveConsumer) {
@ -958,7 +958,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public boolean isUseDedicatedTaskRunner() {
return useDedicatedTaskRunner;
}
public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
this.useDedicatedTaskRunner = useDedicatedTaskRunner;
}
@ -1023,7 +1023,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Used internally for adding Sessions to the Connection
*
*
* @param session
* @throws JMSException
* @throws JMSException
@ -1037,7 +1037,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Used interanlly for removing Sessions from a Connection
*
*
* @param session
*/
protected void removeSession(ActiveMQSession session) {
@ -1047,7 +1047,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Add a ConnectionConsumer
*
*
* @param connectionConsumer
* @throws JMSException
*/
@ -1057,7 +1057,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Remove a ConnectionConsumer
*
*
* @param connectionConsumer
*/
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
@ -1067,7 +1067,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a <CODE>TopicSession</CODE> object.
*
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@ -1091,7 +1091,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
*
* @param topic the topic to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@ -1119,7 +1119,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
*
* @param queue the queue to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@ -1147,7 +1147,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
*
* @param destination the destination to access
* @param messageSelector only messages with properties matching the message
* selector expression are delivered. A value of null or an
@ -1212,7 +1212,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Creates a <CODE>QueueSession</CODE> object.
*
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will
* acknowledge any messages it receives; ignored if the
@ -1238,7 +1238,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* If the clientID was not specified this method will throw an exception.
* This method is used to ensure that the clientID + durableSubscriber name
* are used correctly.
*
*
* @throws JMSException
*/
public void checkClientIDWasManuallySpecified() throws JMSException {
@ -1249,7 +1249,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* send a Packet through the Connection - for internal use only
*
*
* @param command
* @throws JMSException
*/
@ -1261,17 +1261,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
/**
* Send a packet through a Connection - for internal use only
*
*
* @param command
* @return
* @throws JMSException
@ -1311,7 +1311,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Send a packet through a Connection - for internal use only
*
*
* @param command
* @return
* @throws JMSException
@ -1324,25 +1324,25 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
private Response doSyncSendPacket(Command command, int timeout)
throws JMSException {
try {
Response response = (Response) (timeout > 0
? this.transport.request(command, timeout)
private Response doSyncSendPacket(Command command, int timeout)
throws JMSException {
try {
Response response = (Response) (timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
} else {
throw JMSExceptionSupport.create(er.getException());
}
}
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
/**
* @return statistics for this Connection
@ -1354,7 +1354,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* simply throws an exception if the Connection is already closed or the
* Transport has failed
*
*
* @throws JMSException
*/
protected synchronized void checkClosedOrFailed() throws JMSException {
@ -1366,7 +1366,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* simply throws an exception if the Connection is already closed
*
*
* @throws JMSException
*/
protected synchronized void checkClosed() throws JMSException {
@ -1377,7 +1377,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Send the ConnectionInfo to the Broker
*
*
* @throws JMSException
*/
protected void ensureConnectionInfoSent() throws JMSException {
@ -1391,12 +1391,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
info.setClientId(clientIdGenerator.generateId());
}
syncSendPacket(info.copy());
this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.
ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
if (watchTopicAdvisories) {
advisoryConsumer = new AdvisoryConsumer(this, consumerId);
@ -1439,13 +1439,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Set true if always require messages to be sync sent
*
*
* @param alwaysSyncSend
*/
public void setAlwaysSyncSend(boolean alwaysSyncSend) {
this.alwaysSyncSend = alwaysSyncSend;
}
/**
* @return the messagePrioritySupported
*/
@ -1509,7 +1509,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
* before calling this method.
*
*
* @throws IllegalStateException if the connection is in used.
*/
public void changeUserInfo(String userName, String password) throws JMSException {
@ -1614,7 +1614,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Enables an optimised acknowledgement mode where messages are acknowledged
* in batches rather than individually
*
*
* @param optimizeAcknowledge The optimizeAcknowledge to set.
*/
public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
@ -1650,7 +1650,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
}
/**
* @return the sendTimeout
*/
@ -1664,7 +1664,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}
/**
* @return the sendAcksAsync
*/
@ -1824,7 +1824,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.
*
*
* @param error the exception that the problem
*/
public void onClientInternalException(final Throwable error) {
@ -1836,14 +1836,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
});
} else {
LOG.debug("Async client internal exception occurred with no exception listener registered: "
LOG.debug("Async client internal exception occurred with no exception listener registered: "
+ error, error);
}
}
}
/**
* Used for handling async exceptions
*
*
* @param error
*/
public void onAsyncException(Throwable error) {
@ -1868,27 +1868,27 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
public void onException(final IOException error) {
onAsyncException(error);
if (!closing.get() && !closed.get()) {
executor.execute(new Runnable() {
public void run() {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
try {
cleanup();
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (Iterator<TransportListener> iter = transportListeners
.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
}
});
}
}
onAsyncException(error);
if (!closing.get() && !closed.get()) {
executor.execute(new Runnable() {
public void run() {
transportFailed(error);
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
try {
cleanup();
} catch (JMSException e) {
LOG.warn("Exception during connection cleanup, " + e, e);
}
for (Iterator<TransportListener> iter = transportListeners
.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.onException(error);
}
}
});
}
}
public void transportInterupted() {
this.transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
@ -1901,11 +1901,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
}
for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
connectionConsumer.clearMessagesInProgress();
connectionConsumer.clearMessagesInProgress();
}
for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
TransportListener listener = iter.next();
listener.transportInterupted();
@ -1921,7 +1921,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* Create the DestinationInfo object for the temporary destination.
*
*
* @param topic - if its true topic, else queue.
* @return DestinationInfo
* @throws JMSException
@ -2029,7 +2029,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* minimize context switches which boost performance. However sometimes its
* better to go slower to ensure that a single blocked consumer socket does
* not block delivery to other consumers.
*
*
* @param asyncDispatch If true then consumers created on this connection
* will default to having their messages dispatched
* asynchronously. The default value is false.
@ -2069,7 +2069,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, null, timeout);
}
public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
return createInputStream(dest, null, false);
}
@ -2085,7 +2085,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal, long timeout) throws JMSException {
return doCreateInputStream(dest, messageSelector, noLocal, name, timeout);
}
private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName, long timeout) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
@ -2112,7 +2112,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* Creates an output stream allowing full control over the delivery mode,
* the priority and time to live of the messages and the properties added to
* messages on the stream.
*
*
* @param streamProperties defines a map of key-value pairs where the keys
* are strings and the values are primitive values (numbers
* and strings) which are appended to the messages similarly
@ -2137,7 +2137,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
* message is part of a pending transaction or has not been acknowledged in
* the session.
*
*
* @param name the name used to identify this subscription
* @throws JMSException if the session fails to unsubscribe to the durable
* subscription due to some internal error.
@ -2290,11 +2290,11 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setAuditDepth(int auditDepth) {
connectionAudit.setAuditDepth(auditDepth);
}
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
}
}
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
@ -2308,13 +2308,13 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
connectionAudit.rollbackDuplicate(dispatcher, message);
}
public IOException getFirstFailureError() {
return firstFailureError;
}
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
public IOException getFirstFailureError() {
return firstFailureError;
}
protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
if (!closed.get() && !transportFailed.get() && cdl.getCount()>0) {
LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + cdl.getCount() + ") to complete..");
cdl.await(10, TimeUnit.SECONDS);
@ -2322,16 +2322,16 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
signalInterruptionProcessingComplete();
}
}
protected void transportInterruptionProcessingComplete() {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
cdl.countDown();
try {
signalInterruptionProcessingComplete();
} catch (InterruptedException ignored) {}
}
}
protected void transportInterruptionProcessingComplete() {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
if (cdl != null) {
cdl.countDown();
try {
signalInterruptionProcessingComplete();
} catch (InterruptedException ignored) {}
}
}
private void signalInterruptionProcessingComplete() throws InterruptedException {
CountDownLatch cdl = this.transportInterruptionProcessingComplete;
@ -2372,15 +2372,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
}
public long getConsumerFailoverRedeliveryWaitPeriod() {
return consumerFailoverRedeliveryWaitPeriod;
}
protected Scheduler getScheduler() {
return this.scheduler;
}
protected ThreadPoolExecutor getExecutor() {
return this.executor;
}
@ -2399,4 +2399,28 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.checkForDuplicates = checkForDuplicates;
}
/**
* Removes any TempDestinations that this connection has cached, ignoring
* any exceptions generated because the destination is in use as they should
* not be removed.
*/
public void cleanUpTempDestinations() {
if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
return;
}
Iterator<ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
= this.activeTempDestinations.entrySet().iterator();
while(entries.hasNext()) {
ConcurrentHashMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
try {
this.deleteTempDestination(entry.getValue());
} catch (Exception ex) {
// the temp dest is in use so it can not be deleted.
// it is ok to leave it to connection tear down phase
}
}
}
}

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.pool;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@ -36,18 +39,19 @@ import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.EnhancedConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQTempDestination;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* itself to the sessionPool.
*
*
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
*
*
*
*
*/
public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
@ -69,6 +73,9 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
public void close() throws JMSException {
if (this.pool != null) {
this.pool.decrementReferenceCount();
if (this.pool.getConnection() != null) {
this.pool.getConnection().cleanUpTempDestinations();
}
this.pool = null;
}
}
@ -143,7 +150,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
// EnhancedCollection API
// -------------------------------------------------------------------------
public DestinationSource getDestinationSource() throws JMSException {
return getConnection().getDestinationSource();
}
@ -169,5 +176,4 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
public String toString() {
return "PooledConnection { " + pool + " }";
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.Connection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version $Revision: 1.1 $
*/
public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(PooledConnectionFactoryWithTemporaryDestinationsTest.class);
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
pooledFactory = new PooledConnectionFactory(factory);
}
protected void tearDown() throws Exception {
broker.stop();
}
public void testTemporaryQueueLeakAfterConnectionClose() throws Exception {
Connection pooledConnection = null;
Session session = null;
Queue tempQueue = null;
for (int i = 0; i < 2; i++) {
pooledConnection = pooledFactory.createConnection();
session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
tempQueue = session.createTemporaryQueue();
LOG.info("Created queue named: " + tempQueue.getQueueName());
pooledConnection.close();
}
assertEquals(0, countBrokerTemporaryQueues());
}
public void testTemporaryTopicLeakAfterConnectionClose() throws Exception {
Connection pooledConnection = null;
Session session = null;
Topic tempTopic = null;
for (int i = 0; i < 2; i++) {
pooledConnection = pooledFactory.createConnection();
session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
tempTopic = session.createTemporaryTopic();
LOG.info("Created topic named: " + tempTopic.getTopicName());
pooledConnection.close();
}
assertEquals(0, countBrokerTemporaryTopics());
}
private int countBrokerTemporaryQueues() throws Exception {
return ((RegionBroker) broker.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
}
private int countBrokerTemporaryTopics() throws Exception {
return ((RegionBroker) broker.getRegionBroker()).getTempTopicRegion().getDestinationMap().size();
}
}