mirror of https://github.com/apache/activemq.git
rolling back change committed in rev 777209
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@777666 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9b801d19d0
commit
2a1ec7c99e
|
@ -29,9 +29,7 @@ import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
@ -121,9 +119,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
private Boolean tcpNoDelay;
|
private Boolean tcpNoDelay;
|
||||||
private Thread runnerThread;
|
private Thread runnerThread;
|
||||||
|
|
||||||
private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
|
|
||||||
private Thread onewayThread;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to a remote Node - e.g. a Broker
|
* Connect to a remote Node - e.g. a Broker
|
||||||
*
|
*
|
||||||
|
@ -168,31 +163,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
*/
|
*/
|
||||||
public void oneway(Object command) throws IOException {
|
public void oneway(Object command) throws IOException {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
try {
|
|
||||||
outbound.put(command);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new InterruptedIOException();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void sendOneways() {
|
|
||||||
try {
|
|
||||||
while(!isStopped()) {
|
|
||||||
Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
|
|
||||||
if( command!=null ) {
|
|
||||||
try {
|
|
||||||
while( command!=null ) {
|
|
||||||
wireFormat.marshal(command, dataOut);
|
wireFormat.marshal(command, dataOut);
|
||||||
command = outbound.poll();
|
|
||||||
}
|
|
||||||
dataOut.flush();
|
dataOut.flush();
|
||||||
} catch (IOException e) {
|
|
||||||
getTransportListener().onException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -427,11 +399,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
|
|
||||||
protected void doStart() throws Exception {
|
protected void doStart() throws Exception {
|
||||||
connect();
|
connect();
|
||||||
onewayThread = new Thread(null, new Runnable(){
|
|
||||||
public void run() {
|
|
||||||
sendOneways();
|
|
||||||
}}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
|
|
||||||
onewayThread.start();
|
|
||||||
stoppedLatch.set(new CountDownLatch(1));
|
stoppedLatch.set(new CountDownLatch(1));
|
||||||
super.doStart();
|
super.doStart();
|
||||||
}
|
}
|
||||||
|
@ -520,11 +487,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
||||||
LOG.debug("Caught exception closing socket",e);
|
LOG.debug("Caught exception closing socket",e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
if( onewayThread!=null ) {
|
|
||||||
onewayThread.join();
|
|
||||||
onewayThread = null;
|
|
||||||
outbound.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue