git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@448492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonas B. Lim 2006-09-21 08:40:30 +00:00
parent 4194df7f78
commit 635bc2b11a
1 changed files with 63 additions and 26 deletions

View File

@ -24,11 +24,11 @@ import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.SocketFactory;
@ -64,6 +64,9 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected boolean trace;
protected boolean useLocalHost = true;
protected int minmumWireFormatVersion;
protected SocketFactory socketFactory;
private Map socketOptions;
private Boolean keepAlive;
private Boolean tcpNoDelay;
@ -80,7 +83,12 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
*/
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this.wireFormat = wireFormat;
this.socketFactory = socketFactory;
try {
this.socket = socketFactory.createSocket();
} catch (SocketException e) {
this.socket = null;
}
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
setDaemon(false);
@ -251,6 +259,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws SocketException
*/
protected void initialiseSocket(Socket sock) throws SocketException {
if( socketOptions != null ) {
IntrospectionSupport.setProperties(socket, socketOptions);
}
try {
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
@ -276,23 +288,48 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected void connect() throws SocketException, IOException {
initialiseSocket(socket);
if( socket == null && socketFactory == null ) {
throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
}
InetSocketAddress localAddress=null;
InetSocketAddress remoteAddress=null;
if( localLocation!=null ) {
SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
socket.bind(localAddress);
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
if( remoteLocation!=null ) {
String host = resolveHostName(remoteLocation.getHost());
InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
}
if( socket!=null ) {
if( localAddress!=null )
socket.bind(localAddress);
// If it's a server accepted socket.. we don't need to connect it
// to a remote address.
if ( remoteAddress!=null ) {
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
}
else {
} else {
socket.connect(remoteAddress);
}
}
} else {
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if( localAddress!=null ) {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
} else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
}
initialiseSocket(socket);
initializeStreams();
}
@ -322,7 +359,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
public void setSocketOptions(Map socketOptions) {
IntrospectionSupport.setProperties(socket, socketOptions);
this.socketOptions = new HashMap(socketOptions);
}
public String getRemoteAddress() {