From 6013441a9a7c4a13f7412d6d72638de0f420e6a3 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Wed, 15 Nov 2017 08:22:47 -0500 Subject: [PATCH] AMQ-6858 - handle resync of network proxy durables after restart We need to properly handle the re-addition of network proxy durables after the brokers are restarted so removal is done properly --- .../activemq/network/ConduitBridge.java | 14 ++- .../DemandForwardingBridgeSupport.java | 97 ++++++++++----- .../activemq/network/DemandSubscription.java | 6 - .../DurableFiveBrokerNetworkBridgeTest.java | 117 +++++++++++++++++- 4 files changed, 189 insertions(+), 45 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index bc9d004132..70f45f751a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { - if (isProxyNSConsumer(info)) { - final BrokerId[] path = info.getBrokerPath(); - addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); - } else { + //Handle the demand generated by proxy network subscriptions + //The broker path is case is normal + if (isProxyNSConsumerBrokerPath(info)) { + final BrokerId[] path = info.getBrokerPath(); + addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName()); + //This is the durable sync case on broker restart + } else if (isProxyNSConsumerClientId(info.getClientId()) && + isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) { + addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName()); + } else { ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 75084d10db..df493c3c2b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,7 +36,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import javax.management.ObjectName; @@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br (info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); } - private boolean isProxyBridgeSubscription(SubscriptionInfo info) { - if (info.getSubcriptionName() != null && info.getClientId() != null) { - if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) - && !info.getClientId().startsWith(configuration.getName())) { + protected boolean isProxyBridgeSubscription(String clientId, String subName) { + if (subName != null && clientId != null) { + if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) { return true; } } return false; } - protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) { - if (sub != null && path.length > 1 && subName != null) { - String b1 = path[path.length-1].toString(); - String b2 = path[path.length-2].toString(); - final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName); - sub.getDurableRemoteSubs().add(newSubInfo); - sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet(); - LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo); + /** + * This scenaior is primarily used for durable sync on broker restarts + * + * @param sub + * @param clientId + * @param subName + */ + protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) { + if (clientId != null && sub != null && subName != null) { + String newClientId = getProxyBridgeClientId(clientId); + final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName); + sub.getDurableRemoteSubs().add(newSubInfo); + LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo); + } else { LOG.debug("Skipping addProxyNetworkSubscription"); } } - private String getProxyBridgeClientId(SubscriptionInfo info) { - String newClientId = info.getClientId(); + /** + * Add a durable remote proxy subscription when we can generate via the BrokerId path + * This is the most common scenario + * + * @param sub + * @param path + * @param subName + */ + protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) { + if (sub != null && path.length > 1 && subName != null) { + String b1 = path[path.length-1].toString(); + String b2 = path[path.length-2].toString(); + final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName); + sub.getDurableRemoteSubs().add(newSubInfo); + } + } + + private String getProxyBridgeClientId(String clientId) { + String newClientId = clientId; String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null; if (clientIdTokens != null && clientIdTokens.length > 2) { newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound" @@ -705,10 +726,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br return newClientId; } - protected boolean isProxyNSConsumer(ConsumerInfo info) { + protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) { return info.getBrokerPath() != null && info.getBrokerPath().length > 1; } + protected boolean isProxyNSConsumerClientId(String clientId) { + return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3; + } + protected void serviceRemoteCommand(Command command) { if (!disposed.get()) { try { @@ -1008,27 +1033,25 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } else if (data.getClass() == RemoveSubscriptionInfo.class) { - RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); - SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); + final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); + final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(), + subscriptionInfo.getSubscriptionName()); for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { DemandSubscription ds = i.next(); boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); + //If this is a proxy bridge subscription we need to try changing the clientId + if (!removed && proxyBridgeSub){ + subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId())); + if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) { + ds.getDurableRemoteSubs().remove(subscriptionInfo); + removed = true; + } + } + if (removed) { cleanupDurableSub(ds, i); - //If this is a proxy bridge subscription we need to try changing the clientId - } else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){ - subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo)); - if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) { - AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger()); - count.decrementAndGet(); - //Only remove the durable remote sub if the count <= 0 - if (count.get() <= 0) { - ds.getDurableRemoteSubs().remove(subscriptionInfo); - ds.getNetworkDemandConsumerMap().remove(subscriptionInfo); - cleanupDurableSub(ds, i); - } - } } } } @@ -1407,9 +1430,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br undoMapRegistration(sub); } else { if (consumerInfo.isDurable()) { - if (isProxyNSConsumer(sub.getRemoteInfo())) { - BrokerId[] path = sub.getRemoteInfo().getBrokerPath(); - addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName()); + //Handle the demand generated by proxy network subscriptions + //The broker path is case is normal + if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) { + final BrokerId[] path = info.getBrokerPath(); + addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName()); + //This is the durable sync case on broker restart + } else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) && + isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) { + addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()); } else { sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java index 96a9baf963..843a6d1759 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java @@ -44,8 +44,6 @@ public class DemandSubscription { private final AtomicBoolean activeWaiter = new AtomicBoolean(); private final Set durableRemoteSubs = new CopyOnWriteArraySet(); private final Set forcedDurableConsumers = new CopyOnWriteArraySet(); - //Used for proxy network consumers - private final Map networkDemandConsumerMap = new ConcurrentHashMap<>(); private SubscriptionInfo localDurableSubscriber; private NetworkBridgeFilter networkBridgeFilter; @@ -87,10 +85,6 @@ public class DemandSubscription { return durableRemoteSubs; } - public Map getNetworkDemandConsumerMap() { - return networkDemandConsumerMap; - } - /** * @return true if there are no interested consumers */ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java index 94d73936f4..2d8dc03fb3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.network; +import java.io.File; import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -46,6 +47,7 @@ import junit.framework.Test; public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport { private boolean duplex = true; + private boolean deletePersistentMessagesOnStartup = true; @Override protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { @@ -61,6 +63,117 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu return connector; } + public void testDurablePropagationBrokerRestartDuplex() throws Exception { + duplex = true; + testDurablePropagationBrokerRestart(); + } + + public void testDurablePropagationBrokerRestartOneWay() throws Exception { + duplex = false; + testDurablePropagationBrokerRestart(); + } + + protected void testDurablePropagationBrokerRestart() throws Exception { + deletePersistentMessagesOnStartup = true; + + // Setup broker networks + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); + bridgeBrokers("Broker_C_C", "Broker_D_D"); + bridgeBrokers("Broker_D_D", "Broker_E_E"); + + if (!duplex) { + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); + bridgeBrokers("Broker_D_D", "Broker_C_C"); + bridgeBrokers("Broker_E_E", "Broker_D_D"); + } + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Connection conn = brokers.get("Broker_A_A").factory.createConnection(); + conn.setClientID("clientId1"); + conn.start(); + Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2"); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + + //bring online a consumer on the other side + Connection conn2 = brokers.get("Broker_E_E").factory.createConnection(); + conn2.setClientID("clientId2"); + conn2.start(); + Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE"); + MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2"); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1); + + clientA.close(); + clientA2.close(); + clientE.close(); + clientE2.close(); + + this.destroyAllBrokers(); + deletePersistentMessagesOnStartup = false; + String options = new String("?persistent=true&useJmx=false"); + createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options)); + createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options)); + createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options)); + createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options)); + createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options)); + bridgeBrokers("Broker_A_A", "Broker_B_B"); + bridgeBrokers("Broker_B_B", "Broker_C_C"); + bridgeBrokers("Broker_C_C", "Broker_D_D"); + bridgeBrokers("Broker_D_D", "Broker_E_E"); + if (!duplex) { + bridgeBrokers("Broker_B_B", "Broker_A_A"); + bridgeBrokers("Broker_C_C", "Broker_B_B"); + bridgeBrokers("Broker_D_D", "Broker_C_C"); + bridgeBrokers("Broker_E_E", "Broker_D_D"); + } + + startAllBrokers(); + + conn = brokers.get("Broker_A_A").factory.createConnection(); + conn.setClientID("clientId1"); + conn.start(); + ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + conn2 = brokers.get("Broker_E_E").factory.createConnection(); + conn2.setClientID("clientId2"); + conn2.start(); + ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE); + //bring one online and leave others offline to test mixed + clientE = ses2.createDurableSubscriber(dest, "subE"); + clientE.close(); + + ses.unsubscribe("subA"); + ses.unsubscribe("subA2"); + ses2.unsubscribe("subE"); + ses2.unsubscribe("subE2"); + + assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0); + } + public void testDurablePropagationDuplex() throws Exception { duplex = true; testDurablePropagation(); @@ -552,7 +665,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); - String options = new String("?persistent=false&useJmx=false"); + String options = new String("?persistent=true&useJmx=false"); createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options)); createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options)); createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options)); @@ -563,6 +676,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu @Override protected void configureBroker(BrokerService broker) { broker.setBrokerId(broker.getBrokerName()); + broker.setDeleteAllMessagesOnStartup(deletePersistentMessagesOnStartup); + broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest"); } protected Session createSession(String broker) throws Exception {