diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 066a934b07..1efb9c7755 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -137,6 +140,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br private TransportConnection duplexInitiatingConnection; private BrokerService brokerService = null; private ObjectName mbeanObjectName; + private ExecutorService serialExecutor = Executors.newSingleThreadExecutor(); public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -355,6 +359,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br brokerService.getTaskRunnerFactory().execute(new Runnable() { public void run() { try { + serialExecutor.shutdown(); + if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + List pendingTasks = serialExecutor.shutdownNow(); + LOG.info("pending tasks on stop" + pendingTasks); + } localBroker.oneway(new ShutdownInfo()); remoteBroker.oneway(new ShutdownInfo()); } catch (Throwable e) { @@ -594,7 +603,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } else if (data.getClass() == DestinationInfo.class) { // It's a destination info - we want to pass up information about temporary destinations - DestinationInfo destInfo = (DestinationInfo) data; + final DestinationInfo destInfo = (DestinationInfo) data; BrokerId[] path = destInfo.getBrokerPath(); if (path != null && path.length >= networkTTL) { if (LOG.isDebugEnabled()) { @@ -619,7 +628,20 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br if (LOG.isTraceEnabled()) { LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo); } - localBroker.oneway(destInfo); + if (destInfo.isRemoveOperation()) { + // serialise with removeSub operations such that all removeSub advisories are generated + serialExecutor.execute(new Runnable() { + public void run() { + try { + localBroker.oneway(destInfo); + } catch (IOException e) { + LOG.warn("failed to deliver remove command for destination:" + destInfo.getDestination(), e); + } + } + }); + } else { + localBroker.oneway(destInfo); + } } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); removeDemandSubscription(id); @@ -658,7 +680,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); // continue removal in separate thread to free up this thread for outstanding responses - brokerService.getTaskRunnerFactory().execute(new Runnable() { + // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations + // such that all removeSub advisories are generated + serialExecutor.execute(new Runnable() { public void run() { sub.waitForCompletion(); try { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 8fd3b4c427..3767290ea3 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -662,10 +662,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon c.dispose(); } - // As TemporaryQueue and TemporaryTopic instances are bound - // to a connection we should just delete them after the connection - // is closed to free up memory - cleanUpTempDestinations(); + this.activeTempDestinations.clear(); if (isConnectionInfoSentToBroker) { // If we announced ourselves to the broker.. Try to let the broker @@ -2527,6 +2524,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * Removes any TempDestinations that this connection has cached, ignoring * any exceptions generated because the destination is in use as they should * not be removed. + * Used from a pooled connection, b/c it will not be explicitly closed. */ public void cleanUpTempDestinations() { @@ -2612,7 +2610,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that * have been configured with optimizeAcknowledge enabled. * - * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set + * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set */ public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;