revert/rework AMQ-1521, fix for AMQ-1973; new test case and SocketProxy that can be used to simulate network outage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@703447 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-10-10 13:11:49 +00:00
parent 01e520a827
commit 876118997b
6 changed files with 464 additions and 5 deletions

View File

@ -420,7 +420,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) { if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) {
serviceRemoteConsumerAdvisory(message.getDataStructure()); serviceRemoteConsumerAdvisory(message.getDataStructure());
} else { } else {
localBroker.oneway(message); if (message.isResponseRequired()) {
Response reply = new Response();
reply.setCorrelationId(message.getCommandId());
localBroker.oneway(message);
remoteBroker.oneway(reply);
} else {
localBroker.oneway(message);
}
} }
} else { } else {
switch (command.getDataStructureType()) { switch (command.getDataStructureType()) {
@ -436,6 +443,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring ConsumerInfo: "+ command); LOG.debug("Ignoring ConsumerInfo: "+ command);
} }
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding ConsumerInfo: "+ command);
}
} }
} else { } else {
// received a subscription whilst stopping // received a subscription whilst stopping
@ -606,10 +617,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
Message message = configureMessage(md); Message message = configureMessage(md);
if (trace) { if (trace) {
LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message); LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
LOG.trace("cameFromRemote = "+cameFromRemote); LOG.trace("cameFromRemote = "+cameFromRemote + ", repsonseRequired = " + message.isResponseRequired());
} }
if (!message.isResponseRequired() || isDuplex()) { if (!message.isResponseRequired()) {
// If the message was originally sent using async // If the message was originally sent using async
// send, we will preserve that QOS // send, we will preserve that QOS

View File

@ -177,7 +177,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* reads packets from a Socket * reads packets from a Socket
*/ */
public void run() { public void run() {
LOG.trace("TCP consumer thread starting"); LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread(); this.runnerThread=Thread.currentThread();
try { try {
while (!isStopped()) { while (!isStopped()) {

View File

@ -269,6 +269,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i); TextMessage msg = createTextMessage(sess, conn.getClientID() + ": Message-" + i);
producer.send(msg); producer.send(msg);
onSend(i, msg);
} }
producer.close(); producer.close();
@ -277,6 +278,9 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
brokerItem.connections.remove(conn); brokerItem.connections.remove(conn);
} }
protected void onSend(int i, TextMessage msg) {
}
protected TextMessage createTextMessage(Session session, String initText) throws Exception { protected TextMessage createTextMessage(Session session, String initText) throws Exception {
TextMessage msg = session.createTextMessage(); TextMessage msg = session.createTextMessage();

View File

@ -95,7 +95,7 @@ public class DuplexNetworkMBeanTest extends TestCase {
assertEquals(0, countMbeans(broker, "stopped")); assertEquals(0, countMbeans(broker, "stopped"));
} }
assertEquals(0, countMbeans(networkedBroker, "NetworkBridge")); //assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
assertEquals(1, countMbeans(networkedBroker, "Connector")); assertEquals(1, countMbeans(networkedBroker, "Connector"));
assertEquals(0, countMbeans(networkedBroker, "Connection")); assertEquals(0, countMbeans(networkedBroker, "Connection"));
assertEquals(0, countMbeans(broker, "Connection")); assertEquals(0, countMbeans(broker, "Connection"));

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.List;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTestSupport {
private static final int NETWORK_DOWN_TIME = 5000;
protected static final int MESSAGE_COUNT = 200;
private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker";
private SocketProxy socketProxy;
private long networkDownTimeStart;
public boolean useDuplexNetworkBridge;
public boolean sumulateStalledNetwork;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} );
addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } );
}
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
bridgeBrokers(SPOKE, HUB);
startAllBrokers();
// Setup destination
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
MessageConsumer client = createConsumer(HUB, dest);
// allow subscription information to flow back to Spoke
sleep(600);
// Send messages
sendMessages(SPOKE, dest, MESSAGE_COUNT);
MessageIdList msgs = getConsumerMessages(HUB, client);
msgs.waitForMessagesToArrive(MESSAGE_COUNT);
assertTrue("At least message " + MESSAGE_COUNT +
" must be recieved, duplicates are expected, count=" + msgs.getMessageCount(),
MESSAGE_COUNT <= msgs.getMessageCount());
}
@Override
protected void startAllBrokers() throws Exception {
// Ensure HUB is started first so bridge will be active from the get go
BrokerItem brokerItem = brokers.get(HUB);
brokerItem.broker.start();
brokerItem = brokers.get(SPOKE);
brokerItem.broker.start();
sleep(600);
}
public void setUp() throws Exception {
networkDownTimeStart = 0;
super.setAutoFail(true);
super.setUp();
final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
}
public static Test suite() {
return suite(BrokerQueueNetworkWithDisconnectTest.class);
}
@Override
protected void onSend(int i, TextMessage msg) {
sleep(50);
if (i == 50 || i == 150) {
if (sumulateStalledNetwork) {
socketProxy.pause();
} else {
socketProxy.close();
}
networkDownTimeStart = System.currentTimeMillis();
} else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds
if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) {
if (sumulateStalledNetwork) {
socketProxy.goOn();
} else {
socketProxy.reopen();
}
networkDownTimeStart = 0;
} else {
// slow message production to allow bridge to recover and limit message duplication
sleep(500);
}
}
super.onSend(i, msg);
}
private void sleep(int milliSecondTime) {
try {
Thread.sleep(milliSecondTime);
} catch (InterruptedException igonred) {
}
}
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
socketProxy = new SocketProxy(remoteURI);
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(" + socketProxy.getUrl()
+ "?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000)?useExponentialBackOff=false"));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
if (useDuplexNetworkBridge) {
connector.setDuplex(true);
}
return connector;
} else {
throw new Exception("Remote broker has no registered connectors.");
}
}
}

View File

@ -0,0 +1,283 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class SocketProxy {
private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
private URI proxyUrl;
private URI target;
private Acceptor acceptor;
private ServerSocket serverSocket;
public List<Connection> connections = new LinkedList<Connection>();
private int listenPort = 0;
public SocketProxy(URI uri) throws Exception {
this(0, uri);
}
public SocketProxy(int port, URI uri) throws Exception {
listenPort = port;
target = uri;
open();
}
protected void open() throws Exception {
if (proxyUrl == null) {
serverSocket = new ServerSocket(listenPort);
proxyUrl = urlFromSocket(target, serverSocket);
} else {
serverSocket = new ServerSocket(proxyUrl.getPort());
}
acceptor = new Acceptor(serverSocket, target);
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
}
public URI getUrl() {
return proxyUrl;
}
/*
* close all proxy connections and acceptor
*/
public void close() {
List<Connection> connections;
synchronized(this.connections) {
connections = new ArrayList<Connection>(this.connections);
}
LOG.info("close, numConnectons=" + connections.size());
for (Connection con : connections) {
closeConnection(con);
}
acceptor.close();
}
/*
* called after a close to restart the acceptor on the same port
*/
public void reopen() {
LOG.info("reopen");
try {
open();
} catch (Exception e) {
LOG.debug("exception on reopen url:" + getUrl(), e);
}
}
/*
* pause accepting new connecitons and data transfer through existing proxy
* connections. All sockets remain open
*/
public void pause() {
synchronized(connections) {
LOG.info("pause, numConnectons=" + connections.size());
acceptor.pause();
for (Connection con : connections) {
con.pause();
}
}
}
/*
* continue after pause
*/
public void goOn() {
synchronized(connections) {
LOG.info("goOn, numConnectons=" + connections.size());
for (Connection con : connections) {
con.goOn();
}
}
acceptor.goOn();
}
private void closeConnection(Connection c) {
try {
c.close();
} catch (Exception e) {
LOG.debug("exception on close of: " + c, e);
}
}
private URI urlFromSocket(URI uri, ServerSocket serverSocket) throws Exception {
int listenPort = serverSocket.getLocalPort();
return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
}
public class Connection {
private Socket receiveSocket;
private Socket sendSocket;
private Pump requestThread;
private Pump responseThread;
public Connection(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = new Socket(target.getHost(), target.getPort());
linkWithThreads(receiveSocket, sendSocket);
LOG.info("proxy connection " + sendSocket);
}
public void goOn() {
responseThread.goOn();
requestThread.goOn();
}
public void pause() {
requestThread.pause();
responseThread.pause();
}
public void close() throws Exception {
synchronized(connections) {
connections.remove(this);
}
receiveSocket.close();
sendSocket.close();
}
private void linkWithThreads(Socket source, Socket dest) {
requestThread = new Pump(source, dest);
responseThread = new Pump(dest, source);
requestThread.start();
responseThread.start();
}
public class Pump extends Thread {
protected Socket src;
private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Pump(Socket source, Socket dest) {
super("SocketProxy-DataTransfer-" + source.getPort() + ":" + dest.getPort());
src = source;
destination = dest;
pause.set(new CountDownLatch(0));
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
byte[] buf = new byte[1024];
try {
InputStream in = src.getInputStream();
OutputStream out = destination.getOutputStream();
while (true) {
int len = in.read(buf);
if (len == -1) {
break;
}
pause.get().await();
out.write(buf, 0, len);
}
} catch (Exception e) {
LOG.debug("read/write failed, reason: " + e.getLocalizedMessage());
try {
close();
} catch (Exception ignore) {
}
}
}
}
}
public class Acceptor implements Runnable {
private ServerSocket socket;
private URI target;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Acceptor(ServerSocket serverSocket, URI uri) {
socket = serverSocket;
target = uri;
pause.set(new CountDownLatch(0));
try {
socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
} catch (SocketException e) {
e.printStackTrace();
}
}
public void pause() {
pause.set(new CountDownLatch(1));
}
public void goOn() {
pause.get().countDown();
}
public void run() {
try {
while(!socket.isClosed()) {
pause.get().await();
try {
Socket source = socket.accept();
LOG.info("accepted " + source);
synchronized(connections) {
connections.add(new Connection(source, target));
}
} catch (SocketTimeoutException expected) {
}
}
} catch (Exception e) {
LOG.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
}
}
public void close() {
try {
socket.close();
goOn();
} catch (IOException ignored) {
}
}
}
}