mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1188925 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ef850895a2
commit
28e9cd7eee
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
|
@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
/**
|
||||
* An implementation of the {@link Transport} interface using raw tcp/ip
|
||||
*
|
||||
*
|
||||
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
|
||||
|
@ -138,7 +138,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
|
||||
/**
|
||||
* Connect to a remote Node - e.g. a Broker
|
||||
*
|
||||
*
|
||||
* @param wireFormat
|
||||
* @param socketFactory
|
||||
* @param remoteLocation
|
||||
|
@ -162,7 +162,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
|
||||
/**
|
||||
* Initialize from a server Socket
|
||||
*
|
||||
*
|
||||
* @param wireFormat
|
||||
* @param socket
|
||||
* @throws IOException
|
||||
|
@ -249,7 +249,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
// reflect the value returned by Socket.getTrafficClass().
|
||||
return this.trafficClass;
|
||||
}
|
||||
|
||||
|
||||
public void setTypeOfService(int typeOfService) {
|
||||
this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
|
||||
this.typeOfServiceChosen = true;
|
||||
|
@ -262,7 +262,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
public void setTrace(boolean trace) {
|
||||
this.trace = trace;
|
||||
}
|
||||
|
||||
|
||||
public String getLogWriterName() {
|
||||
return logWriterName;
|
||||
}
|
||||
|
@ -294,7 +294,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
public void setJmxPort(int jmxPort) {
|
||||
this.jmxPort = jmxPort;
|
||||
}
|
||||
|
||||
|
||||
public int getMinmumWireFormatVersion() {
|
||||
return minmumWireFormatVersion;
|
||||
}
|
||||
|
@ -396,7 +396,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
public void setIoBufferSize(int ioBufferSize) {
|
||||
this.ioBufferSize = ioBufferSize;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the closeAsync
|
||||
*/
|
||||
|
@ -425,7 +425,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
|
||||
/**
|
||||
* Configures the socket for use
|
||||
*
|
||||
*
|
||||
* @param sock
|
||||
* @throws SocketException, IllegalArgumentException if setting the options
|
||||
* on the socket failed.
|
||||
|
@ -533,31 +533,37 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
// closeStreams();
|
||||
if (socket != null) {
|
||||
if (closeAsync) {
|
||||
//closing the socket can hang also
|
||||
//closing the socket can hang also
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
||||
DefaultThreadPools.getDefaultTaskRunnerFactory().execute(new Runnable() {
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Caught exception closing socket",e);
|
||||
}finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Caught exception closing socket", e);
|
||||
}
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
latch.await(1,TimeUnit.SECONDS);
|
||||
}else {
|
||||
|
||||
try {
|
||||
latch.await(1,TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Caught exception closing socket",e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -626,7 +632,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public <T> T narrow(Class<T> target) {
|
||||
if (target == Socket.class) {
|
||||
|
@ -636,11 +642,11 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
|
|||
}
|
||||
return super.narrow(target);
|
||||
}
|
||||
|
||||
|
||||
public int getReceiveCounter() {
|
||||
return receiveCounter;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param sock The socket on which to set the Traffic Class.
|
||||
* @return Whether or not the Traffic Class was set on the given socket.
|
||||
|
|
Loading…
Reference in New Issue