diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index a2f718443d..097dfa5ff9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -838,7 +838,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index //condition if the original add is processed after the update, which can cause //a duplicate message to be stored if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { - message.beforeMarshall(null); result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); result.addListener(new PendingMarshalUsageTracker(message)); } else { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index a13bcd5d3c..1a9949e807 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -511,7 +511,6 @@ public class Topic extends BaseDestination implements Task { waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage); } - message.beforeMarshall(null); result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage()); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 769e303e12..7f8283d61d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -383,6 +383,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddQueueMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { + message.beforeMarshall(wireFormat); StoreQueueTask result = new StoreQueueTask(this, context, message); ListenableFuture future = result.getFuture(); message.getMessageId().setFutureOrSequenceLong(future); @@ -753,6 +754,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public ListenableFuture asyncAddTopicMessage(final ConnectionContext context, final Message message) throws IOException { if (isConcurrentStoreAndDispatchTopics()) { + message.beforeMarshall(wireFormat); StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get()); result.aquireLocks(); addTopicTask(this, result); diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index f80e722ca4..5865f35e08 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = { check_running + message.beforeMarshall(wireFormat); message.incrementReferenceCount() uow.addCompleteListener({ message.decrementReferenceCount()