diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 6735418144..81adefa657 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -46,6 +46,7 @@ import java.net.UnknownHostException; public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log log = LogFactory.getLog(TcpTransport.class); + private int connectionTimeout = -1; private int soTimeout = 10000; private int socketBufferSize = 64 * 1024; private Socket socket; @@ -57,6 +58,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private int minmumWireFormatVersion; private long maxInactivityDuration = 0; //30000; private InetSocketAddress socketAddress; + /** * Construct basic helpers @@ -215,17 +217,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S this.maxInactivityDuration = maxInactivityDuration; } + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Sets the timeout used to connect to the socket + */ + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + // Implementation methods // ------------------------------------------------------------------------- - protected void doStart() throws Exception { - initialiseSocket(socket); - if (socketAddress != null) { - socket.connect(socketAddress); - } - initializeStreams(); - super.doStart(); - } /** * Factory method to create a new socket @@ -291,11 +296,18 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S sock.setSoTimeout(soTimeout); } - protected void initializeStreams() throws IOException { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096); - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); - this.dataOut = new DataOutputStream(buffOut); + protected void doStart() throws Exception { + initialiseSocket(socket); + if (socketAddress != null) { + if (connectionTimeout >= 0) { + socket.connect(socketAddress, connectionTimeout); + } + else { + socket.connect(socketAddress); + } + } + initializeStreams(); + super.doStart(); } protected void doStop(ServiceStopper stopper) throws Exception { @@ -305,6 +317,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } } + protected void initializeStreams() throws IOException { + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096); + this.dataIn = new DataInputStream(buffIn); + TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); + this.dataOut = new DataOutputStream(buffOut); + } + protected void closeStreams() throws IOException { if (dataOut != null) { dataOut.close();