mirror of https://github.com/apache/activemq.git
https://issues.apache.org/activemq/browse/AMQ-729 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@586580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4f8a1e6d6b
commit
1ee00173d6
|
@ -16,8 +16,6 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
|
||||
|
@ -37,8 +35,9 @@ public class ActiveMQMessageAudit {
|
|||
|
||||
private static final int DEFAULT_WINDOW_SIZE = 1024;
|
||||
private static final int MAXIMUM_PRODUCER_COUNT = 128;
|
||||
private int windowSize;
|
||||
private Map<Object, BitArrayBin> map;
|
||||
private int auditDepth;
|
||||
private int maximumNumberOfProducersToTrack;
|
||||
private LRUCache<Object, BitArrayBin> map;
|
||||
|
||||
/**
|
||||
* Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack =
|
||||
|
@ -51,13 +50,44 @@ public class ActiveMQMessageAudit {
|
|||
/**
|
||||
* Construct a MessageAudit
|
||||
*
|
||||
* @param windowSize range of ids to track
|
||||
* @param auditDepth range of ids to track
|
||||
* @param maximumNumberOfProducersToTrack number of producers expected in
|
||||
* the system
|
||||
*/
|
||||
public ActiveMQMessageAudit(int windowSize, final int maximumNumberOfProducersToTrack) {
|
||||
this.windowSize = windowSize;
|
||||
map = new LRUCache<Object, BitArrayBin>(maximumNumberOfProducersToTrack, maximumNumberOfProducersToTrack, 0.75f, true);
|
||||
public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
|
||||
this.auditDepth = auditDepth;
|
||||
this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
|
||||
this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the auditDepth
|
||||
*/
|
||||
public int getAuditDepth() {
|
||||
return auditDepth;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param auditDepth the auditDepth to set
|
||||
*/
|
||||
public void setAuditDepth(int auditDepth) {
|
||||
this.auditDepth = auditDepth;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximumNumberOfProducersToTrack
|
||||
*/
|
||||
public int getMaximumNumberOfProducersToTrack() {
|
||||
return maximumNumberOfProducersToTrack;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
|
||||
*/
|
||||
public void setMaximumNumberOfProducersToTrack(
|
||||
int maximumNumberOfProducersToTrack) {
|
||||
this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
|
||||
this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,7 +97,7 @@ public class ActiveMQMessageAudit {
|
|||
* @return true if the message is a duplicate
|
||||
* @throws JMSException
|
||||
*/
|
||||
public boolean isDuplicateMessage(Message message) throws JMSException {
|
||||
public boolean isDuplicate(Message message) throws JMSException {
|
||||
return isDuplicate(message.getJMSMessageID());
|
||||
}
|
||||
|
||||
|
@ -84,7 +114,7 @@ public class ActiveMQMessageAudit {
|
|||
if (seed != null) {
|
||||
BitArrayBin bab = map.get(seed);
|
||||
if (bab == null) {
|
||||
bab = new BitArrayBin(windowSize);
|
||||
bab = new BitArrayBin(auditDepth);
|
||||
map.put(seed, bab);
|
||||
}
|
||||
long index = IdGenerator.getSequenceFromId(id);
|
||||
|
@ -101,9 +131,9 @@ public class ActiveMQMessageAudit {
|
|||
* @param message
|
||||
* @return true if the message is a duplicate
|
||||
*/
|
||||
public boolean isDuplicateMessageReference(final MessageReference message) {
|
||||
public boolean isDuplicate(final MessageReference message) {
|
||||
MessageId id = message.getMessageId();
|
||||
return isDuplicateMessageId(id);
|
||||
return isDuplicate(id);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -112,7 +142,7 @@ public class ActiveMQMessageAudit {
|
|||
* @param id
|
||||
* @return true if the message is a duplicate
|
||||
*/
|
||||
public synchronized boolean isDuplicateMessageId(final MessageId id) {
|
||||
public synchronized boolean isDuplicate(final MessageId id) {
|
||||
boolean answer = false;
|
||||
|
||||
if (id != null) {
|
||||
|
@ -120,7 +150,7 @@ public class ActiveMQMessageAudit {
|
|||
if (pid != null) {
|
||||
BitArrayBin bab = map.get(pid);
|
||||
if (bab == null) {
|
||||
bab = new BitArrayBin(windowSize);
|
||||
bab = new BitArrayBin(auditDepth);
|
||||
map.put(pid, bab);
|
||||
}
|
||||
answer = bab.setBit(id.getProducerSequenceId(), true);
|
||||
|
@ -134,9 +164,9 @@ public class ActiveMQMessageAudit {
|
|||
*
|
||||
* @param message
|
||||
*/
|
||||
public void rollbackMessageReference(final MessageReference message) {
|
||||
public void rollback(final MessageReference message) {
|
||||
MessageId id = message.getMessageId();
|
||||
rollbackMessageId(id);
|
||||
rollback(id);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -144,7 +174,7 @@ public class ActiveMQMessageAudit {
|
|||
*
|
||||
* @param id
|
||||
*/
|
||||
public synchronized void rollbackMessageId(final MessageId id) {
|
||||
public synchronized void rollback(final MessageId id) {
|
||||
if (id != null) {
|
||||
ProducerId pid = id.getProducerId();
|
||||
if (pid != null) {
|
||||
|
@ -155,4 +185,58 @@ public class ActiveMQMessageAudit {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the message is in order
|
||||
* @param msg
|
||||
* @return
|
||||
* @throws JMSException
|
||||
*/
|
||||
public boolean isInOrder(Message msg) throws JMSException {
|
||||
return isInOrder(msg.getJMSMessageID());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the message id is in order
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
public synchronized boolean isInOrder(final String id) {
|
||||
boolean answer = true;
|
||||
|
||||
if (id != null) {
|
||||
String seed = IdGenerator.getSeedFromId(id);
|
||||
if (seed != null) {
|
||||
BitArrayBin bab = map.get(seed);
|
||||
if (bab != null) {
|
||||
long index = IdGenerator.getSequenceFromId(id);
|
||||
answer = bab.isInOrder(index);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check the MessageId is in order
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
public synchronized boolean isInOrder(final MessageId id) {
|
||||
boolean answer = true;
|
||||
|
||||
if (id != null) {
|
||||
ProducerId pid = id.getProducerId();
|
||||
if (pid != null) {
|
||||
BitArrayBin bab = map.get(pid);
|
||||
if (bab != null) {
|
||||
answer = bab.isInOrder(id.getProducerSequenceId());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ class ConnectionAudit {
|
|||
audit = new ActiveMQMessageAudit();
|
||||
destinations.put(destination, audit);
|
||||
}
|
||||
boolean result = audit.isDuplicateMessageReference(message);
|
||||
boolean result = audit.isDuplicate(message);
|
||||
return result;
|
||||
}
|
||||
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
|
||||
|
@ -52,7 +52,7 @@ class ConnectionAudit {
|
|||
audit = new ActiveMQMessageAudit();
|
||||
dispatchers.put(dispatcher, audit);
|
||||
}
|
||||
boolean result = audit.isDuplicateMessageReference(message);
|
||||
boolean result = audit.isDuplicate(message);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -66,12 +66,12 @@ class ConnectionAudit {
|
|||
if (destination.isQueue()) {
|
||||
ActiveMQMessageAudit audit = destinations.get(destination);
|
||||
if (audit != null) {
|
||||
audit.rollbackMessageReference(message);
|
||||
audit.rollback(message);
|
||||
}
|
||||
} else {
|
||||
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
|
||||
if (audit != null) {
|
||||
audit.rollbackMessageReference(message);
|
||||
audit.rollback(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,14 +211,14 @@ public class TransactionBroker extends BrokerFilter {
|
|||
|
||||
public void afterRollback() {
|
||||
if (audit != null) {
|
||||
audit.rollbackMessageReference(message);
|
||||
audit.rollback(message);
|
||||
}
|
||||
}
|
||||
};
|
||||
transaction.addSynchronization(sync);
|
||||
}
|
||||
}
|
||||
if (audit == null || !audit.isDuplicateMessageReference(message)) {
|
||||
if (audit == null || !audit.isDuplicate(message)) {
|
||||
context.setTransaction(transaction);
|
||||
try {
|
||||
next.send(producerExchange, message);
|
||||
|
|
|
@ -23,12 +23,57 @@ package org.apache.activemq.broker.region;
|
|||
public abstract class BaseDestination implements Destination {
|
||||
|
||||
private boolean producerFlowControl = true;
|
||||
|
||||
private int maxProducersToAudit=1024;
|
||||
private int maxAuditDepth=1;
|
||||
private boolean enableAudit=true;
|
||||
/**
|
||||
* @return the producerFlowControl
|
||||
*/
|
||||
public boolean isProducerFlowControl() {
|
||||
return this.producerFlowControl;
|
||||
return producerFlowControl;
|
||||
}
|
||||
/**
|
||||
* @param producerFlowControl the producerFlowControl to set
|
||||
*/
|
||||
public void setProducerFlowControl(boolean producerFlowControl) {
|
||||
this.producerFlowControl = producerFlowControl;
|
||||
}
|
||||
/**
|
||||
* @return the maxProducersToAudit
|
||||
*/
|
||||
public int getMaxProducersToAudit() {
|
||||
return maxProducersToAudit;
|
||||
}
|
||||
/**
|
||||
* @param maxProducersToAudit the maxProducersToAudit to set
|
||||
*/
|
||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
this.maxProducersToAudit = maxProducersToAudit;
|
||||
}
|
||||
/**
|
||||
* @return the maxAuditDepth
|
||||
*/
|
||||
public int getMaxAuditDepth() {
|
||||
return maxAuditDepth;
|
||||
}
|
||||
/**
|
||||
* @param maxAuditDepth the maxAuditDepth to set
|
||||
*/
|
||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||
this.maxAuditDepth = maxAuditDepth;
|
||||
}
|
||||
/**
|
||||
* @return the enableAudit
|
||||
*/
|
||||
public boolean isEnableAudit() {
|
||||
return enableAudit;
|
||||
}
|
||||
/**
|
||||
* @param enableAudit the enableAudit to set
|
||||
*/
|
||||
public void setEnableAudit(boolean enableAudit) {
|
||||
this.enableAudit = enableAudit;
|
||||
}
|
||||
|
||||
public void setProducerFlowControl(boolean value) {
|
||||
this.producerFlowControl = value;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -142,6 +142,9 @@ public class Queue extends BaseDestination implements Task {
|
|||
if (store != null) {
|
||||
// Restore the persistent messages.
|
||||
messages.setSystemUsage(systemUsage);
|
||||
messages.setEnableAudit(isEnableAudit());
|
||||
messages.setMaxAuditDepth(getMaxAuditDepth());
|
||||
messages.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
if (messages.isRecoveryRequired()) {
|
||||
store.recover(new MessageRecoveryListener() {
|
||||
|
||||
|
@ -442,7 +445,7 @@ public class Queue extends BaseDestination implements Task {
|
|||
}
|
||||
}
|
||||
|
||||
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
|
||||
synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
|
||||
final ConnectionContext context = producerExchange.getConnectionContext();
|
||||
message.setRegionDestination(this);
|
||||
if (store != null && message.isPersistent()) {
|
||||
|
@ -567,7 +570,7 @@ public class Queue extends BaseDestination implements Task {
|
|||
doPageIn(false);
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
public void stop() throws Exception{
|
||||
if (taskRunner != null) {
|
||||
taskRunner.shutdown();
|
||||
}
|
||||
|
|
|
@ -17,9 +17,12 @@
|
|||
package org.apache.activemq.broker.region.cursors;
|
||||
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageAudit;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
|
||||
/**
|
||||
|
@ -32,11 +35,21 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
|
|||
protected int memoryUsageHighWaterMark = 90;
|
||||
protected int maxBatchSize = 100;
|
||||
protected SystemUsage systemUsage;
|
||||
protected int maxProducersToAudit=1024;
|
||||
protected int maxAuditDepth=1;
|
||||
protected boolean enableAudit=true;
|
||||
protected ActiveMQMessageAudit audit;
|
||||
private boolean started=false;
|
||||
|
||||
public void start() throws Exception {
|
||||
public synchronized void start() throws Exception {
|
||||
if (!started && enableAudit && audit==null) {
|
||||
audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
|
||||
}
|
||||
started=true;
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
public synchronized void stop() throws Exception {
|
||||
started=false;
|
||||
gc();
|
||||
}
|
||||
|
||||
|
@ -168,4 +181,68 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
|
|||
public LinkedList pageInList(int maxItems) {
|
||||
throw new RuntimeException("Not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxProducersToAudit
|
||||
*/
|
||||
public int getMaxProducersToAudit() {
|
||||
return maxProducersToAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxProducersToAudit the maxProducersToAudit to set
|
||||
*/
|
||||
public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
this.maxProducersToAudit = maxProducersToAudit;
|
||||
if (audit != null) {
|
||||
this.audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxAuditDepth
|
||||
*/
|
||||
public int getMaxAuditDepth() {
|
||||
return this.maxAuditDepth;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param maxAuditDepth the maxAuditDepth to set
|
||||
*/
|
||||
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
|
||||
this.maxAuditDepth = maxAuditDepth;
|
||||
if (audit != null) {
|
||||
this.audit.setAuditDepth(maxAuditDepth);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the enableAudit
|
||||
*/
|
||||
public boolean isEnableAudit() {
|
||||
return this.enableAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param enableAudit the enableAudit to set
|
||||
*/
|
||||
public synchronized void setEnableAudit(boolean enableAudit) {
|
||||
this.enableAudit = enableAudit;
|
||||
if (this.enableAudit && started && audit==null) {
|
||||
audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected synchronized boolean isDuplicate(MessageId messageId) {
|
||||
if (!this.enableAudit || this.audit==null) {
|
||||
return false;
|
||||
}
|
||||
return this.audit.isDuplicate(messageId);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -63,17 +63,18 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
this.store = store;
|
||||
}
|
||||
|
||||
public void start() {
|
||||
public void start() throws Exception {
|
||||
if (started.compareAndSet(false, true)) {
|
||||
super.start();
|
||||
if (systemUsage != null) {
|
||||
systemUsage.getMemoryUsage().addUsageListener(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
public void stop() throws Exception {
|
||||
if (started.compareAndSet(true, false)) {
|
||||
gc();
|
||||
super.stop();
|
||||
if (systemUsage != null) {
|
||||
systemUsage.getMemoryUsage().removeUsageListener(this);
|
||||
}
|
||||
|
@ -118,7 +119,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void destroy() {
|
||||
public synchronized void destroy() throws Exception {
|
||||
stop();
|
||||
for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext();) {
|
||||
Message node = (Message)i.next();
|
||||
|
|
|
@ -210,5 +210,37 @@ public interface PendingMessageCursor extends Service {
|
|||
* @return a list of paged in messages
|
||||
*/
|
||||
LinkedList pageInList(int maxItems);
|
||||
|
||||
/**
|
||||
* set the maximum number of producers to track at one time
|
||||
* @param value
|
||||
*/
|
||||
void setMaxProducersToAudit(int value);
|
||||
|
||||
/**
|
||||
* @return the maximum number of producers to audit
|
||||
*/
|
||||
int getMaxProducersToAudit();
|
||||
|
||||
/**
|
||||
* Set the maximum depth of message ids to track
|
||||
* @param depth
|
||||
*/
|
||||
void setMaxAuditDepth(int depth);
|
||||
|
||||
/**
|
||||
* @return the audit depth
|
||||
*/
|
||||
int getMaxAuditDepth();
|
||||
|
||||
/**
|
||||
* @return the enableAudit
|
||||
*/
|
||||
public boolean isEnableAudit();
|
||||
/**
|
||||
* @param enableAudit the enableAudit to set
|
||||
*/
|
||||
public void setEnableAudit(boolean enableAudit);
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* perist pending messages pending message (messages awaiting disptach to a
|
||||
* persist pending messages pending message (messages awaiting dispatch to a
|
||||
* consumer) cursor
|
||||
*
|
||||
* @version $Revision: 474985 $
|
||||
|
@ -42,6 +42,7 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
|
|||
private final LinkedList<Message> batchList = new LinkedList<Message>();
|
||||
private Destination regionDestination;
|
||||
private int size;
|
||||
private boolean fillBatchDuplicates;
|
||||
|
||||
/**
|
||||
* @param topic
|
||||
|
@ -55,13 +56,14 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
|
|||
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
public void start() throws Exception{
|
||||
super.start();
|
||||
store.resetBatching();
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
store.resetBatching();
|
||||
gc();
|
||||
super.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -127,10 +129,18 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
|
|||
public void finished() {
|
||||
}
|
||||
|
||||
public boolean recoverMessage(Message message) throws Exception {
|
||||
message.setRegionDestination(regionDestination);
|
||||
message.incrementReferenceCount();
|
||||
batchList.addLast(message);
|
||||
public synchronized boolean recoverMessage(Message message)
|
||||
throws Exception {
|
||||
if (!isDuplicate(message.getMessageId())) {
|
||||
message.setRegionDestination(regionDestination);
|
||||
message.incrementReferenceCount();
|
||||
batchList.addLast(message);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring batched duplicated from store: " + message);
|
||||
}
|
||||
fillBatchDuplicates=true;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -153,8 +163,13 @@ class QueueStorePrefetch extends AbstractPendingMessageCursor implements Message
|
|||
}
|
||||
|
||||
// implementation
|
||||
protected void fillBatch() throws Exception {
|
||||
protected synchronized void fillBatch() throws Exception {
|
||||
store.recoverNextMessages(maxBatchSize, this);
|
||||
while (fillBatchDuplicates && batchList.isEmpty()) {
|
||||
fillBatchDuplicates=false;
|
||||
store.recoverNextMessages(maxBatchSize, this);
|
||||
}
|
||||
fillBatchDuplicates=false;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
|
|
|
@ -69,6 +69,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
public synchronized void start() throws Exception {
|
||||
if (!started) {
|
||||
started = true;
|
||||
super.start();
|
||||
for (PendingMessageCursor tsp : storePrefetches) {
|
||||
tsp.start();
|
||||
}
|
||||
|
@ -78,6 +79,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
public synchronized void stop() throws Exception {
|
||||
if (started) {
|
||||
started = false;
|
||||
super.stop();
|
||||
for (PendingMessageCursor tsp : storePrefetches) {
|
||||
tsp.stop();
|
||||
}
|
||||
|
@ -96,6 +98,9 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName, subscription);
|
||||
tsp.setMaxBatchSize(getMaxBatchSize());
|
||||
tsp.setSystemUsage(systemUsage);
|
||||
tsp.setEnableAudit(isEnableAudit());
|
||||
tsp.setMaxAuditDepth(getMaxAuditDepth());
|
||||
tsp.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
topics.put(destination, tsp);
|
||||
storePrefetches.add(tsp);
|
||||
if (started) {
|
||||
|
@ -253,6 +258,36 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
|
|||
tsp.setSystemUsage(usageManager);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
super.setMaxProducersToAudit(maxProducersToAudit);
|
||||
for (PendingMessageCursor cursor : storePrefetches) {
|
||||
cursor.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||
super.setMaxAuditDepth(maxAuditDepth);
|
||||
for (PendingMessageCursor cursor : storePrefetches) {
|
||||
cursor.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void setEnableAudit(boolean enableAudit) {
|
||||
super.setEnableAudit(enableAudit);
|
||||
for (PendingMessageCursor cursor : storePrefetches) {
|
||||
cursor.setEnableAudit(enableAudit);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setEnableAudit(enableAudit);
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
|
||||
if (currentCursor == null || currentCursor.isEmpty()) {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.region.cursors;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageAudit;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.command.Message;
|
||||
|
@ -55,10 +56,14 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
public synchronized void start() throws Exception {
|
||||
started = true;
|
||||
super.start();
|
||||
if (nonPersistent == null) {
|
||||
nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
|
||||
nonPersistent.setMaxBatchSize(getMaxBatchSize());
|
||||
nonPersistent.setSystemUsage(systemUsage);
|
||||
nonPersistent.setEnableAudit(isEnableAudit());
|
||||
nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
|
||||
nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
}
|
||||
nonPersistent.start();
|
||||
persistent.start();
|
||||
|
@ -67,6 +72,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
public synchronized void stop() throws Exception {
|
||||
started = false;
|
||||
super.stop();
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.stop();
|
||||
nonPersistent.gc();
|
||||
|
@ -191,6 +197,39 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
}
|
||||
super.setMaxBatchSize(maxBatchSize);
|
||||
}
|
||||
|
||||
|
||||
public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
super.setMaxProducersToAudit(maxProducersToAudit);
|
||||
if (persistent != null) {
|
||||
persistent.setMaxProducersToAudit(maxProducersToAudit);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void setMaxAuditDepth(int maxAuditDepth) {
|
||||
super.setMaxAuditDepth(maxAuditDepth);
|
||||
if (persistent != null) {
|
||||
persistent.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setMaxAuditDepth(maxAuditDepth);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void setEnableAudit(boolean enableAudit) {
|
||||
super.setEnableAudit(enableAudit);
|
||||
if (persistent != null) {
|
||||
persistent.setEnableAudit(enableAudit);
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.setEnableAudit(enableAudit);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public synchronized void gc() {
|
||||
if (persistent != null) {
|
||||
|
|
|
@ -65,16 +65,18 @@ class TopicStorePrefetch extends AbstractPendingMessageCursor implements Message
|
|||
this.subscriberName = subscriberName;
|
||||
}
|
||||
|
||||
public synchronized void start() {
|
||||
public synchronized void start() throws Exception {
|
||||
if (!started) {
|
||||
started = true;
|
||||
super.start();
|
||||
safeFillBatch();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void stop() {
|
||||
public synchronized void stop() throws Exception {
|
||||
if (started) {
|
||||
started = false;
|
||||
super.stop();
|
||||
store.resetBatching(clientId, subscriberName);
|
||||
gc();
|
||||
}
|
||||
|
|
|
@ -51,6 +51,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
|
||||
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
|
||||
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
|
||||
private int maxProducersToAudit=1024;
|
||||
private int maxAuditDepth=1;
|
||||
private boolean enableAudit=true;
|
||||
private boolean producerFlowControl = true;
|
||||
|
||||
public void configure(Queue queue, Store tmpStore) {
|
||||
|
@ -69,6 +72,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
queue.setMessages(messages);
|
||||
}
|
||||
queue.setProducerFlowControl(isProducerFlowControl());
|
||||
queue.setEnableAudit(isEnableAudit());
|
||||
queue.setMaxAuditDepth(getMaxAuditDepth());
|
||||
queue.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
}
|
||||
|
||||
public void configure(Topic topic) {
|
||||
|
@ -86,6 +92,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
topic.getBrokerMemoryUsage().setLimit(memoryLimit);
|
||||
}
|
||||
topic.setProducerFlowControl(isProducerFlowControl());
|
||||
topic.setEnableAudit(isEnableAudit());
|
||||
topic.setMaxAuditDepth(getMaxAuditDepth());
|
||||
topic.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
}
|
||||
|
||||
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
|
||||
|
@ -266,12 +275,60 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
this.pendingSubscriberPolicy = pendingSubscriberPolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if producer flow control enabled
|
||||
*/
|
||||
public boolean isProducerFlowControl() {
|
||||
return producerFlowControl;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param producerFlowControl
|
||||
*/
|
||||
public void setProducerFlowControl(boolean producerFlowControl) {
|
||||
this.producerFlowControl = producerFlowControl;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxProducersToAudit
|
||||
*/
|
||||
public int getMaxProducersToAudit() {
|
||||
return maxProducersToAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxProducersToAudit the maxProducersToAudit to set
|
||||
*/
|
||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||
this.maxProducersToAudit = maxProducersToAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxAuditDepth
|
||||
*/
|
||||
public int getMaxAuditDepth() {
|
||||
return maxAuditDepth;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxAuditDepth the maxAuditDepth to set
|
||||
*/
|
||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||
this.maxAuditDepth = maxAuditDepth;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the enableAudit
|
||||
*/
|
||||
public boolean isEnableAudit() {
|
||||
return enableAudit;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param enableAudit the enableAudit to set
|
||||
*/
|
||||
public void setEnableAudit(boolean enableAudit) {
|
||||
this.enableAudit = enableAudit;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ public class BitArrayBin {
|
|||
private int maxNumberOfArrays;
|
||||
private int firstIndex = -1;
|
||||
private int firstBin = -1;
|
||||
private long lastBitSet=-1;
|
||||
|
||||
/**
|
||||
* Create a BitArrayBin to a certain window size (number of messages to
|
||||
|
@ -60,9 +61,26 @@ public class BitArrayBin {
|
|||
if (offset >= 0) {
|
||||
answer = ba.set(offset, value);
|
||||
}
|
||||
if (value) {
|
||||
lastBitSet=index;
|
||||
}else {
|
||||
lastBitSet=-1;
|
||||
}
|
||||
}
|
||||
return answer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test if in order
|
||||
* @param index
|
||||
* @return true if next message is in order
|
||||
*/
|
||||
public boolean isInOrder(long index) {
|
||||
if (lastBitSet== -1) {
|
||||
return true;
|
||||
}
|
||||
return lastBitSet+1==index;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the boolean value at the index
|
||||
|
|
|
@ -88,10 +88,32 @@ public class ActiveMQMessageAuditTest extends TestCase {
|
|||
ActiveMQMessage msg = new ActiveMQMessage();
|
||||
msg.setMessageId(id);
|
||||
list.add(msg);
|
||||
assertFalse(audit.isDuplicateMessageReference(msg));
|
||||
assertFalse(audit.isDuplicate(msg.getMessageId()));
|
||||
}
|
||||
for (MessageReference msg : list) {
|
||||
assertTrue(audit.isDuplicateMessageReference(msg));
|
||||
assertTrue(audit.isDuplicate(msg));
|
||||
}
|
||||
}
|
||||
|
||||
public void testIsInOrderString() {
|
||||
int count = 10000;
|
||||
ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
||||
IdGenerator idGen = new IdGenerator();
|
||||
// add to a list
|
||||
List<String> list = new ArrayList<String>();
|
||||
for (int i = 0; i < count; i++) {
|
||||
String id = idGen.generateId();
|
||||
if (i==0) {
|
||||
assertFalse(audit.isDuplicate(id));
|
||||
}
|
||||
if (i > 1 && i%2 != 0) {
|
||||
list.add(id);
|
||||
}
|
||||
|
||||
}
|
||||
for (String id : list) {
|
||||
assertFalse(audit.isInOrder(id));
|
||||
assertFalse(audit.isDuplicate(id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue