diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 960ac9c234..34817a0b80 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -829,33 +829,28 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index producerExchange.incrementSend(); do { checkUsage(context, producerExchange, message); - sendLock.lockInterruptibly(); - try { - message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); - if (store != null && message.isPersistent()) { - message.getMessageId().setFutureOrSequenceLong(null); - try { - if (messages.isCacheEnabled()) { - result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); - result.addListener(new PendingMarshalUsageTracker(message)); - } else { - store.addMessage(context, message); - } - if (isReduceMemoryFootprint()) { - message.clearMarshalledState(); - } - } catch (Exception e) { - // we may have a store in inconsistent state, so reset the cursor - // before restarting normal broker operations - resetNeeded = true; - throw e; + message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); + if (store != null && message.isPersistent()) { + message.getMessageId().setFutureOrSequenceLong(null); + try { + if (messages.isCacheEnabled()) { + result = store.asyncAddQueueMessage(context, message, isOptimizeStorage()); + result.addListener(new PendingMarshalUsageTracker(message)); + } else { + store.addMessage(context, message); } + if (isReduceMemoryFootprint()) { + message.clearMarshalledState(); + } + } catch (Exception e) { + // we may have a store in inconsistent state, so reset the cursor + // before restarting normal broker operations + resetNeeded = true; + throw e; } - if(tryOrderedCursorAdd(message, context)) { - break; - } - } finally { - sendLock.unlock(); + } + if(tryOrderedCursorAdd(message, context)) { + break; } } while (started.get()); diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index b32a811ec9..736d912271 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -59,11 +59,11 @@ public class MemoryMessageStore extends AbstractMessageStore { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); incMessageStoreStatistics(getMessageStoreStatistics(), message); - } - message.incrementReferenceCount(); - message.getMessageId().setFutureOrSequenceLong(sequenceId++); - if (indexListener != null) { - indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + message.incrementReferenceCount(); + message.getMessageId().setFutureOrSequenceLong(sequenceId++); + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + } } } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index fa4672b405..e1c1df4b8b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -326,10 +326,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return task; } + // with asyncTaskMap locked protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { - synchronized (store.asyncTaskMap) { - store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); - } + store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); this.queueExecutor.execute(task); } @@ -390,9 +389,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { message.getMessageId().setFutureOrSequenceLong(future); message.setRecievedByDFBridge(true); // flag message as concurrentStoreAndDispatch result.aquireLocks(); - addQueueTask(this, result); - if (indexListener != null) { - indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + synchronized (asyncTaskMap) { + addQueueTask(this, result); + if (indexListener != null) { + indexListener.onAdd(new IndexListener.MessageContext(context, message, null)); + } } return future; } else { diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index a4cdcac4ac..f80e722ca4 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -758,7 +758,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P uow.addCompleteListener({ message.decrementReferenceCount() }) - val sequence = lastSeq.synchronized { + lastSeq.synchronized { val seq = lastSeq.incrementAndGet() message.getMessageId.setFutureOrSequenceLong(seq); // null context on xa recovery, we want to bypass the cursor & pending adds as it will be reset @@ -768,9 +768,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def run(): Unit = pendingCursorAdds.synchronized { pendingCursorAdds.remove(seq) } })) } - seq + uow.enqueue(key, seq, message, delay) } - uow.enqueue(key, sequence, message, delay) } override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context, message, false)