allow a connectionTimeout to be specified on a socket for AMQ-607

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383831 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-03-07 09:47:25 +00:00
parent 3cd3fd364e
commit c2123e3b1b
1 changed files with 32 additions and 13 deletions

View File

@ -46,6 +46,7 @@ import java.net.UnknownHostException;
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log log = LogFactory.getLog(TcpTransport.class); private static final Log log = LogFactory.getLog(TcpTransport.class);
private int connectionTimeout = -1;
private int soTimeout = 10000; private int soTimeout = 10000;
private int socketBufferSize = 64 * 1024; private int socketBufferSize = 64 * 1024;
private Socket socket; private Socket socket;
@ -58,6 +59,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
private long maxInactivityDuration = 0; //30000; private long maxInactivityDuration = 0; //30000;
private InetSocketAddress socketAddress; private InetSocketAddress socketAddress;
/** /**
* Construct basic helpers * Construct basic helpers
* *
@ -215,17 +217,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.maxInactivityDuration = maxInactivityDuration; this.maxInactivityDuration = maxInactivityDuration;
} }
public int getConnectionTimeout() {
return connectionTimeout;
}
/**
* Sets the timeout used to connect to the socket
*/
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
protected void doStart() throws Exception {
initialiseSocket(socket);
if (socketAddress != null) {
socket.connect(socketAddress);
}
initializeStreams();
super.doStart();
}
/** /**
* Factory method to create a new socket * Factory method to create a new socket
@ -291,11 +296,18 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
sock.setSoTimeout(soTimeout); sock.setSoTimeout(soTimeout);
} }
protected void initializeStreams() throws IOException { protected void doStart() throws Exception {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096); initialiseSocket(socket);
this.dataIn = new DataInputStream(buffIn); if (socketAddress != null) {
TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192); if (connectionTimeout >= 0) {
this.dataOut = new DataOutputStream(buffOut); socket.connect(socketAddress, connectionTimeout);
}
else {
socket.connect(socketAddress);
}
}
initializeStreams();
super.doStart();
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
@ -305,6 +317,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
} }
protected void initializeStreams() throws IOException {
TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), 4096);
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), 8192);
this.dataOut = new DataOutputStream(buffOut);
}
protected void closeStreams() throws IOException { protected void closeStreams() throws IOException {
if (dataOut != null) { if (dataOut != null) {
dataOut.close(); dataOut.close();