Jonas B. Lim 2007-03-05 17:03:36 +00:00
parent 49ea0eddf6
commit b743552137
6 changed files with 68 additions and 8 deletions

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.region.MessageReference;
@ -55,6 +56,7 @@ public class ConnectionContext {
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
private boolean networkConnection;
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
public ConnectionContext() {
@ -253,4 +255,9 @@ public class ConnectionContext {
public synchronized void setNetworkConnection(boolean networkConnection) {
this.networkConnection = networkConnection;
}
public AtomicBoolean getStopping() {
return stopping;
}
}

View File

@ -111,7 +111,8 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
private boolean starting;
private boolean pendingStop;
private long timeStamp=0;
private AtomicBoolean stopped=new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean transportDisposed = new AtomicBoolean();
private final AtomicBoolean disposed=new AtomicBoolean(false);
private CountDownLatch stopLatch=new CountDownLatch(1);
private final AtomicBoolean asyncException=new AtomicBoolean(false);
@ -846,8 +847,24 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
transport.stop();
active=false;
if(disposed.compareAndSet(false,true)){
taskRunner.wakeup();
dispatchStoppedLatch.await();
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
ArrayList l=new ArrayList(localConnectionStates.values());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionState cs=(ConnectionState) iter.next();
cs.getContext().getStopping().set(true);
}
if( taskRunner!=null ) {
taskRunner.wakeup();
// Give it a change to stop gracefully.
dispatchStoppedLatch.await(5, TimeUnit.SECONDS);
disposeTransport();
taskRunner.shutdown();
} else {
disposeTransport();
}
if( taskRunner!=null )
taskRunner.shutdown();
@ -868,7 +885,7 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
// Remove all logical connection associated with this connection
// from the broker.
if(!broker.isStopped()){
ArrayList l=new ArrayList(localConnectionStates.keySet());
l=new ArrayList(localConnectionStates.keySet());
for(Iterator iter=l.iterator();iter.hasNext();){
ConnectionId connectionId=(ConnectionId)iter.next();
try{
@ -884,7 +901,6 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
}
stopLatch.countDown();
}
log.debug("Stopped connection: "+transport.getRemoteAddress());
}
}
@ -1122,4 +1138,16 @@ public class TransportConnection implements Service,Connection,Task,CommandVisit
consumerExchanges.remove(id);
}
}
protected void disposeTransport() {
if( transportDisposed.compareAndSet(false, true) ) {
try {
transport.stop();
active = false;
log.debug("Stopped connection: "+transport.getRemoteAddress());
} catch (Exception e) {
log.debug("Could not stop transport: "+e,e);
}
}
}
}

View File

@ -329,7 +329,10 @@ public class Queue implements Destination, Task {
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
usageManager.waitForSpace();
while( !usageManager.waitForSpace(1000) ) {
if( context.getStopping().get() )
throw new IOException("Connection closed, send aborted.");
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if(message.isExpired()){

View File

@ -247,6 +247,10 @@ public class Topic implements Destination {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else {
while( !usageManager.waitForSpace(1000) ) {
if( context.getStopping().get() )
throw new IOException("Connection closed, send aborted.");
}
usageManager.waitForSpace();
// The usage manager could have delayed us by the time

View File

@ -175,7 +175,7 @@ public class TopicSubscription extends AbstractSubscription{
public void afterCommit() throws Exception{
synchronized(TopicSubscription.this){
if(singleDestination){
if( singleDestination && destination!=null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
@ -184,7 +184,7 @@ public class TopicSubscription extends AbstractSubscription{
}
});
}else{
if(singleDestination){
if( singleDestination && destination!=null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());

View File

@ -116,6 +116,24 @@ public class UsageManager implements Service{
}
}
}
/**
* @throws InterruptedException
*
* @param timeout
*/
public boolean waitForSpace(long timeout) throws InterruptedException {
if(parent!=null) {
if( !parent.waitForSpace(timeout) )
return false;
}
synchronized (usageMutex) {
if( percentUsage >= 100 ) {
usageMutex.wait(timeout);
}
return percentUsage < 100;
}
}
/**
* Increases the usage by the value amount.