use async dispatch for vm:// transport by default

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@515999 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-08 10:22:22 +00:00
parent a08a4a84c7
commit da13d596a5
4 changed files with 79 additions and 49 deletions

View File

@ -58,9 +58,10 @@ class DedicatedTaskRunner implements TaskRunner {
/**
* shut down the task
* @param timeout
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException{
public void shutdown(long timeout) throws InterruptedException{
synchronized(mutex){
shutdown=true;
pending=true;
@ -68,11 +69,19 @@ class DedicatedTaskRunner implements TaskRunner {
// Wait till the thread stops.
if(!threadTerminated){
mutex.wait();
mutex.wait(timeout);
}
}
}
/**
* shut down the task
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException{
shutdown(0);
}
private void runTask() {
try {

View File

@ -77,7 +77,7 @@ class PooledTaskRunner implements TaskRunner {
* shut down the task
* @throws InterruptedException
*/
public void shutdown() throws InterruptedException{
public void shutdown(long timeout) throws InterruptedException{
synchronized(runable){
shutdown=true;
//the check on the thread is done
@ -85,13 +85,17 @@ class PooledTaskRunner implements TaskRunner {
//shutDown() being called, which would wait forever
//waiting for iterating to finish
if(runningThread!=Thread.currentThread()){
while(iterating==true){
runable.wait();
if(iterating==true){
runable.wait(timeout);
}
}
}
}
public void shutdown() throws InterruptedException {
shutdown(0);
}
private void runTask() {
synchronized (runable) {

View File

@ -25,4 +25,5 @@ package org.apache.activemq.thread;
public interface TaskRunner {
public abstract void wakeup() throws InterruptedException;
public abstract void shutdown() throws InterruptedException;
public abstract void shutdown(long timeout) throws InterruptedException;
}

View File

@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.Command;
import org.apache.activemq.thread.Task;
@ -50,27 +51,32 @@ public class VMTransport implements Transport,Task{
protected boolean disposed;
protected boolean marshal;
protected boolean network;
protected boolean async=false;
protected boolean started=false;
protected boolean async=true;
protected AtomicBoolean started=new AtomicBoolean();
protected int asyncQueueDepth=2000;
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
protected LinkedBlockingQueue messageQueue=null;
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
private final Object mutex=new Object();
public VMTransport(URI location){
this.location=location;
this.id=nextId.getAndIncrement();
}
synchronized public VMTransport getPeer(){
public VMTransport getPeer(){
synchronized(mutex){
return peer;
}
}
synchronized public void setPeer(VMTransport peer){
public void setPeer(VMTransport peer){
synchronized(mutex){
this.peer=peer;
}
}
public void oneway(Object command) throws IOException{
if(disposed){
@ -99,11 +105,13 @@ public class VMTransport implements Transport,Task{
}
}
protected synchronized void asyncOneWay(Object command) throws IOException{
protected void asyncOneWay(Object command) throws IOException{
try{
synchronized(mutex){
if(messageQueue==null){
messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
}
}
messageQueue.put(command);
wakeup();
}catch(final InterruptedException e){
@ -124,18 +132,22 @@ public class VMTransport implements Transport,Task{
throw new AssertionError("Unsupported Method");
}
public synchronized TransportListener getTransportListener(){
public TransportListener getTransportListener(){
synchronized(mutex){
return transportListener;
}
}
synchronized public void setTransportListener(TransportListener commandListener){
public void setTransportListener(TransportListener commandListener){
synchronized(mutex){
this.transportListener=commandListener;
}
wakeup();
peer.wakeup();
}
public synchronized void start() throws Exception{
started=true;
public void start() throws Exception{
if(started.compareAndSet(false,true)){
if(transportListener==null)
throw new IOException("TransportListener not set.");
if(!async){
@ -149,17 +161,19 @@ public class VMTransport implements Transport,Task{
wakeup();
}
}
}
public synchronized void stop() throws Exception{
started=false;
public void stop() throws Exception{
if(started.compareAndSet(true,false)){
if(!disposed){
disposed=true;
}
if(taskRunner!=null){
taskRunner.shutdown();
taskRunner.shutdown(1000);
taskRunner=null;
}
}
}
public Object narrow(Class target){
if(target.isAssignableFrom(getClass())){
@ -201,16 +215,16 @@ public class VMTransport implements Transport,Task{
public boolean iterate(){
final TransportListener tl=peer.transportListener;
Command command=null;
// if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
synchronized(this){
if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
synchronized(mutex){
if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){
command=(Command)messageQueue.poll();
if (command != null) {
}
}
if(tl!=null&&command!=null){
tl.onCommand(command);
}
}
}
return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
return result;
}
/**
@ -241,11 +255,13 @@ public class VMTransport implements Transport,Task{
this.asyncQueueDepth=asyncQueueDepth;
}
protected synchronized void wakeup(){
protected void wakeup(){
if(async){
synchronized(mutex){
if(taskRunner==null){
taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
}
}
try{
taskRunner.wakeup();
}catch(InterruptedException e){