diff --git a/activemq-core/project.xml b/activemq-core/project.xml index 3d9a80e1cb..f7a13a7f62 100755 --- a/activemq-core/project.xml +++ b/activemq-core/project.xml @@ -347,6 +347,9 @@ **/DiscoveryTransportBrokerTest.* + + + **/FanoutTransportBrokerTest.* diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 7c9ac3ab02..e5bb154059 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -145,6 +145,7 @@ public class FailoverTransport implements CompositeTransport { log.debug("Attempting connect to: " + uri); Transport t = TransportFactory.compositeConnect(uri); t.setTransportListener(myTransportListener); + t.start(); if (started) { restoreTransport(t); } @@ -223,7 +224,6 @@ public class FailoverTransport implements CompositeTransport { return; started = true; if (connectedTransport != null) { - connectedTransport.start(); stateTracker.restore(connectedTransport); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index a1b57a67fb..7b27b97a18 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -56,6 +56,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S private boolean useLocalHost = true; private int minmumWireFormatVersion; private long maxInactivityDuration = 30000; + private InetSocketAddress socketAddress; /** * Construct basic helpers @@ -77,7 +78,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public TcpTransport(WireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { this(wireFormat); this.socket = createSocket(remoteLocation); - initializeStreams(); } /** @@ -93,7 +93,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public TcpTransport(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { this(wireFormat); this.socket = createSocket(remoteLocation, localLocation); - initializeStreams(); } /** @@ -106,8 +105,6 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { this(wireFormat); this.socket = socket; - initialiseSocket(socket); - initializeStreams(); setDaemon(true); } @@ -183,9 +180,51 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public void setUseLocalHost(boolean useLocalHost) { this.useLocalHost = useLocalHost; } + + public int getSocketBufferSize() { + return socketBufferSize; + } + + /** + * Sets the buffer size to use on the socket + */ + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + public int getSoTimeout() { + return soTimeout; + } + + /** + * Sets the socket timeout + */ + public void setSoTimeout(int soTimeout) { + this.soTimeout = soTimeout; + } + + public long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + /** + * Sets the maximum inactivity duration + */ + public void setMaxInactivityDuration(long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + // Implementation methods // ------------------------------------------------------------------------- + protected void doStart() throws Exception { + initialiseSocket(socket); + if (socketAddress != null) { + socket.connect(socketAddress); + } + initializeStreams(); + super.doStart(); + } /** * Factory method to create a new socket @@ -198,10 +237,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S */ protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException { String host = resolveHostName(remoteLocation.getHost()); - SocketAddress sockAddress = new InetSocketAddress(host, remoteLocation.getPort()); + socketAddress = new InetSocketAddress(host, remoteLocation.getPort()); Socket sock = new Socket(); - initialiseSocket(sock); - sock.connect(sockAddress); return sock; } @@ -275,13 +312,4 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S dataIn.close(); } } - - public long getMaxInactivityDuration() { - return maxInactivityDuration; - } - - public void setMaxInactivityDuration(long maxInactivityDuration) { - this.maxInactivityDuration = maxInactivityDuration; - } - }