diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 28d136fe84..57afc85d11 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br protected ActiveMQDestination[] durableDestinations; protected final ConcurrentMap subscriptionMapByLocalId = new ConcurrentHashMap<>(); protected final ConcurrentMap subscriptionMapByRemoteId = new ConcurrentHashMap<>(); - protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap()); + protected final Set forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>()); + protected final ConcurrentMap> compositeConsumerIds = new ConcurrentHashMap<>(); + protected final ConcurrentMap> compositeSubscriptions = new ConcurrentHashMap<>(); protected final BrokerId localBrokerPath[] = new BrokerId[]{null}; protected final CountDownLatch startedLatch = new CountDownLatch(2); protected final CountDownLatch localStartedLatch = new CountDownLatch(1); @@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveInfo.class) { ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId(); + + // If we have an entry in compositeConsumerIds then this consumer was a + // composite consumer and we need to remove the entries in the set and + // not the consumer id we received here + final Set compositeIds = compositeConsumerIds.remove(id); + if (compositeIds != null) { + for (ConsumerId compositeId : compositeIds) { + serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId)); + } + return; + } + removeDemandSubscription(id); if (forcedDurableRemoteId.remove(id)) { @@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } else if (data.getClass() == RemoveSubscriptionInfo.class) { final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + + // If we have an entry in compositeSubscriptions then this consumer was a + // composite consumer and we need to remove the entries in the set and not + // the subscription that we received here + final Set compositeSubs = + this.compositeSubscriptions.remove(subscriptionInfo); + if (compositeSubs != null) { + for (SubscriptionInfo compositeSub : compositeSubs) { + RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo(); + remove.setClientId(compositeSub.getClientId()); + remove.setSubscriptionName(compositeSub.getSubscriptionName()); + remove.setConnectionId(this.localConnectionInfo.getConnectionId()); + serviceRemoteConsumerAdvisory(remove); + } + return; + } + final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName()); for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { @@ -1415,6 +1447,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException { + // Check if this was processed and split into new consumers for composite dests + if (splitCompositeConsumer(consumerInfo)) { + // If true we don't want to continue processing the original consumer info + return; + } + ConsumerInfo info = consumerInfo.copy(); addRemoteBrokerToBrokerPath(info); DemandSubscription sub = createDemandSubscription(info); @@ -1443,6 +1481,65 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + // Generate new consumers for each destination that part of a composite destination list for a consumer + private boolean splitCompositeConsumer(final ConsumerInfo consumerInfo) throws IOException { + // If not a composite destination or if an advisory topic then return false + // So we process normally and don't split + if (!consumerInfo.getDestination().isComposite() || + AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) { + return false; + } + + // At this point this is a composite destination and not an advisory topic. The destination + // will be split into individual destinations to create demand so that conduit subscriptions + // and durable subscriptions work correctly + + // Handle duplicates, don't need to create again if we already have an entry + // Just return true so we stop processing + if (!isDuplicateSuppressionOff(consumerInfo) && compositeConsumerIds.containsKey( + consumerInfo.getConsumerId())) { + return true; + } + + // Get a set to store mapped consumer Ids for each individual destination in the composite list + // and (if applicable) a set for subscriptions for durables + final Set consumerIds = compositeConsumerIds.computeIfAbsent( + consumerInfo.getConsumerId(), + k -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + final Set subscriptions = Optional.ofNullable( + consumerInfo.getSubscriptionName()).map( + subName -> compositeSubscriptions.computeIfAbsent( + new SubscriptionInfo(consumerInfo.getClientId(), + consumerInfo.getSubscriptionName()), + k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))).orElse(null); + + // Split and go through each destination that is part of the composite list and process + for (ActiveMQDestination individualDest : consumerInfo.getDestination() + .getCompositeDestinations()) { + // Create a new consumer info with the individual destinations and + // generate new consumer Ids for each and add to the consumerIds set + final ConsumerInfo info = consumerInfo.copy(); + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), + consumerIdGenerator.getNextSequenceId())); + info.setDestination(individualDest); + consumerIds.add(info.getConsumerId()); + + // If there is a subscription name (durable) then generate a new one for the dest + // and add to the subscriptions set + Optional.ofNullable(subscriptions).ifPresent( + subs -> { + info.setSubscriptionName( + consumerInfo.getSubscriptionName() + individualDest.getPhysicalName()); + subs.add( + new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + }); + + // Continue on and process the new consumer Info + addConsumerInfo(info); + } + return true; + } + private void undoMapRegistration(DemandSubscription sub) { subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java new file mode 100644 index 0000000000..cfdc2fd32d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java @@ -0,0 +1,435 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test class to verify composite consumers correctly create demand + * with a network of brokers, especially conduit subs + * See AMQ-9262 + */ +@RunWith(Parameterized.class) +public class CompositeConsumerNetworkBridgeTest extends DynamicNetworkTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class); + + private final static String testTopic1 = "test.composite.topic.1"; + private final static String testTopic2 = "test.composite.topic.2"; + private final static String testQueue1 = "test.composite.queue.1"; + private final static String testQueue2 = "test.composite.queue.2"; + private BrokerService broker1; + private BrokerService broker2; + private Session session1; + private Session session2; + private final FLOW flow; + private final static List topics = List.of( + new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2)); + private final static List queues = List.of( + new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2)); + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {FLOW.FORWARD}, + {FLOW.REVERSE} + }); + } + + public CompositeConsumerNetworkBridgeTest(final FLOW flow) { + this.flow = flow; + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + /** + * Test a composite durable subscription + */ + @Test + public void testCompositeDurableSubscriber() throws Exception { + setUp(); + final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2); + + // Create durable sub on composite destination + // Will create a composite consumer on the local broker but + // should create 2 consumers on the remote + TopicSubscriber durSub = session1.createDurableSubscriber(compositeTopic, subName); + assertConsumersCount(broker1, compositeTopic, 1); + + // The remote broker should create two durable subs instead of 1 + // Should be 1 durable on each of the topics that are part of the composite + assertConsumersCount(broker2, compositeTopic, 0); + assertNCDurableSubsCount(broker2, compositeTopic, 0); + for (ActiveMQTopic topic : topics) { + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + } + assertCompositeMapCounts(1, 1); + + durSub.close(); + Thread.sleep(1000); + removeSubscription(broker1, subName); + + //Verify cleanup + for (ActiveMQTopic topic : topics) { + assertConsumersCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); + } + assertCompositeMapCounts(0, 0); + } + + /** + * Test a composite durable subscription and normal subscription + */ + @Test + public void testCompositeAndNormalDurableSub() throws Exception { + setUp(); + final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2); + + // create composite sub and a sub on one of the individual topics + TopicSubscriber durSub1 = session1.createDurableSubscriber(compositeTopic, subName + "1"); + TopicSubscriber durSub2 = session1.createDurableSubscriber(topics.get(0), subName + "2"); + + // Should split the composite and create network subs on individual topics + for (ActiveMQTopic topic : topics) { + assertNCDurableSubsCount(broker2, topic, 1); + } + assertNCDurableSubsCount(broker2, compositeTopic, 0); + // Only 1 sub is composite so should just have 1 map entry + assertCompositeMapCounts(1, 1); + + // Verify message received + MessageProducer producer = session2.createProducer(topics.get(0)); + producer.send(session2.createTextMessage("test")); + assertNotNull(durSub1.receive(1000)); + assertNotNull(durSub2.receive(1000)); + + durSub1.close(); + durSub2.close();; + + Thread.sleep(1000); + removeSubscription(broker1, subName + "1"); + removeSubscription(broker1, subName + "2"); + assertCompositeMapCounts(0, 0); + } + + + /** + * Test two topic subscriptions that match + */ + @Test + public void testTopicCompositeSubs() throws Exception { + setUp(); + final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2); + + // Create two identical subscriptions on a composite topic + MessageConsumer sub1 = session1.createConsumer(compositeTopic); + MessageConsumer sub2 = session1.createConsumer(compositeTopic); + for (ActiveMQTopic topic : topics) { + // Verify the local broker has two subs on each individual topic + assertConsumersCount(broker1, topic, 2); + // Verify that conduit subscription works correctly now + // and only 1 sub on each topic. This used to be broken before AMQ-9262 + // and would create two subscriptions even though conduit was true + assertConsumersCount(broker2, topic, 1); + } + assertCompositeMapCounts(2, 0); + + MessageProducer producer = session2.createProducer(topics.get(0)); + producer.send(session2.createTextMessage("test")); + + assertNotNull(sub1.receive(1000)); + assertNotNull(sub2.receive(1000)); + + sub1.close(); + sub2.close(); + + assertCompositeMapCounts(0, 0); + } + + /** + * Test two queue composite subscriptions that match + */ + @Test + public void testCompositeQueueSubs() throws Exception { + setUp(); + final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + "," + testQueue2); + + // Create two matching composite queue subs to test conduit subs + MessageConsumer sub1 = session1.createConsumer(compositeQueue); + MessageConsumer sub2 = session1.createConsumer(compositeQueue); + for (ActiveMQDestination queue : queues) { + assertConsumersCount(broker1, queue, 2); + // Verify conduit subs now work correctly, this used to be 2 + // which was wrong as conduit is true and is fixed as of AMQ-9262 + assertConsumersCount(broker2, queue, 1); + } + assertCompositeMapCounts(2, 0); + + MessageProducer producer = session2.createProducer(queues.get(0)); + producer.send(session2.createTextMessage("test")); + + // Make sure one of the queue receivers gets the message + assertTrue(sub1.receive(1000) != null + || sub2.receive(1000) != null); + + sub1.close(); + sub2.close(); + assertCompositeMapCounts(0, 0); + } + + /** + * Test a composite queue and normal queue sub + */ + @Test + public void testCompositeAndNormalQueueSubs() throws Exception { + setUp(); + final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + "," + testQueue2); + + // Create two matching composite queue subs to test conduit subs + MessageConsumer sub1 = session1.createConsumer(compositeQueue); + MessageConsumer sub2 = session1.createConsumer(new ActiveMQQueue(testQueue2)); + + assertConsumersCount(broker1, queues.get(0), 1); + assertConsumersCount(broker1, queues.get(1), 2); + for (ActiveMQDestination queue : queues) { + assertConsumersCount(broker2, queue, 1); + } + // Only 1 sub is a composite sub + assertCompositeMapCounts(1, 0); + + MessageProducer producer = session2.createProducer(queues.get(0)); + producer.send(session2.createTextMessage("test")); + + // Make sure message received by sub1 + assertNotNull(sub1.receive(1000)); + + sub1.close(); + sub2.close(); + assertCompositeMapCounts(0, 0); + } + + /** + * Test two matching durable composite subs + * + * This test used to fail with an exception as the bridge would + * try and create a duplicate network durable with the same client id + * and sub and would error + */ + @Test + public void testCompositeTwoDurableSubscribers() throws Exception { + setUp(); + final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2); + + TopicSubscriber durSub1 = session1.createDurableSubscriber(compositeTopic, subName + "1"); + TopicSubscriber durSub2 = session1.createDurableSubscriber(compositeTopic, subName + "2"); + assertConsumersCount(broker1, compositeTopic, 2); + + assertConsumersCount(broker2, compositeTopic, 0); + assertNCDurableSubsCount(broker2, compositeTopic, 0); + for (ActiveMQTopic topic : topics) { + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + } + assertCompositeMapCounts(2, 2); + + durSub1.close(); + Thread.sleep(1000); + removeSubscription(broker1, subName + "1"); + + for (ActiveMQTopic topic : topics) { + assertConsumersCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, topic, 1); + } + + durSub2.close(); + Thread.sleep(1000); + removeSubscription(broker1, subName + "2"); + + for (ActiveMQTopic topic : topics) { + assertConsumersCount(broker2, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); + } + + assertCompositeMapCounts(0, 0); + } + + + private void setUp() throws Exception { + doSetUp(tempFolder.newFolder(), tempFolder.newFolder()); + } + + protected void doSetUp(File localDataDir, File remoteDataDir) throws Exception { + doSetUpRemoteBroker(remoteDataDir); + doSetUpLocalBroker(localDataDir); + //Give time for advisories to propagate + Thread.sleep(1000); + } + + protected void doSetUpLocalBroker(File dataDir) throws Exception { + localBroker = createLocalBroker(dataDir); + localBroker.setDeleteAllMessagesOnStartup(true); + localBroker.start(); + localBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID("clientId"); + localConnection.start(); + + Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (flow.equals(FLOW.FORWARD)) { + broker1 = localBroker; + session1 = localSession; + } else { + broker2 = localBroker; + session2 = localSession; + } + } + + protected void doSetUpRemoteBroker(File dataDir) throws Exception { + remoteBroker = createRemoteBroker(dataDir); + remoteBroker.setDeleteAllMessagesOnStartup(true); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID("clientId"); + remoteConnection.start(); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (flow.equals(FLOW.FORWARD)) { + broker2 = remoteBroker; + session2 = remoteSession; + } else { + broker1 = remoteBroker; + session1 = remoteSession; + } + } + + protected BrokerService createLocalBroker(File dataDir) throws Exception { + + BrokerService brokerService = new BrokerService(); + brokerService.setMonitorConnectionSplits(true); + brokerService.setDataDirectoryFile(dataDir); + brokerService.setBrokerName("localBroker"); + brokerService.addNetworkConnector(configureLocalNetworkConnector()); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic(testTopic1), + new ActiveMQTopic(testTopic2), + new ActiveMQQueue(testQueue1), + new ActiveMQQueue(testQueue2)}); + + return brokerService; + } + + protected NetworkConnector configureLocalNetworkConnector() throws Exception { + + List transportConnectors = remoteBroker.getTransportConnectors(); + URI remoteURI = transportConnectors.get(0).getConnectUri(); + String uri = "static:(" + remoteURI + ")"; + NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri)); + connector.setName("networkConnector"); + connector.setDynamicOnly(false); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(true); + connector.setStaticBridge(false); + ArrayList dynamicIncludedDestinations = new ArrayList<>(); + dynamicIncludedDestinations.addAll(List.of(new ActiveMQTopic("test.composite.topic.>"), + new ActiveMQQueue("test.composite.queue.>"))); + connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations); + return connector; + } + + + protected BrokerService createRemoteBroker(File dataDir) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(dataDir); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setDestinations(new ActiveMQDestination[] { + new ActiveMQTopic(testTopic1), + new ActiveMQTopic(testTopic2), + new ActiveMQQueue(testQueue1), + new ActiveMQQueue(testQueue2)}); + + return brokerService; + } + + protected void assertCompositeMapCounts(int compositeConsumerIdsSize, int compositeSubSize) + throws Exception { + DurableConduitBridge bridge = findBridge(); + assertTrue( Wait.waitFor(() -> compositeConsumerIdsSize == bridge.compositeConsumerIds.size(), 5000, 500)); + assertTrue( Wait.waitFor(() -> compositeSubSize == bridge.compositeSubscriptions.size(), 5000, 500)); + } + + protected DurableConduitBridge findBridge() throws Exception { + if (flow.equals(FLOW.FORWARD)) { + return findBridge(remoteBroker); + } else { + return findBridge(localBroker); + } + } + + protected DurableConduitBridge findBridge(BrokerService broker) throws Exception { + final NetworkBridge bridge; + if (broker.getNetworkConnectors().size() > 0) { + assertTrue(Wait.waitFor(() -> broker.getNetworkConnectors().get(0).activeBridges().size() == 1, 5000, 500)); + bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + } else { + bridge = findDuplexBridge(broker.getTransportConnectorByScheme("tcp")); + } + return (DurableConduitBridge)bridge; + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 7e56bb2112..aa36e26fba 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -75,7 +75,6 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private boolean forceDurable = false; private boolean useVirtualDestSubs = false; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; - public static enum FLOW {FORWARD, REVERSE} private BrokerService broker1; private BrokerService broker2; @@ -139,7 +138,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertSubscriptionsCount(broker1, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); assertNCDurableSubsCount(broker2, topic, 0); @@ -161,7 +160,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertSubscriptionsCount(broker1, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); assertNCDurableSubsCount(broker2, topic, 0); @@ -188,7 +187,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { } assertSubscriptionsCount(broker1, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); doTearDown(); @@ -217,7 +216,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { includedTopics = "different.topic"; restartBroker(broker1, false); assertSubscriptionsCount(broker1, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); //Test that on successful reconnection of the bridge that @@ -310,7 +309,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { assertSubscriptionsCount(broker1, topic, 1); session1.createDurableSubscriber(topic2, "sub2"); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertSubscriptionsCount(broker1, topic, 0); assertSubscriptionsCount(broker1, topic2, 1); @@ -376,7 +375,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { //with bridge off, remove 100 subs for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { - removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j); + removeSubscription(broker1, subName + i + j); } } @@ -481,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { restartBroker(broker1, false); assertSubscriptionsCount(broker1, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); session1.createDurableSubscriber(topic, "sub2").close(); assertSubscriptionsCount(broker1, topic, 1); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java index aade6d36dd..2d83fb71b7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -40,6 +40,7 @@ import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.util.SubscriptionKey; @@ -50,6 +51,7 @@ import org.junit.rules.TemporaryFolder; public abstract class DynamicNetworkTestSupport { + public enum FLOW {FORWARD, REVERSE}; protected Connection localConnection; protected Connection remoteConnection; @@ -92,14 +94,10 @@ public abstract class DynamicNetworkTestSupport { } } - protected void assertBridgeStarted() throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1; - } - }, 10000, 500)); + assertTrue(Wait.waitFor( + () -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, + 10000, 500)); } protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, @@ -113,24 +111,16 @@ public abstract class DynamicNetworkTestSupport { } protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - //should only be 1 for the composite destination creation - return count == destinationStatistics.getConsumers().getCount(); - } + assertTrue(Wait.waitFor(() -> { + //should only be 1 for the composite destination creation + return count == destinationStatistics.getConsumers().getCount(); })); } protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == destinationStatistics.getDequeues().getCount() && - count == destinationStatistics.getDispatched().getCount() && - count == destinationStatistics.getForwards().getCount(); - } - })); + assertTrue(Wait.waitFor(() -> count == destinationStatistics.getDequeues().getCount() && + count == destinationStatistics.getDispatched().getCount() && + count == destinationStatistics.getForwards().getCount())); } protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { @@ -145,27 +135,22 @@ public abstract class DynamicNetworkTestSupport { protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getNCDurableSubs(brokerService, dest).size(); - } - }, 10000, 500)); + assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(), + 10000, 500)); } protected void assertConsumersCount(final BrokerService brokerService, - final ActiveMQTopic dest, final int count) throws Exception { - assertTrue(Wait.waitFor(new Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == getConsumers(brokerService, dest).size(); - } - }, 10000, 500)); + final ActiveMQDestination dest, final int count) throws Exception { + assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(), + 10000, 500)); + Thread.sleep(1000); + // Check one more time after a short pause to make sure the count didn't increase past what we wanted + assertEquals(count, getConsumers(brokerService, dest).size()); } protected List getConsumers(final BrokerService brokerService, - final ActiveMQTopic dest) throws Exception { - Topic destination = (Topic) brokerService.getDestination(dest); + final ActiveMQDestination dest) throws Exception { + Destination destination = brokerService.getDestination(dest); return destination.getConsumers(); } @@ -208,8 +193,8 @@ public abstract class DynamicNetworkTestSupport { return subs; } - protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic, - final String subName) throws Exception { + protected void removeSubscription(final BrokerService brokerService, + final String subName) throws Exception { final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); info.setClientId(clientId); info.setSubscriptionName(subName); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java index 678935bc1a..a93420e32f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java @@ -51,7 +51,6 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { protected String testTopicName2 = "include.nonforced.bar"; protected String staticTopic = "include.static.bar"; protected String staticTopic2 = "include.static.nonforced.bar"; - public static enum FLOW {FORWARD, REVERSE}; private BrokerService broker1; private BrokerService broker2; private Session session1; @@ -126,7 +125,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { //Remove the sub durSub.close(); Thread.sleep(1000); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); //The durable should be gone even though there is a consumer left //since we are not forcing durable subs @@ -186,7 +185,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { Thread.sleep(1000); assertNCDurableSubsCount(broker2, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertNCDurableSubsCount(broker2, topic, 0); } @@ -201,7 +200,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { Thread.sleep(1000); assertNCDurableSubsCount(broker2, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); Thread.sleep(1000); assertConsumersCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); @@ -225,7 +224,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport { assertConsumersCount(broker2, topic, 1); assertNCDurableSubsCount(broker2, topic, 1); - removeSubscription(broker1, topic, subName); + removeSubscription(broker1, subName); assertConsumersCount(broker2, topic, 0); assertNCDurableSubsCount(broker2, topic, 0); }