mirror of https://github.com/apache/activemq.git
Clearing the subscription from the local map in DemandForwardingBridgeSupport to make sure that demand can be properly recreated again.
This commit is contained in:
parent
78c959a5c4
commit
09054fc4a8
|
@ -833,6 +833,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
|
||||
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
|
||||
localBroker.oneway(sending);
|
||||
|
||||
//remove subscriber from map
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.util.Wait;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
|
||||
public abstract class DynamicNetworkTestSupport {
|
||||
|
||||
protected Connection localConnection;
|
||||
protected Connection remoteConnection;
|
||||
protected BrokerService localBroker;
|
||||
protected BrokerService remoteBroker;
|
||||
protected Session localSession;
|
||||
protected Session remoteSession;
|
||||
protected ActiveMQTopic included;
|
||||
protected ActiveMQTopic excluded;
|
||||
protected String testTopicName = "include.test.bar";
|
||||
protected String excludeTopicName = "exclude.test.bar";
|
||||
protected String clientId = "clientId";
|
||||
protected String subName = "subId";
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
|
||||
|
||||
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context,
|
||||
final BrokerService brokerService) throws Exception {
|
||||
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
|
||||
info.setClientId(clientId);
|
||||
info.setSubcriptionName(subName);
|
||||
context.setBroker(brokerService.getBroker());
|
||||
context.setClientId(clientId);
|
||||
return info;
|
||||
}
|
||||
|
||||
protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
//should only be 1 for the composite destination creation
|
||||
return count == destinationStatistics.getConsumers().getCount();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return count == destinationStatistics.getDequeues().getCount() &&
|
||||
count == destinationStatistics.getDispatched().getCount() &&
|
||||
count == destinationStatistics.getForwards().getCount();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
|
||||
assertEquals("local broker dest stat dispatched", count, localStatistics.getDispatched().getCount());
|
||||
assertEquals("local broker dest stat dequeues", count, localStatistics.getDequeues().getCount());
|
||||
assertEquals("local broker dest stat forwards", count, localStatistics.getForwards().getCount());
|
||||
}
|
||||
|
||||
protected interface ConsumerCreator {
|
||||
MessageConsumer createConsumer() throws JMSException;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,224 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* This test is to show that if a durable subscription over a network bridge is deleted and
|
||||
* re-created, messages will flow properly again for dynamic subscriptions.
|
||||
*
|
||||
* AMQ-6050
|
||||
*/
|
||||
public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
|
||||
|
||||
/**
|
||||
* Test publisher on localBroker and durable on remoteBroker
|
||||
* after durable deletion, recreate durable
|
||||
*/
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testDurableConsumer() throws Exception {
|
||||
testReceive(remoteBroker, remoteSession, localBroker, localSession, new ConsumerCreator() {
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer() throws JMSException {
|
||||
return remoteSession.createDurableSubscriber(included, subName);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverse and test publisher on remoteBroker and durable on localBroker
|
||||
* after durable deletion, recreate durable
|
||||
*/
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testDurableConsumerReverse() throws Exception {
|
||||
testReceive(localBroker, localSession, remoteBroker, remoteSession, new ConsumerCreator() {
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer() throws JMSException {
|
||||
return localSession.createDurableSubscriber(included, subName);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Test publisher on localBroker and durable on remoteBroker
|
||||
* after durable deletion, recreate with a non-durable consumer
|
||||
*/
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testDurableAndTopicConsumer() throws Exception {
|
||||
testReceive(remoteBroker, remoteSession, localBroker, localSession, new ConsumerCreator() {
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer() throws JMSException {
|
||||
return remoteSession.createConsumer(included);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverse and test publisher on remoteBroker and durable on localBroker
|
||||
* after durable deletion, recreate with a non-durable consumer
|
||||
*/
|
||||
@Test(timeout = 30 * 1000)
|
||||
public void testDurableAndTopicConsumerReverse() throws Exception {
|
||||
testReceive(localBroker, localSession, remoteBroker, remoteSession, new ConsumerCreator() {
|
||||
|
||||
@Override
|
||||
public MessageConsumer createConsumer() throws JMSException {
|
||||
return localSession.createConsumer(included);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testReceive(BrokerService receiveBroker, Session receiveSession,
|
||||
BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception {
|
||||
|
||||
final DestinationStatistics destinationStatistics =
|
||||
publishBroker.getDestination(included).getDestinationStatistics();
|
||||
|
||||
MessageProducer includedProducer = publishSession.createProducer(included);
|
||||
MessageConsumer bridgeConsumer = receiveSession.createDurableSubscriber(
|
||||
included, subName);
|
||||
|
||||
waitForConsumerCount(destinationStatistics, 1);
|
||||
|
||||
//remove the durable
|
||||
final ConnectionContext context = new ConnectionContext();
|
||||
RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker);
|
||||
bridgeConsumer.close();
|
||||
Thread.sleep(1000);
|
||||
receiveBroker.getBroker().removeSubscription(context, info);
|
||||
waitForConsumerCount(destinationStatistics, 0);
|
||||
|
||||
//re-create consumer
|
||||
MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
|
||||
waitForConsumerCount(destinationStatistics, 1);
|
||||
|
||||
//make sure message received
|
||||
includedProducer.send(publishSession.createTextMessage("test"));
|
||||
assertNotNull(bridgeConsumer2.receive(5000));
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
doSetUp(true);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
doTearDown();
|
||||
}
|
||||
|
||||
protected void doTearDown() throws Exception {
|
||||
if (localConnection != null) {
|
||||
localConnection.close();
|
||||
}
|
||||
if (remoteConnection != null) {
|
||||
remoteConnection.close();
|
||||
}
|
||||
if (localBroker != null) {
|
||||
localBroker.stop();
|
||||
}
|
||||
if (remoteBroker != null) {
|
||||
remoteBroker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void doSetUp(boolean deleteAllMessages) throws Exception {
|
||||
remoteBroker = createRemoteBroker();
|
||||
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
remoteBroker.start();
|
||||
remoteBroker.waitUntilStarted();
|
||||
localBroker = createLocalBroker();
|
||||
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
localBroker.start();
|
||||
localBroker.waitUntilStarted();
|
||||
URI localURI = localBroker.getVmConnectorURI();
|
||||
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
|
||||
fac.setAlwaysSyncSend(true);
|
||||
fac.setDispatchAsync(false);
|
||||
localConnection = fac.createConnection();
|
||||
localConnection.setClientID(clientId);
|
||||
localConnection.start();
|
||||
URI remoteURI = remoteBroker.getVmConnectorURI();
|
||||
fac = new ActiveMQConnectionFactory(remoteURI);
|
||||
remoteConnection = fac.createConnection();
|
||||
remoteConnection.setClientID(clientId);
|
||||
remoteConnection.start();
|
||||
included = new ActiveMQTopic(testTopicName);
|
||||
localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
|
||||
protected NetworkConnector connector;
|
||||
protected BrokerService createLocalBroker() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
brokerService.setMonitorConnectionSplits(true);
|
||||
brokerService.setDataDirectoryFile(tempFolder.newFolder());
|
||||
brokerService.setBrokerName("localBroker");
|
||||
|
||||
connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)"));
|
||||
connector.setName("networkConnector");
|
||||
connector.setDecreaseNetworkConsumerPriority(false);
|
||||
connector.setConduitSubscriptions(true);
|
||||
connector.setDuplex(true);
|
||||
connector.setDynamicallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
|
||||
new ActiveMQTopic(testTopicName)));
|
||||
connector.setExcludedDestinations(Lists.<ActiveMQDestination>newArrayList(
|
||||
new ActiveMQTopic(excludeTopicName)));
|
||||
|
||||
brokerService.addNetworkConnector(connector);
|
||||
brokerService.addConnector("tcp://localhost:61616");
|
||||
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
protected BrokerService createRemoteBroker() throws Exception {
|
||||
BrokerService brokerService = new BrokerService();
|
||||
brokerService.setBrokerName("remoteBroker");
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.setDataDirectoryFile(tempFolder.newFolder());
|
||||
brokerService.addConnector("tcp://localhost:61617");
|
||||
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
}
|
|
@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
|
@ -29,7 +28,6 @@ import java.util.Collection;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -58,9 +56,7 @@ import org.apache.activemq.util.Wait;
|
|||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
@ -75,7 +71,7 @@ import com.google.common.collect.Lists;
|
|||
* as demand.
|
||||
*/
|
||||
@RunWith(Parameterized.class)
|
||||
public class VirtualConsumerDemandTest {
|
||||
public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
|
||||
|
||||
protected static final int MESSAGE_COUNT = 10;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class);
|
||||
|
@ -96,25 +92,14 @@ public class VirtualConsumerDemandTest {
|
|||
});
|
||||
}
|
||||
|
||||
protected Connection localConnection;
|
||||
protected Connection remoteConnection;
|
||||
protected BrokerService localBroker;
|
||||
protected BrokerService remoteBroker;
|
||||
|
||||
protected JavaRuntimeConfigurationBroker runtimeBroker;
|
||||
protected Session localSession;
|
||||
protected Session remoteSession;
|
||||
protected ActiveMQTopic included;
|
||||
protected ActiveMQTopic excluded;
|
||||
protected String consumerName = "durableSubs";
|
||||
protected String testTopicName = "include.test.bar";
|
||||
protected String testQueueName = "include.test.foo";
|
||||
|
||||
private final boolean isDuplex;
|
||||
private final boolean isUseVirtualDestSubsOnCreation;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempFolder = new TemporaryFolder(new File("target"));
|
||||
|
||||
|
||||
public VirtualConsumerDemandTest(boolean isDuplex, boolean isUseVirtualDestSubsOnCreation) {
|
||||
// Assume.assumeTrue(
|
||||
|
@ -790,8 +775,8 @@ public class VirtualConsumerDemandTest {
|
|||
MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName);
|
||||
|
||||
//configure a virtual destination that forwards messages to an excluded destination
|
||||
CompositeTopic compositeTopic = createCompositeTopic("excluded.test.bar",
|
||||
new ActiveMQQueue("excluded.test.bar.bridge"));
|
||||
CompositeTopic compositeTopic = createCompositeTopic("exclude.test.bar",
|
||||
new ActiveMQQueue("exclude.test.bar.bridge"));
|
||||
|
||||
runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true);
|
||||
|
||||
|
@ -799,7 +784,7 @@ public class VirtualConsumerDemandTest {
|
|||
Message test = localSession.createTextMessage("test");
|
||||
Thread.sleep(1000);
|
||||
|
||||
MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("excluded.test.bar.bridge"));
|
||||
MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("exclude.test.bar.bridge"));
|
||||
Thread.sleep(2000);
|
||||
includedProducer.send(test);
|
||||
assertNull(bridgeConsumer.receive(5000));
|
||||
|
@ -1302,13 +1287,8 @@ public class VirtualConsumerDemandTest {
|
|||
remoteAdvisoryBroker = (AdvisoryBroker)
|
||||
brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
|
||||
|
||||
NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61616)"));
|
||||
brokerService.addNetworkConnector(connector);
|
||||
|
||||
brokerService.addConnector("tcp://localhost:61617");
|
||||
|
||||
|
||||
|
||||
return brokerService;
|
||||
}
|
||||
|
||||
|
@ -1330,27 +1310,6 @@ public class VirtualConsumerDemandTest {
|
|||
return compositeQueue;
|
||||
}
|
||||
|
||||
protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception {
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
//should only be 1 for the composite destination creation
|
||||
return count == destinationStatistics.getConsumers().getCount();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception {
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return count == destinationStatistics.getDequeues().getCount() &&
|
||||
count == destinationStatistics.getDispatched().getCount() &&
|
||||
count == destinationStatistics.getForwards().getCount();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String topic) throws JMSException {
|
||||
return remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(
|
||||
new ActiveMQTopic(topic)));
|
||||
|
@ -1361,12 +1320,6 @@ public class VirtualConsumerDemandTest {
|
|||
new ActiveMQQueue(queue)));
|
||||
}
|
||||
|
||||
protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) {
|
||||
assertEquals("local broker dest stat dispatched", count, localStatistics.getDispatched().getCount());
|
||||
assertEquals("local broker dest stat dequeues", count, localStatistics.getDequeues().getCount());
|
||||
assertEquals("local broker dest stat forwards", count, localStatistics.getForwards().getCount());
|
||||
}
|
||||
|
||||
protected void assertRemoteAdvisoryCount(final MessageConsumer advisoryConsumer, final int count) throws JMSException {
|
||||
int available = 0;
|
||||
ActiveMQMessage message = null;
|
||||
|
|
Loading…
Reference in New Issue