From f232ceced0f8b697dad7526fed011122179f2ad5 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 6 Jul 2011 16:11:03 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3353 - Durable subscribers on durable topics don't receive messages after network disconnect. Have suppression for durable subs be conditional on sub active such that reconnection/recreation/failover of bridge is not a problem. org.apache.activemq.usecases.NoDuplicateOnTopicNetworkTest still needs some work to deal with already exists errors from dupliate restarts. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1143482 13f79535-47bb-0310-9956-ffa450edef68 --- .../DemandForwardingBridgeSupport.java | 8 +- .../BrokerQueueNetworkWithDisconnectTest.java | 8 +- ...leSubscriberWithNetworkDisconnectTest.java | 234 ++++++++++++++++++ ...MulticastDiscoveryOnFaultyNetworkTest.java | 9 +- .../NoDuplicateOnTopicNetworkTest.java | 27 +- .../org/apache/activemq/util/SocketProxy.java | 10 +- 6 files changed, 278 insertions(+), 18 deletions(-) create mode 100644 activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 149981ea11..36c1e09aa4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -32,8 +32,10 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.broker.region.AbstractRegion; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -1046,7 +1048,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br List networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds(); if (!networkConsumers.isEmpty()) { if (matchFound(candidateConsumers, networkConsumers)) { - suppress = hasLowerPriority(sub, candidate.getLocalInfo()); + suppress = isActiveDurableSub(sub) && hasLowerPriority(sub, candidate.getLocalInfo()); break; } } @@ -1054,6 +1056,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return suppress; } + private boolean isActiveDurableSub(Subscription sub) { + return (sub.getConsumerInfo().isDurable() && sub instanceof DurableTopicSubscription && ((DurableTopicSubscription)sub).isActive()); + } + private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) { boolean suppress = false; diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java index 5b4980b13a..c0b71c304f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java @@ -51,14 +51,14 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest private SocketProxy socketProxy; private long networkDownTimeStart; public boolean useDuplexNetworkBridge = true; - public boolean sumulateStalledNetwork; + public boolean simulateStalledNetwork; private long inactiveDuration = 1000; private boolean useSocketProxy = true; public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { addCombinationValues( "useDuplexNetworkBridge", new Object[]{ Boolean.TRUE, Boolean.FALSE} ); - addCombinationValues( "sumulateStalledNetwork", new Object[]{ Boolean.TRUE } ); + addCombinationValues( "simulateStalledNetwork", new Object[]{ Boolean.TRUE } ); } public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception { @@ -197,7 +197,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest protected void onSend(int i, TextMessage msg) { sleep(50); if (i == 50 || i == 150) { - if (sumulateStalledNetwork) { + if (simulateStalledNetwork) { socketProxy.pause(); } else { socketProxy.close(); @@ -206,7 +206,7 @@ public class BrokerQueueNetworkWithDisconnectTest extends JmsMultipleBrokersTest } else if (networkDownTimeStart > 0) { // restart after NETWORK_DOWN_TIME seconds if (networkDownTimeStart + NETWORK_DOWN_TIME < System.currentTimeMillis()) { - if (sumulateStalledNetwork) { + if (simulateStalledNetwork) { socketProxy.goOn(); } else { socketProxy.reopen(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java new file mode 100644 index 0000000000..e042b30ef4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriberWithNetworkDisconnectTest.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.net.URI; +import java.util.List; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.SocketProxy; +import org.apache.activemq.util.Wait; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public class DurableSubscriberWithNetworkDisconnectTest extends JmsMultipleBrokersTestSupport { + private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkDisconnectTest.class); + private static final int NETWORK_DOWN_TIME = 10000; + private static final String HUB = "HubBroker"; + private static final String SPOKE = "SpokeBroker"; + private SocketProxy socketProxy; + private long networkDownTimeStart; + private long inactiveDuration = 1000; + private long receivedMsgs = 0; + private boolean useSocketProxy = true; + protected static final int MESSAGE_COUNT = 200; + public boolean useDuplexNetworkBridge = true; + public boolean simulateStalledNetwork; + public boolean dynamicOnly = true; + public long networkTTL = 3; + public boolean exponentialBackOff; + public boolean failover = false; + public boolean inactivity = true; + + public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() { + addCombinationValues("failover", new Object[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception { + bridgeBrokers(SPOKE, HUB); + + startAllBrokers(); + + // Setup connection + URI hubURI = brokers.get(HUB).broker.getVmConnectorURI(); + URI spokeURI = brokers.get(SPOKE).broker.getVmConnectorURI(); + ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI); + ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI); + Connection conHub = facHub.createConnection(); + Connection conSpoke = facSpoke.createConnection(); + conHub.setClientID("clientHUB"); + conSpoke.setClientID("clientSPOKE"); + conHub.start(); + conSpoke.start(); + Session sesHub = conHub.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session sesSpoke = conSpoke.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO"); + String consumerName = "consumerName"; + + // Setup consumers + MessageConsumer remoteConsumer = sesSpoke.createDurableSubscriber(topic, consumerName); + remoteConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message msg) { + try { + TextMessage textMsg = (TextMessage) msg; + receivedMsgs++; + LOG.info("Received messages (" + receivedMsgs + "): " + textMsg.getText()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + // allow subscription information to flow back to Spoke + sleep(1000); + + // Setup producer + MessageProducer localProducer = sesHub.createProducer(topic); + localProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + + // Send messages + for (int i = 0; i < MESSAGE_COUNT; i++) { + sleep(50); + if (i == 50 || i == 150) { + if (simulateStalledNetwork) { + socketProxy.pause(); + } else { + socketProxy.close(); + } + networkDownTimeStart = System.currentTimeMillis(); + } else if (networkDownTimeStart > 0) { + // restart after NETWORK_DOWN_TIME seconds + sleep(NETWORK_DOWN_TIME); + networkDownTimeStart = 0; + if (simulateStalledNetwork) { + socketProxy.goOn(); + } else { + socketProxy.reopen(); + } + } else { + // slow message production to allow bridge to recover and limit message duplication + sleep(500); + } + Message test = sesHub.createTextMessage("test-" + i); + localProducer.send(test); + } + + LOG.info("waiting for messages to flow"); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return receivedMsgs >= MESSAGE_COUNT; + } + }); + + assertTrue("At least message " + MESSAGE_COUNT + + " must be received, count=" + receivedMsgs, + MESSAGE_COUNT <= receivedMsgs); + brokers.get(HUB).broker.deleteAllMessages(); + brokers.get(SPOKE).broker.deleteAllMessages(); + conHub.close(); + conSpoke.close(); + } + + @Override + protected void startAllBrokers() throws Exception { + // Ensure HUB is started first so bridge will be active from the get go + BrokerItem brokerItem = brokers.get(HUB); + brokerItem.broker.start(); + brokerItem = brokers.get(SPOKE); + brokerItem.broker.start(); + sleep(600); + } + + public void setUp() throws Exception { + networkDownTimeStart = 0; + inactiveDuration = 1000; + useSocketProxy = true; + receivedMsgs = 0; + super.setAutoFail(true); + super.setUp(); + final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"; + createBroker(new URI("broker:(tcp://localhost:61617)/" + HUB + options)); + createBroker(new URI("broker:(tcp://localhost:61616)/" + SPOKE + options)); + } + + public void tearDown() throws Exception { + super.tearDown(); + if (socketProxy != null) { + socketProxy.close(); + } + } + + public static Test suite() { + return suite(DurableSubscriberWithNetworkDisconnectTest.class); + } + + private void sleep(int milliSecondTime) { + try { + Thread.sleep(milliSecondTime); + } catch (InterruptedException igonred) { + } + } + + @Override + protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean l_dynamicOnly, int networkTTL, boolean l_conduit, boolean l_failover) throws Exception { + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI; + if (!transportConnectors.isEmpty()) { + remoteURI = ((TransportConnector) transportConnectors.get(0)).getConnectUri(); + if (useSocketProxy) { + socketProxy = new SocketProxy(remoteURI); + remoteURI = socketProxy.getUrl(); + } + String options = ""; + if (failover) { + options = "static:(failover:(" + remoteURI; + } else { + options = "static:(" + remoteURI; + } + if (inactivity) { + options += "?wireFormat.maxInactivityDuration=" + inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + inactiveDuration + ")"; + } else { + options += ")"; + } + + if (failover) { + options += "?maxReconnectAttempts=1)"; + } + + options += "?useExponentialBackOff=" + exponentialBackOff; + DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(options)); + connector.setDynamicOnly(dynamicOnly); + connector.setNetworkTTL(networkTTL); + localBroker.addNetworkConnector(connector); + maxSetupTime = 2000; + if (useDuplexNetworkBridge) { + connector.setDuplex(true); + } + return connector; + } else { + throw new Exception("Remote broker has no registered connectors."); + } + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java index 882792e3d5..dce25b00fe 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MulticastDiscoveryOnFaultyNetworkTest.java @@ -38,7 +38,6 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes private static final String HUB = "HubBroker"; private static final String SPOKE = "SpokeBroker"; public boolean useDuplexNetworkBridge; - public boolean sumulateStalledNetwork; private TransportConnector mCastTrpConnector; @@ -121,9 +120,9 @@ public class MulticastDiscoveryOnFaultyNetworkTest extends JmsMultipleBrokersTes List transportConnectors = remoteBroker.getTransportConnectors(); if (!transportConnectors.isEmpty()) { - mCastTrpConnector = ((TransportConnector)transportConnectors.get(0)); - mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC")); - } - return connector; + mCastTrpConnector = ((TransportConnector)transportConnectors.get(0)); + mCastTrpConnector.setDiscoveryUri(new URI("multicast://default?group=TESTERIC")); + } + return connector; } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index 277bcc8284..d52ac8f2e8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.JMSException; @@ -66,10 +67,12 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { public boolean suppressDuplicateTopicSubs = false; public DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); + public boolean durableSub = false; + AtomicInteger idCounter = new AtomicInteger(0); private boolean dynamicOnly = false; // no duplicates in cyclic network if networkTTL <=1 - // when > 1, subscriptions perculate around resulting in duplicates as there is no + // when > 1, subscriptions percolate around resulting in duplicates as there is no // memory of the original subscription. // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds() private int ttl = 3; @@ -114,6 +117,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private BrokerService createAndStartBroker(String name, String addr) throws Exception { BrokerService broker = new BrokerService(); + //broker.setDeleteAllMessagesOnStartup(true); broker.setBrokerName(name); broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT)); broker.setUseJmx(false); @@ -148,8 +152,9 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { } public void initCombosForTestProducerConsumerTopic() { - this.addCombinationValues("suppresDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE}); + this.addCombinationValues("suppressDuplicateTopicSubs", new Object[]{Boolean.TRUE, Boolean.FALSE}); this.addCombinationValues("dispatchPolicy", new Object[]{new PriorityNetworkDispatchPolicy(), new SimpleDispatchPolicy()}); + this.addCombinationValues("durableSub", new Object[]{Boolean.TRUE, Boolean.FALSE}); } public void testProducerConsumerTopic() throws Exception { @@ -206,6 +211,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { } map.put(msg, msg); } + consumer.unSubscribe(); if (suppressDuplicateTopicSubs || dispatchPolicy instanceof PriorityNetworkDispatchPolicy) { assertEquals("no duplicates", 0, duplicateCount); assertEquals("got all required messages: " + map.size(), consumer @@ -227,6 +233,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private Topic topic; private MessageProducer producer; private MessageConsumer consumer; + private final String durableID = "DURABLE_ID"; private List receivedStrings = Collections.synchronizedList(new ArrayList()); private int numMessages = 10; @@ -262,6 +269,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( brokerURL); connection = factory.createConnection(); + connection.setClientID("ID" + idCounter.incrementAndGet()); } private void createTopic() throws JMSException { @@ -274,7 +282,11 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { } private void createConsumer() throws JMSException { - consumer = session.createConsumer(topic); + if (durableSub) { + consumer = session.createDurableSubscriber(topic, durableID); + } else { + consumer = session.createConsumer(topic); + } consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { @@ -319,5 +331,14 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { public int getNumMessages() { return numMessages; } + + public void unSubscribe() throws Exception { + consumer.close(); + if (durableSub) { + session.unsubscribe(durableID); + // ensure un-subscription has percolated though the network + Thread.sleep(2000); + } + } } } diff --git a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java index 822096efe0..72699b23eb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java +++ b/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java @@ -134,7 +134,7 @@ public class SocketProxy { synchronized(this.connections) { connections = new ArrayList(this.connections); } - LOG.info("close, numConnectons=" + connections.size()); + LOG.info("close, numConnections=" + connections.size()); for (Bridge con : connections) { closeConnection(con); } @@ -151,7 +151,7 @@ public class SocketProxy { synchronized(this.connections) { connections = new ArrayList(this.connections); } - LOG.info("halfClose, numConnectons=" + connections.size()); + LOG.info("halfClose, numConnections=" + connections.size()); for (Bridge con : connections) { halfCloseConnection(con); } @@ -174,12 +174,12 @@ public class SocketProxy { } /* - * pause accepting new connecitons and data transfer through existing proxy + * pause accepting new connections and data transfer through existing proxy * connections. All sockets remain open */ public void pause() { synchronized(connections) { - LOG.info("pause, numConnectons=" + connections.size()); + LOG.info("pause, numConnections=" + connections.size()); acceptor.pause(); for (Bridge con : connections) { con.pause(); @@ -192,7 +192,7 @@ public class SocketProxy { */ public void goOn() { synchronized(connections) { - LOG.info("goOn, numConnectons=" + connections.size()); + LOG.info("goOn, numConnections=" + connections.size()); for (Bridge con : connections) { con.goOn(); }