https://issues.apache.org/jira/browse/AMQ-3353 - Durable subscribers on durable topics don't receive messages after network disconnect. Have suppression for durable subs be conditional on sub active such that reconnection/recreation/failover of bridge is not a problem. org.apache.activemq.usecases.NoDuplicateOnTopicNetworkTest still needs some work to deal with already exists errors from dupliate restarts.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1143482 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-07-06 16:11:03 +00:00
parent 40f9146939
commit f232ceced0
6 changed files with 278 additions and 18 deletions

View File

@ -32,8 +32,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion; import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -1046,7 +1048,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) { if (!networkConsumers.isEmpty()) {
if (matchFound(candidateConsumers, networkConsumers)) { if (matchFound(candidateConsumers, networkConsumers)) {
suppress = hasLowerPriority(sub, candidate.getLocalInfo()); suppress = isActiveDurableSub(sub) && hasLowerPriority(sub, candidate.getLocalInfo());
break; break;
} }
} }
@ -1054,6 +1056,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return suppress; return suppress;
} }
private boolean isActiveDurableSub(Subscription sub) {
return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && ((DurableTopicSubscription)sub).isActive());
}
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
boolean suppress = false; boolean suppress = false;

View File

@ -51,14 +51,14 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
private SocketProxy socketProxy; private SocketProxy socketProxy;
private long networkDownTimeStart; private long networkDownTimeStart;
public boolean useDuplexNetworkBridge = true; public boolean useDuplexNetworkBridge = true;
public boolean sumulateStalledNetwork; public boolean simulateStalledNetwork;
private long inactiveDuration = 1000; private long inactiveDuration = 1000;
private boolean useSocketProxy = true; private boolean useSocketProxy = true;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} ); addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} );
addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } ); addCombinationValues( "simulateStalledNetwork", new Object[]{ Boolean.TRUE } );
} }
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception { public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
@ -197,7 +197,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
protected void onSend(int i, TextMessage msg) { protected void onSend(int i, TextMessage msg) {
sleep(50); sleep(50);
if (i == 50 || i == 150) { if (i == 50 || i == 150) {
if (sumulateStalledNetwork) { if (simulateStalledNetwork) {
socketProxy.pause(); socketProxy.pause();
} else { } else {
socketProxy.close(); socketProxy.close();
@ -206,7 +206,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest
} else if (networkDownTimeStart > 0) { } else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds // restart after NETWORK_DOWN_TIME seconds
if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) { if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) {
if (sumulateStalledNetwork) { if (simulateStalledNetwork) {
socketProxy.goOn(); socketProxy.goOn();
} else { } else {
socketProxy.reopen(); socketProxy.reopen();

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import java.util.List;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkDisconnectTest.class);
private static final int NETWORK_DOWN_TIME = 10000;
private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker";
private SocketProxy socketProxy;
private long networkDownTimeStart;
private long inactiveDuration = 1000;
private long receivedMsgs = 0;
private boolean useSocketProxy = true;
protected static final int MESSAGE_COUNT = 200;
public boolean useDuplexNetworkBridge = true;
public boolean simulateStalledNetwork;
public boolean dynamicOnly = true;
public long networkTTL = 3;
public boolean exponentialBackOff;
public boolean failover = false;
public boolean inactivity = true;
public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
addCombinationValues("failover", new Object[]{Boolean.FALSE, Boolean.TRUE});
}
public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
bridgeBrokers(SPOKE, HUB);
startAllBrokers();
// Setup connection
URI hubURI = brokers.get(HUB).broker.getVmConnectorURI();
URI spokeURI = brokers.get(SPOKE).broker.getVmConnectorURI();
ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
Connection conHub = facHub.createConnection();
Connection conSpoke = facSpoke.createConnection();
conHub.setClientID("clientHUB");
conSpoke.setClientID("clientSPOKE");
conHub.start();
conSpoke.start();
Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
String consumerName = "consumerName";
// Setup consumers
MessageConsumer remoteConsumer = sesSpoke.createDurableSubscriber(topic, consumerName);
remoteConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
TextMessage textMsg = (TextMessage) msg;
receivedMsgs++;
LOG.info("Received messages (" + receivedMsgs + "): " + textMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// allow subscription information to flow back to Spoke
sleep(1000);
// Setup producer
MessageProducer localProducer = sesHub.createProducer(topic);
localProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// Send messages
for (int i = 0; i < MESSAGE_COUNT; i++) {
sleep(50);
if (i == 50 || i == 150) {
if (simulateStalledNetwork) {
socketProxy.pause();
} else {
socketProxy.close();
}
networkDownTimeStart = System.currentTimeMillis();
} else if (networkDownTimeStart > 0) {
// restart after NETWORK_DOWN_TIME seconds
sleep(NETWORK_DOWN_TIME);
networkDownTimeStart = 0;
if (simulateStalledNetwork) {
socketProxy.goOn();
} else {
socketProxy.reopen();
}
} else {
// slow message production to allow bridge to recover and limit message duplication
sleep(500);
}
Message test = sesHub.createTextMessage("test-" + i);
localProducer.send(test);
}
LOG.info("waiting for messages to flow");
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return receivedMsgs >= MESSAGE_COUNT;
}
});
assertTrue("At least message " + MESSAGE_COUNT +
" must be received, count=" + receivedMsgs,
MESSAGE_COUNT <= receivedMsgs);
brokers.get(HUB).broker.deleteAllMessages();
brokers.get(SPOKE).broker.deleteAllMessages();
conHub.close();
conSpoke.close();
}
@Override
protected void startAllBrokers() throws Exception {
// Ensure HUB is started first so bridge will be active from the get go
BrokerItem brokerItem = brokers.get(HUB);
brokerItem.broker.start();
brokerItem = brokers.get(SPOKE);
brokerItem.broker.start();
sleep(600);
}
public void setUp() throws Exception {
networkDownTimeStart = 0;
inactiveDuration = 1000;
useSocketProxy = true;
receivedMsgs = 0;
super.setAutoFail(true);
super.setUp();
final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options));
createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options));
}
public void tearDown() throws Exception {
super.tearDown();
if (socketProxy != null) {
socketProxy.close();
}
}
public static Test suite() {
return suite(DurableSubscriberWithNetworkDisconnectTest.class);
}
private void sleep(int milliSecondTime) {
try {
Thread.sleep(milliSecondTime);
} catch (InterruptedException igonred) {
}
}
@Override
protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean l_dynamicOnly, int networkTTL, boolean l_conduit, boolean l_failover) throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
remoteURI = ((TransportConnector) transportConnectors.get(0)).getConnectUri();
if (useSocketProxy) {
socketProxy = new SocketProxy(remoteURI);
remoteURI = socketProxy.getUrl();
}
String options = "";
if (failover) {
options = "static:(failover:(" + remoteURI;
} else {
options = "static:(" + remoteURI;
}
if (inactivity) {
options += "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + inactiveDuration + ")";
} else {
options += ")";
}
if (failover) {
options += "?maxReconnectAttempts=1)";
}
options += "?useExponentialBackOff=" + exponentialBackOff;
DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(options));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
if (useDuplexNetworkBridge) {
connector.setDuplex(true);
}
return connector;
} else {
throw new Exception("Remote broker has no registered connectors.");
}
}
}

View File

@ -38,7 +38,6 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes
private static final String HUB = "HubBroker"; private static final String HUB = "HubBroker";
private static final String SPOKE = "SpokeBroker"; private static final String SPOKE = "SpokeBroker";
public boolean useDuplexNetworkBridge; public boolean useDuplexNetworkBridge;
public boolean sumulateStalledNetwork;
private TransportConnector mCastTrpConnector; private TransportConnector mCastTrpConnector;

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -66,10 +67,12 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
public boolean suppressDuplicateTopicSubs = false; public boolean suppressDuplicateTopicSubs = false;
public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
public boolean durableSub = false;
AtomicInteger idCounter = new AtomicInteger(0);
private boolean dynamicOnly = false; private boolean dynamicOnly = false;
// no duplicates in cyclic network if networkTTL <=1 // no duplicates in cyclic network if networkTTL <=1
// when > 1, subscriptions perculate around resulting in duplicates as there is no // when > 1, subscriptions percolate around resulting in duplicates as there is no
// memory of the original subscription. // memory of the original subscription.
// solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds() // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
private int ttl = 3; private int ttl = 3;
@ -114,6 +117,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private BrokerService createAndStartBroker(String name, String addr) private BrokerService createAndStartBroker(String name, String addr)
throws Exception { throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
//broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(name); broker.setBrokerName(name);
broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT)); broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT));
broker.setUseJmx(false); broker.setUseJmx(false);
@ -148,8 +152,9 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
} }
public void initCombosForTestProducerConsumerTopic() { public void initCombosForTestProducerConsumerTopic() {
this.addCombinationValues("suppresDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE}); this.addCombinationValues("suppressDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE});
this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(), new SimpleDispatchPolicy()}); this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(), new SimpleDispatchPolicy()});
this.addCombinationValues("durableSub", new Object[]{Boolean.TRUE, Boolean.FALSE});
} }
public void testProducerConsumerTopic() throws Exception { public void testProducerConsumerTopic() throws Exception {
@ -206,6 +211,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
} }
map.put(msg, msg); map.put(msg, msg);
} }
consumer.unSubscribe();
if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy) { if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy) {
assertEquals("no duplicates", 0, duplicateCount); assertEquals("no duplicates", 0, duplicateCount);
assertEquals("got all required messages: " + map.size(), consumer assertEquals("got all required messages: " + map.size(), consumer
@ -227,6 +233,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
private Topic topic; private Topic topic;
private MessageProducer producer; private MessageProducer producer;
private MessageConsumer consumer; private MessageConsumer consumer;
private final String durableID = "DURABLE_ID";
private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>()); private List<String> receivedStrings = Collections.synchronizedList(new ArrayList<String>());
private int numMessages = 10; private int numMessages = 10;
@ -262,6 +269,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
brokerURL); brokerURL);
connection = factory.createConnection(); connection = factory.createConnection();
connection.setClientID("ID" + idCounter.incrementAndGet());
} }
private void createTopic() throws JMSException { private void createTopic() throws JMSException {
@ -274,7 +282,11 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
} }
private void createConsumer() throws JMSException { private void createConsumer() throws JMSException {
if (durableSub) {
consumer = session.createDurableSubscriber(topic, durableID);
} else {
consumer = session.createConsumer(topic); consumer = session.createConsumer(topic);
}
consumer.setMessageListener(new MessageListener() { consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) { public void onMessage(Message arg0) {
@ -319,5 +331,14 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport {
public int getNumMessages() { public int getNumMessages() {
return numMessages; return numMessages;
} }
public void unSubscribe() throws Exception {
consumer.close();
if (durableSub) {
session.unsubscribe(durableID);
// ensure un-subscription has percolated though the network
Thread.sleep(2000);
}
}
} }
} }

View File

@ -134,7 +134,7 @@ public class SocketProxy {
synchronized(this.connections) { synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections); connections = new ArrayList<Bridge>(this.connections);
} }
LOG.info("close, numConnectons=" + connections.size()); LOG.info("close, numConnections=" + connections.size());
for (Bridge con : connections) { for (Bridge con : connections) {
closeConnection(con); closeConnection(con);
} }
@ -151,7 +151,7 @@ public class SocketProxy {
synchronized(this.connections) { synchronized(this.connections) {
connections = new ArrayList<Bridge>(this.connections); connections = new ArrayList<Bridge>(this.connections);
} }
LOG.info("halfClose, numConnectons=" + connections.size()); LOG.info("halfClose, numConnections=" + connections.size());
for (Bridge con : connections) { for (Bridge con : connections) {
halfCloseConnection(con); halfCloseConnection(con);
} }
@ -174,12 +174,12 @@ public class SocketProxy {
} }
/* /*
* pause accepting new connecitons and data transfer through existing proxy * pause accepting new connections and data transfer through existing proxy
* connections. All sockets remain open * connections. All sockets remain open
*/ */
public void pause() { public void pause() {
synchronized(connections) { synchronized(connections) {
LOG.info("pause, numConnectons=" + connections.size()); LOG.info("pause, numConnections=" + connections.size());
acceptor.pause(); acceptor.pause();
for (Bridge con : connections) { for (Bridge con : connections) {
con.pause(); con.pause();
@ -192,7 +192,7 @@ public class SocketProxy {
*/ */
public void goOn() { public void goOn() {
synchronized(connections) { synchronized(connections) {
LOG.info("goOn, numConnectons=" + connections.size()); LOG.info("goOn, numConnections=" + connections.size());
for (Bridge con : connections) { for (Bridge con : connections) {
con.goOn(); con.goOn();
} }