resolve https://issues.apache.org/activemq/browse/AMQ-2720 - ensure kahaDB getDestinations returns only durable topic destinations and add test case for duplicate messages in a network after restart

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@941281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-05-05 13:37:13 +00:00
parent a3344e61a9
commit 591e55ffce
4 changed files with 97 additions and 14 deletions

View File

@ -696,7 +696,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
Message message = configureMessage(md); Message message = configureMessage(md);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message); LOG.debug("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ", consumer: " + md.getConsumerId() + ", brokerPath: " + Arrays.toString(message.getBrokerPath()) + ", message: " + message);
} }
if (!message.isResponseRequired()) { if (!message.isResponseRequired()) {

View File

@ -46,6 +46,7 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener; import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand; import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination;
@ -496,9 +497,23 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void execute(Transaction tx) throws IOException { public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next(); Entry<String, StoredDestination> entry = iterator.next();
rc.add(convert(entry.getKey())); if (!isEmptyTopic(entry, tx)) {
rc.add(convert(entry.getKey()));
}
} }
} }
private boolean isEmptyTopic(Entry<String, StoredDestination> entry, Transaction tx) throws IOException {
boolean isEmptyTopic = false;
ActiveMQDestination dest = convert(entry.getKey());
if (dest.isTopic()) {
StoredDestination loadedStore = getStoredDestination(convert(dest), tx);
if (loadedStore.ackPositions.isEmpty()) {
isEmptyTopic = true;
}
}
return isEmptyTopic;
}
}); });
} }
return rc; return rc;

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
public class ThreeBrokerVirtualTopicNetworkAMQPATest extends ThreeBrokerVirtualTopicNetworkTest {
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
File dataFileDir = new File("target/test-amq-data/amq/" + broker.getBrokerName());
AMQPersistenceAdapter adapter = new AMQPersistenceAdapter();
adapter.setDirectory(dataFileDir);
broker.setPersistenceAdapter(adapter);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import javax.jms.Destination; import javax.jms.Destination;
@ -98,23 +99,51 @@ public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSu
waitForBridgeFormation(); waitForBridgeFormation();
clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false)); clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
LOG.info("recreated clientA");
Thread.sleep(2000); Thread.sleep(2000);
sendMessages("BrokerA", dest, 2); sendMessages("BrokerA", dest, 10);
msgsA = getConsumerMessages("BrokerA", clientA); msgsA = getConsumerMessages("BrokerA", clientA);
msgsA.waitForMessagesToArrive(2); msgsA.waitForMessagesToArrive(10);
msgsB.waitForMessagesToArrive(3); msgsB.waitForMessagesToArrive(11);
msgsC.waitForMessagesToArrive(3); msgsC.waitForMessagesToArrive(11);
// ensure we don't get any more messages // ensure we don't get any more messages
Thread.sleep(2000); Thread.sleep(2000);
assertEquals(2, msgsA.getMessageCount()); assertEquals(10, msgsA.getMessageCount());
assertEquals(3, msgsB.getMessageCount()); assertEquals(11, msgsB.getMessageCount());
assertEquals(3, msgsC.getMessageCount()); assertEquals(11, msgsC.getMessageCount());
// restart to ensure no hanging messages
LOG.info("Restarting brokerA again");
brokerItem = brokers.remove("BrokerA");
if (brokerItem != null) {
brokerItem.destroy();
}
restartedBroker = createAndConfigureBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?useJmx=false"));
bridgeAndConfigureBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL, conduitSubs);
bridgeAndConfigureBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL, conduitSubs);
restartedBroker.start();
waitForBridgeFormation();
clientA = createConsumer("BrokerA", createDestination("Consumer.A.TEST.FOO", false));
LOG.info("recreated clientA again");
Thread.sleep(2000);
msgsA = getConsumerMessages("BrokerA", clientA);
// ensure we don't get any more messages
Thread.sleep(5000);
assertEquals(0, msgsA.getMessageCount());
assertEquals(11, msgsB.getMessageCount());
assertEquals(11, msgsC.getMessageCount());
} }
@ -135,10 +164,7 @@ public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSu
private BrokerService createAndConfigureBroker(URI uri) throws Exception { private BrokerService createAndConfigureBroker(URI uri) throws Exception {
BrokerService broker = createBroker(uri); BrokerService broker = createBroker(uri);
File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName()); configurePersistenceAdapter(broker);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
broker.setPersistenceAdapter(kaha);
// make all topics virtual and consumers use the default prefix // make all topics virtual and consumers use the default prefix
VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor();
@ -147,4 +173,12 @@ public class ThreeBrokerVirtualTopicNetworkTest extends JmsMultipleBrokersTestSu
broker.setDestinationInterceptors(destinationInterceptors); broker.setDestinationInterceptors(destinationInterceptors);
return broker; return broker;
} }
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
File dataFileDir = new File("target/test-amq-data/kahadb/" + broker.getBrokerName());
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
broker.setPersistenceAdapter(kaha);
}
} }