Hiram R. Chirino 2006-07-27 00:44:17 +00:00
parent 91156e667e
commit 3372966d20
8 changed files with 60 additions and 1 deletions

View File

@ -535,6 +535,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (!closed.get()) {
closing.set(true);
this.factoryStats.removeConnection(this);
if( advisoryConsumer!=null ) {
advisoryConsumer.dispose();
advisoryConsumer=null;

View File

@ -389,6 +389,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processBrokerInfo(BrokerInfo info) {
// We only expect to get one broker info command per connection
if( this.brokerInfo!=null ) {
log.warn("Unexpected extra broker info command received: "+info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
return null;
}

View File

@ -146,6 +146,7 @@ public class IndirectMessageReference implements MessageReference {
synchronized public void drop() {
dropped=true;
lockOwner = null;
if( !persistent && message!=null ) {
message.decrementReferenceCount();
message=null;
@ -156,7 +157,7 @@ public class IndirectMessageReference implements MessageReference {
if( !regionDestination.lock(this, subscription) )
return false;
synchronized (this) {
if( lockOwner!=null && lockOwner!=subscription )
if( dropped || (lockOwner!=null && lockOwner!=subscription) )
return false;
lockOwner = subscription;
return true;

View File

@ -51,6 +51,7 @@ public class Scheduler {
ScheduledFuture ticket = (ScheduledFuture) clockTickets.remove(task);
if( ticket!=null ) {
ticket.cancel(true);
clockDaemon.remove(task);
}
}

View File

@ -19,6 +19,18 @@
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
<transportConnector uri="vm://broker1"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
<transportConnector uri="vm://broker2"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
<transportConnector uri="vm://broker1"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
<transportConnector uri="vm://broker2"/>