Moving beforeMarshall back to the store implementations because we don't
want all store implementations to marshall (such as memory store)

This reverts commit 32913408a6.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-04-18 12:42:46 +00:00
parent 9d545cf11f
commit 840583df09
4 changed files with 3 additions and 2 deletions

View File

@ -840,7 +840,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
//condition if the original add is processed after the update, which can cause
//a duplicate message to be stored
if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
message.beforeMarshall(null);
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
result.addListener(new PendingMarshalUsageTracker(message));
} else {

View File

@ -511,7 +511,6 @@ public class Topic extends BaseDestination implements Task {
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
message.beforeMarshall(null);
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
}

View File

@ -384,6 +384,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchQueues()) {
message.beforeMarshall(wireFormat);
StoreQueueTask result = new StoreQueueTask(this, context, message);
ListenableFuture<Object> future = result.getFuture();
message.getMessageId().setFutureOrSequenceLong(future);
@ -752,6 +753,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
throws IOException {
if (isConcurrentStoreAndDispatchTopics()) {
message.beforeMarshall(wireFormat);
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
result.aquireLocks();
addTopicTask(this, result);

View File

@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
check_running
message.beforeMarshall(wireFormat);
message.incrementReferenceCount()
uow.addCompleteListener({
message.decrementReferenceCount()