git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@980014 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-28 10:43:27 +00:00
parent 43c2315257
commit 6b4509cdea
10 changed files with 697 additions and 32 deletions

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -37,6 +38,7 @@ import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
@ -80,7 +82,6 @@ import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -88,6 +89,7 @@ import org.apache.activemq.transaction.Transaction;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
@ -96,7 +98,7 @@ import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.activemq.thread.DefaultThreadPools.*;
import static org.apache.activemq.thread.DefaultThreadPools.getDefaultTaskRunnerFactory;
/**
* @version $Revision: 1.8 $
*/
@ -149,6 +151,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock();
private BrokerId duplexRemoteBrokerId;
/**
* @param connector
@ -1178,6 +1181,20 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
// We first look if existing network connection already exists for the same broker Id
// It's possible in case of brief network fault to have this transport connector side of the connection always active
// and the duplex network connector side wanting to open a new one
// In this case, the old connection must be broken
BrokerId remoteBrokerId = info.getBrokerId();
setDuplexRemoteBrokerId(remoteBrokerId);
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
TransportConnection c = iter.next();
if ((c != this) && (remoteBrokerId.equals(c.getDuplexRemoteBrokerId()))) {
LOG.warn("An existing duplex active connection already exists for this broker (" + remoteBrokerId + "). Stopping it.");
c.stop();
}
}
Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
Map<String, String> props = createMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
@ -1198,6 +1215,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
return null;
} catch (TransportDisposedIOException e) {
LOG.warn("Duplex Bridge back to " + info.getBrokerName() + " was correctly stopped before it was correctly started.");
return null;
} catch (Exception e) {
LOG.error("Creating duplex network bridge", e);
}
@ -1391,4 +1411,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
protected synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId);
}
protected synchronized void setDuplexRemoteBrokerId(BrokerId remoteBrokerId) {
this.duplexRemoteBrokerId = remoteBrokerId;
}
protected synchronized BrokerId getDuplexRemoteBrokerId() {
return this.duplexRemoteBrokerId;
}
}

View File

@ -233,6 +233,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// initiator side of duplex network
remoteBrokerNameKnownLatch.await();
}
if (!disposed.get()) {
try {
triggerRemoteStartBridge();
} catch (IOException e) {
@ -242,6 +243,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (l != null) {
l.onStart(this);
}
} else {
LOG.warn ("Bridge was disposed before the start() method was fully executed.");
throw new TransportDisposedIOException();
}
}
}
@ -285,6 +290,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
remoteBrokerNameKnownLatch.await();
if (!disposed.get()) {
localConnectionInfo = new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId = "NC_" + remoteBrokerName + "_inbound_" + configuration.getBrokerName();
@ -306,9 +312,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
} else {
LOG.warn ("Bridge was disposed before the startLocalBridge() method was fully executed.");
}
startedLatch.countDown();
localStartedLatch.countDown();
if (!disposed.get()) {
setupStaticDestinations();
} else {
LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") was interrupted during establishment.");
}
}
}
}
@ -408,6 +421,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
remoteBrokerNameKnownLatch.countDown();
}
}

View File

@ -21,13 +21,13 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryListener;
import org.apache.activemq.util.IntrospectionSupport;
@ -128,6 +128,8 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
try {
bridge.start();
bridges.put(uri, bridge);
} catch (TransportDisposedIOException e) {
LOG.warn("Network bridge between: " + localURI + " and: " + uri + " was correctly stopped before it was correctly started.");
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Random;
import javax.net.ServerSocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*/
public class ServerSocketTstFactory extends ServerSocketFactory {
private static final Log LOG = LogFactory.getLog(ServerSocketTstFactory.class);
private class ServerSocketTst {
private final ServerSocket socket;
public ServerSocketTst(int port, Random rnd) throws IOException {
this.socket = ServerSocketFactory.getDefault().createServerSocket(port);
}
public ServerSocketTst(int port, int backlog, Random rnd) throws IOException {
this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog);
}
public ServerSocketTst(int port, int backlog, InetAddress bindAddr, Random rnd) throws IOException {
this.socket = ServerSocketFactory.getDefault().createServerSocket(port, backlog, bindAddr);
}
public ServerSocket getSocket() {
return this.socket;
}
};
private final Random rnd;
public ServerSocketTstFactory() {
super();
LOG.info("Creating a new ServerSocketTstFactory");
this.rnd = new Random();
}
public ServerSocket createServerSocket(int port) throws IOException {
ServerSocketTst sSock = new ServerSocketTst(port, this.rnd);
return sSock.getSocket();
}
public ServerSocket createServerSocket(int port, int backlog) throws IOException {
ServerSocketTst sSock = new ServerSocketTst(port, backlog, this.rnd);
return sSock.getSocket();
}
public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
ServerSocketTst sSock = new ServerSocketTst(port, backlog, ifAddress, this.rnd);
return sSock.getSocket();
}
private final static ServerSocketTstFactory server = new ServerSocketTstFactory();
public static ServerSocketTstFactory getDefault() {
return server;
}
}

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*
* Automatically generated socket.close() calls to simulate network faults
*/
public class SocketTstFactory extends SocketFactory {
private static final Log LOG = LogFactory.getLog(SocketTstFactory.class);
private static final ConcurrentHashMap<InetAddress, Integer> closeIter = new ConcurrentHashMap<InetAddress, Integer>();
private class SocketTst {
private class Bagot implements Runnable {
private Thread processus;
private Random rnd;
private Socket socket;
private final InetAddress address;
public Bagot(Random rnd, Socket socket, InetAddress address) {
this.processus = new Thread(this, "Network Faults maker : undefined");
this.rnd = rnd;
this.socket = socket;
this.address = address;
}
public void start() {
this.processus.setName("Network Faults maker : " + this.socket.toString());
this.processus.start();
}
public void run () {
int lastDelayVal;
Integer lastDelay;
while (!this.processus.isInterrupted()) {
if (!this.socket.isClosed()) {
try {
lastDelay = closeIter.get(this.address);
if (lastDelay == null) {
lastDelayVal = 0;
}
else {
lastDelayVal = lastDelay.intValue();
if (lastDelayVal > 10)
lastDelayVal += 20;
else lastDelayVal += 1;
}
lastDelay = new Integer(lastDelayVal);
LOG.info("Trying to close client socket " + socket.toString() + " in " + lastDelayVal + " milliseconds");
try {
Thread.sleep(lastDelayVal);
} catch (InterruptedException e) {
this.processus.interrupt();
Thread.currentThread().interrupt();
} catch (IllegalArgumentException e) {
}
this.socket.close();
closeIter.put(this.address, lastDelay);
LOG.info("Client socket " + this.socket.toString() + " is closed.");
} catch (IOException e) {
}
}
this.processus.interrupt();
}
}
}
private final Bagot bagot;
private final Socket socket;
public SocketTst(InetAddress address, int port, Random rnd) throws IOException {
this.socket = new Socket(address, port);
bagot = new Bagot(rnd, this.socket, address);
}
public SocketTst(InetAddress address, int port, InetAddress localAddr, int localPort, Random rnd) throws IOException {
this.socket = new Socket(address, port, localAddr, localPort);
bagot = new Bagot(rnd, this.socket, address);
}
public SocketTst(String address, int port, Random rnd) throws UnknownHostException, IOException {
this.socket = new Socket(address, port);
bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address));
}
public SocketTst(String address, int port, InetAddress localAddr, int localPort, Random rnd) throws IOException {
this.socket = new Socket(address, port, localAddr, localPort);
bagot = new Bagot(rnd, this.socket, InetAddress.getByName(address));
}
public Socket getSocket() {
return this.socket;
}
public void startBagot() {
bagot.start();
}
};
private final Random rnd;
public SocketTstFactory() {
super();
LOG.info("Creating a new SocketTstFactory");
this.rnd = new Random();
}
public Socket createSocket(InetAddress host, int port) throws IOException {
SocketTst sockTst;
sockTst = new SocketTst(host, port, this.rnd);
sockTst.startBagot();
return sockTst.getSocket();
}
public Socket createSocket(InetAddress host, int port, InetAddress localAddress, int localPort) throws IOException {
SocketTst sockTst;
sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
sockTst.startBagot();
return sockTst.getSocket();
}
public Socket createSocket(String host, int port) throws IOException {
SocketTst sockTst;
sockTst = new SocketTst(host, port, this.rnd);
sockTst.startBagot();
return sockTst.getSocket();
}
public Socket createSocket(String host, int port, InetAddress localAddress, int localPort) throws IOException {
SocketTst sockTst;
sockTst = new SocketTst(host, port, localAddress, localPort, this.rnd);
sockTst.startBagot();
return sockTst.getSocket();
}
private final static SocketTstFactory client = new SocketTstFactory();
public static SocketFactory getDefault() {
return client;
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.wireformat.WireFormat;
import javax.net.SocketFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
* @version $Revision$
*/
public class TcpFaultyTransport extends TcpTransport implements Transport, Service, Runnable {
public TcpFaultyTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
URI localLocation) throws UnknownHostException, IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
/**
* @return pretty print of 'this'
*/
public String toString() {
return "tcpfaulty://" + socket.getInetAddress() + ":" + socket.getPort();
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.Socket;
import java.net.ServerSocket;
import java.net.InetAddress;
import org.apache.activemq.transport.tcp.ServerSocketTstFactory;
/**
* Automatically generated socket.close() calls to simulate network faults
*/
public class TcpFaultyTransportFactory extends TcpTransportFactory {
private static final Log LOG = LogFactory.getLog(TcpFaultyTransportFactory.class);
protected TcpFaultyTransport createTcpFaultyTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new TcpFaultyTransport(wf, socketFactory, location, localLocation);
}
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(':');
try {
Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
} catch (Exception e) {
LOG.warn("path isn't a valid local location for TcpTransport to use", e);
}
}
SocketFactory socketFactory = createSocketFactory();
return createTcpFaultyTransport(wf, socketFactory, location, localLocation);
}
protected TcpFaultyTransportServer createTcpFaultyTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpFaultyTransportServer(this, location, serverSocketFactory);
}
public TransportServer doBind(final URI location) throws IOException {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
TcpFaultyTransportServer server = createTcpFaultyTransportServer(location, serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions);
server.bind();
return server;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
protected SocketFactory createSocketFactory() throws IOException {
return SocketTstFactory.getDefault();
}
protected ServerSocketFactory createServerSocketFactory() throws IOException {
return ServerSocketTstFactory.getDefault();
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.tcp;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import org.apache.activemq.util.ServiceListener;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import javax.net.ServerSocketFactory;
/**
* A TCP based implementation of {@link TransportServer}
*
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
* @version $Revision$
*/
public class TcpFaultyTransportServer extends TcpTransportServer implements ServiceListener{
public TcpFaultyTransportServer(TcpFaultyTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(transportFactory, location, serverSocketFactory);
}
/**
* @return pretty print of this
*/
public String toString() {
return "" + getBindLocation();
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.List;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTestSupport {
protected static final int MESSAGE_COUNT = 200;
private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker";
public boolean useDuplexNetworkBridge;
public boolean sumulateStalledNetwork;
private TransportConnector mCastTrpConnector;
public void initCombosForTestSendOnAFaultyTransport() {
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE , Boolean.FALSE } );
addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
}
public void testSendOnAFaultyTransport() throws Exception {
bridgeBrokers(SPOKE, HUB);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer client = createConsumer(HUB, dest);
// allow subscription information to flow back to Spoke
sleep(600);
// Send messages
sendMessages(SPOKE, dest, MESSAGE_COUNT);
MessageIdList msgs = getConsumerMessages(HUB, client);
msgs.setMaximumDuration(200000L);
msgs.waitForMessagesToArrive(MESSAGE_COUNT);
assertTrue("At least message " + MESSAGE_COUNT +
" must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
MESSAGE_COUNT <= msgs.getMessageCount());
}
@Override
protected void startAllBrokers() throws Exception {
// Ensure HUB is started first so bridge will be active from the get go
BrokerItem brokerItem = brokers.get(HUB);
brokerItem.broker.start();
brokerItem = brokers.get(SPOKE);
brokerItem.broker.start();
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
final String options = "?persistent=false&useJmx=false&deleteAllMessagesOnStartup=true";
createBroker(new URI("broker:(tcpfaulty://localhost:61617)/" + HUB + options));
createBroker(new URI("broker:(tcpfaulty://localhost:61616)/" + SPOKE + options));
}
public static Test suite() {
return suite(MulticastDiscoveryOnFaultyNetworkTest.class);
}
@Override
protected void onSend(int i, TextMessage msg) {
sleep(50);
}
private void sleep(int milliSecondTime) {
try {
Thread.sleep(milliSecondTime);
} catch (InterruptedException igonred) {
}
}
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean conduit) throws Exception {
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("multicast://default?group=TESTERIC"));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
if (useDuplexNetworkBridge) {
connector.setDuplex(true);
}
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
if (!transportConnectors.isEmpty()) {
mCastTrpConnector = ((TransportConnector)transportConnectors.get(0));
mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC"));
}
return connector;
}
}

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.tcp.TcpFaultyTransportFactory