diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 5dea07635d..a3924ceb8e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +38,7 @@ import javax.transaction.xa.XAResource; import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.Command; import org.apache.activemq.command.CommandTypes; @@ -80,7 +82,6 @@ import org.apache.activemq.state.ConsumerState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; import org.apache.activemq.state.TransactionState; -import org.apache.activemq.thread.DefaultThreadPools; import org.apache.activemq.thread.Task; import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; @@ -88,6 +89,7 @@ import org.apache.activemq.transaction.Transaction; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.ResponseCorrelator; import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.MarshallingSupport; @@ -96,7 +98,7 @@ import org.apache.activemq.util.URISupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import static org.apache.activemq.thread.DefaultThreadPools.*; +import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory; /** * @version $Revision: 1.8 $ */ @@ -149,6 +151,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { private final TaskRunnerFactory taskRunnerFactory; private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); + private BrokerId duplexRemoteBrokerId; /** * @param connector @@ -1178,6 +1181,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // so this TransportConnection is the rear end of a network bridge // We have been requested to create a two way pipe ... try { + // We first look if existing network connection already exists for the same broker Id + // It's possible in case of brief network fault to have this transport connector side of the connection always active + // and the duplex network connector side wanting to open a new one + // In this case, the old connection must be broken + BrokerId remoteBrokerId = info.getBrokerId(); + setDuplexRemoteBrokerId(remoteBrokerId); + CopyOnWriteArrayList connections = this.connector.getConnections(); + for (Iterator iter = connections.iterator(); iter.hasNext();) { + TransportConnection c = iter.next(); + if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) { + LOG.warn("An existing duplex active connection already exists for this broker (" + remoteBrokerId + "). Stopping it."); + c.stop(); + } + } Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); Map props = createMap(properties); NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); @@ -1198,6 +1215,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor { duplexBridge.duplexStart(this, brokerInfo, info); LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); return null; + } catch (TransportDisposedIOException e) { + LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly stopped before it was correctly started."); + return null; } catch (Exception e) { LOG.error("Creating duplex network bridge", e); } @@ -1391,4 +1411,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor { protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { return connectionStateRegister.lookupConnectionState(connectionId); } + + protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) { + this.duplexRemoteBrokerId = remoteBrokerId; + } + + protected synchronized BrokerId getDuplexRemoteBrokerId() { + return this.duplexRemoteBrokerId; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 3d7c123344..9f0fb302cf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -233,14 +233,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // initiator side of duplex network remoteBrokerNameKnownLatch.await(); } - try { - triggerRemoteStartBridge(); - } catch (IOException e) { - LOG.warn("Caught exception from remote start", e); - } - NetworkBridgeListener l = this.networkBridgeListener; - if (l != null) { - l.onStart(this); + if (!disposed.get()) { + try { + triggerRemoteStartBridge(); + } catch (IOException e) { + LOG.warn("Caught exception from remote start", e); + } + NetworkBridgeListener l = this.networkBridgeListener; + if (l != null) { + l.onStart(this); + } + } else { + LOG.warn ("Bridge was disposed before the start() method was fully executed."); + throw new TransportDisposedIOException(); } } } @@ -285,30 +290,38 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } remoteBrokerNameKnownLatch.await(); - localConnectionInfo = new ConnectionInfo(); - localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); - localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); - localConnectionInfo.setClientId(localClientId); - localConnectionInfo.setUserName(configuration.getUserName()); - localConnectionInfo.setPassword(configuration.getPassword()); - Transport originalTransport = remoteBroker; - while (originalTransport instanceof TransportFilter) { - originalTransport = ((TransportFilter) originalTransport).getNext(); + if (!disposed.get()) { + localConnectionInfo = new ConnectionInfo(); + localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId())); + localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName(); + localConnectionInfo.setClientId(localClientId); + localConnectionInfo.setUserName(configuration.getUserName()); + localConnectionInfo.setPassword(configuration.getPassword()); + Transport originalTransport = remoteBroker; + while (originalTransport instanceof TransportFilter) { + originalTransport = ((TransportFilter) originalTransport).getNext(); + } + if (originalTransport instanceof SslTransport) { + X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); + localConnectionInfo.setTransportContext(peerCerts); + } + localBroker.oneway(localConnectionInfo); + + localSessionInfo = new SessionInfo(localConnectionInfo, 1); + localBroker.oneway(localSessionInfo); + + LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); + + } else { + LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed."); } - if (originalTransport instanceof SslTransport) { - X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates(); - localConnectionInfo.setTransportContext(peerCerts); - } - localBroker.oneway(localConnectionInfo); - - localSessionInfo = new SessionInfo(localConnectionInfo, 1); - localBroker.oneway(localSessionInfo); - - LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established."); - startedLatch.countDown(); localStartedLatch.countDown(); - setupStaticDestinations(); + if (!disposed.get()) { + setupStaticDestinations(); + } else { + LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment."); + } } } } @@ -408,6 +421,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped"); + remoteBrokerNameKnownLatch.countDown(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java index 3a5b125827..14e5d77fb8 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java @@ -21,13 +21,13 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.SslContext; import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; import org.apache.activemq.transport.discovery.DiscoveryListener; import org.apache.activemq.util.IntrospectionSupport; @@ -128,6 +128,8 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco try { bridge.start(); bridges.put(uri, bridge); + } catch (TransportDisposedIOException e) { + LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly stopped before it was correctly started."); } catch (Exception e) { ServiceSupport.dispose(localTransport); ServiceSupport.dispose(remoteTransport); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java new file mode 100755 index 0000000000..a3afe748c5 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/ServerSocketTstFactory.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.util.Random; +import javax.net.ServerSocketFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + */ +public class ServerSocketTstFactory extends ServerSocketFactory { + private static final Log LOG = LogFactory.getLog(ServerSocketTstFactory.class); + + private class ServerSocketTst { + + private final ServerSocket socket; + + public ServerSocketTst(int port, Random rnd) throws IOException { + this.socket = ServerSocketFactory.getDefault().createServerSocket(port); + } + + public ServerSocketTst(int port, int backlog, Random rnd) throws IOException { + this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog); + } + + public ServerSocketTst(int port, int backlog, InetAddress bindAddr, Random rnd) throws IOException { + this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog, bindAddr); + } + + public ServerSocket getSocket() { + return this.socket; + } + }; + + private final Random rnd; + + public ServerSocketTstFactory() { + super(); + LOG.info("Creating a new ServerSocketTstFactory"); + this.rnd = new Random(); + } + + public ServerSocket createServerSocket(int port) throws IOException { + ServerSocketTst sSock = new ServerSocketTst(port, this.rnd); + return sSock.getSocket(); + } + + public ServerSocket createServerSocket(int port, int backlog) throws IOException { + ServerSocketTst sSock = new ServerSocketTst(port, backlog, this.rnd); + return sSock.getSocket(); + } + + public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException { + ServerSocketTst sSock = new ServerSocketTst(port, backlog, ifAddress, this.rnd); + return sSock.getSocket(); + } + + private final static ServerSocketTstFactory server = new ServerSocketTstFactory(); + + public static ServerSocketTstFactory getDefault() { + return server; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java new file mode 100755 index 0000000000..f1215c767d --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/SocketTstFactory.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import javax.net.SocketFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision$ + * + * Automatically generated socket.close() calls to simulate network faults + */ +public class SocketTstFactory extends SocketFactory { + private static final Log LOG = LogFactory.getLog(SocketTstFactory.class); + + private static final ConcurrentHashMap closeIter = new ConcurrentHashMap(); + + private class SocketTst { + + private class Bagot implements Runnable { + private Thread processus; + private Random rnd; + private Socket socket; + private final InetAddress address; + + public Bagot(Random rnd, Socket socket, InetAddress address) { + this.processus = new Thread(this, "Network Faults maker : undefined"); + this.rnd = rnd; + this.socket = socket; + this.address = address; + } + + public void start() { + this.processus.setName("Network Faults maker : " + this.socket.toString()); + this.processus.start(); + } + + public void run () { + int lastDelayVal; + Integer lastDelay; + while (!this.processus.isInterrupted()) { + if (!this.socket.isClosed()) { + try { + lastDelay = closeIter.get(this.address); + if (lastDelay == null) { + lastDelayVal = 0; + } + else { + lastDelayVal = lastDelay.intValue(); + if (lastDelayVal > 10) + lastDelayVal += 20; + else lastDelayVal += 1; + } + + lastDelay = new Integer(lastDelayVal); + + LOG.info("Trying to close client socket " + socket.toString() + " in " + lastDelayVal + " milliseconds"); + + try { + Thread.sleep(lastDelayVal); + } catch (InterruptedException e) { + this.processus.interrupt(); + Thread.currentThread().interrupt(); + } catch (IllegalArgumentException e) { + } + + this.socket.close(); + closeIter.put(this.address, lastDelay); + LOG.info("Client socket " + this.socket.toString() + " is closed."); + } catch (IOException e) { + } + } + + this.processus.interrupt(); + } + } + } + + private final Bagot bagot; + private final Socket socket; + + public SocketTst(InetAddress address, int port, Random rnd) throws IOException { + this.socket = new Socket(address, port); + bagot = new Bagot(rnd, this.socket, address); + } + + public SocketTst(InetAddress address, int port, InetAddress localAddr, int localPort, Random rnd) throws IOException { + this.socket = new Socket(address, port, localAddr, localPort); + bagot = new Bagot(rnd, this.socket, address); + } + + public SocketTst(String address, int port, Random rnd) throws UnknownHostException, IOException { + this.socket = new Socket(address, port); + bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address)); + } + + public SocketTst(String address, int port, InetAddress localAddr, int localPort, Random rnd) throws IOException { + this.socket = new Socket(address, port, localAddr, localPort); + bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address)); + } + + public Socket getSocket() { + return this.socket; + } + + public void startBagot() { + bagot.start(); + } + }; + + private final Random rnd; + + public SocketTstFactory() { + super(); + LOG.info("Creating a new SocketTstFactory"); + this.rnd = new Random(); + } + + public Socket createSocket(InetAddress host, int port) throws IOException { + SocketTst sockTst; + sockTst = new SocketTst(host, port, this.rnd); + sockTst.startBagot(); + return sockTst.getSocket(); + } + + public Socket createSocket(InetAddress host, int port, InetAddress localAddress, int localPort) throws IOException { + SocketTst sockTst; + sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd); + sockTst.startBagot(); + return sockTst.getSocket(); + } + + public Socket createSocket(String host, int port) throws IOException { + SocketTst sockTst; + sockTst = new SocketTst(host, port, this.rnd); + sockTst.startBagot(); + return sockTst.getSocket(); + } + + public Socket createSocket(String host, int port, InetAddress localAddress, int localPort) throws IOException { + SocketTst sockTst; + sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd); + sockTst.startBagot(); + return sockTst.getSocket(); + } + + private final static SocketTstFactory client = new SocketTstFactory(); + + public static SocketFactory getDefault() { + return client; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java new file mode 100755 index 0000000000..282e1decc1 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransport.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +import org.apache.activemq.Service; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.TcpTransport; + +import org.apache.activemq.wireformat.WireFormat; + +import javax.net.SocketFactory; + +/** + * An implementation of the {@link Transport} interface using raw tcp/ip + * + * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) + * @version $Revision$ + */ +public class TcpFaultyTransport extends TcpTransport implements Transport, Service, Runnable { + + public TcpFaultyTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, + URI localLocation) throws UnknownHostException, IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + /** + * @return pretty print of 'this' + */ + public String toString() { + return "tcpfaulty://" + socket.getInetAddress() + ":" + socket.getPort(); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java new file mode 100755 index 0000000000..f4e52651d0 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportFactory.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ServerSocketFactory; +import javax.net.SocketFactory; + +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.transport.InactivityMonitor; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportLoggerFactory; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.WireFormatNegotiator; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.net.Socket; +import java.net.ServerSocket; +import java.net.InetAddress; + +import org.apache.activemq.transport.tcp.ServerSocketTstFactory; + +/** + * Automatically generated socket.close() calls to simulate network faults + */ +public class TcpFaultyTransportFactory extends TcpTransportFactory { + private static final Log LOG = LogFactory.getLog(TcpFaultyTransportFactory.class); + + protected TcpFaultyTransport createTcpFaultyTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException { + return new TcpFaultyTransport(wf, socketFactory, location, localLocation); + } + + protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { + URI localLocation = null; + String path = location.getPath(); + // see if the path is a local URI location + if (path != null && path.length() > 0) { + int localPortIndex = path.indexOf(':'); + try { + Integer.parseInt(path.substring(localPortIndex + 1, path.length())); + String localString = location.getScheme() + ":/" + path; + localLocation = new URI(localString); + } catch (Exception e) { + LOG.warn("path isn't a valid local location for TcpTransport to use", e); + } + } + SocketFactory socketFactory = createSocketFactory(); + return createTcpFaultyTransport(wf, socketFactory, location, localLocation); + } + + protected TcpFaultyTransportServer createTcpFaultyTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new TcpFaultyTransportServer(this, location, serverSocketFactory); + } + + public TransportServer doBind(final URI location) throws IOException { + try { + Map options = new HashMap(URISupport.parseParamters(location)); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + TcpFaultyTransportServer server = createTcpFaultyTransportServer(location, serverSocketFactory); + server.setWireFormatFactory(createWireFormatFactory(options)); + IntrospectionSupport.setProperties(server, options); + Map transportOptions = IntrospectionSupport.extractProperties(options, "transport."); + server.setTransportOption(transportOptions); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + + protected SocketFactory createSocketFactory() throws IOException { + return SocketTstFactory.getDefault(); + } + + + protected ServerSocketFactory createServerSocketFactory() throws IOException { + return ServerSocketTstFactory.getDefault(); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java new file mode 100755 index 0000000000..6255cc9c22 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/transport/tcp/TcpFaultyTransportServer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.tcp; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; + +import org.apache.activemq.util.ServiceListener; +import org.apache.activemq.transport.tcp.TcpTransportServer; + +import javax.net.ServerSocketFactory; + +/** + * A TCP based implementation of {@link TransportServer} + * + * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) + * @version $Revision$ + */ + +public class TcpFaultyTransportServer extends TcpTransportServer implements ServiceListener{ + + public TcpFaultyTransportServer(TcpFaultyTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory); + } + + /** + * @return pretty print of this + */ + public String toString() { + return "" + getBindLocation(); + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java new file mode 100644 index 0000000000..ade8a1d1ea --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.List; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; + +import junit.framework.Test; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.SocketProxy; + + +public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTestSupport { + protected static final int MESSAGE_COUNT = 200; + private static final String HUB = "HubBroker"; + private static final String SPOKE = "SpokeBroker"; + public boolean useDuplexNetworkBridge; + public boolean sumulateStalledNetwork; + + private TransportConnector mCastTrpConnector; + + public void initCombosForTestSendOnAFaultyTransport() { + addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE , Boolean.FALSE } ); + addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } ); + } + + public void testSendOnAFaultyTransport() throws Exception { + bridgeBrokers(SPOKE, HUB); + + startAllBrokers(); + + // Setup destination + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + MessageConsumer client = createConsumer(HUB, dest); + + // allow subscription information to flow back to Spoke + sleep(600); + + // Send messages + sendMessages(SPOKE, dest, MESSAGE_COUNT); + + MessageIdList msgs = getConsumerMessages(HUB, client); + msgs.setMaximumDuration(200000L); + msgs.waitForMessagesToArrive(MESSAGE_COUNT); + + assertTrue("At least message " + MESSAGE_COUNT + + " must be recieved, duplicates are expected, count=" + msgs.getMessageCount(), + MESSAGE_COUNT <= msgs.getMessageCount()); + } + + + @Override + protected void startAllBrokers() throws Exception { + // Ensure HUB is started first so bridge will be active from the get go + BrokerItem brokerItem = brokers.get(HUB); + brokerItem.broker.start(); + brokerItem = brokers.get(SPOKE); + brokerItem.broker.start(); + } + + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true"; + createBroker(new URI("broker:(tcpfaulty://localhost:61617)/" + HUB + options)); + createBroker(new URI("broker:(tcpfaulty://localhost:61616)/" + SPOKE + options)); + } + + public static Test suite() { + return suite(MulticastDiscoveryOnFaultyNetworkTest.class); + } + + @Override + protected void onSend(int i, TextMessage msg) { + sleep(50); + } + + private void sleep(int milliSecondTime) { + try { + Thread.sleep(milliSecondTime); + } catch (InterruptedException igonred) { + } + } + + + @Override + protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception { + DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC")); + connector.setDynamicOnly(dynamicOnly); + connector.setNetworkTTL(networkTTL); + localBroker.addNetworkConnector(connector); + maxSetupTime = 2000; + if (useDuplexNetworkBridge) { + connector.setDuplex(true); + } + + List transportConnectors = remoteBroker.getTransportConnectors(); + if (!transportConnectors.isEmpty()) { + mCastTrpConnector = ((TransportConnector)transportConnectors.get(0)); + mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC")); + } + return connector; + } +} diff --git a/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty b/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty new file mode 100644 index 0000000000..2c5278f2c5 --- /dev/null +++ b/activemq-core/src/test/resources/META-INF/services/org/apache/activemq/transport/tcpfaulty @@ -0,0 +1,17 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +class=org.apache.activemq.transport.tcp.TcpFaultyTransportFactory