mirror of https://github.com/apache/activemq.git
r245@34: chirino | 2007-02-23 14:49:44 -0500
We now can shutdown a connection that is blocked on send due to queue limits. Stats tracking code added recently was causing a NPE at times. Added a gaurd against the NPE git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b1783a21ab
commit
f56d9221ad
|
@ -18,6 +18,7 @@
|
|||
package org.apache.activemq.broker;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
|
@ -56,6 +57,7 @@ public class ConnectionContext {
|
|||
private AtomicInteger referenceCounter = new AtomicInteger();
|
||||
private boolean dontSendReponse;
|
||||
private boolean networkConnection;
|
||||
private final AtomicBoolean stopping = new AtomicBoolean();
|
||||
|
||||
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
|
||||
|
||||
|
@ -264,4 +266,8 @@ public class ConnectionContext {
|
|||
this.networkConnection = networkConnection;
|
||||
}
|
||||
|
||||
public AtomicBoolean getStopping() {
|
||||
return stopping;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -920,6 +920,16 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
|||
|
||||
if(disposed.compareAndSet(false, true)) {
|
||||
|
||||
|
||||
// Let all the connection contexts know we are shutting down
|
||||
// so that in progress operations can notice and unblock.
|
||||
ArrayList l=new ArrayList(localConnectionStates.values());
|
||||
for(Iterator iter=l.iterator();iter.hasNext();){
|
||||
ConnectionState cs=(ConnectionState) iter.next();
|
||||
cs.getContext().getStopping().set(true);
|
||||
}
|
||||
|
||||
|
||||
taskRunner.wakeup();
|
||||
dispatchStoppedLatch.await();
|
||||
|
||||
|
@ -943,7 +953,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
|
|||
// Remove all logical connection associated with this connection
|
||||
// from the broker.
|
||||
if(!broker.isStopped()){
|
||||
ArrayList l=new ArrayList(localConnectionStates.keySet());
|
||||
l=new ArrayList(localConnectionStates.keySet());
|
||||
for(Iterator iter=l.iterator();iter.hasNext();){
|
||||
ConnectionId connectionId=(ConnectionId) iter.next();
|
||||
try{
|
||||
|
|
|
@ -315,7 +315,10 @@ public class Queue implements Destination {
|
|||
if (usageManager.isSendFailIfNoSpace() ) {
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
} else {
|
||||
usageManager.waitForSpace();
|
||||
while( !usageManager.waitForSpace(1000) ) {
|
||||
if( context.getStopping().get() )
|
||||
throw new IOException("Connection closed, send aborted.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -236,6 +236,10 @@ public class Topic implements Destination {
|
|||
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
|
||||
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
|
||||
} else {
|
||||
while( !usageManager.waitForSpace(1000) ) {
|
||||
if( context.getStopping().get() )
|
||||
throw new IOException("Connection closed, send aborted.");
|
||||
}
|
||||
usageManager.waitForSpace();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,7 +162,7 @@ public class TopicSubscription extends AbstractSubscription{
|
|||
context.getTransaction().addSynchronization(new Synchronization(){
|
||||
public void afterCommit() throws Exception{
|
||||
synchronized( TopicSubscription.this ) {
|
||||
if( singleDestination ) {
|
||||
if( singleDestination && destination!=null) {
|
||||
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
|
||||
}
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ public class TopicSubscription extends AbstractSubscription{
|
|||
});
|
||||
}else{
|
||||
|
||||
if( singleDestination ) {
|
||||
if( singleDestination && destination!=null ) {
|
||||
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
|
||||
}
|
||||
|
||||
|
|
|
@ -95,6 +95,19 @@ public class UsageManager {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean waitForSpace(long timeout) throws InterruptedException {
|
||||
if(parent!=null) {
|
||||
if( !parent.waitForSpace(timeout) )
|
||||
return false;
|
||||
}
|
||||
synchronized (usageMutex) {
|
||||
if( percentUsage >= 100 ) {
|
||||
usageMutex.wait(timeout);
|
||||
}
|
||||
return percentUsage < 100;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param callback
|
||||
* @return true if the UsageManager was full. The callback will only be called if this method returns true.
|
||||
|
|
Loading…
Reference in New Issue