AMQ-6858 - handle resync of network proxy durables after restart

We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly
This commit is contained in:
Christopher L. Shannon (cshannon) 2017-11-15 08:22:47 -05:00
parent 50243106c2
commit 6013441a9a
4 changed files with 189 additions and 45 deletions

View File

@ -81,9 +81,15 @@ public class ConduitBridge extends DemandForwardingBridge {
ds.addForcedDurableConsumer(info.getConsumerId()); ds.addForcedDurableConsumer(info.getConsumerId());
} }
} else { } else {
if (isProxyNSConsumer(info)) { //Handle the demand generated by proxy network subscriptions
//The broker path is case is normal
if (isProxyNSConsumerBrokerPath(info)) {
final BrokerId[] path = info.getBrokerPath(); final BrokerId[] path = info.getBrokerPath();
addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName());
//This is the durable sync case on broker restart
} else if (isProxyNSConsumerClientId(info.getClientId()) &&
isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName());
} else { } else {
ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
} }

View File

@ -36,7 +36,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
(info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); (info.getClientId() == null || info.getClientId().startsWith(configuration.getName()));
} }
private boolean isProxyBridgeSubscription(SubscriptionInfo info) { protected boolean isProxyBridgeSubscription(String clientId, String subName) {
if (info.getSubcriptionName() != null && info.getClientId() != null) { if (subName != null && clientId != null) {
if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(configuration.getName())) {
&& !info.getClientId().startsWith(configuration.getName())) {
return true; return true;
} }
} }
return false; return false;
} }
protected void addProxyNetworkSubscription(final DemandSubscription sub, final BrokerId[] path, String subName) { /**
if (sub != null && path.length > 1 && subName != null) { * This scenaior is primarily used for durable sync on broker restarts
String b1 = path[path.length-1].toString(); *
String b2 = path[path.length-2].toString(); * @param sub
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName); * @param clientId
* @param subName
*/
protected void addProxyNetworkSubscriptionClientId(final DemandSubscription sub, final String clientId, String subName) {
if (clientId != null && sub != null && subName != null) {
String newClientId = getProxyBridgeClientId(clientId);
final SubscriptionInfo newSubInfo = new SubscriptionInfo(newClientId, subName);
sub.getDurableRemoteSubs().add(newSubInfo); sub.getDurableRemoteSubs().add(newSubInfo);
sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> new AtomicInteger()).incrementAndGet();
LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo); LOG.debug("Adding proxy network subscription {} to demand subscription", newSubInfo);
} else { } else {
LOG.debug("Skipping addProxyNetworkSubscription"); LOG.debug("Skipping addProxyNetworkSubscription");
} }
} }
private String getProxyBridgeClientId(SubscriptionInfo info) { /**
String newClientId = info.getClientId(); * Add a durable remote proxy subscription when we can generate via the BrokerId path
* This is the most common scenario
*
* @param sub
* @param path
* @param subName
*/
protected void addProxyNetworkSubscriptionBrokerPath(final DemandSubscription sub, final BrokerId[] path, String subName) {
if (sub != null && path.length > 1 && subName != null) {
String b1 = path[path.length-1].toString();
String b2 = path[path.length-2].toString();
final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + configuration.getClientIdToken() + "inbound" + configuration.getClientIdToken() + b1, subName);
sub.getDurableRemoteSubs().add(newSubInfo);
}
}
private String getProxyBridgeClientId(String clientId) {
String newClientId = clientId;
String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null; String[] clientIdTokens = newClientId != null ? newClientId.split(Pattern.quote(configuration.getClientIdToken())) : null;
if (clientIdTokens != null && clientIdTokens.length > 2) { if (clientIdTokens != null && clientIdTokens.length > 2) {
newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound" newClientId = clientIdTokens[clientIdTokens.length - 3] + configuration.getClientIdToken() + "inbound"
@ -705,10 +726,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return newClientId; return newClientId;
} }
protected boolean isProxyNSConsumer(ConsumerInfo info) { protected boolean isProxyNSConsumerBrokerPath(ConsumerInfo info) {
return info.getBrokerPath() != null && info.getBrokerPath().length > 1; return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
} }
protected boolean isProxyNSConsumerClientId(String clientId) {
return clientId != null && clientId.split(Pattern.quote(configuration.getClientIdToken())).length > 3;
}
protected void serviceRemoteCommand(Command command) { protected void serviceRemoteCommand(Command command) {
if (!disposed.get()) { if (!disposed.get()) {
try { try {
@ -1008,27 +1033,25 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} else if (data.getClass() == RemoveSubscriptionInfo.class) { } else if (data.getClass() == RemoveSubscriptionInfo.class) {
RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data); final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()); final SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
final boolean proxyBridgeSub = isProxyBridgeSubscription(subscriptionInfo.getClientId(),
subscriptionInfo.getSubscriptionName());
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) { for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next(); DemandSubscription ds = i.next();
boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo); boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
//If this is a proxy bridge subscription we need to try changing the clientId
if (!removed && proxyBridgeSub){
subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo.getClientId()));
if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
ds.getDurableRemoteSubs().remove(subscriptionInfo);
removed = true;
}
}
if (removed) { if (removed) {
cleanupDurableSub(ds, i); cleanupDurableSub(ds, i);
//If this is a proxy bridge subscription we need to try changing the clientId
} else if (!removed && isProxyBridgeSubscription(subscriptionInfo)){
subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
AtomicInteger count = ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new AtomicInteger());
count.decrementAndGet();
//Only remove the durable remote sub if the count <= 0
if (count.get() <= 0) {
ds.getDurableRemoteSubs().remove(subscriptionInfo);
ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
cleanupDurableSub(ds, i);
}
}
} }
} }
} }
@ -1407,9 +1430,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
undoMapRegistration(sub); undoMapRegistration(sub);
} else { } else {
if (consumerInfo.isDurable()) { if (consumerInfo.isDurable()) {
if (isProxyNSConsumer(sub.getRemoteInfo())) { //Handle the demand generated by proxy network subscriptions
BrokerId[] path = sub.getRemoteInfo().getBrokerPath(); //The broker path is case is normal
addProxyNetworkSubscription(sub, path, consumerInfo.getSubscriptionName()); if (isProxyNSConsumerBrokerPath(sub.getRemoteInfo())) {
final BrokerId[] path = info.getBrokerPath();
addProxyNetworkSubscriptionBrokerPath(sub, path, consumerInfo.getSubscriptionName());
//This is the durable sync case on broker restart
} else if (isProxyNSConsumerClientId(sub.getRemoteInfo().getClientId()) &&
isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) {
addProxyNetworkSubscriptionClientId(sub, sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName());
} else { } else {
sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName())); sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
} }

View File

@ -44,8 +44,6 @@ public class DemandSubscription {
private final AtomicBoolean activeWaiter = new AtomicBoolean(); private final AtomicBoolean activeWaiter = new AtomicBoolean();
private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>(); private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>(); private final Set<ConsumerId> forcedDurableConsumers = new CopyOnWriteArraySet<ConsumerId>();
//Used for proxy network consumers
private final Map<SubscriptionInfo, AtomicInteger> networkDemandConsumerMap = new ConcurrentHashMap<>();
private SubscriptionInfo localDurableSubscriber; private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter; private NetworkBridgeFilter networkBridgeFilter;
@ -87,10 +85,6 @@ public class DemandSubscription {
return durableRemoteSubs; return durableRemoteSubs;
} }
public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
return networkDemandConsumerMap;
}
/** /**
* @return true if there are no interested consumers * @return true if there are no interested consumers
*/ */

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import java.io.File;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -46,6 +47,7 @@ import junit.framework.Test;
public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport { public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport {
private boolean duplex = true; private boolean duplex = true;
private boolean deletePersistentMessagesOnStartup = true;
@Override @Override
protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception {
@ -61,6 +63,117 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
return connector; return connector;
} }
public void testDurablePropagationBrokerRestartDuplex() throws Exception {
duplex = true;
testDurablePropagationBrokerRestart();
}
public void testDurablePropagationBrokerRestartOneWay() throws Exception {
duplex = false;
testDurablePropagationBrokerRestart();
}
protected void testDurablePropagationBrokerRestart() throws Exception {
deletePersistentMessagesOnStartup = true;
// Setup broker networks
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
bridgeBrokers("Broker_C_C", "Broker_D_D");
bridgeBrokers("Broker_D_D", "Broker_E_E");
if (!duplex) {
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
bridgeBrokers("Broker_D_D", "Broker_C_C");
bridgeBrokers("Broker_E_E", "Broker_D_D");
}
startAllBrokers();
// Setup destination
ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true);
// Setup consumers
Connection conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
conn.start();
Session ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
MessageConsumer clientA2 = ses.createDurableSubscriber(dest, "subA2");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
//bring online a consumer on the other side
Connection conn2 = brokers.get("Broker_E_E").factory.createConnection();
conn2.setClientID("clientId2");
conn2.start();
Session ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
MessageConsumer clientE2 = ses2.createDurableSubscriber(dest, "subE2");
// let consumers propagate around the network
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 2);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 1);
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 1);
clientA.close();
clientA2.close();
clientE.close();
clientE2.close();
this.destroyAllBrokers();
deletePersistentMessagesOnStartup = false;
String options = new String("?persistent=true&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
createBroker(new URI("broker:(tcp://localhost:61619)/Broker_D_D" + options));
createBroker(new URI("broker:(tcp://localhost:61620)/Broker_E_E" + options));
bridgeBrokers("Broker_A_A", "Broker_B_B");
bridgeBrokers("Broker_B_B", "Broker_C_C");
bridgeBrokers("Broker_C_C", "Broker_D_D");
bridgeBrokers("Broker_D_D", "Broker_E_E");
if (!duplex) {
bridgeBrokers("Broker_B_B", "Broker_A_A");
bridgeBrokers("Broker_C_C", "Broker_B_B");
bridgeBrokers("Broker_D_D", "Broker_C_C");
bridgeBrokers("Broker_E_E", "Broker_D_D");
}
startAllBrokers();
conn = brokers.get("Broker_A_A").factory.createConnection();
conn.setClientID("clientId1");
conn.start();
ses = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn2 = brokers.get("Broker_E_E").factory.createConnection();
conn2.setClientID("clientId2");
conn2.start();
ses2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
//bring one online and leave others offline to test mixed
clientE = ses2.createDurableSubscriber(dest, "subE");
clientE.close();
ses.unsubscribe("subA");
ses.unsubscribe("subA2");
ses2.unsubscribe("subE");
ses2.unsubscribe("subE2");
assertNCDurableSubsCount(brokers.get("Broker_A_A").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_B_B").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_C_C").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_D_D").broker, dest, 0);
assertNCDurableSubsCount(brokers.get("Broker_E_E").broker, dest, 0);
}
public void testDurablePropagationDuplex() throws Exception { public void testDurablePropagationDuplex() throws Exception {
duplex = true; duplex = true;
testDurablePropagation(); testDurablePropagation();
@ -552,7 +665,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
public void setUp() throws Exception { public void setUp() throws Exception {
super.setAutoFail(true); super.setAutoFail(true);
super.setUp(); super.setUp();
String options = new String("?persistent=false&useJmx=false"); String options = new String("?persistent=true&useJmx=false");
createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options)); createBroker(new URI("broker:(tcp://localhost:61616)/Broker_A_A" + options));
createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options)); createBroker(new URI("broker:(tcp://localhost:61617)/Broker_B_B" + options));
createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options)); createBroker(new URI("broker:(tcp://localhost:61618)/Broker_C_C" + options));
@ -563,6 +676,8 @@ public class DurableFiveBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSu
@Override @Override
protected void configureBroker(BrokerService broker) { protected void configureBroker(BrokerService broker) {
broker.setBrokerId(broker.getBrokerName()); broker.setBrokerId(broker.getBrokerName());
broker.setDeleteAllMessagesOnStartup(deletePersistentMessagesOnStartup);
broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "DurableFiveBrokerNetworkBridgeTest");
} }
protected Session createSession(String broker) throws Exception { protected Session createSession(String broker) throws Exception {