From 2edad74ca9150ee67b30b0b01a6ba6ef72f670a8 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 8 Nov 2007 15:39:51 +0000 Subject: [PATCH] The VMTransport now let's it's peer know when it's being stopped so that the Peer can give its' transport listener a peer disconnected exception. Otherwise a VM transport client could disconnect without the server side knowing it disconnected and the server side would not terminate it's side of the connection. This could be seen as a memory leak on when the static network config is setup and one of the static brokers is not up. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@593204 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/vm/VMTransport.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 799fe24dbe..f7d33bcd1c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -43,6 +43,7 @@ import org.apache.activemq.util.IOExceptionSupport; */ public class VMTransport implements Transport, Task { + private static final Object DISCONNECT = new Object(); private static final AtomicLong NEXT_ID = new AtomicLong(0); private static final TaskRunnerFactory TASK_RUNNER_FACTORY = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000); protected VMTransport peer; @@ -91,7 +92,11 @@ public class VMTransport implements Transport, Task { peer.getMessageQueue().put(command); peer.wakeup(); } else { - peer.transportListener.onCommand(command); + if( command == DISCONNECT ) { + peer.transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); + } else { + peer.transportListener.onCommand(command); + } } enqueueValve.decrement(); } else { @@ -137,6 +142,12 @@ public class VMTransport implements Transport, Task { // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. if( enqueueValve.isOn() ) { + // let the peer know that we are disconnecting.. + try { + oneway(DISCONNECT); + } catch (Exception ignore) { + } + TaskRunner tr = null; try { enqueueValve.turnOff(); @@ -183,9 +194,13 @@ public class VMTransport implements Transport, Task { } LinkedBlockingQueue mq = getMessageQueue(); - Command command = (Command)mq.poll(); + Object command = mq.poll(); if (command != null) { - tl.onCommand(command); + if( command == DISCONNECT ) { + tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); + } else { + tl.onCommand(command); + } return !mq.isEmpty(); } else { return false;