The VM transport could deadlock between the iterate() method and the oneway() method when the async message buffer used by the transport fills up. Change the synchronization logic to make use the a Valve to avoid needing to lock mutexes for so long.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@586185 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2007-10-19 00:00:45 +00:00
parent 49325b60cd
commit fe8bc3317c
1 changed files with 121 additions and 101 deletions

View File

@ -17,14 +17,17 @@
package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@ -54,23 +57,17 @@ public class VMTransport implements Transport, Task {
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
private final Object mutex = new Object();
private final Object lazyInitMutext = new Object();
private final Valve enqueueValve = new Valve(true);
private final AtomicBoolean stopping = new AtomicBoolean();
public VMTransport(URI location) {
this.location = location;
this.id = NEXT_ID.getAndIncrement();
}
public VMTransport getPeer() {
synchronized (mutex) {
return peer;
}
}
public void setPeer(VMTransport peer) {
synchronized (mutex) {
this.peer = peer;
}
this.peer = peer;
}
public void oneway(Object command) throws IOException {
@ -81,34 +78,131 @@ public class VMTransport implements Transport, Task {
throw new IOException("Peer not connected.");
}
TransportListener tl = null;
synchronized (peer.mutex) {
if (peer.disposed) {
try {
// Disable the peer from changing his state while we try to enqueue onto him.
peer.enqueueValve.increment();
if (peer.disposed || peer.stopping.get()) {
throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
}
if (peer.started) {
if (peer.async) {
peer.enqueue(command);
peer.getMessageQueue().put(command);
peer.wakeup();
} else {
tl = peer.transportListener;
peer.transportListener.onCommand(command);
}
enqueueValve.decrement();
} else {
peer.enqueue(command);
peer.getMessageQueue().put(command);
}
}
if (tl != null) {
tl.onCommand(command);
} catch (InterruptedException e) {
throw IOExceptionSupport.create(e);
} finally {
// Allow the peer to change state again...
peer.enqueueValve.decrement();
}
}
private void enqueue(Object command) throws IOException {
public void start() throws Exception {
if (transportListener == null) {
throw new IOException("TransportListener not set.");
}
try {
getMessageQueue().put(command);
} catch (final InterruptedException e) {
throw IOExceptionSupport.create(e);
enqueueValve.turnOff();
if (messageQueue != null && !async) {
Object command;
while ((command = messageQueue.poll()) != null) {
transportListener.onCommand(command);
}
}
started = true;
wakeup();
} finally {
enqueueValve.turnOn();
}
}
public void stop() throws Exception {
TaskRunner tr = null;
try {
stopping.set(true);
enqueueValve.turnOff();
if (!disposed) {
started = false;
disposed = true;
if (taskRunner != null) {
tr = taskRunner;
taskRunner = null;
}
}
} finally {
stopping.set(false);
enqueueValve.turnOn();
}
if (tr != null) {
tr.shutdown(1000);
}
}
/**
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
final TransportListener tl;
try {
// Disable changing the state variables while we are running...
enqueueValve.increment();
tl = transportListener;
if (!started || disposed || tl == null || stopping.get()) {
if( stopping.get() ) {
// drain the queue it since folks could be blocked putting on to
// it and that would not allow the stop() method for finishing up.
getMessageQueue().clear();
}
return false;
}
} catch (InterruptedException e) {
return false;
} finally {
enqueueValve.decrement();
}
LinkedBlockingQueue<Object> mq = getMessageQueue();
Command command = (Command)mq.poll();
if (command != null) {
tl.onCommand(command);
return !mq.isEmpty();
} else {
return false;
}
}
public void setTransportListener(TransportListener commandListener) {
try {
try {
enqueueValve.turnOff();
this.transportListener = commandListener;
wakeup();
} finally {
enqueueValve.turnOn();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private LinkedBlockingQueue<Object> getMessageQueue() {
synchronized (lazyInitMutext) {
if (messageQueue == null) {
messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
}
return messageQueue;
}
}
@ -125,59 +219,7 @@ public class VMTransport implements Transport, Task {
}
public TransportListener getTransportListener() {
synchronized (mutex) {
return transportListener;
}
}
public void setTransportListener(TransportListener commandListener) {
synchronized (mutex) {
this.transportListener = commandListener;
wakeup();
}
}
private LinkedBlockingQueue<Object> getMessageQueue() {
synchronized (mutex) {
if (messageQueue == null) {
messageQueue = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
}
return messageQueue;
}
}
public void start() throws Exception {
if (transportListener == null) {
throw new IOException("TransportListener not set.");
}
synchronized (mutex) {
if (messageQueue != null) {
Object command;
while ((command = messageQueue.poll()) != null) {
transportListener.onCommand(command);
}
}
started = true;
wakeup();
}
}
public void stop() throws Exception {
TaskRunner tr = null;
synchronized (mutex) {
if (!disposed) {
started = false;
disposed = true;
if (taskRunner != null) {
tr = taskRunner;
taskRunner = null;
}
}
}
if (tr != null) {
tr.shutdown(1000);
}
return transportListener;
}
public <T> T narrow(Class<T> target) {
@ -214,28 +256,6 @@ public class VMTransport implements Transport, Task {
return null;
}
/**
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
final TransportListener tl;
synchronized (mutex) {
tl = transportListener;
if (!started || disposed || tl == null) {
return false;
}
}
LinkedBlockingQueue<Object> mq = getMessageQueue();
final Command command = (Command)mq.poll();
if (command != null) {
tl.onCommand(command);
return !mq.isEmpty();
} else {
return false;
}
}
/**
* @return the async
*/
@ -266,7 +286,7 @@ public class VMTransport implements Transport, Task {
protected void wakeup() {
if (async) {
synchronized (mutex) {
synchronized (lazyInitMutext) {
if (taskRunner == null) {
taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, "VMTransport: " + toString());
}