From dc3c5a719b47fc40b5affce8412390ead67ce8ef Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Fri, 15 Apr 2016 13:01:21 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6256 Calling beforeMarshall on messages when they async stored before the store task is run and before consumer dispatch to prevent two threads from trying to mutate the message state at the same time. (cherry picked from commit b9f9f03829a65efa2956c347d2cafa41905313c6) --- .../activemq/store/kahadb/KahaDBStore.java | 3 +- .../activemq/leveldb/LevelDBStore.scala | 1 + .../org/apache/activemq/bugs/AMQ3436Test.java | 1 + .../AbstractVmConcurrentDispatchTest.java} | 54 +++++-------- .../KahaDbVmConcurrentDispatchTest.java | 68 ++++++++++++++++ .../MultiKahaDbVmConcurrentDispatchTest.java | 81 +++++++++++++++++++ .../LevelDbVmConcurrentDispatchTest.java | 61 ++++++++++++++ 7 files changed, 232 insertions(+), 37 deletions(-) rename activemq-unit-tests/src/test/java/org/apache/activemq/{bugs/AMQ6222Test.java => store/AbstractVmConcurrentDispatchTest.java} (86%) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index fa4672b405..06a60a594a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -66,7 +66,6 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -385,6 +384,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { + message.beforeMarshall(wireFormat); StoreQueueTask result = new StoreQueueTask(this, context, message); ListenableFuture future = result.getFuture(); message.getMessageId().setFutureOrSequenceLong(future); @@ -753,6 +753,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchTopics()) { + message.beforeMarshall(wireFormat); StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); result.aquireLocks(); addTopicTask(this, result); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index a4cdcac4ac..745f36dab2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running + message.beforeMarshall(wireFormat); message.incrementReferenceCount() uow.addCompleteListener({ message.decrementReferenceCount() diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java index 65e0783487..d9156de817 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java @@ -145,6 +145,7 @@ public class AMQ3436Test { boolean firstMessage = true; + @Override public void onMessage(Message msg) { try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java similarity index 86% rename from activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java rename to activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java index 512ef9dd5d..aaaaf6916f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.bugs; +package org.apache.activemq.store; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,13 +47,11 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,40 +61,23 @@ import org.slf4j.LoggerFactory; * This test shows that messages are received with non-null data while * several consumers are used. */ -@RunWith(Parameterized.class) -public class AMQ6222Test { +public abstract class AbstractVmConcurrentDispatchTest { - private static final Logger LOG = LoggerFactory.getLogger(AMQ6222Test.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class); private final MessageType messageType; private final boolean reduceMemoryFootPrint; - private final boolean concurrentDispatch; - private static enum MessageType {TEXT, MAP, OBJECT} - private final static boolean[] booleanVals = {true, false}; - private static boolean[] reduceMemoryFootPrintVals = booleanVals; - private static boolean[] concurrentDispatchVals = booleanVals; + protected static enum MessageType {TEXT, MAP, OBJECT} + protected final static boolean[] booleanVals = {true, false}; + protected static boolean[] reduceMemoryFootPrintVals = booleanVals; - @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") - public static Collection data() { - List values = new ArrayList<>(); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); - for (MessageType mt : MessageType.values()) { - for (boolean rmfVal : reduceMemoryFootPrintVals) { - for (boolean cdVal : concurrentDispatchVals) { - values.add(new Object[] {mt, rmfVal, cdVal}); - } - } - } - - return values; - } - - public AMQ6222Test(MessageType messageType, boolean reduceMemoryFootPrint, - boolean concurrentDispatch) { + public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { this.messageType = messageType; this.reduceMemoryFootPrint = reduceMemoryFootPrint; - this.concurrentDispatch = concurrentDispatch; } private BrokerService broker; @@ -126,8 +106,8 @@ public class AMQ6222Test { defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint); policyMap.setDefaultEntry(defaultPolicy); broker.setDestinationPolicy(policyMap); - KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + broker.setDataDirectoryFile(dataFileDir.getRoot()); + configurePersistenceAdapter(broker); broker.start(); broker.waitUntilStarted(); @@ -143,6 +123,8 @@ public class AMQ6222Test { } } + protected abstract void configurePersistenceAdapter(final BrokerService broker) throws IOException; + @Test(timeout=180000) public void testMessagesAreValid() throws Exception { @@ -161,7 +143,7 @@ public class AMQ6222Test { try { tasks.shutdown(); - tasks.awaitTermination(10, TimeUnit.SECONDS); + tasks.awaitTermination(20, TimeUnit.SECONDS); } catch (Exception e) { //should get exception with no errors } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..217a7c7429 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + private final boolean concurrentDispatch; + private static boolean[] concurrentDispatchVals = booleanVals; + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + for (boolean cdVal : concurrentDispatchVals) { + values.add(new Object[] {mt, rmfVal, cdVal}); + } + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean concurrentDispatch) { + super(messageType, reduceMemoryFootPrint); + this.concurrentDispatch = concurrentDispatch; + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..3d16ce7d56 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + private final boolean concurrentDispatch; + private static boolean[] concurrentDispatchVals = booleanVals; + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + for (boolean cdVal : concurrentDispatchVals) { + values.add(new Object[] {mt, rmfVal, cdVal}); + } + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean concurrentDispatch) { + super(messageType, reduceMemoryFootPrint); + this.concurrentDispatch = concurrentDispatch; + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + //setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(dataFileDir.getRoot()); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(false); + List stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + broker.setPersistenceAdapter(persistenceAdapter); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..d1b7e43d81 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.leveldb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStoreFactory; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + values.add(new Object[] {mt, rmfVal}); + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { + super(messageType, reduceMemoryFootPrint); + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + broker.setPersistenceFactory(new LevelDBStoreFactory()); + } + +}