From bc8441bebc92483c2b1ea7298022a4bccb1403bf Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 10 Feb 2012 20:19:44 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3684 - resolve deadlock on blocked oneway, revise sync and lazy init, remove use of valve git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1242912 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/vm/VMTransport.java | 228 +++++++----------- .../transport/vm/VMTransportServer.java | 2 +- 2 files changed, 85 insertions(+), 145 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 1c51205d20..320e501365 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -26,20 +26,14 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; -import org.apache.activemq.thread.TaskRunnerFactory; -import org.apache.activemq.thread.Valve; import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportListener; -import org.apache.activemq.util.IOExceptionSupport; - /** * A Transport implementation that uses direct method invocations. - * - * */ public class VMTransport implements Transport, Task { @@ -47,21 +41,23 @@ public class VMTransport implements Transport, Task { private static final AtomicLong NEXT_ID = new AtomicLong(0); protected VMTransport peer; protected TransportListener transportListener; - protected boolean disposed; protected boolean marshal; protected boolean network; protected boolean async = true; protected int asyncQueueDepth = 2000; - protected LinkedBlockingQueue messageQueue; - protected boolean started; protected final URI location; protected final long id; - private TaskRunner taskRunner; - private final Object lazyInitMutext = new Object(); - private final Valve enqueueValve = new Valve(true); - protected final AtomicBoolean stopping = new AtomicBoolean(); + protected LinkedBlockingQueue messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); + private TaskRunner taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); + private volatile int receiveCounter; - + + // Managed Sate access protected by locks. + protected final AtomicBoolean stopping = new AtomicBoolean(); + protected final AtomicBoolean started = new AtomicBoolean(); + protected final AtomicBoolean starting = new AtomicBoolean(); + protected final AtomicBoolean disposed = new AtomicBoolean(); + public VMTransport(URI location) { this.location = location; this.id = NEXT_ID.getAndIncrement(); @@ -72,50 +68,52 @@ public class VMTransport implements Transport, Task { } public void oneway(Object command) throws IOException { - if (disposed) { + if (disposed.get()) { throw new TransportDisposedIOException("Transport disposed."); } if (peer == null) { throw new IOException("Peer not connected."); } - - TransportListener transportListener=null; + TransportListener transportListener = null; try { - // Disable the peer from changing his state while we try to enqueue onto him. - peer.enqueueValve.increment(); - - if (peer.disposed || peer.stopping.get()) { + if (peer.disposed.get() || peer.stopping.get()) { throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); } - - if (peer.started) { + + if (peer.started.get()) { if (peer.async) { - peer.getMessageQueue().put(command); + peer.messageQueue.put(command); peer.wakeup(); } else { transportListener = peer.transportListener; } } else { - peer.getMessageQueue().put(command); + peer.messageQueue.put(command); + synchronized (peer.starting) { + if (peer.started.get() && !peer.messageQueue.isEmpty()) { + // we missed the pending dispatch during start + if (peer.async) { + peer.wakeup(); + } else { + transportListener = peer.transportListener; + } + } + } } - } catch (InterruptedException e) { InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); iioe.initCause(e); throw iioe; - } finally { - // Allow the peer to change state again... - peer.enqueueValve.decrement(); } - dispatch(peer, transportListener, command); } - + public void dispatch(VMTransport transport, TransportListener transportListener, Object command) { - if( transportListener!=null ) { - if( command == DISCONNECT ) { - transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); + if (transportListener != null) { + if (command == DISCONNECT) { + transportListener.onException( + new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { transport.receiveCounter++; transportListener.onCommand(command); @@ -124,135 +122,81 @@ public class VMTransport implements Transport, Task { } public void start() throws Exception { - if (transportListener == null) { - throw new IOException("TransportListener not set."); - } - try { - enqueueValve.turnOff(); - if (messageQueue != null && !async) { + + if (starting.compareAndSet(false, true)) { + + if (transportListener == null) { + throw new IOException("TransportListener not set."); + } + + // ensure there is no missed dispatch during start, sync with oneway + synchronized (peer.starting) { Object command; - while ((command = messageQueue.poll()) != null && !stopping.get() ) { - receiveCounter++; + while ((command = messageQueue.poll()) != null && !stopping.get()) { dispatch(this, transportListener, command); } + + if (!disposed.get()) { + + started.set(true); + + if (async) { + taskRunner.wakeup(); + } else { + messageQueue.clear(); + messageQueue = null; + taskRunner.shutdown(); + taskRunner = null; + } + } } - started = true; - wakeup(); - } finally { - enqueueValve.turnOn(); - } - // If we get stopped while starting up, then do the actual stop now - // that the enqueueValve is back on. - if( stopping.get() ) { - stop(); } } public void stop() throws Exception { - stopping.set(true); - - // If stop() is called while being start()ed.. then we can't stop until we return to the start() method. - if( enqueueValve.isOn() ) { - + if (disposed.compareAndSet(false, true)) { + stopping.set(true); // let the peer know that we are disconnecting.. try { peer.transportListener.onCommand(new ShutdownInfo()); } catch (Exception ignore) { } - - - TaskRunner tr = null; - try { - enqueueValve.turnOff(); - if (!disposed) { - started = false; - disposed = true; - if (taskRunner != null) { - tr = taskRunner; - taskRunner = null; - } - } - } finally { - stopping.set(false); - enqueueValve.turnOn(); - } - if (tr != null) { - tr.shutdown(1000); - } - + if (messageQueue != null) { + messageQueue.clear(); + } + if (taskRunner != null) { + taskRunner.shutdown(1000); + taskRunner = null; + } } - } - + /** * @see org.apache.activemq.thread.Task#iterate() */ public boolean iterate() { - - final TransportListener tl; - try { - // Disable changing the state variables while we are running... - enqueueValve.increment(); - tl = transportListener; - if (!started || disposed || tl == null || stopping.get()) { - if( stopping.get() ) { - // drain the queue it since folks could be blocked putting on to - // it and that would not allow the stop() method for finishing up. - getMessageQueue().clear(); - } - return false; - } - } catch (InterruptedException e) { + + if (disposed.get() || stopping.get()) { return false; - } finally { - enqueueValve.decrement(); } - LinkedBlockingQueue mq = getMessageQueue(); + LinkedBlockingQueue mq = messageQueue; Object command = mq.poll(); if (command != null) { - if( command == DISCONNECT ) { - tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); + if (command == DISCONNECT) { + transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.")); } else { - tl.onCommand(command); + transportListener.onCommand(command); } return !mq.isEmpty(); } else { return false; } - } public void setTransportListener(TransportListener commandListener) { - try { - // enqueue can block on blocking queue, preventing turnOff - // so avoid in that case: https://issues.apache.org/jira/browse/AMQ-3684 - if (async && getMessageQueue().remainingCapacity() == 0) { - // enqueue blocked or will be - this.transportListener = commandListener; - wakeup(); - } else { - try { - enqueueValve.turnOff(); - this.transportListener = commandListener; - wakeup(); - } finally { - enqueueValve.turnOn(); - } - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - private LinkedBlockingQueue getMessageQueue() { - synchronized (lazyInitMutext) { - if (messageQueue == null) { - messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth); - } - return messageQueue; - } + this.transportListener = commandListener; } public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { @@ -336,11 +280,6 @@ public class VMTransport implements Transport, Task { protected void wakeup() { if (async) { - synchronized (lazyInitMutext) { - if (taskRunner == null) { - taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString()); - } - } try { taskRunner.wakeup(); } catch (InterruptedException e) { @@ -353,16 +292,16 @@ public class VMTransport implements Transport, Task { return false; } - public boolean isDisposed() { - return disposed; - } - - public boolean isConnected() { - return started; - } + public boolean isDisposed() { + return disposed.get(); + } - public void reconnect(URI uri) throws IOException { - throw new IOException("Not supported"); + public boolean isConnected() { + return started.get(); + } + + public void reconnect(URI uri) throws IOException { + throw new IOException("reconnection Not supported by this transport."); } public boolean isReconnectSupported() { @@ -372,7 +311,8 @@ public class VMTransport implements Transport, Task { public boolean isUpdateURIsSupported() { return false; } - public void updateURIs(boolean reblance,URI[] uris) throws IOException { + + public void updateURIs(boolean reblance, URI[] uris) throws IOException { throw new IOException("Not supported"); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java index 1313427a4f..0ffa3fd986 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java @@ -75,7 +75,7 @@ public class VMTransportServer implements TransportServer { connectionCount.incrementAndGet(); VMTransport client = new VMTransport(location) { public void stop() throws Exception { - if (stopping.compareAndSet(false, true) && !disposed) { + if (stopping.compareAndSet(false, true) && !disposed.get()) { super.stop(); if (connectionCount.decrementAndGet() == 0 && disposeOnDisconnect) {