AMQ-9262 - Fix network subscriptions for composite consumers (#1014)

This fixes network subscriptions that are generated on demand when a
consumer uses composite destinations. Before this fix conduit
subscriptions didn't work correctly. This fix now splits up the
composite dest and generates correct demand for each of the individual
destinations.
This commit is contained in:
Christopher L. Shannon 2023-06-07 07:18:18 -04:00 committed by GitHub
parent 686c8ecbce
commit 901956d4dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 567 additions and 52 deletions

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected ActiveMQDestination[] durableDestinations;
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<>();
protected final ConcurrentMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<>();
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
protected final Set<ConsumerId> forcedDurableRemoteId = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected final ConcurrentMap<ConsumerId, Set<ConsumerId>> compositeConsumerIds = new ConcurrentHashMap<>();
protected final ConcurrentMap<SubscriptionInfo, Set<SubscriptionInfo>> compositeSubscriptions = new ConcurrentHashMap<>();
protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
protected final CountDownLatch startedLatch = new CountDownLatch(2);
protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
// If we have an entry in compositeConsumerIds then this consumer was a
// composite consumer and we need to remove the entries in the set and
// not the consumer id we received here
final Set<ConsumerId> compositeIds = compositeConsumerIds.remove(id);
if (compositeIds != null) {
for (ConsumerId compositeId : compositeIds) {
serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
}
return;
}
removeDemandSubscription(id);
if (forcedDurableRemoteId.remove(id)) {
@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (data.getClass() == RemoveSubscriptionInfo.class) {
final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
// If we have an entry in compositeSubscriptions then this consumer was a
// composite consumer and we need to remove the entries in the set and not
// the subscription that we received here
final Set<SubscriptionInfo> compositeSubs =
this.compositeSubscriptions.remove(subscriptionInfo);
if (compositeSubs != null) {
for (SubscriptionInfo compositeSub : compositeSubs) {
RemoveSubscriptionInfo remove = new RemoveSubscriptionInfo();
remove.setClientId(compositeSub.getClientId());
remove.setSubscriptionName(compositeSub.getSubscriptionName());
remove.setConnectionId(this.localConnectionInfo.getConnectionId());
serviceRemoteConsumerAdvisory(remove);
}
return;
}
final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
@ -1415,6 +1447,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
// Check if this was processed and split into new consumers for composite dests
if (splitCompositeConsumer(consumerInfo)) {
// If true we don't want to continue processing the original consumer info
return;
}
ConsumerInfo info = consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
@ -1443,6 +1481,65 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
// Generate new consumers for each destination that part of a composite destination list for a consumer
private boolean splitCompositeConsumer(final ConsumerInfo consumerInfo) throws IOException {
// If not a composite destination or if an advisory topic then return false
// So we process normally and don't split
if (!consumerInfo.getDestination().isComposite() ||
AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) {
return false;
}
// At this point this is a composite destination and not an advisory topic. The destination
// will be split into individual destinations to create demand so that conduit subscriptions
// and durable subscriptions work correctly
// Handle duplicates, don't need to create again if we already have an entry
// Just return true so we stop processing
if (!isDuplicateSuppressionOff(consumerInfo) && compositeConsumerIds.containsKey(
consumerInfo.getConsumerId())) {
return true;
}
// Get a set to store mapped consumer Ids for each individual destination in the composite list
// and (if applicable) a set for subscriptions for durables
final Set<ConsumerId> consumerIds = compositeConsumerIds.computeIfAbsent(
consumerInfo.getConsumerId(),
k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
final Set<SubscriptionInfo> subscriptions = Optional.ofNullable(
consumerInfo.getSubscriptionName()).map(
subName -> compositeSubscriptions.computeIfAbsent(
new SubscriptionInfo(consumerInfo.getClientId(),
consumerInfo.getSubscriptionName()),
k -> Collections.newSetFromMap(new ConcurrentHashMap<>()))).orElse(null);
// Split and go through each destination that is part of the composite list and process
for (ActiveMQDestination individualDest : consumerInfo.getDestination()
.getCompositeDestinations()) {
// Create a new consumer info with the individual destinations and
// generate new consumer Ids for each and add to the consumerIds set
final ConsumerInfo info = consumerInfo.copy();
info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
info.setDestination(individualDest);
consumerIds.add(info.getConsumerId());
// If there is a subscription name (durable) then generate a new one for the dest
// and add to the subscriptions set
Optional.ofNullable(subscriptions).ifPresent(
subs -> {
info.setSubscriptionName(
consumerInfo.getSubscriptionName() + individualDest.getPhysicalName());
subs.add(
new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
});
// Continue on and process the new consumer Info
addConsumerInfo(info);
}
return true;
}
private void undoMapRegistration(DemandSubscription sub) {
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());

View File

@ -0,0 +1,435 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test class to verify composite consumers correctly create demand
* with a network of brokers, especially conduit subs
* See AMQ-9262
*/
@RunWith(Parameterized.class)
public class CompositeConsumerNetworkBridgeTest extends DynamicNetworkTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class);
private final static String testTopic1 = "test.composite.topic.1";
private final static String testTopic2 = "test.composite.topic.2";
private final static String testQueue1 = "test.composite.queue.1";
private final static String testQueue2 = "test.composite.queue.2";
private BrokerService broker1;
private BrokerService broker2;
private Session session1;
private Session session2;
private final FLOW flow;
private final static List<ActiveMQTopic> topics = List.of(
new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2));
private final static List<ActiveMQQueue> queues = List.of(
new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2));
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{FLOW.FORWARD},
{FLOW.REVERSE}
});
}
public CompositeConsumerNetworkBridgeTest(final FLOW flow) {
this.flow = flow;
}
@After
public void tearDown() throws Exception {
doTearDown();
}
/**
* Test a composite durable subscription
*/
@Test
public void testCompositeDurableSubscriber() throws Exception {
setUp();
final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2);
// Create durable sub on composite destination
// Will create a composite consumer on the local broker but
// should create 2 consumers on the remote
TopicSubscriber durSub = session1.createDurableSubscriber(compositeTopic, subName);
assertConsumersCount(broker1, compositeTopic, 1);
// The remote broker should create two durable subs instead of 1
// Should be 1 durable on each of the topics that are part of the composite
assertConsumersCount(broker2, compositeTopic, 0);
assertNCDurableSubsCount(broker2, compositeTopic, 0);
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
}
assertCompositeMapCounts(1, 1);
durSub.close();
Thread.sleep(1000);
removeSubscription(broker1, subName);
//Verify cleanup
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
}
assertCompositeMapCounts(0, 0);
}
/**
* Test a composite durable subscription and normal subscription
*/
@Test
public void testCompositeAndNormalDurableSub() throws Exception {
setUp();
final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2);
// create composite sub and a sub on one of the individual topics
TopicSubscriber durSub1 = session1.createDurableSubscriber(compositeTopic, subName + "1");
TopicSubscriber durSub2 = session1.createDurableSubscriber(topics.get(0), subName + "2");
// Should split the composite and create network subs on individual topics
for (ActiveMQTopic topic : topics) {
assertNCDurableSubsCount(broker2, topic, 1);
}
assertNCDurableSubsCount(broker2, compositeTopic, 0);
// Only 1 sub is composite so should just have 1 map entry
assertCompositeMapCounts(1, 1);
// Verify message received
MessageProducer producer = session2.createProducer(topics.get(0));
producer.send(session2.createTextMessage("test"));
assertNotNull(durSub1.receive(1000));
assertNotNull(durSub2.receive(1000));
durSub1.close();
durSub2.close();;
Thread.sleep(1000);
removeSubscription(broker1, subName + "1");
removeSubscription(broker1, subName + "2");
assertCompositeMapCounts(0, 0);
}
/**
* Test two topic subscriptions that match
*/
@Test
public void testTopicCompositeSubs() throws Exception {
setUp();
final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2);
// Create two identical subscriptions on a composite topic
MessageConsumer sub1 = session1.createConsumer(compositeTopic);
MessageConsumer sub2 = session1.createConsumer(compositeTopic);
for (ActiveMQTopic topic : topics) {
// Verify the local broker has two subs on each individual topic
assertConsumersCount(broker1, topic, 2);
// Verify that conduit subscription works correctly now
// and only 1 sub on each topic. This used to be broken before AMQ-9262
// and would create two subscriptions even though conduit was true
assertConsumersCount(broker2, topic, 1);
}
assertCompositeMapCounts(2, 0);
MessageProducer producer = session2.createProducer(topics.get(0));
producer.send(session2.createTextMessage("test"));
assertNotNull(sub1.receive(1000));
assertNotNull(sub2.receive(1000));
sub1.close();
sub2.close();
assertCompositeMapCounts(0, 0);
}
/**
* Test two queue composite subscriptions that match
*/
@Test
public void testCompositeQueueSubs() throws Exception {
setUp();
final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + "," + testQueue2);
// Create two matching composite queue subs to test conduit subs
MessageConsumer sub1 = session1.createConsumer(compositeQueue);
MessageConsumer sub2 = session1.createConsumer(compositeQueue);
for (ActiveMQDestination queue : queues) {
assertConsumersCount(broker1, queue, 2);
// Verify conduit subs now work correctly, this used to be 2
// which was wrong as conduit is true and is fixed as of AMQ-9262
assertConsumersCount(broker2, queue, 1);
}
assertCompositeMapCounts(2, 0);
MessageProducer producer = session2.createProducer(queues.get(0));
producer.send(session2.createTextMessage("test"));
// Make sure one of the queue receivers gets the message
assertTrue(sub1.receive(1000) != null
|| sub2.receive(1000) != null);
sub1.close();
sub2.close();
assertCompositeMapCounts(0, 0);
}
/**
* Test a composite queue and normal queue sub
*/
@Test
public void testCompositeAndNormalQueueSubs() throws Exception {
setUp();
final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + "," + testQueue2);
// Create two matching composite queue subs to test conduit subs
MessageConsumer sub1 = session1.createConsumer(compositeQueue);
MessageConsumer sub2 = session1.createConsumer(new ActiveMQQueue(testQueue2));
assertConsumersCount(broker1, queues.get(0), 1);
assertConsumersCount(broker1, queues.get(1), 2);
for (ActiveMQDestination queue : queues) {
assertConsumersCount(broker2, queue, 1);
}
// Only 1 sub is a composite sub
assertCompositeMapCounts(1, 0);
MessageProducer producer = session2.createProducer(queues.get(0));
producer.send(session2.createTextMessage("test"));
// Make sure message received by sub1
assertNotNull(sub1.receive(1000));
sub1.close();
sub2.close();
assertCompositeMapCounts(0, 0);
}
/**
* Test two matching durable composite subs
*
* This test used to fail with an exception as the bridge would
* try and create a duplicate network durable with the same client id
* and sub and would error
*/
@Test
public void testCompositeTwoDurableSubscribers() throws Exception {
setUp();
final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + "," + testTopic2);
TopicSubscriber durSub1 = session1.createDurableSubscriber(compositeTopic, subName + "1");
TopicSubscriber durSub2 = session1.createDurableSubscriber(compositeTopic, subName + "2");
assertConsumersCount(broker1, compositeTopic, 2);
assertConsumersCount(broker2, compositeTopic, 0);
assertNCDurableSubsCount(broker2, compositeTopic, 0);
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
}
assertCompositeMapCounts(2, 2);
durSub1.close();
Thread.sleep(1000);
removeSubscription(broker1, subName + "1");
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
}
durSub2.close();
Thread.sleep(1000);
removeSubscription(broker1, subName + "2");
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
}
assertCompositeMapCounts(0, 0);
}
private void setUp() throws Exception {
doSetUp(tempFolder.newFolder(), tempFolder.newFolder());
}
protected void doSetUp(File localDataDir, File remoteDataDir) throws Exception {
doSetUpRemoteBroker(remoteDataDir);
doSetUpLocalBroker(localDataDir);
//Give time for advisories to propagate
Thread.sleep(1000);
}
protected void doSetUpLocalBroker(File dataDir) throws Exception {
localBroker = createLocalBroker(dataDir);
localBroker.setDeleteAllMessagesOnStartup(true);
localBroker.start();
localBroker.waitUntilStarted();
URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
fac.setAlwaysSyncSend(true);
fac.setDispatchAsync(false);
localConnection = fac.createConnection();
localConnection.setClientID("clientId");
localConnection.start();
Wait.waitFor(() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500);
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker1 = localBroker;
session1 = localSession;
} else {
broker2 = localBroker;
session2 = localSession;
}
}
protected void doSetUpRemoteBroker(File dataDir) throws Exception {
remoteBroker = createRemoteBroker(dataDir);
remoteBroker.setDeleteAllMessagesOnStartup(true);
remoteBroker.start();
remoteBroker.waitUntilStarted();
URI remoteURI = remoteBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(remoteURI);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("clientId");
remoteConnection.start();
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (flow.equals(FLOW.FORWARD)) {
broker2 = remoteBroker;
session2 = remoteSession;
} else {
broker1 = remoteBroker;
session1 = remoteSession;
}
}
protected BrokerService createLocalBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setMonitorConnectionSplits(true);
brokerService.setDataDirectoryFile(dataDir);
brokerService.setBrokerName("localBroker");
brokerService.addNetworkConnector(configureLocalNetworkConnector());
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopic1),
new ActiveMQTopic(testTopic2),
new ActiveMQQueue(testQueue1),
new ActiveMQQueue(testQueue2)});
return brokerService;
}
protected NetworkConnector configureLocalNetworkConnector() throws Exception {
List<TransportConnector> transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI = transportConnectors.get(0).getConnectUri();
String uri = "static:(" + remoteURI + ")";
NetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
connector.setName("networkConnector");
connector.setDynamicOnly(false);
connector.setDecreaseNetworkConsumerPriority(false);
connector.setConduitSubscriptions(true);
connector.setDuplex(true);
connector.setStaticBridge(false);
ArrayList<ActiveMQDestination> dynamicIncludedDestinations = new ArrayList<>();
dynamicIncludedDestinations.addAll(List.of(new ActiveMQTopic("test.composite.topic.>"),
new ActiveMQQueue("test.composite.queue.>")));
connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations);
return connector;
}
protected BrokerService createRemoteBroker(File dataDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("remoteBroker");
brokerService.setUseJmx(false);
brokerService.setDataDirectoryFile(dataDir);
brokerService.addConnector("tcp://localhost:0");
brokerService.setDestinations(new ActiveMQDestination[] {
new ActiveMQTopic(testTopic1),
new ActiveMQTopic(testTopic2),
new ActiveMQQueue(testQueue1),
new ActiveMQQueue(testQueue2)});
return brokerService;
}
protected void assertCompositeMapCounts(int compositeConsumerIdsSize, int compositeSubSize)
throws Exception {
DurableConduitBridge bridge = findBridge();
assertTrue( Wait.waitFor(() -> compositeConsumerIdsSize == bridge.compositeConsumerIds.size(), 5000, 500));
assertTrue( Wait.waitFor(() -> compositeSubSize == bridge.compositeSubscriptions.size(), 5000, 500));
}
protected DurableConduitBridge findBridge() throws Exception {
if (flow.equals(FLOW.FORWARD)) {
return findBridge(remoteBroker);
} else {
return findBridge(localBroker);
}
}
protected DurableConduitBridge findBridge(BrokerService broker) throws Exception {
final NetworkBridge bridge;
if (broker.getNetworkConnectors().size() > 0) {
assertTrue(Wait.waitFor(() -> broker.getNetworkConnectors().get(0).activeBridges().size() == 1, 5000, 500));
bridge = broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
} else {
bridge = findDuplexBridge(broker.getTransportConnectorByScheme("tcp"));
}
return (DurableConduitBridge)bridge;
}
}

View File

@ -75,7 +75,6 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
private boolean forceDurable = false;
private boolean useVirtualDestSubs = false;
private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
public static enum FLOW {FORWARD, REVERSE}
private BrokerService broker1;
private BrokerService broker2;
@ -139,7 +138,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
@ -161,7 +160,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
@ -188,7 +187,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
}
assertSubscriptionsCount(broker1, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
doTearDown();
@ -217,7 +216,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
includedTopics = "different.topic";
restartBroker(broker1, false);
assertSubscriptionsCount(broker1, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
//Test that on successful reconnection of the bridge that
@ -310,7 +309,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
assertSubscriptionsCount(broker1, topic, 1);
session1.createDurableSubscriber(topic2, "sub2");
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertSubscriptionsCount(broker1, topic, 0);
assertSubscriptionsCount(broker1, topic2, 1);
@ -376,7 +375,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
//with bridge off, remove 100 subs
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 10; j++) {
removeSubscription(broker1, new ActiveMQTopic("include.test." + i), subName + i + j);
removeSubscription(broker1, subName + i + j);
}
}
@ -481,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport {
restartBroker(broker1, false);
assertSubscriptionsCount(broker1, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
session1.createDurableSubscriber(topic, "sub2").close();
assertSubscriptionsCount(broker1, topic, 1);

View File

@ -40,6 +40,7 @@ import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.SubscriptionKey;
@ -50,6 +51,7 @@ import org.junit.rules.TemporaryFolder;
public abstract class DynamicNetworkTestSupport {
public enum FLOW {FORWARD, REVERSE};
protected Connection localConnection;
protected Connection remoteConnection;
@ -92,14 +94,10 @@ public abstract class DynamicNetworkTestSupport {
}
}
protected void assertBridgeStarted() throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
}
}, 10000, 500));
assertTrue(Wait.waitFor(
() -> localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
10000, 500));
}
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
@ -113,24 +111,16 @@ public abstract class DynamicNetworkTestSupport {
}
protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
assertTrue(Wait.waitFor(() -> {
//should only be 1 for the composite destination creation
return count == destinationStatistics.getConsumers().getCount();
}
}));
}
protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == destinationStatistics.getDequeues().getCount() &&
assertTrue(Wait.waitFor(() -> count == destinationStatistics.getDequeues().getCount() &&
count == destinationStatistics.getDispatched().getCount() &&
count == destinationStatistics.getForwards().getCount();
}
}));
count == destinationStatistics.getForwards().getCount()));
}
protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
@ -145,27 +135,22 @@ public abstract class DynamicNetworkTestSupport {
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getNCDurableSubs(brokerService, dest).size();
}
}, 10000, 500));
assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, dest).size(),
10000, 500));
}
protected void assertConsumersCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == getConsumers(brokerService, dest).size();
}
}, 10000, 500));
final ActiveMQDestination dest, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, dest).size(),
10000, 500));
Thread.sleep(1000);
// Check one more time after a short pause to make sure the count didn't increase past what we wanted
assertEquals(count, getConsumers(brokerService, dest).size());
}
protected List<Subscription> getConsumers(final BrokerService brokerService,
final ActiveMQTopic dest) throws Exception {
Topic destination = (Topic) brokerService.getDestination(dest);
final ActiveMQDestination dest) throws Exception {
Destination destination = brokerService.getDestination(dest);
return destination.getConsumers();
}
@ -208,7 +193,7 @@ public abstract class DynamicNetworkTestSupport {
return subs;
}
protected void removeSubscription(final BrokerService brokerService, final ActiveMQTopic topic,
protected void removeSubscription(final BrokerService brokerService,
final String subName) throws Exception {
final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);

View File

@ -51,7 +51,6 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
protected String testTopicName2 = "include.nonforced.bar";
protected String staticTopic = "include.static.bar";
protected String staticTopic2 = "include.static.nonforced.bar";
public static enum FLOW {FORWARD, REVERSE};
private BrokerService broker1;
private BrokerService broker2;
private Session session1;
@ -126,7 +125,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
//Remove the sub
durSub.close();
Thread.sleep(1000);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
//The durable should be gone even though there is a consumer left
//since we are not forcing durable subs
@ -186,7 +185,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertNCDurableSubsCount(broker2, topic, 0);
}
@ -201,7 +200,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
Thread.sleep(1000);
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
@ -225,7 +224,7 @@ public class ForceDurableNetworkBridgeTest extends DynamicNetworkTestSupport {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
removeSubscription(broker1, topic, subName);
removeSubscription(broker1, subName);
assertConsumersCount(broker2, topic, 0);
assertNCDurableSubsCount(broker2, topic, 0);
}