From bc005fd1e63f71ba2081f6f99a14bb616d8c0a3b Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 18 Aug 2006 18:02:49 +0000 Subject: [PATCH] tidy code for easier maintenance git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@432664 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/vm/VMTransport.java | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java index 24582a8210..ce941fe21a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java @@ -77,26 +77,37 @@ public class VMTransport implements Transport,Task{ if(peer==null) throw new IOException("Peer not connected."); if(!peer.disposed){ - final TransportListener tl=peer.transportListener; - messageQueue=getMessageQueue(); - prePeerSetQueue=peer.prePeerSetQueue; - if(tl==null){ - prePeerSetQueue.add(command); - }else if(!async){ - tl.onCommand(command); + + if(async){ + asyncOneWay(command); }else{ - try{ - messageQueue.put(command); - wakeup(); - }catch(final InterruptedException e){ - log.error("messageQueue interuppted",e); - throw new IOException(e.getMessage()); - } + syncOneWay(command); } }else{ throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed."); } } + + protected void syncOneWay(Command command){ + final TransportListener tl=peer.transportListener; + prePeerSetQueue=peer.prePeerSetQueue; + if(tl==null){ + prePeerSetQueue.add(command); + }else{ + tl.onCommand(command); + } + } + + protected void asyncOneWay(Command command) throws IOException{ + messageQueue=getMessageQueue(); + try{ + messageQueue.put(command); + wakeup(); + }catch(final InterruptedException e){ + log.error("messageQueue interupted",e); + throw new IOException(e.getMessage()); + } + } public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{ throw new AssertionError("Unsupported Method"); @@ -117,18 +128,23 @@ public class VMTransport implements Transport,Task{ synchronized public void setTransportListener(TransportListener commandListener){ this.transportListener=commandListener; wakeup(); + peer.wakeup(); } public synchronized void start() throws Exception{ started=true; if(transportListener==null) throw new IOException("TransportListener not set."); - for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ - Command command=(Command) iter.next(); - transportListener.onCommand(command); - iter.remove(); + if(!async){ + for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ + Command command=(Command) iter.next(); + transportListener.onCommand(command); + iter.remove(); + } + }else{ + wakeup(); + peer.wakeup(); } - wakeup(); } public void stop() throws Exception{ @@ -176,14 +192,14 @@ public class VMTransport implements Transport,Task{ return null; } - // task implementation + /** + * @see org.apache.activemq.thread.Task#iterate() + */ public boolean iterate(){ - TransportListener tl=peer.transportListener; + final TransportListener tl=peer.transportListener; if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){ - Command command=(Command) messageQueue.poll(); - if(tl!=null){ - tl.onCommand(command); - } + final Command command=(Command) messageQueue.poll(); + tl.onCommand(command); } return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null); }