diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index fa4672b405..06a60a594a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -66,7 +66,6 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionStore; -import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -385,6 +384,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { + message.beforeMarshall(wireFormat); StoreQueueTask result = new StoreQueueTask(this, context, message); ListenableFuture future = result.getFuture(); message.getMessageId().setFutureOrSequenceLong(future); @@ -753,6 +753,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchTopics()) { + message.beforeMarshall(wireFormat); StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); result.aquireLocks(); addTopicTask(this, result); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index a4cdcac4ac..745f36dab2 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running + message.beforeMarshall(wireFormat); message.incrementReferenceCount() uow.addCompleteListener({ message.decrementReferenceCount() diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java index 65e0783487..d9156de817 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java @@ -145,6 +145,7 @@ public class AMQ3436Test { boolean firstMessage = true; + @Override public void onMessage(Message msg) { try { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java similarity index 86% rename from activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java rename to activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java index 512ef9dd5d..aaaaf6916f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.bugs; +package org.apache.activemq.store; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; +import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -48,13 +47,11 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,40 +61,23 @@ import org.slf4j.LoggerFactory; * This test shows that messages are received with non-null data while * several consumers are used. */ -@RunWith(Parameterized.class) -public class AMQ6222Test { +public abstract class AbstractVmConcurrentDispatchTest { - private static final Logger LOG = LoggerFactory.getLogger(AMQ6222Test.class); + private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class); private final MessageType messageType; private final boolean reduceMemoryFootPrint; - private final boolean concurrentDispatch; - private static enum MessageType {TEXT, MAP, OBJECT} - private final static boolean[] booleanVals = {true, false}; - private static boolean[] reduceMemoryFootPrintVals = booleanVals; - private static boolean[] concurrentDispatchVals = booleanVals; + protected static enum MessageType {TEXT, MAP, OBJECT} + protected final static boolean[] booleanVals = {true, false}; + protected static boolean[] reduceMemoryFootPrintVals = booleanVals; - @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") - public static Collection data() { - List values = new ArrayList<>(); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); - for (MessageType mt : MessageType.values()) { - for (boolean rmfVal : reduceMemoryFootPrintVals) { - for (boolean cdVal : concurrentDispatchVals) { - values.add(new Object[] {mt, rmfVal, cdVal}); - } - } - } - - return values; - } - - public AMQ6222Test(MessageType messageType, boolean reduceMemoryFootPrint, - boolean concurrentDispatch) { + public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { this.messageType = messageType; this.reduceMemoryFootPrint = reduceMemoryFootPrint; - this.concurrentDispatch = concurrentDispatch; } private BrokerService broker; @@ -126,8 +106,8 @@ public class AMQ6222Test { defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint); policyMap.setDefaultEntry(defaultPolicy); broker.setDestinationPolicy(policyMap); - KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); - ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + broker.setDataDirectoryFile(dataFileDir.getRoot()); + configurePersistenceAdapter(broker); broker.start(); broker.waitUntilStarted(); @@ -143,6 +123,8 @@ public class AMQ6222Test { } } + protected abstract void configurePersistenceAdapter(final BrokerService broker) throws IOException; + @Test(timeout=180000) public void testMessagesAreValid() throws Exception { @@ -161,7 +143,7 @@ public class AMQ6222Test { try { tasks.shutdown(); - tasks.awaitTermination(10, TimeUnit.SECONDS); + tasks.awaitTermination(20, TimeUnit.SECONDS); } catch (Exception e) { //should get exception with no errors } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..217a7c7429 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + private final boolean concurrentDispatch; + private static boolean[] concurrentDispatchVals = booleanVals; + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + for (boolean cdVal : concurrentDispatchVals) { + values.add(new Object[] {mt, rmfVal, cdVal}); + } + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean concurrentDispatch) { + super(messageType, reduceMemoryFootPrint); + this.concurrentDispatch = concurrentDispatch; + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..3d16ce7d56 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + private final boolean concurrentDispatch; + private static boolean[] concurrentDispatchVals = booleanVals; + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + for (boolean cdVal : concurrentDispatchVals) { + values.add(new Object[] {mt, rmfVal, cdVal}); + } + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint, + boolean concurrentDispatch) { + super(messageType, reduceMemoryFootPrint); + this.concurrentDispatch = concurrentDispatch; + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + //setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(dataFileDir.getRoot()); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch); + + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(false); + List stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + broker.setPersistenceAdapter(persistenceAdapter); + } + +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java new file mode 100644 index 0000000000..d1b7e43d81 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.leveldb; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.leveldb.LevelDBStoreFactory; +import org.apache.activemq.store.AbstractVmConcurrentDispatchTest; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest { + + @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}") + public static Collection data() { + List values = new ArrayList<>(); + + for (MessageType mt : MessageType.values()) { + for (boolean rmfVal : reduceMemoryFootPrintVals) { + values.add(new Object[] {mt, rmfVal}); + } + } + + return values; + } + + /** + * @param messageType + * @param reduceMemoryFootPrint + * @param concurrentDispatch + */ + public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) { + super(messageType, reduceMemoryFootPrint); + } + + @Override + protected void configurePersistenceAdapter(BrokerService broker) throws IOException { + broker.setPersistenceFactory(new LevelDBStoreFactory()); + } + +}