mirror of https://github.com/apache/activemq.git
Fixing a regression that caused a network bridge to recreate durable demand improperly.
This commit is contained in:
parent
e0c2c177c2
commit
2117768e0a
|
@ -18,6 +18,7 @@ package org.apache.activemq.network;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ConsumerId;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
|
@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
|
|||
public class DurableConduitBridge extends ConduitBridge {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DurableConduitBridge:" + configuration.getBrokerName() + "->" + getRemoteBrokerName();
|
||||
}
|
||||
|
@ -52,6 +54,7 @@ public class DurableConduitBridge extends ConduitBridge {
|
|||
* Subscriptions for these destinations are always created
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected void setupStaticDestinations() {
|
||||
super.setupStaticDestinations();
|
||||
ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
|
||||
|
@ -60,11 +63,22 @@ public class DurableConduitBridge extends ConduitBridge {
|
|||
if (isPermissableDestination(dest) && !doesConsumerExist(dest)) {
|
||||
DemandSubscription sub = createDemandSubscription(dest);
|
||||
sub.setStaticallyIncluded(true);
|
||||
if (dest.isTopic()) {
|
||||
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
|
||||
}
|
||||
try {
|
||||
addSubscription(sub);
|
||||
//Filtering by non-empty subscriptions, see AMQ-5875
|
||||
if (dest.isTopic()) {
|
||||
sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest));
|
||||
for (Subscription subscription : this.getRegionSubscriptions(dest)) {
|
||||
String clientId = subscription.getContext().getClientId();
|
||||
String subName = subscription.getConsumerInfo().getSubscriptionName();
|
||||
if (clientId != null && clientId.equals(sub.getLocalInfo().getClientId())
|
||||
&& subName != null && subName.equals(sub.getLocalInfo().getSubscriptionName())) {
|
||||
addSubscription(sub);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
addSubscription(sub);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to add static destination {}", dest, e);
|
||||
}
|
||||
|
@ -74,6 +88,7 @@ public class DurableConduitBridge extends ConduitBridge {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
|
||||
if (addToAlreadyInterestedConsumers(info)) {
|
||||
return null; // don't want this subscription added
|
||||
|
|
|
@ -0,0 +1,202 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public abstract class AbstractMultiKahaDBDeletionTest {
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
.getLogger(MultiKahaDBTopicDeletionTest.class);
|
||||
|
||||
protected BrokerService brokerService;
|
||||
protected Broker broker;
|
||||
protected URI brokerConnectURI;
|
||||
protected File storeDir;
|
||||
protected ActiveMQDestination dest1;
|
||||
protected ActiveMQDestination dest2;
|
||||
|
||||
public AbstractMultiKahaDBDeletionTest(ActiveMQDestination dest1, ActiveMQDestination dest2) {
|
||||
this.dest1 = dest1;
|
||||
this.dest2 = dest2;
|
||||
}
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempTestDir = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
setUpBroker(true);
|
||||
}
|
||||
|
||||
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
this.initPersistence(brokerService);
|
||||
// set up a transport
|
||||
TransportConnector connector = brokerService
|
||||
.addConnector(new TransportConnector());
|
||||
connector.setUri(new URI("tcp://0.0.0.0:0"));
|
||||
connector.setName("tcp");
|
||||
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
brokerConnectURI = brokerService.getConnectorByName("tcp")
|
||||
.getConnectUri();
|
||||
broker = brokerService.getBroker();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
|
||||
protected void initPersistence(BrokerService brokerService)
|
||||
throws IOException {
|
||||
storeDir = tempTestDir.getRoot();
|
||||
brokerService.setPersistent(true);
|
||||
|
||||
// setup multi-kaha adapter
|
||||
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(storeDir);
|
||||
|
||||
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
|
||||
kahaStore.setJournalMaxFileLength(1024 * 512);
|
||||
|
||||
// set up a store per destination
|
||||
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
|
||||
filtered.setPersistenceAdapter(kahaStore);
|
||||
filtered.setPerDestination(true);
|
||||
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
|
||||
stores.add(filtered);
|
||||
|
||||
persistenceAdapter.setFilteredPersistenceAdapters(stores);
|
||||
brokerService.setPersistenceAdapter(persistenceAdapter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a destination can be deleted and the other destination can still be subscribed to
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testDest1Deletion() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", dest1, dest2);
|
||||
LOG.info("Removing {}, subscribing to {}", dest1, dest2);
|
||||
|
||||
// Create two destinations
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
|
||||
|
||||
// remove destination2
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
|
||||
|
||||
// try and create a consumer on dest2, before AMQ-5875 this
|
||||
//would cause an IllegalStateException for Topics
|
||||
createConsumer(dest2);
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter());
|
||||
assertTrue("Store index should still exist", storeFiles.size() >= 1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testDest2Deletion() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", dest1, dest2);
|
||||
LOG.info("Removing {}, subscribing to {}", dest2, dest1);
|
||||
|
||||
// Create two destinations
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
|
||||
|
||||
// remove destination2
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
|
||||
|
||||
// try and create a consumer on dest1, before AMQ-5875 this
|
||||
//would cause an IllegalStateException for Topics
|
||||
createConsumer(dest1);
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter());
|
||||
assertTrue("Store index should still exist", storeFiles.size() >= 1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testStoreCleanupDeleteDest1First() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", dest1, dest2);
|
||||
LOG.info("Deleting {} first, {} second", dest1, dest2);
|
||||
|
||||
// Create two destinations
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
|
||||
|
||||
// remove both destinations
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
|
||||
|
||||
//Assert that with no more destinations attached to a store that it has been cleaned up
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter());
|
||||
assertEquals("Store files should be deleted", 0, storeFiles.size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreCleanupDeleteDest2First() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", dest1, dest2);
|
||||
LOG.info("Deleting {} first, {} second", dest2, dest1);
|
||||
|
||||
// Create two destinations
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), dest2, false);
|
||||
|
||||
// remove both destinations
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest2, 100);
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), dest1, 100);
|
||||
|
||||
//Assert that with no more destinations attached to a store that it has been cleaned up
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), getStoreFileFilter());
|
||||
assertEquals("Store files should be deleted", 0, storeFiles.size());
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected abstract void createConsumer(ActiveMQDestination dest) throws JMSException;
|
||||
|
||||
protected abstract WildcardFileFilter getStoreFileFilter();
|
||||
|
||||
}
|
|
@ -0,0 +1,91 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* AMQ-5875
|
||||
*
|
||||
* This test shows that when multiple destinations share a single KahaDB
|
||||
* instance when using mKahaDB, that the deletion of one Queue will not cause
|
||||
* the store to be deleted if another destination is still attached. This
|
||||
* issue was related to Topics but this test makes sure Queues work as well.
|
||||
*
|
||||
* */
|
||||
@RunWith(Parameterized.class)
|
||||
public class MultiKahaDBQueueDeletionTest extends AbstractMultiKahaDBDeletionTest {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
.getLogger(MultiKahaDBTopicDeletionTest.class);
|
||||
|
||||
protected static ActiveMQQueue QUEUE1 = new ActiveMQQueue("test.>");
|
||||
protected static ActiveMQQueue QUEUE2 = new ActiveMQQueue("test.t.queue");
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
|
||||
//Test with queues created in different orders
|
||||
return Arrays.asList(new Object[][] {
|
||||
{QUEUE1, QUEUE2},
|
||||
{QUEUE2, QUEUE1}
|
||||
});
|
||||
}
|
||||
|
||||
public MultiKahaDBQueueDeletionTest(ActiveMQQueue dest1, ActiveMQQueue dest2) {
|
||||
super(dest1, dest2);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#createConsumer(org.apache.activemq.command.ActiveMQDestination)
|
||||
*/
|
||||
@Override
|
||||
protected void createConsumer(ActiveMQDestination dest) throws JMSException {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||
brokerConnectURI);
|
||||
Connection connection = factory.createConnection();
|
||||
connection.setClientID("client1");
|
||||
connection.start();
|
||||
Session session = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
session.createConsumer(dest);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.activemq.store.kahadb.AbstractMultiKahaDBDeletionTest#getStoreFileFilter()
|
||||
*/
|
||||
@Override
|
||||
protected WildcardFileFilter getStoreFileFilter() {
|
||||
return new WildcardFileFilter("queue*");
|
||||
}
|
||||
|
||||
}
|
|
@ -16,32 +16,18 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.TransportConnector;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
@ -58,21 +44,13 @@ import org.slf4j.LoggerFactory;
|
|||
*
|
||||
* */
|
||||
@RunWith(Parameterized.class)
|
||||
public class MultiKahaDBTopicDeletionTest {
|
||||
public class MultiKahaDBTopicDeletionTest extends AbstractMultiKahaDBDeletionTest {
|
||||
protected static final Logger LOG = LoggerFactory
|
||||
.getLogger(MultiKahaDBTopicDeletionTest.class);
|
||||
|
||||
protected BrokerService brokerService;
|
||||
protected Broker broker;
|
||||
protected URI brokerConnectURI;
|
||||
protected File storeDir;
|
||||
protected ActiveMQTopic topic1;
|
||||
protected ActiveMQTopic topic2;
|
||||
|
||||
protected static ActiveMQTopic TOPIC1 = new ActiveMQTopic("test.>");
|
||||
protected static ActiveMQTopic TOPIC2 = new ActiveMQTopic("test.t.topic");
|
||||
|
||||
|
||||
@Parameters
|
||||
public static Collection<Object[]> data() {
|
||||
|
||||
|
@ -83,144 +61,13 @@ public class MultiKahaDBTopicDeletionTest {
|
|||
});
|
||||
}
|
||||
|
||||
public MultiKahaDBTopicDeletionTest(ActiveMQTopic topic1, ActiveMQTopic topic2) {
|
||||
this.topic1 = topic1;
|
||||
this.topic2 = topic2;
|
||||
public MultiKahaDBTopicDeletionTest(ActiveMQTopic dest1,
|
||||
ActiveMQTopic dest2) {
|
||||
super(dest1, dest2);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder tempTestDir = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
setUpBroker(true);
|
||||
}
|
||||
|
||||
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
this.initPersistence(brokerService);
|
||||
// set up a transport
|
||||
TransportConnector connector = brokerService
|
||||
.addConnector(new TransportConnector());
|
||||
connector.setUri(new URI("tcp://0.0.0.0:0"));
|
||||
connector.setName("tcp");
|
||||
|
||||
brokerService.start();
|
||||
brokerService.waitUntilStarted();
|
||||
brokerConnectURI = brokerService.getConnectorByName("tcp")
|
||||
.getConnectUri();
|
||||
broker = brokerService.getBroker();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
|
||||
protected void initPersistence(BrokerService brokerService)
|
||||
throws IOException {
|
||||
storeDir = tempTestDir.getRoot();
|
||||
brokerService.setPersistent(true);
|
||||
|
||||
// setup multi-kaha adapter
|
||||
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(storeDir);
|
||||
|
||||
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
|
||||
kahaStore.setJournalMaxFileLength(1024 * 512);
|
||||
|
||||
// set up a store per destination
|
||||
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
|
||||
filtered.setPersistenceAdapter(kahaStore);
|
||||
filtered.setPerDestination(true);
|
||||
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
|
||||
stores.add(filtered);
|
||||
|
||||
persistenceAdapter.setFilteredPersistenceAdapters(stores);
|
||||
brokerService.setPersistenceAdapter(persistenceAdapter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that a topic can be deleted and the other topic can still be subscribed to
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testTopic1Deletion() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", topic1, topic2);
|
||||
LOG.info("Removing {}, subscribing to {}", topic1, topic2);
|
||||
|
||||
// Create two topics
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
|
||||
|
||||
// remove topic2
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
|
||||
|
||||
// try and create a subscription on topic2, before AMQ-5875 this
|
||||
//would cause an IllegalStateException
|
||||
createSubscriber(topic2);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testTopic2Deletion() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", topic1, topic2);
|
||||
LOG.info("Removing {}, subscribing to {}", topic2, topic1);
|
||||
|
||||
// Create two topics
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
|
||||
|
||||
// remove topic2
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
|
||||
|
||||
// try and create a subscription on topic1, before AMQ-5875 this
|
||||
//would cause an IllegalStateException
|
||||
createSubscriber(topic1);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testStoreCleanupDeleteTopic1First() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", topic1, topic2);
|
||||
LOG.info("Deleting {} first, {} second", topic1, topic2);
|
||||
|
||||
// Create two topics
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
|
||||
|
||||
// remove both topics
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
|
||||
|
||||
//Assert that with no more destinations attached to a store that it has been cleaned up
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*"));
|
||||
assertEquals("Store files should be deleted", 0, storeFiles.size());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStoreCleanupDeleteTopic2First() throws Exception {
|
||||
LOG.info("Creating {} first, {} second", topic1, topic2);
|
||||
LOG.info("Deleting {} first, {} second", topic2, topic1);
|
||||
|
||||
// Create two topics
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic1, false);
|
||||
broker.addDestination(brokerService.getAdminConnectionContext(), topic2, false);
|
||||
|
||||
// remove both topics
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic2, 100);
|
||||
broker.removeDestination(brokerService.getAdminConnectionContext(), topic1, 100);
|
||||
|
||||
//Assert that with no more destinations attached to a store that it has been cleaned up
|
||||
Collection<File> storeFiles = FileUtils.listFiles(storeDir, new WildcardFileFilter("db*"), new WildcardFileFilter("topic*"));
|
||||
assertEquals("Store files should be deleted", 0, storeFiles.size());
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected void createSubscriber(ActiveMQTopic topic) throws JMSException {
|
||||
@Override
|
||||
protected void createConsumer(ActiveMQDestination dest) throws JMSException {
|
||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
|
||||
brokerConnectURI);
|
||||
Connection connection = factory.createConnection();
|
||||
|
@ -228,7 +75,12 @@ public class MultiKahaDBTopicDeletionTest {
|
|||
connection.start();
|
||||
Session session = connection.createSession(false,
|
||||
Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "sub1");
|
||||
session.createDurableSubscriber((Topic) dest, "sub1");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WildcardFileFilter getStoreFileFilter() {
|
||||
return new WildcardFileFilter("topic*");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue