From c2123e3b1be86a90e64b5c02d8c15331a855d4a4 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Tue, 7 Mar 2006 09:47:25 +0000 Subject: [PATCH] allow a connectionTimeout to be specified on a socket for AMQ-607 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383831 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/transport/tcp/TcpTransport.java | 45 +++++++++++++------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 6735418144..81adefa657 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -46,6 +46,7 @@ import java.net.UnknownHostException; public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log log = LogFactory.getLog(TcpTransport.class); + private int connectionTimeout = -1; private int soTimeout = 10000; private int socketBufferSize = 64 * 1024; private Socket socket; @@ -57,6 +58,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private int minmumWireFormatVersion; private long maxInactivityDuration = 0; //30000; private InetSocketAddress socketAddress; + /** * Construct basic helpers @@ -215,17 +217,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S this.maxInactivityDuration = maxInactivityDuration; } + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Sets the timeout used to connect to the socket + */ + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + // Implementation methods // ------------------------------------------------------------------------- - protected void doStart() throws Exception { - initialiseSocket(socket); - if (socketAddress != null) { - socket.connect(socketAddress); - } - initializeStreams(); - super.doStart(); - } /** * Factory method to create a new socket @@ -291,11 +296,18 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S sock.setSoTimeout(soTimeout); } - protected void initializeStreams() throws IOException { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096); - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); - this.dataOut = new DataOutputStream(buffOut); + protected void doStart() throws Exception { + initialiseSocket(socket); + if (socketAddress != null) { + if (connectionTimeout >= 0) { + socket.connect(socketAddress, connectionTimeout); + } + else { + socket.connect(socketAddress); + } + } + initializeStreams(); + super.doStart(); } protected void doStop(ServiceStopper stopper) throws Exception { @@ -305,6 +317,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } } + protected void initializeStreams() throws IOException { + TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096); + this.dataIn = new DataInputStream(buffIn); + TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); + this.dataOut = new DataOutputStream(buffOut); + } + protected void closeStreams() throws IOException { if (dataOut != null) { dataOut.close();