mirror of https://github.com/apache/activemq.git
Tidied up the async dispatch option
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@494950 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b21113962b
commit
f19a3a66d6
|
@ -11,6 +11,7 @@
|
|||
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.transport.vm;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -19,6 +20,7 @@ import java.util.Collections;
|
|||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import org.apache.activemq.broker.BrokerStoppedException;
|
||||
import org.apache.activemq.command.Command;
|
||||
import org.apache.activemq.thread.Task;
|
||||
import org.apache.activemq.thread.TaskRunner;
|
||||
|
@ -32,16 +34,18 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* A Transport implementation that uses direct method invocations.
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class VMTransport implements Transport,Task{
|
||||
|
||||
private static final Log log=LogFactory.getLog(VMTransport.class);
|
||||
private static final AtomicLong nextId=new AtomicLong(0);
|
||||
private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
|
||||
true,1000);
|
||||
true,1000);
|
||||
protected VMTransport peer;
|
||||
protected TransportListener transportListener;
|
||||
protected boolean disposed;
|
||||
|
@ -51,7 +55,7 @@ public class VMTransport implements Transport,Task{
|
|||
protected boolean started=false;
|
||||
protected int asyncQueueDepth=2000;
|
||||
protected List prePeerSetQueue=Collections.synchronizedList(new LinkedList());
|
||||
protected LinkedBlockingQueue messageQueue;
|
||||
protected LinkedBlockingQueue messageQueue=null;
|
||||
protected final URI location;
|
||||
protected final long id;
|
||||
private TaskRunner taskRunner;
|
||||
|
@ -76,9 +80,8 @@ public class VMTransport implements Transport,Task{
|
|||
if(peer==null)
|
||||
throw new IOException("Peer not connected.");
|
||||
if(!peer.disposed){
|
||||
|
||||
if(async){
|
||||
asyncOneWay(command);
|
||||
asyncOneWay(command);
|
||||
}else{
|
||||
syncOneWay(command);
|
||||
}
|
||||
|
@ -86,7 +89,7 @@ public class VMTransport implements Transport,Task{
|
|||
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void syncOneWay(Object command){
|
||||
final TransportListener tl=peer.transportListener;
|
||||
prePeerSetQueue=peer.prePeerSetQueue;
|
||||
|
@ -96,10 +99,12 @@ public class VMTransport implements Transport,Task{
|
|||
tl.onCommand(command);
|
||||
}
|
||||
}
|
||||
|
||||
protected void asyncOneWay(Object command) throws IOException{
|
||||
messageQueue=getMessageQueue();
|
||||
|
||||
protected synchronized void asyncOneWay(Object command) throws IOException{
|
||||
try{
|
||||
if(messageQueue==null){
|
||||
messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
|
||||
}
|
||||
messageQueue.put(command);
|
||||
wakeup();
|
||||
}catch(final InterruptedException e){
|
||||
|
@ -136,17 +141,17 @@ public class VMTransport implements Transport,Task{
|
|||
throw new IOException("TransportListener not set.");
|
||||
if(!async){
|
||||
for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
|
||||
Command command=(Command) iter.next();
|
||||
Command command=(Command)iter.next();
|
||||
transportListener.onCommand(command);
|
||||
iter.remove();
|
||||
}
|
||||
}else{
|
||||
wakeup();
|
||||
peer.wakeup();
|
||||
wakeup();
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() throws Exception{
|
||||
public synchronized void stop() throws Exception{
|
||||
started=false;
|
||||
if(!disposed){
|
||||
disposed=true;
|
||||
|
@ -196,11 +201,17 @@ public class VMTransport implements Transport,Task{
|
|||
*/
|
||||
public boolean iterate(){
|
||||
final TransportListener tl=peer.transportListener;
|
||||
if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
|
||||
final Command command=(Command) messageQueue.poll();
|
||||
tl.onCommand(command);
|
||||
Command command=null;
|
||||
// if(!disposed && !messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
|
||||
synchronized(this){
|
||||
if(messageQueue!=null&&!disposed&&!peer.disposed&&tl!=null &&!messageQueue.isEmpty()){
|
||||
command=(Command)messageQueue.poll();
|
||||
if (command != null) {
|
||||
tl.onCommand(command);
|
||||
}
|
||||
}
|
||||
}
|
||||
return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
|
||||
return messageQueue!=null&&!messageQueue.isEmpty()&&!peer.disposed;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -231,8 +242,8 @@ public class VMTransport implements Transport,Task{
|
|||
this.asyncQueueDepth=asyncQueueDepth;
|
||||
}
|
||||
|
||||
protected void wakeup(){
|
||||
if(async&&messageQueue!=null&&!messageQueue.isEmpty()){
|
||||
protected synchronized void wakeup(){
|
||||
if(async){
|
||||
if(taskRunner==null){
|
||||
taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
|
||||
}
|
||||
|
@ -243,11 +254,4 @@ public class VMTransport implements Transport,Task{
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized LinkedBlockingQueue getMessageQueue(){
|
||||
if(messageQueue==null){
|
||||
messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
|
||||
}
|
||||
return messageQueue;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue