mirror of https://github.com/apache/activemq.git
Moving beforeMarshall back to the store implementations because we don't
want all store implementations to marshall (such as memory store)
This reverts commit b9b98a45ce
.
This commit is contained in:
parent
9c8bd3360f
commit
11622b3af3
|
@ -838,7 +838,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
//condition if the original add is processed after the update, which can cause
|
||||
//a duplicate message to be stored
|
||||
if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
|
||||
message.beforeMarshall(null);
|
||||
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
|
||||
result.addListener(new PendingMarshalUsageTracker(message));
|
||||
} else {
|
||||
|
|
|
@ -511,7 +511,6 @@ public class Topic extends BaseDestination implements Task {
|
|||
|
||||
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
|
||||
}
|
||||
message.beforeMarshall(null);
|
||||
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
|
||||
}
|
||||
|
||||
|
|
|
@ -383,6 +383,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
|
||||
throws IOException {
|
||||
if (isConcurrentStoreAndDispatchQueues()) {
|
||||
message.beforeMarshall(wireFormat);
|
||||
StoreQueueTask result = new StoreQueueTask(this, context, message);
|
||||
ListenableFuture<Object> future = result.getFuture();
|
||||
message.getMessageId().setFutureOrSequenceLong(future);
|
||||
|
@ -753,6 +754,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
|
||||
throws IOException {
|
||||
if (isConcurrentStoreAndDispatchTopics()) {
|
||||
message.beforeMarshall(wireFormat);
|
||||
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
|
||||
result.aquireLocks();
|
||||
addTopicTask(this, result);
|
||||
|
|
|
@ -754,6 +754,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||
|
||||
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
|
||||
check_running
|
||||
message.beforeMarshall(wireFormat);
|
||||
message.incrementReferenceCount()
|
||||
uow.addCompleteListener({
|
||||
message.decrementReferenceCount()
|
||||
|
|
Loading…
Reference in New Issue