Hiram R. Chirino 2006-07-27 16:52:50 +00:00
parent 8abb655e82
commit acd599606e
8 changed files with 64 additions and 5 deletions

View File

@ -540,10 +540,12 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (!closed.get()) {
closing.set(true);
if (advisoryConsumer != null) {
advisoryConsumer.dispose();
advisoryConsumer = null;
}
this.factoryStats.removeConnection(this);
if( advisoryConsumer!=null ) {
advisoryConsumer.dispose();
advisoryConsumer=null;
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();

View File

@ -389,6 +389,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
}
public Response processBrokerInfo(BrokerInfo info) {
// We only expect to get one broker info command per connection
if( this.brokerInfo!=null ) {
log.warn("Unexpected extra broker info command received: "+info);
}
this.brokerInfo = info;
broker.addBroker(this, info);
return null;
}

View File

@ -146,6 +146,7 @@ public class IndirectMessageReference implements MessageReference {
synchronized public void drop() {
dropped=true;
lockOwner = null;
if( !persistent && message!=null ) {
message.decrementReferenceCount();
message=null;
@ -156,7 +157,7 @@ public class IndirectMessageReference implements MessageReference {
if( !regionDestination.lock(this, subscription) )
return false;
synchronized (this) {
if( lockOwner!=null && lockOwner!=subscription )
if( dropped || (lockOwner!=null && lockOwner!=subscription) )
return false;
lockOwner = subscription;
return true;

View File

@ -51,6 +51,7 @@ public class Scheduler {
ScheduledFuture ticket = (ScheduledFuture) clockTickets.remove(task);
if( ticket!=null ) {
ticket.cancel(true);
clockDaemon.remove(task);
}
}

View File

@ -19,6 +19,18 @@
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.org/config/1.0">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
<transportConnector uri="vm://broker1"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.org/config/1.0">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
<transportConnector uri="vm://broker2"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.org/config/1.0">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
<transportConnector uri="vm://broker1"/>

View File

@ -19,6 +19,18 @@
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false" xmlns="http://activemq.org/config/1.0">
<!-- disabling the subscription recovery policy allows us take memory leaks easier -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<subscriptionRecoveryPolicy>
<noSubscriptionRecoveryPolicy />
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries></policyMap>
</destinationPolicy>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
<transportConnector uri="vm://broker2"/>