https://issues.apache.org/jira/browse/AMQ-3684 - resolve deadlock on blocked oneway, revise sync and lazy init, remove use of valve

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1242912 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-10 20:19:44 +00:00
parent 93a379fe3c
commit bc8441bebc
2 changed files with 85 additions and 145 deletions

View File

@ -26,20 +26,14 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
/**
* A Transport implementation that uses direct method invocations.
*
*
*/
public class VMTransport implements Transport, Task {
@ -47,21 +41,23 @@ public class VMTransport implements Transport, Task {
private static final AtomicLong NEXT_ID = new AtomicLong(0);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
protected boolean marshal;
protected boolean network;
protected boolean async = true;
protected int asyncQueueDepth = 2000;
protected LinkedBlockingQueue<Object> messageQueue;
protected boolean started;
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
private final Object lazyInitMutext = new Object();
private final Valve enqueueValve = new Valve(true);
protected final AtomicBoolean stopping = new AtomicBoolean();
protected LinkedBlockingQueue<Object> messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
private TaskRunner taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
private volatile int receiveCounter;
// Managed Sate access protected by locks.
protected final AtomicBoolean stopping = new AtomicBoolean();
protected final AtomicBoolean started = new AtomicBoolean();
protected final AtomicBoolean starting = new AtomicBoolean();
protected final AtomicBoolean disposed = new AtomicBoolean();
public VMTransport(URI location) {
this.location = location;
this.id = NEXT_ID.getAndIncrement();
@ -72,50 +68,52 @@ public class VMTransport implements Transport, Task {
}
public void oneway(Object command) throws IOException {
if (disposed) {
if (disposed.get()) {
throw new TransportDisposedIOException("Transport disposed.");
}
if (peer == null) {
throw new IOException("Peer not connected.");
}
TransportListener transportListener=null;
TransportListener transportListener = null;
try {
// Disable the peer from changing his state while we try to enqueue onto him.
peer.enqueueValve.increment();
if (peer.disposed || peer.stopping.get()) {
if (peer.disposed.get() || peer.stopping.get()) {
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
}
if (peer.started) {
if (peer.started.get()) {
if (peer.async) {
peer.getMessageQueue().put(command);
peer.messageQueue.put(command);
peer.wakeup();
} else {
transportListener = peer.transportListener;
}
} else {
peer.getMessageQueue().put(command);
peer.messageQueue.put(command);
synchronized (peer.starting) {
if (peer.started.get() && !peer.messageQueue.isEmpty()) {
// we missed the pending dispatch during start
if (peer.async) {
peer.wakeup();
} else {
transportListener = peer.transportListener;
}
}
}
}
} catch (InterruptedException e) {
InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
iioe.initCause(e);
throw iioe;
} finally {
// Allow the peer to change state again...
peer.enqueueValve.decrement();
}
dispatch(peer, transportListener, command);
}
public void dispatch(VMTransport transport, TransportListener transportListener, Object command) {
if( transportListener!=null ) {
if( command == DISCONNECT ) {
transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
if (transportListener != null) {
if (command == DISCONNECT) {
transportListener.onException(
new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
transport.receiveCounter++;
transportListener.onCommand(command);
@ -124,135 +122,81 @@ public class VMTransport implements Transport, Task {
}
public void start() throws Exception {
if (transportListener == null) {
throw new IOException("TransportListener not set.");
}
try {
enqueueValve.turnOff();
if (messageQueue != null && !async) {
if (starting.compareAndSet(false, true)) {
if (transportListener == null) {
throw new IOException("TransportListener not set.");
}
// ensure there is no missed dispatch during start, sync with oneway
synchronized (peer.starting) {
Object command;
while ((command = messageQueue.poll()) != null && !stopping.get() ) {
receiveCounter++;
while ((command = messageQueue.poll()) != null && !stopping.get()) {
dispatch(this, transportListener, command);
}
if (!disposed.get()) {
started.set(true);
if (async) {
taskRunner.wakeup();
} else {
messageQueue.clear();
messageQueue = null;
taskRunner.shutdown();
taskRunner = null;
}
}
}
started = true;
wakeup();
} finally {
enqueueValve.turnOn();
}
// If we get stopped while starting up, then do the actual stop now
// that the enqueueValve is back on.
if( stopping.get() ) {
stop();
}
}
public void stop() throws Exception {
stopping.set(true);
// If stop() is called while being start()ed.. then we can't stop until we return to the start() method.
if( enqueueValve.isOn() ) {
if (disposed.compareAndSet(false, true)) {
stopping.set(true);
// let the peer know that we are disconnecting..
try {
peer.transportListener.onCommand(new ShutdownInfo());
} catch (Exception ignore) {
}
TaskRunner tr = null;
try {
enqueueValve.turnOff();
if (!disposed) {
started = false;
disposed = true;
if (taskRunner != null) {
tr = taskRunner;
taskRunner = null;
}
}
} finally {
stopping.set(false);
enqueueValve.turnOn();
}
if (tr != null) {
tr.shutdown(1000);
}
if (messageQueue != null) {
messageQueue.clear();
}
if (taskRunner != null) {
taskRunner.shutdown(1000);
taskRunner = null;
}
}
}
/**
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
final TransportListener tl;
try {
// Disable changing the state variables while we are running...
enqueueValve.increment();
tl = transportListener;
if (!started || disposed || tl == null || stopping.get()) {
if( stopping.get() ) {
// drain the queue it since folks could be blocked putting on to
// it and that would not allow the stop() method for finishing up.
getMessageQueue().clear();
}
return false;
}
} catch (InterruptedException e) {
if (disposed.get() || stopping.get()) {
return false;
} finally {
enqueueValve.decrement();
}
LinkedBlockingQueue<Object> mq = getMessageQueue();
LinkedBlockingQueue<Object> mq = messageQueue;
Object command = mq.poll();
if (command != null) {
if( command == DISCONNECT ) {
tl.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
if (command == DISCONNECT) {
transportListener.onException(new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."));
} else {
tl.onCommand(command);
transportListener.onCommand(command);
}
return !mq.isEmpty();
} else {
return false;
}
}
public void setTransportListener(TransportListener commandListener) {
try {
// enqueue can block on blocking queue, preventing turnOff
// so avoid in that case: https://issues.apache.org/jira/browse/AMQ-3684
if (async && getMessageQueue().remainingCapacity() == 0) {
// enqueue blocked or will be
this.transportListener = commandListener;
wakeup();
} else {
try {
enqueueValve.turnOff();
this.transportListener = commandListener;
wakeup();
} finally {
enqueueValve.turnOn();
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private LinkedBlockingQueue<Object> getMessageQueue() {
synchronized (lazyInitMutext) {
if (messageQueue == null) {
messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
}
return messageQueue;
}
this.transportListener = commandListener;
}
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
@ -336,11 +280,6 @@ public class VMTransport implements Transport, Task {
protected void wakeup() {
if (async) {
synchronized (lazyInitMutext) {
if (taskRunner == null) {
taskRunner = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(this, "VMTransport: " + toString());
}
}
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
@ -353,16 +292,16 @@ public class VMTransport implements Transport, Task {
return false;
}
public boolean isDisposed() {
return disposed;
}
public boolean isConnected() {
return started;
}
public boolean isDisposed() {
return disposed.get();
}
public void reconnect(URI uri) throws IOException {
throw new IOException("Not supported");
public boolean isConnected() {
return started.get();
}
public void reconnect(URI uri) throws IOException {
throw new IOException("reconnection Not supported by this transport.");
}
public boolean isReconnectSupported() {
@ -372,7 +311,8 @@ public class VMTransport implements Transport, Task {
public boolean isUpdateURIsSupported() {
return false;
}
public void updateURIs(boolean reblance,URI[] uris) throws IOException {
public void updateURIs(boolean reblance, URI[] uris) throws IOException {
throw new IOException("Not supported");
}

View File

@ -75,7 +75,7 @@ public class VMTransportServer implements TransportServer {
connectionCount.incrementAndGet();
VMTransport client = new VMTransport(location) {
public void stop() throws Exception {
if (stopping.compareAndSet(false, true) && !disposed) {
if (stopping.compareAndSet(false, true) && !disposed.get()) {
super.stop();
if (connectionCount.decrementAndGet() == 0
&& disposeOnDisconnect) {