git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@753222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-03-13 12:33:18 +00:00
parent a91f59a201
commit 2a58f4c3a2
5 changed files with 61 additions and 6 deletions

View File

@ -2196,6 +2196,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
this.producerWindowSize = producerWindowSize;
}
public void setAuditDepth(int auditDepth) {
connectionAudit.setAuditDepth(auditDepth);
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
}
protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
connectionAudit.removeDispatcher(dispatcher);
}

View File

@ -112,6 +112,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private boolean sendAcksAsync=true;
private TransportListener transportListener;
private ExceptionListener exceptionListener;
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
// /////////////////////////////////////////////
//
@ -310,6 +312,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@ -669,6 +673,8 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
props.setProperty("auditMaximumProducerNumber", Integer.toString(getAuditMaximumProducerNumber()));
}
public boolean isUseCompression() {
@ -882,4 +888,20 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public void setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
public int getAuditDepth() {
return auditDepth;
}
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
}
public int getAuditMaximumProducerNumber() {
return auditMaximumProducerNumber;
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
this.auditMaximumProducerNumber = auditMaximumProducerNumber;
}
}

View File

@ -33,15 +33,15 @@ import org.apache.activemq.util.LRUCache;
*/
public class ActiveMQMessageAudit {
private static final int DEFAULT_WINDOW_SIZE = 2048;
private static final int MAXIMUM_PRODUCER_COUNT = 64;
public static final int DEFAULT_WINDOW_SIZE = 2048;
public static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
/**
* Default Constructor windowSize = 1024, maximumNumberOfProducersToTrack =
* 128
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
* 64
*/
public ActiveMQMessageAudit() {
this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);

View File

@ -30,6 +30,11 @@ class ConnectionAudit {
private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit> destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit> dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
dispatchers.remove(dispatcher);
}
@ -41,7 +46,7 @@ class ConnectionAudit {
if (destination.isQueue()) {
ActiveMQMessageAudit audit = destinations.get(destination);
if (audit == null) {
audit = new ActiveMQMessageAudit();
audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
destinations.put(destination, audit);
}
boolean result = audit.isDuplicate(message);
@ -49,7 +54,7 @@ class ConnectionAudit {
}
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
if (audit == null) {
audit = new ActiveMQMessageAudit();
audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
dispatchers.put(dispatcher, audit);
}
boolean result = audit.isDuplicate(message);
@ -91,4 +96,21 @@ class ConnectionAudit {
void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
public int getAuditDepth() {
return auditDepth;
}
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
}
public int getAuditMaximumProducerNumber() {
return auditMaximumProducerNumber;
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
this.auditMaximumProducerNumber = auditMaximumProducerNumber;
}
}

View File

@ -82,6 +82,9 @@ public class ActiveMQConnectionFactoryTest extends CombinationTestSupport {
assertTrue(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL());
cf = new ActiveMQConnectionFactory("vm://localhost?jms.auditDepth=5000");
assertEquals(5000, cf.getAuditDepth());
}
public void testUseURIToConfigureRedeliveryPolicy() throws URISyntaxException, JMSException {