The VMTransport now let's it's peer know when it's being stopped so that the Peer can give its' transport listener a peer disconnected exception. Otherwise a VM transport client could disconnect without the server side knowing it disconnected and the server side would not terminate it's side of the connection. This could be seen as a memory leak on when the static network config is setup and one of the static brokers is not up.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@593204 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-11-08 15:39:51 +00:00
parent 5c45b00b1c
commit 2edad74ca9
1 changed files with 18 additions and 3 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.util.IOExceptionSupport;
*/ */
public class VMTransport implements Transport, Task { public class VMTransport implements Transport, Task {
private static final Object DISCONNECT = new Object();
private static final AtomicLong NEXT_ID = new AtomicLong(0); private static final AtomicLong NEXT_ID = new AtomicLong(0);
private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000);
protected VMTransport peer; protected VMTransport peer;
@ -91,7 +92,11 @@ public class VMTransport implements Transport, Task {
peer.getMessageQueue().put(command); peer.getMessageQueue().put(command);
peer.wakeup(); peer.wakeup();
} else { } else {
peer.transportListener.onCommand(command); if( command == DISCONNECT ) {
peer.transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
peer.transportListener.onCommand(command);
}
} }
enqueueValve.decrement(); enqueueValve.decrement();
} else { } else {
@ -137,6 +142,12 @@ public class VMTransport implements Transport, Task {
// If stop() is called while being start()ed.. then we can't stop until we return to the start() method. // If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
if( enqueueValve.isOn() ) { if( enqueueValve.isOn() ) {
// let the peer know that we are disconnecting..
try {
oneway(DISCONNECT);
} catch (Exception ignore) {
}
TaskRunner tr = null; TaskRunner tr = null;
try { try {
enqueueValve.turnOff(); enqueueValve.turnOff();
@ -183,9 +194,13 @@ public class VMTransport implements Transport, Task {
} }
LinkedBlockingQueue<Object> mq = getMessageQueue(); LinkedBlockingQueue<Object> mq = getMessageQueue();
Command command = (Command)mq.poll(); Object command = mq.poll();
if (command != null) { if (command != null) {
tl.onCommand(command); if( command == DISCONNECT ) {
tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
tl.onCommand(command);
}
return !mq.isEmpty(); return !mq.isEmpty();
} else { } else {
return false; return false;