Cleaned up the TCP transport a little.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@421811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-07-14 05:54:13 +00:00
parent 3a59e68649
commit 68c8c643f2
5 changed files with 97 additions and 112 deletions

View File

@ -27,18 +27,16 @@ import java.net.URI;
*/ */
public abstract class TransportServerSupport extends ServiceSupport implements TransportServer { public abstract class TransportServerSupport extends ServiceSupport implements TransportServer {
private URI location; private URI connectURI;
private URI bindLocation;
private TransportAcceptListener acceptListener; private TransportAcceptListener acceptListener;
public TransportServerSupport() { public TransportServerSupport() {
} }
public TransportServerSupport(URI location) { public TransportServerSupport(URI location) {
this.location = location; this.connectURI = location;
} this.bindLocation = location;
public URI getConnectURI() {
return location;
} }
/** /**
@ -60,16 +58,16 @@ public abstract class TransportServerSupport extends ServiceSupport implements T
/** /**
* @return Returns the location. * @return Returns the location.
*/ */
public URI getLocation() { public URI getConnectURI() {
return location; return connectURI;
} }
/** /**
* @param location * @param location
* The location to set. * The location to set.
*/ */
public void setLocation(URI location) { public void setConnectURI(URI location) {
this.location = location; this.connectURI = location;
} }
protected void onAcceptError(Exception e) { protected void onAcceptError(Exception e) {
@ -77,4 +75,12 @@ public abstract class TransportServerSupport extends ServiceSupport implements T
acceptListener.onAcceptError(e); acceptListener.onAcceptError(e);
} }
} }
public URI getBindLocation() {
return bindLocation;
}
public void setBindLocation(URI bindLocation) {
this.bindLocation = bindLocation;
}
} }

View File

@ -68,7 +68,7 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor
} }
protected void doStart() throws Exception { protected void doStart() throws Exception {
log.info("Listening for connections at: " + getLocation()); log.info("Listening for connections at: " + getConnectURI());
runner = new Thread(this, "ActiveMQ Transport Server: "+toString()); runner = new Thread(this, "ActiveMQ Transport Server: "+toString());
runner.setDaemon(daemon); runner.setDaemon(daemon);
runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT); runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT);

View File

@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
@ -49,27 +50,19 @@ import org.apache.commons.logging.LogFactory;
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log log = LogFactory.getLog(TcpTransport.class); private static final Log log = LogFactory.getLog(TcpTransport.class);
private int connectionTimeout = 30000; protected final URI remoteLocation;
private int soTimeout = 0; protected final URI localLocation;
private int socketBufferSize = 128 * 1024; protected final WireFormat wireFormat;
private Socket socket;
private DataOutputStream dataOut; protected int connectionTimeout = 30000;
private DataInputStream dataIn; protected int soTimeout = 0;
private WireFormat wireFormat; protected int socketBufferSize = 128 * 1024;
private boolean trace; protected Socket socket;
private boolean useLocalHost = true; protected DataOutputStream dataOut;
private int minmumWireFormatVersion; protected DataInputStream dataIn;
private InetSocketAddress remoteAddress; protected boolean trace;
private InetSocketAddress localAddress; protected boolean useLocalHost = true;
protected int minmumWireFormatVersion;
/**
* Construct basic helpers
*
* @param wireFormat
*/
protected TcpTransport(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
/** /**
* Connect to a remote Node - e.g. a Broker * Connect to a remote Node - e.g. a Broker
@ -83,10 +76,14 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
this(wireFormat); this.wireFormat = wireFormat;
this.socket = createSocket(socketFactory, remoteLocation, localLocation); this.socket = socketFactory.createSocket();
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
setDaemon(false);
} }
/** /**
* Initialize from a server Socket * Initialize from a server Socket
* *
@ -95,8 +92,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws IOException * @throws IOException
*/ */
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
this(wireFormat); this.wireFormat = wireFormat;
this.socket = socket; this.socket = socket;
this.remoteLocation = null;
this.localLocation = null;
setDaemon(true); setDaemon(true);
} }
@ -211,29 +210,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/**
* Factory method to create a new socket
*
* @param remoteLocation
* @param localLocation ignored if null
* @return
* @throws IOException
* @throws IOException
* @throws UnknownHostException
*/
protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
String host = resolveHostName(remoteLocation.getHost());
remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
if( localLocation!=null ) {
localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
}
Socket sock = socketFactory.createSocket();
return sock;
}
protected String resolveHostName(String host) throws UnknownHostException { protected String resolveHostName(String host) throws UnknownHostException {
String localName = InetAddress.getLocalHost().getHostName(); String localName = InetAddress.getLocalHost().getHostName();
if (localName != null && isUseLocalHost()) { if (localName != null && isUseLocalHost()) {
@ -263,23 +239,33 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
protected void doStart() throws Exception { protected void doStart() throws Exception {
initialiseSocket(socket); connect();
if( localAddress!=null ) { super.doStart();
}
protected void connect() throws SocketException, IOException {
initialiseSocket(socket);
if( localLocation!=null ) {
SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
socket.bind(localAddress); socket.bind(localAddress);
} }
if (remoteAddress != null) { if( remoteLocation!=null ) {
String host = resolveHostName(remoteLocation.getHost());
InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
if (connectionTimeout >= 0) { if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout); socket.connect(remoteAddress, connectionTimeout);
} }
else { else {
socket.connect(remoteAddress); socket.connect(remoteAddress);
} }
} }
initializeStreams(); initializeStreams();
super.doStart(); }
}
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
closeStreams(); closeStreams();
if (socket != null) { if (socket != null) {
socket.close(); socket.close();
@ -303,7 +289,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
public void setSocketOptions(Map socketOptions) { public void setSocketOptions(Map socketOptions) {
IntrospectionSupport.setProperties(socket, socketOptions); IntrospectionSupport.setProperties(socket, socketOptions);
} }
public String getRemoteAddress() { public String getRemoteAddress() {

View File

@ -52,6 +52,7 @@ public class TcpTransportFactory extends TransportFactory {
IntrospectionSupport.setProperties(server, options); IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions); server.setTransportOption(transportOptions);
server.bind();
return server; return server;
} }
@ -125,7 +126,7 @@ public class TcpTransportFactory extends TransportFactory {
* @throws UnknownHostException * @throws UnknownHostException
* @throws IOException * @throws IOException
*/ */
private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpTransport(wf, socketFactory, location, localLocation); return new TcpTransport(wf, socketFactory, location, localLocation);
} }

View File

@ -35,6 +35,7 @@ import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -48,24 +49,47 @@ import javax.net.ServerSocketFactory;
*/ */
public class TcpTransportServer extends TransportServerThreadSupport { public class TcpTransportServer extends TransportServerThreadSupport {
private static final Log log = LogFactory.getLog(TcpTransportServer.class); private static final Log log = LogFactory.getLog(TcpTransportServer.class);
private ServerSocket serverSocket; protected ServerSocket serverSocket;
private int backlog = 5000; protected int backlog = 5000;
private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
private final TcpTransportFactory transportFactory; protected final TcpTransportFactory transportFactory;
private long maxInactivityDuration = 30000; protected long maxInactivityDuration = 30000;
private int minmumWireFormatVersion; protected int minmumWireFormatVersion;
private boolean trace; protected boolean trace;
private Map transportOptions; protected Map transportOptions;
protected final ServerSocketFactory serverSocketFactory;
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location); super(location);
this.transportFactory=transportFactory; this.transportFactory=transportFactory;
this.serverSocket = createServerSocket(location, serverSocketFactory); this.serverSocketFactory = serverSocketFactory;
this.serverSocket.setSoTimeout(2000);
updatePhysicalUri(location);
} }
public void bind() throws IOException {
URI bind = getBindLocation();
String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host);
if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
}
else {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
}
this.serverSocket.setSoTimeout(2000);
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
/** /**
* @return Returns the wireFormatFactory. * @return Returns the wireFormatFactory.
*/ */
@ -168,19 +192,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
* @return pretty print of this * @return pretty print of this
*/ */
public String toString() { public String toString() {
return ""+getLocation(); return ""+getBindLocation();
}
/**
* In cases where we construct ourselves with a zero port we need to
* regenerate the URI with the real physical port so that people can connect
* to us via discovery
*
* @throws UnknownHostException
*/
protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException, UnknownHostException {
setLocation(new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket.getLocalPort(), bindAddr.getPath(),
bindAddr.getQuery(), bindAddr.getFragment()));
} }
/** /**
@ -198,26 +210,6 @@ public class TcpTransportServer extends TransportServerThreadSupport {
return result; return result;
} }
/**
* Factory method to create a new ServerSocket
*
* @throws UnknownHostException
* @throws IOException
*/
protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException {
ServerSocket answer = null;
String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host);
if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
answer = factory.createServerSocket(bind.getPort(), backlog);
}
else {
answer = factory.createServerSocket(bind.getPort(), backlog, addr);
}
return answer;
}
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
super.doStop(stopper); super.doStop(stopper);
if (serverSocket != null) { if (serverSocket != null) {