diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java index 0321e06471..c2acff39bd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/DedicatedTaskRunner.java @@ -58,9 +58,10 @@ class DedicatedTaskRunner implements TaskRunner { /** * shut down the task + * @param timeout * @throws InterruptedException */ - public void shutdown() throws InterruptedException{ + public void shutdown(long timeout) throws InterruptedException{ synchronized(mutex){ shutdown=true; pending=true; @@ -68,10 +69,18 @@ class DedicatedTaskRunner implements TaskRunner { // Wait till the thread stops. if(!threadTerminated){ - mutex.wait(); + mutex.wait(timeout); } } - } + } + + /** + * shut down the task + * @throws InterruptedException + */ + public void shutdown() throws InterruptedException{ + shutdown(0); + } private void runTask() { diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java index 2f4c7632de..531d4814d9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java @@ -77,7 +77,7 @@ class PooledTaskRunner implements TaskRunner { * shut down the task * @throws InterruptedException */ - public void shutdown() throws InterruptedException{ + public void shutdown(long timeout) throws InterruptedException{ synchronized(runable){ shutdown=true; //the check on the thread is done @@ -85,13 +85,17 @@ class PooledTaskRunner implements TaskRunner { //shutDown() being called, which would wait forever //waiting for iterating to finish if(runningThread!=Thread.currentThread()){ - while(iterating==true){ - runable.wait(); + if(iterating==true){ + runable.wait(timeout); } } } } + + public void shutdown() throws InterruptedException { + shutdown(0); + } private void runTask() { synchronized (runable) { diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java index 1f65c1ff1f..bba1c3ef43 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunner.java @@ -25,4 +25,5 @@ package org.apache.activemq.thread; public interface TaskRunner { public abstract void wakeup() throws InterruptedException; public abstract void shutdown() throws InterruptedException; + public abstract void shutdown(long timeout) throws InterruptedException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index f4bbf03ae8..1c875e9be5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.command.Command; import org.apache.activemq.thread.Task; @@ -50,26 +51,31 @@ public class VMTransport implements Transport,Task{ protected boolean disposed; protected boolean marshal; protected boolean network; - protected boolean async=false; - protected boolean started=false; + protected boolean async=true; + protected AtomicBoolean started=new AtomicBoolean(); protected int asyncQueueDepth=2000; protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList()); protected LinkedBlockingQueue messageQueue=null; protected final URI location; protected final long id; private TaskRunner taskRunner; + private final Object mutex=new Object(); public VMTransport(URI location){ this.location=location; this.id=nextId.getAndIncrement(); } - synchronized public VMTransport getPeer(){ - return peer; + public VMTransport getPeer(){ + synchronized(mutex){ + return peer; + } } - synchronized public void setPeer(VMTransport peer){ - this.peer=peer; + public void setPeer(VMTransport peer){ + synchronized(mutex){ + this.peer=peer; + } } public void oneway(Object command) throws IOException{ @@ -99,10 +105,12 @@ public class VMTransport implements Transport,Task{ } } - protected synchronized void asyncOneWay(Object command) throws IOException{ + protected void asyncOneWay(Object command) throws IOException{ try{ - if(messageQueue==null){ - messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); + synchronized(mutex){ + if(messageQueue==null){ + messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth); + } } messageQueue.put(command); wakeup(); @@ -124,40 +132,46 @@ public class VMTransport implements Transport,Task{ throw new AssertionError("Unsupported Method"); } - public synchronized TransportListener getTransportListener(){ - return transportListener; + public TransportListener getTransportListener(){ + synchronized(mutex){ + return transportListener; + } } - synchronized public void setTransportListener(TransportListener commandListener){ - this.transportListener=commandListener; + public void setTransportListener(TransportListener commandListener){ + synchronized(mutex){ + this.transportListener=commandListener; + } wakeup(); peer.wakeup(); } - public synchronized void start() throws Exception{ - started=true; - if(transportListener==null) - throw new IOException("TransportListener not set."); - if(!async){ - for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ - Command command=(Command)iter.next(); - transportListener.onCommand(command); - iter.remove(); + public void start() throws Exception{ + if(started.compareAndSet(false,true)){ + if(transportListener==null) + throw new IOException("TransportListener not set."); + if(!async){ + for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ + Command command=(Command)iter.next(); + transportListener.onCommand(command); + iter.remove(); + } + }else{ + peer.wakeup(); + wakeup(); } - }else{ - peer.wakeup(); - wakeup(); } } - public synchronized void stop() throws Exception{ - started=false; - if(!disposed){ - disposed=true; - } - if(taskRunner!=null){ - taskRunner.shutdown(); - taskRunner=null; + public void stop() throws Exception{ + if(started.compareAndSet(true,false)){ + if(!disposed){ + disposed=true; + } + if(taskRunner!=null){ + taskRunner.shutdown(1000); + taskRunner=null; + } } } @@ -201,16 +215,16 @@ public class VMTransport implements Transport,Task{ public boolean iterate(){ final TransportListener tl=peer.transportListener; Command command=null; - // if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){ - synchronized(this){ - if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){ + synchronized(mutex){ + if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null&&!messageQueue.isEmpty()){ command=(Command)messageQueue.poll(); - if (command != null) { - tl.onCommand(command); - } } } - return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed; + if(tl!=null&&command!=null){ + tl.onCommand(command); + } + boolean result=messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed; + return result; } /** @@ -241,10 +255,12 @@ public class VMTransport implements Transport,Task{ this.asyncQueueDepth=asyncQueueDepth; } - protected synchronized void wakeup(){ + protected void wakeup(){ if(async){ - if(taskRunner==null){ - taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString()); + synchronized(mutex){ + if(taskRunner==null){ + taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString()); + } } try{ taskRunner.wakeup();