diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 05773ff7dc..1fe00acf67 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -32,6 +32,8 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTempQueue; @@ -283,6 +285,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } + @Override + void rollbackStatsOnDuplicate(KahaDestination commandDestination) { + RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); + Set destinationSet = regionBroker.getDestinations(convert(commandDestination)); + for (Destination destination : destinationSet) { + destination.getDestinationStatistics().getMessages().decrement(); + destination.getDestinationStatistics().getEnqueues().decrement(); + } + } + private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { return pageFile.tx().execute(new Transaction.CallableClosure() { public Location execute(Transaction tx) throws IOException { @@ -1037,7 +1049,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } int type = Integer.parseInt(dest.substring(0, p)); String name = dest.substring(p + 1); + return convert(type, name); + } + private ActiveMQDestination convert(KahaDestination commandDestination) { + return convert(commandDestination.getType().getNumber(), commandDestination.getName()); + } + + private ActiveMQDestination convert(int type, String name) { switch (KahaDestination.DestinationType.valueOf(type)) { case QUEUE: return new ActiveMQQueue(name); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index bb51663cca..378f55026b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1207,6 +1207,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId()); sd.messageIdIndex.put(tx, command.getMessageId(), previous); sd.locationIndex.remove(tx, location); + rollbackStatsOnDuplicate(command.getDestination()); } } else { // restore the previous value.. Looks like this was a redo of a @@ -1222,6 +1223,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe metadata.lastUpdate = location; } + abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination); + void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); if (!command.hasSubscriptionKey()) { diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java index 47d6da499c..eecc9de617 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerQueueClientsReconnectTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.util.Wait; @@ -440,6 +441,84 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu })); } + public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception { + broker1 = "BrokerA"; + broker2 = "BrokerB"; + + NetworkConnector networkConnector = bridgeBrokers(broker1, broker2); + + final AtomicBoolean first = new AtomicBoolean(); + final CountDownLatch gotMessageLatch = new CountDownLatch(1); + + BrokerService brokerService = brokers.get(broker2).broker; + brokerService.setPersistent(true); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + @Override + public void send(final ProducerBrokerExchange producerExchange, + org.apache.activemq.command.Message messageSend) + throws Exception { + super.send(producerExchange, messageSend); + if (first.compareAndSet(false, true)) { + producerExchange.getConnectionContext().setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + LOG.info("Waiting for recepit"); + assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS)); + LOG.info("Stopping connection post send and receive and multiple producers"); + producerExchange.getConnectionContext().getConnection().stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + }); + + // Create queue + ActiveMQDestination dest = createDestination("TEST.FOO", false); + + // statically include our destination + networkConnector.addStaticallyIncludedDestination(dest); + + // Run brokers + startAllBrokers(); + + waitForBridgeFormation(); + + sendMessages("BrokerA", dest, 1); + + // wait for broker2 to get the initial forward + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1; + } + }); + + // message still pending on broker1 + assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount()); + + // allow the bridge to be shutdown and restarted + gotMessageLatch.countDown(); + + + // verify message is forwarded after restart + assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount(); + } + })); + + assertEquals("one messages pending", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount()); + assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount()); + } + protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception { Message msg; int i;