mirror of https://github.com/apache/activemq.git
Moving beforeMarshall call out of the store and into the actual destination
This commit is contained in:
parent
b9f9f03829
commit
b9b98a45ce
|
@ -838,6 +838,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
//condition if the original add is processed after the update, which can cause
|
//condition if the original add is processed after the update, which can cause
|
||||||
//a duplicate message to be stored
|
//a duplicate message to be stored
|
||||||
if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
|
if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
|
||||||
|
message.beforeMarshall(null);
|
||||||
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
|
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
|
||||||
final PendingMarshalUsageTracker tracker = new PendingMarshalUsageTracker(message);
|
final PendingMarshalUsageTracker tracker = new PendingMarshalUsageTracker(message);
|
||||||
result.addListener(new Runnable() {
|
result.addListener(new Runnable() {
|
||||||
|
|
|
@ -511,6 +511,7 @@ public class Topic extends BaseDestination implements Task {
|
||||||
|
|
||||||
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
|
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
|
||||||
}
|
}
|
||||||
|
message.beforeMarshall(null);
|
||||||
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
|
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter;
|
||||||
import org.apache.activemq.store.TopicMessageStore;
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
import org.apache.activemq.store.TransactionIdTransformer;
|
import org.apache.activemq.store.TransactionIdTransformer;
|
||||||
import org.apache.activemq.store.TransactionStore;
|
import org.apache.activemq.store.TransactionStore;
|
||||||
|
import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
||||||
|
@ -383,7 +384,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
|
public ListenableFuture<Object> asyncAddQueueMessage(final ConnectionContext context, final Message message)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (isConcurrentStoreAndDispatchQueues()) {
|
if (isConcurrentStoreAndDispatchQueues()) {
|
||||||
message.beforeMarshall(wireFormat);
|
|
||||||
StoreQueueTask result = new StoreQueueTask(this, context, message);
|
StoreQueueTask result = new StoreQueueTask(this, context, message);
|
||||||
ListenableFuture<Object> future = result.getFuture();
|
ListenableFuture<Object> future = result.getFuture();
|
||||||
message.getMessageId().setFutureOrSequenceLong(future);
|
message.getMessageId().setFutureOrSequenceLong(future);
|
||||||
|
@ -754,7 +754,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
|
public ListenableFuture<Object> asyncAddTopicMessage(final ConnectionContext context, final Message message)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (isConcurrentStoreAndDispatchTopics()) {
|
if (isConcurrentStoreAndDispatchTopics()) {
|
||||||
message.beforeMarshall(wireFormat);
|
|
||||||
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
|
StoreTopicTask result = new StoreTopicTask(this, context, message, subscriptionCount.get());
|
||||||
result.aquireLocks();
|
result.aquireLocks();
|
||||||
addTopicTask(this, result);
|
addTopicTask(this, result);
|
||||||
|
|
|
@ -754,7 +754,6 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
||||||
|
|
||||||
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
|
def doAdd(uow: DelayableUOW, context: ConnectionContext, message: Message, delay:Boolean): CountDownFuture[AnyRef] = {
|
||||||
check_running
|
check_running
|
||||||
message.beforeMarshall(wireFormat);
|
|
||||||
message.incrementReferenceCount()
|
message.incrementReferenceCount()
|
||||||
uow.addCompleteListener({
|
uow.addCompleteListener({
|
||||||
message.decrementReferenceCount()
|
message.decrementReferenceCount()
|
||||||
|
|
Loading…
Reference in New Issue