https://issues.apache.org/jira/browse/AMQ-6164 - allow journal write batching on a single destination

This commit is contained in:
gtully 2016-02-09 12:49:53 +00:00
parent 90726a60af
commit 499e39e52c
4 changed files with 34 additions and 39 deletions

View File

@ -829,33 +829,28 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
producerExchange.incrementSend(); producerExchange.incrementSend();
do { do {
checkUsage(context, producerExchange, message); checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly(); message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
try { if (store != null && message.isPersistent()) {
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); message.getMessageId().setFutureOrSequenceLong(null);
if (store != null && message.isPersistent()) { try {
message.getMessageId().setFutureOrSequenceLong(null); if (messages.isCacheEnabled()) {
try { result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
if (messages.isCacheEnabled()) { result.addListener(new PendingMarshalUsageTracker(message));
result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); } else {
result.addListener(new PendingMarshalUsageTracker(message)); store.addMessage(context, message);
} else {
store.addMessage(context, message);
}
if (isReduceMemoryFootprint()) {
message.clearMarshalledState();
}
} catch (Exception e) {
// we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations
resetNeeded = true;
throw e;
} }
if (isReduceMemoryFootprint()) {
message.clearMarshalledState();
}
} catch (Exception e) {
// we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations
resetNeeded = true;
throw e;
} }
if(tryOrderedCursorAdd(message, context)) { }
break; if(tryOrderedCursorAdd(message, context)) {
} break;
} finally {
sendLock.unlock();
} }
} while (started.get()); } while (started.get());

View File

@ -59,11 +59,11 @@ public class MemoryMessageStore extends AbstractMessageStore {
synchronized (messageTable) { synchronized (messageTable) {
messageTable.put(message.getMessageId(), message); messageTable.put(message.getMessageId(), message);
incMessageStoreStatistics(getMessageStoreStatistics(), message); incMessageStoreStatistics(getMessageStoreStatistics(), message);
} message.incrementReferenceCount();
message.incrementReferenceCount(); message.getMessageId().setFutureOrSequenceLong(sequenceId++);
message.getMessageId().setFutureOrSequenceLong(sequenceId++); if (indexListener != null) {
if (indexListener != null) { indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); }
} }
} }

View File

@ -326,10 +326,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return task; return task;
} }
// with asyncTaskMap locked
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
synchronized (store.asyncTaskMap) { store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.queueExecutor.execute(task); this.queueExecutor.execute(task);
} }
@ -390,9 +389,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
message.getMessageId().setFutureOrSequenceLong(future); message.getMessageId().setFutureOrSequenceLong(future);
message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch
result.aquireLocks(); result.aquireLocks();
addQueueTask(this, result); synchronized (asyncTaskMap) {
if (indexListener != null) { addQueueTask(this, result);
indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); if (indexListener != null) {
indexListener.onAdd(new IndexListener.MessageContext(context, message, null));
}
} }
return future; return future;
} else { } else {

View File

@ -758,7 +758,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
uow.addCompleteListener({ uow.addCompleteListener({
message.decrementReferenceCount() message.decrementReferenceCount()
}) })
val sequence = lastSeq.synchronized { lastSeq.synchronized {
val seq = lastSeq.incrementAndGet() val seq = lastSeq.incrementAndGet()
message.getMessageId.setFutureOrSequenceLong(seq); message.getMessageId.setFutureOrSequenceLong(seq);
// null context on xa recovery, we want to bypass the cursor & pending adds as it will be reset // null context on xa recovery, we want to bypass the cursor & pending adds as it will be reset
@ -768,9 +768,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) } def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) }
})) }))
} }
seq uow.enqueue(key, seq, message, delay)
} }
uow.enqueue(key, sequence, message, delay)
} }
override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false) override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)