AMQ-7238 - Ensure remoteId subscription map is also cleared when local

map is cleared inside DemandForwardingBridgeSupport
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-07-08 08:38:34 -04:00
parent eb885cb513
commit c7eff84058
3 changed files with 48 additions and 2 deletions

View File

@ -1074,8 +1074,11 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
sending.setConnectionId(this.localConnectionInfo.getConnectionId()); sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending); localBroker.oneway(sending);
//remove subscriber from map //remove subscriber from local map
i.remove(); i.remove();
//need to remove the mapping from the remote map as well
subscriptionMapByRemoteId.remove(ds.getRemoteInfo().getConsumerId());
} }
} }

View File

@ -16,10 +16,12 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -30,6 +32,8 @@ import javax.jms.Session;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
@ -227,5 +231,25 @@ public abstract class DynamicNetworkTestSupport {
}, 10000, 500)); }, 10000, 500));
} }
protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) {
assertNotNull(networkBridge);
DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) networkBridge;
assertEquals(count, bridge.subscriptionMapByLocalId.size());
assertEquals(count, bridge.subscriptionMapByRemoteId.size());
}
protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
assertNotNull(connector);
for (TransportConnection tc : connector.getConnections()) {
if (tc.getConnectionId().startsWith("networkConnector_")) {
final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
bridgeField.setAccessible(true);
return (DemandForwardingBridge) bridgeField.get(tc);
}
}
return null;
}
} }

View File

@ -16,8 +16,10 @@
*/ */
package org.apache.activemq.network; package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.lang.reflect.Field;
import java.net.URI; import java.net.URI;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -28,15 +30,18 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.Wait;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import scala.annotation.bridge;
/** /**
* This test is to show that if a durable subscription over a network bridge is deleted and * This test is to show that if a durable subscription over a network bridge is deleted and
@ -106,7 +111,7 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
}); });
} }
public void testReceive(BrokerService receiveBroker, Session receiveSession, protected void testReceive(BrokerService receiveBroker, Session receiveSession,
BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception { BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception {
final DestinationStatistics destinationStatistics = final DestinationStatistics destinationStatistics =
@ -118,6 +123,17 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
waitForConsumerCount(destinationStatistics, 1); waitForConsumerCount(destinationStatistics, 1);
final NetworkBridge bridge;
if (publishBroker.getNetworkConnectors().size() > 0) {
Wait.waitFor(() -> publishBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500);
bridge = publishBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
} else {
bridge = findDuplexBridge(publishBroker.getTransportConnectorByScheme("tcp"));
}
//Should be 2 - one for the durable destination and one for the advisory destinations
assertSubscriptionMapCounts(bridge, 2);
//remove the durable //remove the durable
final ConnectionContext context = new ConnectionContext(); final ConnectionContext context = new ConnectionContext();
RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker); RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker);
@ -126,6 +142,9 @@ public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
receiveBroker.getBroker().removeSubscription(context, info); receiveBroker.getBroker().removeSubscription(context, info);
waitForConsumerCount(destinationStatistics, 0); waitForConsumerCount(destinationStatistics, 0);
//Should be 1 - 0 for the durable destination and one for the advisory destinations
assertSubscriptionMapCounts(bridge, 1);
//re-create consumer //re-create consumer
MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer(); MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
waitForConsumerCount(destinationStatistics, 1); waitForConsumerCount(destinationStatistics, 1);