tidy code for easier maintenance

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@432664 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-08-18 18:02:49 +00:00
parent 36553b336d
commit bc005fd1e6
1 changed files with 41 additions and 25 deletions

View File

@ -77,26 +77,37 @@ public class VMTransport implements Transport,Task{
if(peer==null) if(peer==null)
throw new IOException("Peer not connected."); throw new IOException("Peer not connected.");
if(!peer.disposed){ if(!peer.disposed){
if(async){
asyncOneWay(command);
}else{
syncOneWay(command);
}
}else{
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
}
}
protected void syncOneWay(Command command){
final TransportListener tl=peer.transportListener; final TransportListener tl=peer.transportListener;
messageQueue=getMessageQueue();
prePeerSetQueue=peer.prePeerSetQueue; prePeerSetQueue=peer.prePeerSetQueue;
if(tl==null){ if(tl==null){
prePeerSetQueue.add(command); prePeerSetQueue.add(command);
}else if(!async){
tl.onCommand(command);
}else{ }else{
tl.onCommand(command);
}
}
protected void asyncOneWay(Command command) throws IOException{
messageQueue=getMessageQueue();
try{ try{
messageQueue.put(command); messageQueue.put(command);
wakeup(); wakeup();
}catch(final InterruptedException e){ }catch(final InterruptedException e){
log.error("messageQueue interuppted",e); log.error("messageQueue interupted",e);
throw new IOException(e.getMessage()); throw new IOException(e.getMessage());
} }
} }
}else{
throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
}
}
public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{ public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method"); throw new AssertionError("Unsupported Method");
@ -117,18 +128,23 @@ public class VMTransport implements Transport,Task{
synchronized public void setTransportListener(TransportListener commandListener){ synchronized public void setTransportListener(TransportListener commandListener){
this.transportListener=commandListener; this.transportListener=commandListener;
wakeup(); wakeup();
peer.wakeup();
} }
public synchronized void start() throws Exception{ public synchronized void start() throws Exception{
started=true; started=true;
if(transportListener==null) if(transportListener==null)
throw new IOException("TransportListener not set."); throw new IOException("TransportListener not set.");
if(!async){
for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){ for(Iterator iter=prePeerSetQueue.iterator();iter.hasNext();){
Command command=(Command) iter.next(); Command command=(Command) iter.next();
transportListener.onCommand(command); transportListener.onCommand(command);
iter.remove(); iter.remove();
} }
}else{
wakeup(); wakeup();
peer.wakeup();
}
} }
public void stop() throws Exception{ public void stop() throws Exception{
@ -176,15 +192,15 @@ public class VMTransport implements Transport,Task{
return null; return null;
} }
// task implementation /**
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate(){ public boolean iterate(){
TransportListener tl=peer.transportListener; final TransportListener tl=peer.transportListener;
if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){ if(!messageQueue.isEmpty()&&!peer.disposed&&tl!=null){
Command command=(Command) messageQueue.poll(); final Command command=(Command) messageQueue.poll();
if(tl!=null){
tl.onCommand(command); tl.onCommand(command);
} }
}
return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null); return !messageQueue.isEmpty()&&!peer.disposed&&!(peer.transportListener==null);
} }