Calling beforeMarshall on messages when they async stored before the
store task is run and before consumer dispatch to prevent two threads
from trying to mutate the message state at the same time.

(cherry picked from commit b9f9f03829)
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-04-15 13:01:21 +00:00
parent 4a3fa320e7
commit dc3c5a719b
7 changed files with 232 additions and 37 deletions

View File

@ -66,7 +66,6 @@ import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionIdTransformer;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@ -385,6 +384,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message) public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException { throws IOException {
if (isConcurrentStoreAndDispatchQueues()) { if (isConcurrentStoreAndDispatchQueues()) {
message.beforeMarshall(wireFormat);
StoreQueueTask result = new StoreQueueTask(this, context, message); StoreQueueTask result = new StoreQueueTask(this, context, message);
ListenableFuture<Object> future = result.getFuture(); ListenableFuture<Object> future = result.getFuture();
message.getMessageId().setFutureOrSequenceLong(future); message.getMessageId().setFutureOrSequenceLong(future);
@ -753,6 +753,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message) public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException { throws IOException {
if (isConcurrentStoreAndDispatchTopics()) { if (isConcurrentStoreAndDispatchTopics()) {
message.beforeMarshall(wireFormat);
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
result.aquireLocks(); result.aquireLocks();
addTopicTask(this, result); addTopicTask(this, result);

View File

@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
check_running check_running
message.beforeMarshall(wireFormat);
message.incrementReferenceCount() message.incrementReferenceCount()
uow.addCompleteListener({ uow.addCompleteListener({
message.decrementReferenceCount() message.decrementReferenceCount()

View File

@ -145,6 +145,7 @@ public class AMQ3436Test {
boolean firstMessage = true; boolean firstMessage = true;
@Override
public void onMessage(Message msg) { public void onMessage(Message msg) {
try { try {

View File

@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.store;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -48,13 +47,11 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.rules.TemporaryFolder;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -64,40 +61,23 @@ import org.slf4j.LoggerFactory;
* This test shows that messages are received with non-null data while * This test shows that messages are received with non-null data while
* several consumers are used. * several consumers are used.
*/ */
@RunWith(Parameterized.class) public abstract class AbstractVmConcurrentDispatchTest {
public class AMQ6222Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ6222Test.class); private static final Logger LOG = LoggerFactory.getLogger(AbstractVmConcurrentDispatchTest.class);
private final MessageType messageType; private final MessageType messageType;
private final boolean reduceMemoryFootPrint; private final boolean reduceMemoryFootPrint;
private final boolean concurrentDispatch;
private static enum MessageType {TEXT, MAP, OBJECT} protected static enum MessageType {TEXT, MAP, OBJECT}
private final static boolean[] booleanVals = {true, false}; protected final static boolean[] booleanVals = {true, false};
private static boolean[] reduceMemoryFootPrintVals = booleanVals; protected static boolean[] reduceMemoryFootPrintVals = booleanVals;
private static boolean[] concurrentDispatchVals = booleanVals;
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}") @Rule
public static Collection<Object[]> data() { public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) { public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
for (boolean cdVal : concurrentDispatchVals) {
values.add(new Object[] {mt, rmfVal, cdVal});
}
}
}
return values;
}
public AMQ6222Test(MessageType messageType, boolean reduceMemoryFootPrint,
boolean concurrentDispatch) {
this.messageType = messageType; this.messageType = messageType;
this.reduceMemoryFootPrint = reduceMemoryFootPrint; this.reduceMemoryFootPrint = reduceMemoryFootPrint;
this.concurrentDispatch = concurrentDispatch;
} }
private BrokerService broker; private BrokerService broker;
@ -126,8 +106,8 @@ public class AMQ6222Test {
defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint); defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint);
policyMap.setDefaultEntry(defaultPolicy); policyMap.setDefaultEntry(defaultPolicy);
broker.setDestinationPolicy(policyMap); broker.setDestinationPolicy(policyMap);
KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); broker.setDataDirectoryFile(dataFileDir.getRoot());
ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch); configurePersistenceAdapter(broker);
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();
@ -143,6 +123,8 @@ public class AMQ6222Test {
} }
} }
protected abstract void configurePersistenceAdapter(final BrokerService broker) throws IOException;
@Test(timeout=180000) @Test(timeout=180000)
public void testMessagesAreValid() throws Exception { public void testMessagesAreValid() throws Exception {
@ -161,7 +143,7 @@ public class AMQ6222Test {
try { try {
tasks.shutdown(); tasks.shutdown();
tasks.awaitTermination(10, TimeUnit.SECONDS); tasks.awaitTermination(20, TimeUnit.SECONDS);
} catch (Exception e) { } catch (Exception e) {
//should get exception with no errors //should get exception with no errors
} }

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class KahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
private final boolean concurrentDispatch;
private static boolean[] concurrentDispatchVals = booleanVals;
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
for (boolean cdVal : concurrentDispatchVals) {
values.add(new Object[] {mt, rmfVal, cdVal});
}
}
}
return values;
}
/**
* @param messageType
* @param reduceMemoryFootPrint
* @param concurrentDispatch
*/
public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean concurrentDispatch) {
super(messageType, reduceMemoryFootPrint);
this.concurrentDispatch = concurrentDispatch;
}
@Override
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class MultiKahaDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
private final boolean concurrentDispatch;
private static boolean[] concurrentDispatchVals = booleanVals;
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
for (boolean cdVal : concurrentDispatchVals) {
values.add(new Object[] {mt, rmfVal, cdVal});
}
}
}
return values;
}
/**
* @param messageType
* @param reduceMemoryFootPrint
* @param concurrentDispatch
*/
public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint,
boolean concurrentDispatch) {
super(messageType, reduceMemoryFootPrint);
this.concurrentDispatch = concurrentDispatch;
}
@Override
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
//setup multi-kaha adapter
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(dataFileDir.getRoot());
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
filtered.setPersistenceAdapter(kahaStore);
filtered.setPerDestination(false);
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
stores.add(filtered);
persistenceAdapter.setFilteredPersistenceAdapters(stores);
broker.setPersistenceAdapter(persistenceAdapter);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.leveldb;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.leveldb.LevelDBStoreFactory;
import org.apache.activemq.store.AbstractVmConcurrentDispatchTest;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class LevelDbVmConcurrentDispatchTest extends AbstractVmConcurrentDispatchTest {
@Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (MessageType mt : MessageType.values()) {
for (boolean rmfVal : reduceMemoryFootPrintVals) {
values.add(new Object[] {mt, rmfVal});
}
}
return values;
}
/**
* @param messageType
* @param reduceMemoryFootPrint
* @param concurrentDispatch
*/
public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean reduceMemoryFootPrint) {
super(messageType, reduceMemoryFootPrint);
}
@Override
protected void configurePersistenceAdapter(BrokerService broker) throws IOException {
broker.setPersistenceFactory(new LevelDBStoreFactory());
}
}