mirror of https://github.com/apache/activemq.git
[AMQ-9217] Fix per-destination audits on IndividualDeadLetterStrategy
This commit is contained in:
parent
97bfc67332
commit
28f7eb7ee8
|
@ -31,13 +31,12 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
private boolean processNonPersistent = false;
|
private boolean processNonPersistent = false;
|
||||||
private boolean processExpired = true;
|
private boolean processExpired = true;
|
||||||
private boolean enableAudit = true;
|
private boolean enableAudit = true;
|
||||||
private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
|
|
||||||
private long expiration;
|
private long expiration;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void rollback(Message message) {
|
public void rollback(Message message) {
|
||||||
if (message != null && this.enableAudit) {
|
if (message != null && this.enableAudit) {
|
||||||
messageAudit.rollback(message);
|
lookupActiveMQMessageAudit(message).rollback(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,7 +45,7 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (message != null) {
|
if (message != null) {
|
||||||
result = true;
|
result = true;
|
||||||
if (enableAudit && messageAudit.isDuplicate(message)) {
|
if (enableAudit && lookupActiveMQMessageAudit(message).isDuplicate(message)) {
|
||||||
result = false;
|
result = false;
|
||||||
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
|
LOG.debug("Not adding duplicate to DLQ: {}, dest: {}", message.getMessageId(), message.getDestination());
|
||||||
}
|
}
|
||||||
|
@ -108,20 +107,13 @@ public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
|
||||||
this.expiration = expiration;
|
this.expiration = expiration;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxProducersToAudit() {
|
public abstract int getMaxProducersToAudit();
|
||||||
return messageAudit.getMaximumNumberOfProducersToTrack();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
public abstract void setMaxProducersToAudit(int maxProducersToAudit);
|
||||||
messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
public abstract void setMaxAuditDepth(int maxAuditDepth);
|
||||||
messageAudit.setAuditDepth(maxAuditDepth);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMaxAuditDepth() {
|
public abstract int getMaxAuditDepth();
|
||||||
return messageAudit.getAuditDepth();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
protected abstract ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region.policy;
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQMessageAudit;
|
||||||
import org.apache.activemq.broker.region.Destination;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
import org.apache.activemq.broker.region.DurableTopicSubscription;
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
|
@ -23,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.util.LRUCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link DeadLetterStrategy} where each destination has its own individual
|
* A {@link DeadLetterStrategy} where each destination has its own individual
|
||||||
|
@ -40,6 +42,10 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
private boolean useQueueForQueueMessages = true;
|
private boolean useQueueForQueueMessages = true;
|
||||||
private boolean useQueueForTopicMessages = true;
|
private boolean useQueueForTopicMessages = true;
|
||||||
private boolean destinationPerDurableSubscriber;
|
private boolean destinationPerDurableSubscriber;
|
||||||
|
private int maxAuditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
|
||||||
|
private int maxProducersToAudit = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
|
||||||
|
|
||||||
|
private final LRUCache<String,ActiveMQMessageAudit> dedicatedMessageAudits = new LRUCache<>(10_000);
|
||||||
|
|
||||||
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
|
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
|
||||||
if (message.getDestination().isQueue()) {
|
if (message.getDestination().isQueue()) {
|
||||||
|
@ -51,6 +57,13 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
|
|
||||||
// Properties
|
// Properties
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
public int getMaxDestinationsToAudit() {
|
||||||
|
return dedicatedMessageAudits.getMaxCacheSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void maxDestinationsToAudit(int maxDestinationsToAudit) {
|
||||||
|
this.dedicatedMessageAudits.setMaxCacheSize(maxDestinationsToAudit);
|
||||||
|
}
|
||||||
|
|
||||||
public String getQueuePrefix() {
|
public String getQueuePrefix() {
|
||||||
return queuePrefix;
|
return queuePrefix;
|
||||||
|
@ -134,6 +147,26 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
|
this.destinationPerDurableSubscriber = destinationPerDurableSubscriber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxProducersToAudit() {
|
||||||
|
return this.maxProducersToAudit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||||
|
this.maxProducersToAudit = maxProducersToAudit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||||
|
this.maxAuditDepth = maxAuditDepth;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxAuditDepth() {
|
||||||
|
return this.maxAuditDepth;
|
||||||
|
}
|
||||||
|
|
||||||
// Implementation methods
|
// Implementation methods
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
protected ActiveMQDestination createDestination(Message message,
|
protected ActiveMQDestination createDestination(Message message,
|
||||||
|
@ -168,4 +201,19 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
|
||||||
|
ActiveMQMessageAudit messageAudit;
|
||||||
|
|
||||||
|
synchronized(dedicatedMessageAudits) {
|
||||||
|
messageAudit = dedicatedMessageAudits.get(message.getDestination().getQualifiedName());
|
||||||
|
|
||||||
|
if(messageAudit == null) {
|
||||||
|
messageAudit = new ActiveMQMessageAudit(getMaxAuditDepth(), getMaxProducersToAudit());
|
||||||
|
dedicatedMessageAudits.put(message.getDestination().getQualifiedName(), messageAudit);
|
||||||
|
}
|
||||||
|
|
||||||
|
return messageAudit;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.region.policy;
|
package org.apache.activemq.broker.region.policy;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQMessageAudit;
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
@ -35,6 +36,7 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
|
public static final String DEFAULT_DEAD_LETTER_QUEUE_NAME = "ActiveMQ.DLQ";
|
||||||
|
|
||||||
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
|
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue(DEFAULT_DEAD_LETTER_QUEUE_NAME);
|
||||||
|
private final ActiveMQMessageAudit messageAudit = new ActiveMQMessageAudit();
|
||||||
|
|
||||||
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
|
public ActiveMQDestination getDeadLetterQueueFor(Message message, Subscription subscription) {
|
||||||
return deadLetterQueue;
|
return deadLetterQueue;
|
||||||
|
@ -48,4 +50,29 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
this.deadLetterQueue = deadLetterQueue;
|
this.deadLetterQueue = deadLetterQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxProducersToAudit() {
|
||||||
|
return messageAudit.getMaximumNumberOfProducersToTrack();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||||
|
messageAudit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||||
|
messageAudit.setAuditDepth(maxAuditDepth);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxAuditDepth() {
|
||||||
|
return messageAudit.getAuditDepth();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActiveMQMessageAudit lookupActiveMQMessageAudit(Message message) {
|
||||||
|
return messageAudit;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,22 +16,31 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker.policy;
|
package org.apache.activemq.broker.policy;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.jms.DeliveryMode;
|
import javax.jms.DeliveryMode;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
import javax.jms.Queue;
|
import javax.jms.Queue;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.virtual.CompositeQueue;
|
||||||
|
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||||
|
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.util.Wait;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -48,12 +57,37 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||||
strategy.setProcessNonPersistent(true);
|
strategy.setProcessNonPersistent(true);
|
||||||
policy.setDeadLetterStrategy(strategy);
|
policy.setDeadLetterStrategy(strategy);
|
||||||
|
|
||||||
|
PolicyEntry indvAuditPolicy = new PolicyEntry();
|
||||||
|
IndividualDeadLetterStrategy indvAuditDlqStrategy = new IndividualDeadLetterStrategy();
|
||||||
|
indvAuditDlqStrategy.setEnableAudit(true);
|
||||||
|
indvAuditPolicy.setDeadLetterStrategy(indvAuditDlqStrategy);
|
||||||
|
|
||||||
|
PolicyEntry shrAuditPolicy = new PolicyEntry();
|
||||||
|
SharedDeadLetterStrategy shrAuditDlqStrategy = new SharedDeadLetterStrategy();
|
||||||
|
shrAuditDlqStrategy.setEnableAudit(true);
|
||||||
|
shrAuditPolicy.setDeadLetterStrategy(shrAuditDlqStrategy);
|
||||||
|
|
||||||
PolicyMap pMap = new PolicyMap();
|
PolicyMap pMap = new PolicyMap();
|
||||||
pMap.put(new ActiveMQQueue(getDestinationString()), policy);
|
pMap.put(new ActiveMQQueue(getDestinationString()), policy);
|
||||||
pMap.put(new ActiveMQTopic(getDestinationString()), policy);
|
pMap.put(new ActiveMQTopic(getDestinationString()), policy);
|
||||||
|
pMap.put(new ActiveMQQueue(getDestinationString() + ".INDV.>"), indvAuditPolicy);
|
||||||
|
pMap.put(new ActiveMQQueue(getDestinationString() + ".SHR.>"), shrAuditPolicy);
|
||||||
|
|
||||||
broker.setDestinationPolicy(pMap);
|
broker.setDestinationPolicy(pMap);
|
||||||
|
|
||||||
|
CompositeQueue indvAuditCompQueue = new CompositeQueue();
|
||||||
|
indvAuditCompQueue.setName(getDestinationString() + ".INDV.A");
|
||||||
|
indvAuditCompQueue.setForwardOnly(true);
|
||||||
|
indvAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".INDV.B"), new ActiveMQQueue(getDestinationString() + ".INDV.C")));
|
||||||
|
|
||||||
|
CompositeQueue sharedAuditCompQueue = new CompositeQueue();
|
||||||
|
sharedAuditCompQueue.setName(getDestinationString() + ".SHR.A");
|
||||||
|
sharedAuditCompQueue.setForwardOnly(true);
|
||||||
|
sharedAuditCompQueue.setForwardTo(Arrays.asList(new ActiveMQQueue(getDestinationString() + ".SHR.B"), new ActiveMQQueue(getDestinationString() + ".SHR.C")));
|
||||||
|
|
||||||
|
VirtualDestinationInterceptor vdi = new VirtualDestinationInterceptor();
|
||||||
|
vdi.setVirtualDestinations(new VirtualDestination[] { indvAuditCompQueue, sharedAuditCompQueue });
|
||||||
|
broker.setDestinationInterceptors(new VirtualDestinationInterceptor[] {vdi});
|
||||||
return broker;
|
return broker;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,6 +133,99 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||||
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
|
assertNull("The message shouldn't be sent to another DLQ", testConsumer.receive(1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AMQ-9217
|
||||||
|
public void testPerDestinationAuditDefault() throws Exception {
|
||||||
|
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
|
||||||
|
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
session = connection.createSession(transactedMode, acknowledgeMode);
|
||||||
|
MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".INDV.A"));
|
||||||
|
messageProducerA.send(session.createTextMessage("testPerDestinationAuditEnabled"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
for(String destName : Set.of(getDestinationString() + ".INDV.B", getDestinationString() + ".INDV.C")) {
|
||||||
|
for (int i = 0; i < rollbackCount; i++) {
|
||||||
|
MessageConsumer indvConsumer = session.createConsumer(session.createQueue(destName));
|
||||||
|
Message message = indvConsumer.receive(5000);
|
||||||
|
assertNotNull("No message received: ", message);
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
LOG.info("Rolled back: " + rollbackCount + " times");
|
||||||
|
indvConsumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueViewMBean a = getProxyToQueue(getDestinationString() + ".INDV.A");
|
||||||
|
assertNotNull(a);
|
||||||
|
assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean b = getProxyToQueue(getDestinationString() + ".INDV.B");
|
||||||
|
assertNotNull(b);
|
||||||
|
assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean c = getProxyToQueue(getDestinationString() + ".INDV.C");
|
||||||
|
assertNotNull(c);
|
||||||
|
assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean bDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.B");
|
||||||
|
assertNotNull(bDlq);
|
||||||
|
assertTrue(Wait.waitFor(() -> bDlq.getEnqueueCount() == 1l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> bDlq.getQueueSize() == 1l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean cDlq = getProxyToQueue("ActiveMQ.DLQ.Queue." + getDestinationString() + ".INDV.C");
|
||||||
|
assertNotNull(cDlq);
|
||||||
|
assertTrue(Wait.waitFor(() -> cDlq.getEnqueueCount() == 1, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> cDlq.getQueueSize() == 1, 3000, 250));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSharedDestinationAuditDropsMessages() throws Exception {
|
||||||
|
ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
|
||||||
|
rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
|
||||||
|
|
||||||
|
connection.start();
|
||||||
|
session = connection.createSession(transactedMode, acknowledgeMode);
|
||||||
|
MessageProducer messageProducerA = session.createProducer(session.createQueue(getDestinationString() + ".SHR.A"));
|
||||||
|
messageProducerA.send(session.createTextMessage("testSharedDestinationAuditDropsMessages"));
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
for(String destName : Set.of(getDestinationString() + ".SHR.B", getDestinationString() + ".SHR.C")) {
|
||||||
|
for (int i = 0; i < rollbackCount; i++) {
|
||||||
|
MessageConsumer shrConsumer = session.createConsumer(session.createQueue(destName));
|
||||||
|
Message message = shrConsumer.receive(5000);
|
||||||
|
assertNotNull("No message received: ", message);
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
LOG.info("Rolled back: " + rollbackCount + " times");
|
||||||
|
shrConsumer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueViewMBean a = getProxyToQueue(getDestinationString() + ".SHR.A");
|
||||||
|
assertNotNull(a);
|
||||||
|
assertTrue(Wait.waitFor(() -> a.getEnqueueCount() == 0l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> a.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean b = getProxyToQueue(getDestinationString() + ".SHR.B");
|
||||||
|
assertNotNull(b);
|
||||||
|
assertTrue(Wait.waitFor(() -> b.getEnqueueCount() == 1l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> b.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
QueueViewMBean c = getProxyToQueue(getDestinationString() + ".SHR.C");
|
||||||
|
assertNotNull(c);
|
||||||
|
assertTrue(Wait.waitFor(() -> c.getEnqueueCount() == 1l, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> c.getQueueSize() == 0l, 3000, 250));
|
||||||
|
|
||||||
|
// Only 1 message in 1 DLQ means the a message was dropped due to shared message audit
|
||||||
|
QueueViewMBean sharedDlq = getProxyToQueue("ActiveMQ.DLQ");
|
||||||
|
assertNotNull(sharedDlq);
|
||||||
|
assertTrue(Wait.waitFor(() -> sharedDlq.getEnqueueCount() == 1, 3000, 250));
|
||||||
|
assertTrue(Wait.waitFor(() -> sharedDlq.getQueueSize() == 1, 3000, 250));
|
||||||
|
}
|
||||||
|
|
||||||
protected void browseDlq() throws Exception {
|
protected void browseDlq() throws Exception {
|
||||||
Enumeration<?> messages = dlqBrowser.getEnumeration();
|
Enumeration<?> messages = dlqBrowser.getEnumeration();
|
||||||
while (messages.hasMoreElements()) {
|
while (messages.hasMoreElements()) {
|
||||||
|
|
Loading…
Reference in New Issue