mirror of https://github.com/apache/activemq.git
close socket in a separate thread and only await stopLatch
for 2 seconds - as it close could be called by InactivityMonitor git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@605671 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3318a9f0e3
commit
8228663f6d
|
@ -30,6 +30,10 @@ import java.net.UnknownHostException;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
|
@ -50,7 +54,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
*/
|
||||
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
|
||||
private static final Log LOG = LogFactory.getLog(TcpTransport.class);
|
||||
|
||||
private static final ThreadPoolExecutor SOCKET_CLOSE;
|
||||
protected final URI remoteLocation;
|
||||
protected final URI localLocation;
|
||||
protected final WireFormat wireFormat;
|
||||
|
@ -427,7 +431,23 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
// is hung.. then this hangs the close.
|
||||
// closeStreams();
|
||||
if (socket != null) {
|
||||
//closing the socket can hang also
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
SOCKET_CLOSE.execute(new Runnable() {
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Caught exception closing socket",e);
|
||||
}finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
latch.await(1,TimeUnit.SECONDS);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -439,7 +459,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
super.stop();
|
||||
CountDownLatch countDownLatch = stoppedLatch.get();
|
||||
if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
|
||||
countDownLatch.await();
|
||||
countDownLatch.await(1,TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -478,4 +498,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
return super.narrow(target);
|
||||
}
|
||||
|
||||
static {
|
||||
SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable);
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue