diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index e2cd6df0f6..82e403be42 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -540,10 +540,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon if (!closed.get()) { closing.set(true); - if (advisoryConsumer != null) { - advisoryConsumer.dispose(); - advisoryConsumer = null; - } + this.factoryStats.removeConnection(this); + + if( advisoryConsumer!=null ) { + advisoryConsumer.dispose(); + advisoryConsumer=null; + } for (Iterator i = this.sessions.iterator(); i.hasNext();) { ActiveMQSession s = (ActiveMQSession) i.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java index 7acb17bc37..b95635a7f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java @@ -389,6 +389,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C } public Response processBrokerInfo(BrokerInfo info) { + + // We only expect to get one broker info command per connection + if( this.brokerInfo!=null ) { + log.warn("Unexpected extra broker info command received: "+info); + } + + this.brokerInfo = info; broker.addBroker(this, info); return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java index adad07e524..bc20cbf576 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java @@ -146,6 +146,7 @@ public class IndirectMessageReference implements MessageReference { synchronized public void drop() { dropped=true; + lockOwner = null; if( !persistent && message!=null ) { message.decrementReferenceCount(); message=null; @@ -156,7 +157,7 @@ public class IndirectMessageReference implements MessageReference { if( !regionDestination.lock(this, subscription) ) return false; synchronized (this) { - if( lockOwner!=null && lockOwner!=subscription ) + if( dropped || (lockOwner!=null && lockOwner!=subscription) ) return false; lockOwner = subscription; return true; diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java index 0c866f159c..236a6cf474 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java @@ -51,6 +51,7 @@ public class Scheduler { ScheduledFuture ticket = (ScheduledFuture) clockTickets.remove(task); if( ticket!=null ) { ticket.cancel(true); + clockDaemon.remove(task); } } diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml index cef196af0f..0c50a0df12 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml @@ -19,6 +19,18 @@ + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml index a0c2256615..40d26c3a79 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml @@ -19,6 +19,18 @@ + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml index c412c156a4..73dfe232ff 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml @@ -19,6 +19,18 @@ + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml index c9c97c6570..1d646eb157 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml @@ -19,6 +19,18 @@ + + + + + + + + + + + +