From 1ee00173d6a395f9b49a3ab07530c9b51fe99f3f Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 19 Oct 2007 19:01:10 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1452 and https://issues.apache.org/activemq/browse/AMQ-729 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@586580 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/ActiveMQMessageAudit.java | 118 +++++++++++++++--- .../org/apache/activemq/ConnectionAudit.java | 8 +- .../activemq/broker/TransactionBroker.java | 4 +- .../broker/region/BaseDestination.java | 55 +++++++- .../apache/activemq/broker/region/Queue.java | 7 +- .../cursors/AbstractPendingMessageCursor.java | 81 +++++++++++- .../cursors/FilePendingMessageCursor.java | 9 +- .../region/cursors/PendingMessageCursor.java | 32 +++++ .../region/cursors/QueueStorePrefetch.java | 31 +++-- .../cursors/StoreDurableSubscriberCursor.java | 35 ++++++ .../region/cursors/StoreQueueCursor.java | 39 ++++++ .../region/cursors/TopicStorePrefetch.java | 6 +- .../broker/region/policy/PolicyEntry.java | 57 +++++++++ .../org/apache/activemq/util/BitArrayBin.java | 18 +++ .../activemq/ActiveMQMessageAuditTest.java | 26 +++- 15 files changed, 478 insertions(+), 48 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java index 7fd52c8daa..faf39bc26c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java @@ -16,8 +16,6 @@ */ package org.apache.activemq; -import java.util.Map; - import javax.jms.JMSException; import javax.jms.Message; @@ -37,8 +35,9 @@ public class ActiveMQMessageAudit { private static final int DEFAULT_WINDOW_SIZE = 1024; private static final int MAXIMUM_PRODUCER_COUNT = 128; - private int windowSize; - private Map map; + private int auditDepth; + private int maximumNumberOfProducersToTrack; + private LRUCache map; /** * Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack = @@ -51,13 +50,44 @@ public class ActiveMQMessageAudit { /** * Construct a MessageAudit * - * @param windowSize range of ids to track + * @param auditDepth range of ids to track * @param maximumNumberOfProducersToTrack number of producers expected in * the system */ - public ActiveMQMessageAudit(int windowSize, final int maximumNumberOfProducersToTrack) { - this.windowSize = windowSize; - map = new LRUCache(maximumNumberOfProducersToTrack, maximumNumberOfProducersToTrack, 0.75f, true); + public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) { + this.auditDepth = auditDepth; + this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; + this.map = new LRUCache(0, maximumNumberOfProducersToTrack, 0.75f, true); + } + + /** + * @return the auditDepth + */ + public int getAuditDepth() { + return auditDepth; + } + + /** + * @param auditDepth the auditDepth to set + */ + public void setAuditDepth(int auditDepth) { + this.auditDepth = auditDepth; + } + + /** + * @return the maximumNumberOfProducersToTrack + */ + public int getMaximumNumberOfProducersToTrack() { + return maximumNumberOfProducersToTrack; + } + + /** + * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set + */ + public void setMaximumNumberOfProducersToTrack( + int maximumNumberOfProducersToTrack) { + this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; + this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); } /** @@ -67,7 +97,7 @@ public class ActiveMQMessageAudit { * @return true if the message is a duplicate * @throws JMSException */ - public boolean isDuplicateMessage(Message message) throws JMSException { + public boolean isDuplicate(Message message) throws JMSException { return isDuplicate(message.getJMSMessageID()); } @@ -84,7 +114,7 @@ public class ActiveMQMessageAudit { if (seed != null) { BitArrayBin bab = map.get(seed); if (bab == null) { - bab = new BitArrayBin(windowSize); + bab = new BitArrayBin(auditDepth); map.put(seed, bab); } long index = IdGenerator.getSequenceFromId(id); @@ -101,9 +131,9 @@ public class ActiveMQMessageAudit { * @param message * @return true if the message is a duplicate */ - public boolean isDuplicateMessageReference(final MessageReference message) { + public boolean isDuplicate(final MessageReference message) { MessageId id = message.getMessageId(); - return isDuplicateMessageId(id); + return isDuplicate(id); } /** @@ -112,7 +142,7 @@ public class ActiveMQMessageAudit { * @param id * @return true if the message is a duplicate */ - public synchronized boolean isDuplicateMessageId(final MessageId id) { + public synchronized boolean isDuplicate(final MessageId id) { boolean answer = false; if (id != null) { @@ -120,7 +150,7 @@ public class ActiveMQMessageAudit { if (pid != null) { BitArrayBin bab = map.get(pid); if (bab == null) { - bab = new BitArrayBin(windowSize); + bab = new BitArrayBin(auditDepth); map.put(pid, bab); } answer = bab.setBit(id.getProducerSequenceId(), true); @@ -134,9 +164,9 @@ public class ActiveMQMessageAudit { * * @param message */ - public void rollbackMessageReference(final MessageReference message) { + public void rollback(final MessageReference message) { MessageId id = message.getMessageId(); - rollbackMessageId(id); + rollback(id); } /** @@ -144,7 +174,7 @@ public class ActiveMQMessageAudit { * * @param id */ - public synchronized void rollbackMessageId(final MessageId id) { + public synchronized void rollback(final MessageId id) { if (id != null) { ProducerId pid = id.getProducerId(); if (pid != null) { @@ -155,4 +185,58 @@ public class ActiveMQMessageAudit { } } } + + /** + * Check the message is in order + * @param msg + * @return + * @throws JMSException + */ + public boolean isInOrder(Message msg) throws JMSException { + return isInOrder(msg.getJMSMessageID()); + } + + /** + * Check the message id is in order + * @param id + * @return + */ + public synchronized boolean isInOrder(final String id) { + boolean answer = true; + + if (id != null) { + String seed = IdGenerator.getSeedFromId(id); + if (seed != null) { + BitArrayBin bab = map.get(seed); + if (bab != null) { + long index = IdGenerator.getSequenceFromId(id); + answer = bab.isInOrder(index); + } + + } + } + return answer; + } + + /** + * Check the MessageId is in order + * @param id + * @return + */ + public synchronized boolean isInOrder(final MessageId id) { + boolean answer = true; + + if (id != null) { + ProducerId pid = id.getProducerId(); + if (pid != null) { + BitArrayBin bab = map.get(pid); + if (bab != null) { + answer = bab.isInOrder(id.getProducerSequenceId()); + } + + } + } + return answer; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java b/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java index 3c63df3f55..51f4723e5f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java +++ b/activemq-core/src/main/java/org/apache/activemq/ConnectionAudit.java @@ -44,7 +44,7 @@ class ConnectionAudit { audit = new ActiveMQMessageAudit(); destinations.put(destination, audit); } - boolean result = audit.isDuplicateMessageReference(message); + boolean result = audit.isDuplicate(message); return result; } ActiveMQMessageAudit audit = dispatchers.get(dispatcher); @@ -52,7 +52,7 @@ class ConnectionAudit { audit = new ActiveMQMessageAudit(); dispatchers.put(dispatcher, audit); } - boolean result = audit.isDuplicateMessageReference(message); + boolean result = audit.isDuplicate(message); return result; } } @@ -66,12 +66,12 @@ class ConnectionAudit { if (destination.isQueue()) { ActiveMQMessageAudit audit = destinations.get(destination); if (audit != null) { - audit.rollbackMessageReference(message); + audit.rollback(message); } } else { ActiveMQMessageAudit audit = dispatchers.get(dispatcher); if (audit != null) { - audit.rollbackMessageReference(message); + audit.rollback(message); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java index 07d5c2983d..9c79179d81 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -211,14 +211,14 @@ public class TransactionBroker extends BrokerFilter { public void afterRollback() { if (audit != null) { - audit.rollbackMessageReference(message); + audit.rollback(message); } } }; transaction.addSynchronization(sync); } } - if (audit == null || !audit.isDuplicateMessageReference(message)) { + if (audit == null || !audit.isDuplicate(message)) { context.setTransaction(transaction); try { next.send(producerExchange, message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 8620885cbe..c60d4a3230 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -23,12 +23,57 @@ package org.apache.activemq.broker.region; public abstract class BaseDestination implements Destination { private boolean producerFlowControl = true; - + private int maxProducersToAudit=1024; + private int maxAuditDepth=1; + private boolean enableAudit=true; + /** + * @return the producerFlowControl + */ public boolean isProducerFlowControl() { - return this.producerFlowControl; + return producerFlowControl; + } + /** + * @param producerFlowControl the producerFlowControl to set + */ + public void setProducerFlowControl(boolean producerFlowControl) { + this.producerFlowControl = producerFlowControl; + } + /** + * @return the maxProducersToAudit + */ + public int getMaxProducersToAudit() { + return maxProducersToAudit; + } + /** + * @param maxProducersToAudit the maxProducersToAudit to set + */ + public void setMaxProducersToAudit(int maxProducersToAudit) { + this.maxProducersToAudit = maxProducersToAudit; + } + /** + * @return the maxAuditDepth + */ + public int getMaxAuditDepth() { + return maxAuditDepth; + } + /** + * @param maxAuditDepth the maxAuditDepth to set + */ + public void setMaxAuditDepth(int maxAuditDepth) { + this.maxAuditDepth = maxAuditDepth; + } + /** + * @return the enableAudit + */ + public boolean isEnableAudit() { + return enableAudit; + } + /** + * @param enableAudit the enableAudit to set + */ + public void setEnableAudit(boolean enableAudit) { + this.enableAudit = enableAudit; } - public void setProducerFlowControl(boolean value) { - this.producerFlowControl = value; - } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 573620188b..2d4e2c70fa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -142,6 +142,9 @@ public class Queue extends BaseDestination implements Task { if (store != null) { // Restore the persistent messages. messages.setSystemUsage(systemUsage); + messages.setEnableAudit(isEnableAudit()); + messages.setMaxAuditDepth(getMaxAuditDepth()); + messages.setMaxProducersToAudit(getMaxProducersToAudit()); if (messages.isRecoveryRequired()) { store.recover(new MessageRecoveryListener() { @@ -442,7 +445,7 @@ public class Queue extends BaseDestination implements Task { } } - void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { + synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception { final ConnectionContext context = producerExchange.getConnectionContext(); message.setRegionDestination(this); if (store != null && message.isPersistent()) { @@ -567,7 +570,7 @@ public class Queue extends BaseDestination implements Task { doPageIn(false); } - public void stop() throws Exception { + public void stop() throws Exception{ if (taskRunner != null) { taskRunner.shutdown(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index 4a5a2003b8..efee241ab3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -17,9 +17,12 @@ package org.apache.activemq.broker.region.cursors; import java.util.LinkedList; + +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.SystemUsage; /** @@ -32,11 +35,21 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { protected int memoryUsageHighWaterMark = 90; protected int maxBatchSize = 100; protected SystemUsage systemUsage; + protected int maxProducersToAudit=1024; + protected int maxAuditDepth=1; + protected boolean enableAudit=true; + protected ActiveMQMessageAudit audit; + private boolean started=false; - public void start() throws Exception { + public synchronized void start() throws Exception { + if (!started && enableAudit && audit==null) { + audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); + } + started=true; } - public void stop() throws Exception { + public synchronized void stop() throws Exception { + started=false; gc(); } @@ -168,4 +181,68 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { public LinkedList pageInList(int maxItems) { throw new RuntimeException("Not supported"); } + + /** + * @return the maxProducersToAudit + */ + public int getMaxProducersToAudit() { + return maxProducersToAudit; + } + + /** + * @param maxProducersToAudit the maxProducersToAudit to set + */ + public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { + this.maxProducersToAudit = maxProducersToAudit; + if (audit != null) { + this.audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); + } + } + + /** + * @return the maxAuditDepth + */ + public int getMaxAuditDepth() { + return this.maxAuditDepth; + } + + + /** + * @param maxAuditDepth the maxAuditDepth to set + */ + public synchronized void setMaxAuditDepth(int maxAuditDepth) { + this.maxAuditDepth = maxAuditDepth; + if (audit != null) { + this.audit.setAuditDepth(maxAuditDepth); + } + } + + + /** + * @return the enableAudit + */ + public boolean isEnableAudit() { + return this.enableAudit; + } + + /** + * @param enableAudit the enableAudit to set + */ + public synchronized void setEnableAudit(boolean enableAudit) { + this.enableAudit = enableAudit; + if (this.enableAudit && started && audit==null) { + audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); + } + } + + + protected synchronized boolean isDuplicate(MessageId messageId) { + if (!this.enableAudit || this.audit==null) { + return false; + } + return this.audit.isDuplicate(messageId); + } + + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index f87ce56873..f6d0d259b5 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -63,17 +63,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple this.store = store; } - public void start() { + public void start() throws Exception { if (started.compareAndSet(false, true)) { + super.start(); if (systemUsage != null) { systemUsage.getMemoryUsage().addUsageListener(this); } } } - public void stop() { + public void stop() throws Exception { if (started.compareAndSet(true, false)) { - gc(); + super.stop(); if (systemUsage != null) { systemUsage.getMemoryUsage().removeUsageListener(this); } @@ -118,7 +119,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple } } - public synchronized void destroy() { + public synchronized void destroy() throws Exception { stop(); for (Iterator i = memoryList.iterator(); i.hasNext();) { Message node = (Message)i.next(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index eea234672f..78d5bb1b0d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -210,5 +210,37 @@ public interface PendingMessageCursor extends Service { * @return a list of paged in messages */ LinkedList pageInList(int maxItems); + + /** + * set the maximum number of producers to track at one time + * @param value + */ + void setMaxProducersToAudit(int value); + + /** + * @return the maximum number of producers to audit + */ + int getMaxProducersToAudit(); + + /** + * Set the maximum depth of message ids to track + * @param depth + */ + void setMaxAuditDepth(int depth); + + /** + * @return the audit depth + */ + int getMaxAuditDepth(); + + /** + * @return the enableAudit + */ + public boolean isEnableAudit(); + /** + * @param enableAudit the enableAudit to set + */ + public void setEnableAudit(boolean enableAudit); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java index dd81e75950..4f66292003 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java @@ -29,7 +29,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * perist pending messages pending message (messages awaiting disptach to a + * persist pending messages pending message (messages awaiting dispatch to a * consumer) cursor * * @version $Revision: 474985 $ @@ -42,6 +42,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message private final LinkedList batchList = new LinkedList(); private Destination regionDestination; private int size; + private boolean fillBatchDuplicates; /** * @param topic @@ -55,13 +56,14 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } - public void start() throws Exception { + public void start() throws Exception{ + super.start(); store.resetBatching(); } public void stop() throws Exception { store.resetBatching(); - gc(); + super.stop(); } /** @@ -127,10 +129,18 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message public void finished() { } - public boolean recoverMessage(Message message) throws Exception { - message.setRegionDestination(regionDestination); - message.incrementReferenceCount(); - batchList.addLast(message); + public synchronized boolean recoverMessage(Message message) + throws Exception { + if (!isDuplicate(message.getMessageId())) { + message.setRegionDestination(regionDestination); + message.incrementReferenceCount(); + batchList.addLast(message); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring batched duplicated from store: " + message); + } + fillBatchDuplicates=true; + } return true; } @@ -153,8 +163,13 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message } // implementation - protected void fillBatch() throws Exception { + protected synchronized void fillBatch() throws Exception { store.recoverNextMessages(maxBatchSize, this); + while (fillBatchDuplicates && batchList.isEmpty()) { + fillBatchDuplicates=false; + store.recoverNextMessages(maxBatchSize, this); + } + fillBatchDuplicates=false; } public String toString() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 7f47c9e7eb..13396ba558 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -69,6 +69,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public synchronized void start() throws Exception { if (!started) { started = true; + super.start(); for (PendingMessageCursor tsp : storePrefetches) { tsp.start(); } @@ -78,6 +79,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { public synchronized void stop() throws Exception { if (started) { started = false; + super.stop(); for (PendingMessageCursor tsp : storePrefetches) { tsp.stop(); } @@ -96,6 +98,9 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription); tsp.setMaxBatchSize(getMaxBatchSize()); tsp.setSystemUsage(systemUsage); + tsp.setEnableAudit(isEnableAudit()); + tsp.setMaxAuditDepth(getMaxAuditDepth()); + tsp.setMaxProducersToAudit(getMaxProducersToAudit()); topics.put(destination, tsp); storePrefetches.add(tsp); if (started) { @@ -253,6 +258,36 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor { tsp.setSystemUsage(usageManager); } } + + public void setMaxProducersToAudit(int maxProducersToAudit) { + super.setMaxProducersToAudit(maxProducersToAudit); + for (PendingMessageCursor cursor : storePrefetches) { + cursor.setMaxAuditDepth(maxAuditDepth); + } + if (nonPersistent != null) { + nonPersistent.setMaxProducersToAudit(maxProducersToAudit); + } + } + + public void setMaxAuditDepth(int maxAuditDepth) { + super.setMaxAuditDepth(maxAuditDepth); + for (PendingMessageCursor cursor : storePrefetches) { + cursor.setMaxAuditDepth(maxAuditDepth); + } + if (nonPersistent != null) { + nonPersistent.setMaxAuditDepth(maxAuditDepth); + } + } + + public synchronized void setEnableAudit(boolean enableAudit) { + super.setEnableAudit(enableAudit); + for (PendingMessageCursor cursor : storePrefetches) { + cursor.setEnableAudit(enableAudit); + } + if (nonPersistent != null) { + nonPersistent.setEnableAudit(enableAudit); + } + } protected synchronized PendingMessageCursor getNextCursor() throws Exception { if (currentCursor == null || currentCursor.isEmpty()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java index eeb333b5ba..cb1234550c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.cursors; +import org.apache.activemq.ActiveMQMessageAudit; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.command.Message; @@ -55,10 +56,14 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { public synchronized void start() throws Exception { started = true; + super.start(); if (nonPersistent == null) { nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore); nonPersistent.setMaxBatchSize(getMaxBatchSize()); nonPersistent.setSystemUsage(systemUsage); + nonPersistent.setEnableAudit(isEnableAudit()); + nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); + nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); } nonPersistent.start(); persistent.start(); @@ -67,6 +72,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { public synchronized void stop() throws Exception { started = false; + super.stop(); if (nonPersistent != null) { nonPersistent.stop(); nonPersistent.gc(); @@ -191,6 +197,39 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor { } super.setMaxBatchSize(maxBatchSize); } + + + public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { + super.setMaxProducersToAudit(maxProducersToAudit); + if (persistent != null) { + persistent.setMaxProducersToAudit(maxProducersToAudit); + } + if (nonPersistent != null) { + nonPersistent.setMaxProducersToAudit(maxProducersToAudit); + } + } + + public synchronized void setMaxAuditDepth(int maxAuditDepth) { + super.setMaxAuditDepth(maxAuditDepth); + if (persistent != null) { + persistent.setMaxAuditDepth(maxAuditDepth); + } + if (nonPersistent != null) { + nonPersistent.setMaxAuditDepth(maxAuditDepth); + } + } + + public synchronized void setEnableAudit(boolean enableAudit) { + super.setEnableAudit(enableAudit); + if (persistent != null) { + persistent.setEnableAudit(enableAudit); + } + if (nonPersistent != null) { + nonPersistent.setEnableAudit(enableAudit); + } + } + + public synchronized void gc() { if (persistent != null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java index 81eb21b0b5..a281c1251f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java @@ -65,16 +65,18 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message this.subscriberName = subscriberName; } - public synchronized void start() { + public synchronized void start() throws Exception { if (!started) { started = true; + super.start(); safeFillBatch(); } } - public synchronized void stop() { + public synchronized void stop() throws Exception { if (started) { started = false; + super.stop(); store.resetBatching(clientId, subscriberName); gc(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index a5e51bedbe..0697c3aabf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -51,6 +51,9 @@ public class PolicyEntry extends DestinationMapEntry { private PendingQueueMessageStoragePolicy pendingQueuePolicy; private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy; private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy; + private int maxProducersToAudit=1024; + private int maxAuditDepth=1; + private boolean enableAudit=true; private boolean producerFlowControl = true; public void configure(Queue queue, Store tmpStore) { @@ -69,6 +72,9 @@ public class PolicyEntry extends DestinationMapEntry { queue.setMessages(messages); } queue.setProducerFlowControl(isProducerFlowControl()); + queue.setEnableAudit(isEnableAudit()); + queue.setMaxAuditDepth(getMaxAuditDepth()); + queue.setMaxProducersToAudit(getMaxProducersToAudit()); } public void configure(Topic topic) { @@ -86,6 +92,9 @@ public class PolicyEntry extends DestinationMapEntry { topic.getBrokerMemoryUsage().setLimit(memoryLimit); } topic.setProducerFlowControl(isProducerFlowControl()); + topic.setEnableAudit(isEnableAudit()); + topic.setMaxAuditDepth(getMaxAuditDepth()); + topic.setMaxProducersToAudit(getMaxProducersToAudit()); } public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) { @@ -266,12 +275,60 @@ public class PolicyEntry extends DestinationMapEntry { this.pendingSubscriberPolicy = pendingSubscriberPolicy; } + /** + * @return true if producer flow control enabled + */ public boolean isProducerFlowControl() { return producerFlowControl; } + /** + * @param producerFlowControl + */ public void setProducerFlowControl(boolean producerFlowControl) { this.producerFlowControl = producerFlowControl; } + /** + * @return the maxProducersToAudit + */ + public int getMaxProducersToAudit() { + return maxProducersToAudit; + } + + /** + * @param maxProducersToAudit the maxProducersToAudit to set + */ + public void setMaxProducersToAudit(int maxProducersToAudit) { + this.maxProducersToAudit = maxProducersToAudit; + } + + /** + * @return the maxAuditDepth + */ + public int getMaxAuditDepth() { + return maxAuditDepth; + } + + /** + * @param maxAuditDepth the maxAuditDepth to set + */ + public void setMaxAuditDepth(int maxAuditDepth) { + this.maxAuditDepth = maxAuditDepth; + } + + /** + * @return the enableAudit + */ + public boolean isEnableAudit() { + return enableAudit; + } + + /** + * @param enableAudit the enableAudit to set + */ + public void setEnableAudit(boolean enableAudit) { + this.enableAudit = enableAudit; + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java index 00f2e1abd7..30e245f0db 100755 --- a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java @@ -29,6 +29,7 @@ public class BitArrayBin { private int maxNumberOfArrays; private int firstIndex = -1; private int firstBin = -1; + private long lastBitSet=-1; /** * Create a BitArrayBin to a certain window size (number of messages to @@ -60,9 +61,26 @@ public class BitArrayBin { if (offset >= 0) { answer = ba.set(offset, value); } + if (value) { + lastBitSet=index; + }else { + lastBitSet=-1; + } } return answer; } + + /** + * Test if in order + * @param index + * @return true if next message is in order + */ + public boolean isInOrder(long index) { + if (lastBitSet== -1) { + return true; + } + return lastBitSet+1==index; + } /** * Get the boolean value at the index diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java index 8dca9b0251..1157d31fe4 100755 --- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java @@ -88,10 +88,32 @@ public class ActiveMQMessageAuditTest extends TestCase { ActiveMQMessage msg = new ActiveMQMessage(); msg.setMessageId(id); list.add(msg); - assertFalse(audit.isDuplicateMessageReference(msg)); + assertFalse(audit.isDuplicate(msg.getMessageId())); } for (MessageReference msg : list) { - assertTrue(audit.isDuplicateMessageReference(msg)); + assertTrue(audit.isDuplicate(msg)); + } + } + + public void testIsInOrderString() { + int count = 10000; + ActiveMQMessageAudit audit = new ActiveMQMessageAudit(); + IdGenerator idGen = new IdGenerator(); + // add to a list + List list = new ArrayList(); + for (int i = 0; i < count; i++) { + String id = idGen.generateId(); + if (i==0) { + assertFalse(audit.isDuplicate(id)); + } + if (i > 1 && i%2 != 0) { + list.add(id); + } + + } + for (String id : list) { + assertFalse(audit.isInOrder(id)); + assertFalse(audit.isDuplicate(id)); } } }