From 289750d7c9849bd26f19e9116457eb72a3412d05 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 20 Sep 2019 10:22:56 +0100 Subject: [PATCH] AMQ-7308 - ensure kahadb message add does not auto create the message store in error, expect an existing store. fix and test --- .../store/PersistenceAdapterTestSupport.java | 1 + .../store/kahadb/MessageDatabase.java | 25 ++- .../store/kahadb/MessageDatabaseSizeTest.java | 2 + .../VirtualTopicConcurrentSendDeleteTest.java | 177 ++++++++++++++++++ 4 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java diff --git a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java index 16d46e1c5a..39a63aee96 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java +++ b/activemq-broker/src/test/java/org/apache/activemq/store/PersistenceAdapterTestSupport.java @@ -57,6 +57,7 @@ abstract public class PersistenceAdapterTestSupport extends TestCase { MessageStore ms = pa.createQueueMessageStore(new ActiveMQQueue("TEST")); + ms.start(); ConnectionContext context = new ConnectionContext(); ActiveMQTextMessage message = new ActiveMQTextMessage(); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 1a120a20a5..ac8ea48005 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1205,6 +1205,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe */ void process(JournalCommand data, final Location location, final Location inDoubtlocation) throws IOException { if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { + initMessageStore(data); process(data, location, (IndexAware) null); } else { // just recover producer audit @@ -1217,6 +1218,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + private void initMessageStore(JournalCommand data) throws IOException { + data.visit(new Visitor() { + @Override + public void visit(KahaAddMessageCommand command) throws IOException { + final KahaDestination destination = command.getDestination(); + if (!storedDestinations.containsKey(key(destination))) { + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + getStoredDestination(destination, tx); + } + }); + } + } + }); + } + // ///////////////////////////////////////////////////////////////// // Journaled record processing methods. Once the record is journaled, // these methods handle applying the index updates. These may be called @@ -1486,8 +1504,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe private final HashSet journalFilesBeingReplicated = new HashSet<>(); long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { - StoredDestination sd = getStoredDestination(command.getDestination(), tx); - + StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx); + if (sd == null) { + // if the store no longer exists, skip + return -1; + } // Skip adding the message to the index if this is a topic and there are // no subscriptions. if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java index 4deb1e07b9..82d401821c 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java @@ -114,6 +114,7 @@ public class MessageDatabaseSizeTest { //Add a single message and update once so we can compare the size consistently MessageStore messageStore = store.createQueueMessageStore(destination); + messageStore.start(); messageStore.addMessage(broker.getAdminConnectionContext(), textMessage); messageStore.updateMessage(textMessage); @@ -134,6 +135,7 @@ public class MessageDatabaseSizeTest { //Add a single message and update once so we can compare the size consistently MessageStore messageStore = store.createQueueMessageStore(destination); + messageStore.start(); messageStore.addMessage(broker.getAdminConnectionContext(), textMessage); textMessage.setText("new size of message"); messageStore.updateMessage(textMessage); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java new file mode 100644 index 0000000000..d4be5ae6c0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicConcurrentSendDeleteTest.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class VirtualTopicConcurrentSendDeleteTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicConcurrentSendDeleteTest.class); + + BrokerService brokerService; + ConnectionFactory connectionFactory; + + @Before + public void createBroker() throws Exception { + createBroker(true); + } + + public void createBroker(boolean delete) throws Exception { + brokerService = new BrokerService(); + //brokerService.setPersistent(false); + brokerService.setDeleteAllMessagesOnStartup(delete); + brokerService.setAdvisorySupport(false); + ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false); + brokerService.start(); + + ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + activeMQConnectionFactory.setWatchTopicAdvisories(false); + activeMQConnectionFactory.setAlwaysSyncSend(false); + ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy(); + zeroPrefetch.setAll(0); + activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch); + connectionFactory = activeMQConnectionFactory; + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + public void testConsumerQueueDeleteOk() throws Exception { + + final int numConnections = 1; + final int numDestinations = 10; + final int numMessages = 4000; + + ExecutorService executorService = Executors.newFixedThreadPool(numConnections * 2); + + brokerService.getRegionBroker().addDestination( + brokerService.getAdminConnectionContext(), new ActiveMQTopic("VirtualTopic.TEST"), false); + + // precreate dests to accentuate read access + for (int i=0; i=0; i--) { + final ActiveMQQueue toDelete = new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"); + + ObjectName queueViewMBeanName = new ObjectName(prefix + toDelete.getQueueName()); + QueueViewMBean proxy = (QueueViewMBean) + brokerService.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + LOG.info("Q len: " + toDelete.getQueueName() + ", " + proxy.getQueueSize()); + brokerService.getAdminView().removeQueue(toDelete.getPhysicalName()); + + TimeUnit.MILLISECONDS.sleep(100); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + } + }); + + + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + + LOG.info("Enqueues: " + ((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); + final int numQueues = ((RegionBroker)brokerService.getRegionBroker()).getQueueRegion().getDestinationMap().size(); + LOG.info("Destinations: " + numQueues ); + + assertEquals("no queues left", 0, numQueues); + + // the bug + assertEquals("no queues, just one topic, in kahadb", 1, brokerService.getPersistenceAdapter().getDestinations().size()); + } +}