diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java index 01bd9e6c90..166d599af4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerSupport.java @@ -27,18 +27,16 @@ import java.net.URI; */ public abstract class TransportServerSupport extends ServiceSupport implements TransportServer { - private URI location; + private URI connectURI; + private URI bindLocation; private TransportAcceptListener acceptListener; public TransportServerSupport() { } public TransportServerSupport(URI location) { - this.location = location; - } - - public URI getConnectURI() { - return location; + this.connectURI = location; + this.bindLocation = location; } /** @@ -60,16 +58,16 @@ public abstract class TransportServerSupport extends ServiceSupport implements T /** * @return Returns the location. */ - public URI getLocation() { - return location; + public URI getConnectURI() { + return connectURI; } /** * @param location * The location to set. */ - public void setLocation(URI location) { - this.location = location; + public void setConnectURI(URI location) { + this.connectURI = location; } protected void onAcceptError(Exception e) { @@ -77,4 +75,12 @@ public abstract class TransportServerSupport extends ServiceSupport implements T acceptListener.onAcceptError(e); } } + + public URI getBindLocation() { + return bindLocation; + } + + public void setBindLocation(URI bindLocation) { + this.bindLocation = bindLocation; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java index 3ef5639b39..31287c255c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/TransportServerThreadSupport.java @@ -68,7 +68,7 @@ public abstract class TransportServerThreadSupport extends TransportServerSuppor } protected void doStart() throws Exception { - log.info("Listening for connections at: " + getLocation()); + log.info("Listening for connections at: " + getConnectURI()); runner = new Thread(this, "ActiveMQ Transport Server: "+toString()); runner.setDaemon(daemon); runner.setPriority(ThreadPriorities.BROKER_MANAGEMENT); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index c58fd0b1ad..21d39ecb0a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -23,6 +23,7 @@ import java.io.InterruptedIOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.URI; @@ -49,27 +50,19 @@ import org.apache.commons.logging.LogFactory; public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { private static final Log log = LogFactory.getLog(TcpTransport.class); - private int connectionTimeout = 30000; - private int soTimeout = 0; - private int socketBufferSize = 128 * 1024; - private Socket socket; - private DataOutputStream dataOut; - private DataInputStream dataIn; - private WireFormat wireFormat; - private boolean trace; - private boolean useLocalHost = true; - private int minmumWireFormatVersion; - private InetSocketAddress remoteAddress; - private InetSocketAddress localAddress; - - /** - * Construct basic helpers - * - * @param wireFormat - */ - protected TcpTransport(WireFormat wireFormat) { - this.wireFormat = wireFormat; - } + protected final URI remoteLocation; + protected final URI localLocation; + protected final WireFormat wireFormat; + + protected int connectionTimeout = 30000; + protected int soTimeout = 0; + protected int socketBufferSize = 128 * 1024; + protected Socket socket; + protected DataOutputStream dataOut; + protected DataInputStream dataIn; + protected boolean trace; + protected boolean useLocalHost = true; + protected int minmumWireFormatVersion; /** * Connect to a remote Node - e.g. a Broker @@ -83,10 +76,14 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @throws UnknownHostException */ public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { - this(wireFormat); - this.socket = createSocket(socketFactory, remoteLocation, localLocation); + this.wireFormat = wireFormat; + this.socket = socketFactory.createSocket(); + this.remoteLocation = remoteLocation; + this.localLocation = localLocation; + setDaemon(false); } + /** * Initialize from a server Socket * @@ -95,8 +92,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S * @throws IOException */ public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { - this(wireFormat); + this.wireFormat = wireFormat; this.socket = socket; + this.remoteLocation = null; + this.localLocation = null; setDaemon(true); } @@ -211,29 +210,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S // Implementation methods // ------------------------------------------------------------------------- - /** - * Factory method to create a new socket - * - * @param remoteLocation - * @param localLocation ignored if null - * @return - * @throws IOException - * @throws IOException - * @throws UnknownHostException - */ - protected Socket createSocket(SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws IOException, UnknownHostException { - - String host = resolveHostName(remoteLocation.getHost()); - remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); - - if( localLocation!=null ) { - localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); - } - - Socket sock = socketFactory.createSocket(); - return sock; - } - protected String resolveHostName(String host) throws UnknownHostException { String localName = InetAddress.getLocalHost().getHostName(); if (localName != null && isUseLocalHost()) { @@ -263,23 +239,33 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } protected void doStart() throws Exception { - initialiseSocket(socket); - if( localAddress!=null ) { + connect(); + super.doStart(); + } + + protected void connect() throws SocketException, IOException { + + initialiseSocket(socket); + + if( localLocation!=null ) { + SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort()); socket.bind(localAddress); - } - if (remoteAddress != null) { + } + if( remoteLocation!=null ) { + String host = resolveHostName(remoteLocation.getHost()); + InetSocketAddress remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); if (connectionTimeout >= 0) { socket.connect(remoteAddress, connectionTimeout); } else { socket.connect(remoteAddress); } - } + } + initializeStreams(); - super.doStart(); - } + } - protected void doStop(ServiceStopper stopper) throws Exception { + protected void doStop(ServiceStopper stopper) throws Exception { closeStreams(); if (socket != null) { socket.close(); @@ -303,7 +289,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } public void setSocketOptions(Map socketOptions) { - IntrospectionSupport.setProperties(socket, socketOptions); + IntrospectionSupport.setProperties(socket, socketOptions); } public String getRemoteAddress() { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 42ca71447e..606568ee69 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -52,6 +52,7 @@ public class TcpTransportFactory extends TransportFactory { IntrospectionSupport.setProperties(server, options); Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); server.setTransportOption(transportOptions); + server.bind(); return server; } @@ -125,7 +126,7 @@ public class TcpTransportFactory extends TransportFactory { * @throws UnknownHostException * @throws IOException */ - private TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { return new TcpTransport(wf, socketFactory, location, localLocation); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index eb6e19ec15..63e84fbd6e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -35,6 +35,7 @@ import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; +import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,24 +49,47 @@ import javax.net.ServerSocketFactory; */ public class TcpTransportServer extends TransportServerThreadSupport { + private static final Log log = LogFactory.getLog(TcpTransportServer.class); - private ServerSocket serverSocket; - private int backlog = 5000; - private WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); - private final TcpTransportFactory transportFactory; - private long maxInactivityDuration = 30000; - private int minmumWireFormatVersion; - private boolean trace; - private Map transportOptions; + protected ServerSocket serverSocket; + protected int backlog = 5000; + protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); + protected final TcpTransportFactory transportFactory; + protected long maxInactivityDuration = 30000; + protected int minmumWireFormatVersion; + protected boolean trace; + protected Map transportOptions; + protected final ServerSocketFactory serverSocketFactory; public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); this.transportFactory=transportFactory; - this.serverSocket = createServerSocket(location, serverSocketFactory); - this.serverSocket.setSoTimeout(2000); - updatePhysicalUri(location); + this.serverSocketFactory = serverSocketFactory; } + public void bind() throws IOException { + URI bind = getBindLocation(); + + String host = bind.getHost(); + host = (host == null || host.length() == 0) ? "localhost" : host; + InetAddress addr = InetAddress.getByName(host); + + if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { + this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog); + } + else { + this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); + } + this.serverSocket.setSoTimeout(2000); + + try { + setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), + bind.getQuery(), bind.getFragment())); + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + /** * @return Returns the wireFormatFactory. */ @@ -168,19 +192,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { * @return pretty print of this */ public String toString() { - return ""+getLocation(); - } - - /** - * In cases where we construct ourselves with a zero port we need to - * regenerate the URI with the real physical port so that people can connect - * to us via discovery - * - * @throws UnknownHostException - */ - protected void updatePhysicalUri(URI bindAddr) throws URISyntaxException, UnknownHostException { - setLocation(new URI(bindAddr.getScheme(), bindAddr.getUserInfo(), resolveHostName(bindAddr.getHost()), serverSocket.getLocalPort(), bindAddr.getPath(), - bindAddr.getQuery(), bindAddr.getFragment())); + return ""+getBindLocation(); } /** @@ -198,26 +210,6 @@ public class TcpTransportServer extends TransportServerThreadSupport { return result; } - /** - * Factory method to create a new ServerSocket - * - * @throws UnknownHostException - * @throws IOException - */ - protected ServerSocket createServerSocket(URI bind, ServerSocketFactory factory) throws UnknownHostException, IOException { - ServerSocket answer = null; - String host = bind.getHost(); - host = (host == null || host.length() == 0) ? "localhost" : host; - InetAddress addr = InetAddress.getByName(host); - if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) { - answer = factory.createServerSocket(bind.getPort(), backlog); - } - else { - answer = factory.createServerSocket(bind.getPort(), backlog, addr); - } - return answer; - } - protected void doStop(ServiceStopper stopper) throws Exception { super.doStop(stopper); if (serverSocket != null) {