diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index b853cd91cd..4a3dbee461 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -17,14 +17,17 @@ package org.apache.activemq.transport.vm; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.URI; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.Command; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.thread.Valve; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; @@ -54,23 +57,17 @@ public class VMTransport implements Transport, Task { protected final URI location; protected final long id; private TaskRunner taskRunner; - private final Object mutex = new Object(); - + private final Object lazyInitMutext = new Object(); + private final Valve enqueueValve = new Valve(true); + private final AtomicBoolean stopping = new AtomicBoolean(); + public VMTransport(URI location) { this.location = location; this.id = NEXT_ID.getAndIncrement(); } - public VMTransport getPeer() { - synchronized (mutex) { - return peer; - } - } - public void setPeer(VMTransport peer) { - synchronized (mutex) { - this.peer = peer; - } + this.peer = peer; } public void oneway(Object command) throws IOException { @@ -81,34 +78,131 @@ public class VMTransport implements Transport, Task { throw new IOException("Peer not connected."); } - TransportListener tl = null; - synchronized (peer.mutex) { - if (peer.disposed) { + try { + // Disable the peer from changing his state while we try to enqueue onto him. + peer.enqueueValve.increment(); + + if (peer.disposed || peer.stopping.get()) { throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); } + if (peer.started) { if (peer.async) { - peer.enqueue(command); + peer.getMessageQueue().put(command); peer.wakeup(); } else { - tl = peer.transportListener; + peer.transportListener.onCommand(command); } + enqueueValve.decrement(); } else { - peer.enqueue(command); + peer.getMessageQueue().put(command); } - } - - if (tl != null) { - tl.onCommand(command); + + } catch (InterruptedException e) { + throw IOExceptionSupport.create(e); + } finally { + // Allow the peer to change state again... + peer.enqueueValve.decrement(); } } - private void enqueue(Object command) throws IOException { + public void start() throws Exception { + if (transportListener == null) { + throw new IOException("TransportListener not set."); + } try { - getMessageQueue().put(command); - } catch (final InterruptedException e) { - throw IOExceptionSupport.create(e); + enqueueValve.turnOff(); + if (messageQueue != null && !async) { + Object command; + while ((command = messageQueue.poll()) != null) { + transportListener.onCommand(command); + } + } + started = true; + wakeup(); + } finally { + enqueueValve.turnOn(); + } + } + + public void stop() throws Exception { + TaskRunner tr = null; + try { + stopping.set(true); + enqueueValve.turnOff(); + if (!disposed) { + started = false; + disposed = true; + if (taskRunner != null) { + tr = taskRunner; + taskRunner = null; + } + } + } finally { + stopping.set(false); + enqueueValve.turnOn(); + } + if (tr != null) { + tr.shutdown(1000); + } + } + + /** + * @see org.apache.activemq.thread.Task#iterate() + */ + public boolean iterate() { + + final TransportListener tl; + try { + // Disable changing the state variables while we are running... + enqueueValve.increment(); + tl = transportListener; + if (!started || disposed || tl == null || stopping.get()) { + if( stopping.get() ) { + // drain the queue it since folks could be blocked putting on to + // it and that would not allow the stop() method for finishing up. + getMessageQueue().clear(); + } + return false; + } + } catch (InterruptedException e) { + return false; + } finally { + enqueueValve.decrement(); + } + + LinkedBlockingQueue mq = getMessageQueue(); + Command command = (Command)mq.poll(); + if (command != null) { + tl.onCommand(command); + return !mq.isEmpty(); + } else { + return false; + } + + } + + public void setTransportListener(TransportListener commandListener) { + try { + try { + enqueueValve.turnOff(); + this.transportListener = commandListener; + wakeup(); + } finally { + enqueueValve.turnOn(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private LinkedBlockingQueue getMessageQueue() { + synchronized (lazyInitMutext) { + if (messageQueue == null) { + messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); + } + return messageQueue; } } @@ -125,59 +219,7 @@ public class VMTransport implements Transport, Task { } public TransportListener getTransportListener() { - synchronized (mutex) { - return transportListener; - } - } - - public void setTransportListener(TransportListener commandListener) { - synchronized (mutex) { - this.transportListener = commandListener; - wakeup(); - } - } - - private LinkedBlockingQueue getMessageQueue() { - synchronized (mutex) { - if (messageQueue == null) { - messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); - } - return messageQueue; - } - } - - public void start() throws Exception { - if (transportListener == null) { - throw new IOException("TransportListener not set."); - } - - synchronized (mutex) { - if (messageQueue != null) { - Object command; - while ((command = messageQueue.poll()) != null) { - transportListener.onCommand(command); - } - } - started = true; - wakeup(); - } - } - - public void stop() throws Exception { - TaskRunner tr = null; - synchronized (mutex) { - if (!disposed) { - started = false; - disposed = true; - if (taskRunner != null) { - tr = taskRunner; - taskRunner = null; - } - } - } - if (tr != null) { - tr.shutdown(1000); - } + return transportListener; } public T narrow(Class target) { @@ -214,28 +256,6 @@ public class VMTransport implements Transport, Task { return null; } - /** - * @see org.apache.activemq.thread.Task#iterate() - */ - public boolean iterate() { - final TransportListener tl; - synchronized (mutex) { - tl = transportListener; - if (!started || disposed || tl == null) { - return false; - } - } - - LinkedBlockingQueue mq = getMessageQueue(); - final Command command = (Command)mq.poll(); - if (command != null) { - tl.onCommand(command); - return !mq.isEmpty(); - } else { - return false; - } - } - /** * @return the async */ @@ -266,7 +286,7 @@ public class VMTransport implements Transport, Task { protected void wakeup() { if (async) { - synchronized (mutex) { + synchronized (lazyInitMutext) { if (taskRunner == null) { taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString()); }