r244@34: chirino | 2007-02-23 14:49:32 -0500

Fix for Memory limits for topics was not returning to normal after it's consumers are disconnected
 
 


git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-4.1@511088 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-02-23 20:25:14 +00:00
parent 043edbe4a0
commit b1783a21ab
1 changed files with 86 additions and 45 deletions

View File

@ -77,6 +77,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicReference;
/** /**
* @version $Revision: 1.8 $ * @version $Revision: 1.8 $
@ -105,7 +106,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
// Used to do async dispatch.. this should perhaps be pushed down into the transport layer.. // Used to do async dispatch.. this should perhaps be pushed down into the transport layer..
protected final List dispatchQueue = Collections.synchronizedList(new LinkedList()); protected final List dispatchQueue = Collections.synchronizedList(new LinkedList());
protected final TaskRunner taskRunner; protected final TaskRunner taskRunner;
protected IOException transportException; protected final AtomicReference transportException = new AtomicReference();
private boolean inServiceException=false; private boolean inServiceException=false;
private ConnectionStatistics statistics = new ConnectionStatistics(); private ConnectionStatistics statistics = new ConnectionStatistics();
@ -126,7 +127,8 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
protected final AtomicBoolean asyncException = new AtomicBoolean(false); protected final AtomicBoolean asyncException = new AtomicBoolean(false);
private ConnectionContext context; private ConnectionContext context;
private boolean networkConnection; private boolean networkConnection;
private CountDownLatch dispatchStopped = new CountDownLatch(1); private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
static class ConnectionState extends org.apache.activemq.state.ConnectionState { static class ConnectionState extends org.apache.activemq.state.ConnectionState {
private final ConnectionContext context; private final ConnectionContext context;
@ -180,7 +182,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
Command command = (Command) o; Command command = (Command) o;
Response response = service(command); Response response = service(command);
if (response != null) { if (response != null) {
dispatch(response); dispatchSync(response);
} }
} }
@ -206,7 +208,7 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
public void serviceTransportException(IOException e) { public void serviceTransportException(IOException e) {
if( !disposed.get() ) { if( !disposed.get() ) {
transportException = e; transportException.set(e);
if( transportLog.isDebugEnabled() ) if( transportLog.isDebugEnabled() )
transportLog.debug("Transport failed: "+e,e); transportLog.debug("Transport failed: "+e,e);
ServiceSupport.dispose(this); ServiceSupport.dispose(this);
@ -766,26 +768,46 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
public void dispatchSync(Command message) { public void dispatchSync(Command message) {
getStatistics().getEnqueues().increment(); getStatistics().getEnqueues().increment();
processDispatch(message); try {
processDispatch(message);
} catch (IOException e) {
serviceExceptionAsync(e);
}
} }
public void dispatchAsync(Command message) { public void dispatchAsync(Command message) {
getStatistics().getEnqueues().increment(); if( !disposed.get() ) {
if( taskRunner==null ) { getStatistics().getEnqueues().increment();
dispatchSync( message ); if( taskRunner==null ) {
} else { dispatchSync( message );
dispatchQueue.add(message); } else {
try { dispatchQueue.add(message);
taskRunner.wakeup(); try {
} catch (InterruptedException e) { taskRunner.wakeup();
Thread.currentThread().interrupt(); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} }
} } else {
if(message.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) message;
Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
}
} }
protected void processDispatch(Command command){ protected void processDispatch(Command command) throws IOException {
try { try {
if( !disposed.get() ) {
dispatch(command);
}
} finally {
if(command.isMessageDispatch()){ if(command.isMessageDispatch()){
MessageDispatch md=(MessageDispatch) command; MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer(); Runnable sub=(Runnable) md.getConsumer();
@ -793,25 +815,43 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if(sub!=null){ if(sub!=null){
sub.run(); sub.run();
} }
dispatch(command);
} else if( command.isShutdownInfo() ) {
dispatch(command);
dispatchStopped.countDown();
} else {
dispatch(command);
} }
} finally {
getStatistics().getDequeues().increment(); getStatistics().getDequeues().increment();
} }
} }
public boolean iterate() { public boolean iterate() {
if( dispatchQueue.isEmpty() || broker.isStopped()) { try {
return false; if( disposed.get() ) {
} else { if( dispatchStopped.compareAndSet(false, true)) {
Command command = (Command) dispatchQueue.remove(0); if( transportException.get()==null ) {
processDispatch( command ); dispatch(new ShutdownInfo());
return true; }
dispatchStoppedLatch.countDown();
}
return false;
}
if( !dispatchStopped.get() ) {
if( dispatchQueue.isEmpty() ) {
return false;
} else {
Command command = (Command) dispatchQueue.remove(0);
processDispatch( command );
return true;
}
} else {
return false;
}
} catch (IOException e) {
if( dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown();
}
serviceExceptionAsync(e);
return false;
} }
} }
@ -880,22 +920,25 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
if(disposed.compareAndSet(false, true)) { if(disposed.compareAndSet(false, true)) {
// Clear out what's on the queue so that we can send the Shutdown command quicker. taskRunner.wakeup();
dispatchQueue.clear(); dispatchStoppedLatch.await();
if( transportException==null ) {
// Wait up to 10 seconds for the shutdown command to be sent to
// the client.
dispatchAsync(new ShutdownInfo());
dispatchStopped.await(10, TimeUnit.SECONDS);
}
if( taskRunner!=null ) if( taskRunner!=null )
taskRunner.shutdownNoWait(); taskRunner.shutdown();
// Clear out the dispatch queue to release any memory that
// is being held on to.
dispatchQueue.clear();
// Run the MessageDispatch callbacks so that message references get cleaned up.
for (Iterator iter = dispatchQueue.iterator(); iter.hasNext();) {
Command command = (Command) iter.next();
if(command.isMessageDispatch()) {
MessageDispatch md=(MessageDispatch) command;
Runnable sub=(Runnable) md.getConsumer();
broker.processDispatch(md);
if(sub!=null){
sub.run();
}
}
}
// //
// Remove all logical connection associated with this connection // Remove all logical connection associated with this connection
// from the broker. // from the broker.
@ -1077,12 +1120,10 @@ public class TransportConnection implements Service, Connection, Task, CommandVi
return null; return null;
} }
protected void dispatch(Command command) { protected void dispatch(Command command) throws IOException {
try { try {
setMarkedCandidate(true); setMarkedCandidate(true);
transport.oneway(command); transport.oneway(command);
} catch(IOException e){
serviceExceptionAsync(e);
} finally{ } finally{
setMarkedCandidate(false); setMarkedCandidate(false);
} }