mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4483 - rework to use destination option to indicate dlq, dlq strategy is typically not in place for dlq dests, option is set when a dlq is first used via region broker sendTodlq, fix and tests
This commit is contained in:
parent
441973b483
commit
be919fbc94
|
@ -524,7 +524,7 @@ public class DestinationView implements DestinationViewMBean {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isDLQ() {
|
public boolean isDLQ() {
|
||||||
return destination.isDLQ();
|
return destination.getActiveMQDestination().isDLQ();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -843,9 +843,8 @@ public abstract class BaseDestination implements Destination {
|
||||||
return ack;
|
return ack;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
protected boolean isDLQ() {
|
||||||
public boolean isDLQ() {
|
return destination.isDLQ();
|
||||||
return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -241,7 +241,5 @@ public interface Destination extends Service, Task, Message.MessageDestination {
|
||||||
|
|
||||||
public void clearPendingMessages();
|
public void clearPendingMessages();
|
||||||
|
|
||||||
public boolean isDLQ();
|
|
||||||
|
|
||||||
void duplicateFromStore(Message message, Subscription subscription);
|
void duplicateFromStore(Message message, Subscription subscription);
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,11 +383,6 @@ public class DestinationFilter implements Destination {
|
||||||
next.clearPendingMessages();
|
next.clearPendingMessages();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDLQ() {
|
|
||||||
return next.isDLQ();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void duplicateFromStore(Message message, Subscription subscription) {
|
public void duplicateFromStore(Message message, Subscription subscription) {
|
||||||
next.duplicateFromStore(message, subscription);
|
next.duplicateFromStore(message, subscription);
|
||||||
|
|
|
@ -796,6 +796,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
|
if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
|
||||||
adminContext = BrokerSupport.getConnectionContext(this);
|
adminContext = BrokerSupport.getConnectionContext(this);
|
||||||
}
|
}
|
||||||
|
addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ();
|
||||||
BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
|
BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,8 +60,6 @@ public interface DeadLetterStrategy {
|
||||||
*/
|
*/
|
||||||
public void setProcessNonPersistent(boolean processNonPersistent);
|
public void setProcessNonPersistent(boolean processNonPersistent);
|
||||||
|
|
||||||
public boolean isDLQ(ActiveMQDestination destination);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows for a Message that was already processed by a DLQ to be rolled back in case
|
* Allows for a Message that was already processed by a DLQ to be rolled back in case
|
||||||
* of a move or a retry of that message, otherwise the Message would be considered a
|
* of a move or a retry of that message, otherwise the Message would be considered a
|
||||||
|
|
|
@ -168,18 +168,4 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDLQ(ActiveMQDestination destination) {
|
|
||||||
String name = destination.getPhysicalName();
|
|
||||||
if (destination.isQueue()) {
|
|
||||||
if ((queuePrefix != null && name.startsWith(queuePrefix)) || (queueSuffix != null && name.endsWith(queueSuffix))) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if ((topicPrefix != null && name.startsWith(topicPrefix)) || (topicSuffix != null && name.endsWith(topicSuffix))) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -48,12 +48,4 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
|
||||||
this.deadLetterQueue = deadLetterQueue;
|
this.deadLetterQueue = deadLetterQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDLQ(ActiveMQDestination destination) {
|
|
||||||
if (destination.equals(deadLetterQueue)) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,7 @@ public class BrokerDestinationView {
|
||||||
* @return true if the destination is a Dead Letter Queue
|
* @return true if the destination is a Dead Letter Queue
|
||||||
*/
|
*/
|
||||||
public boolean isDLQ() {
|
public boolean isDLQ() {
|
||||||
return destination.isDLQ();
|
return destination.getActiveMQDestination().isDLQ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.ObjectInput;
|
||||||
import java.io.ObjectOutput;
|
import java.io.ObjectOutput;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -58,6 +59,7 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
|
||||||
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
|
public static final String TOPIC_QUALIFIED_PREFIX = "topic://";
|
||||||
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
|
public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
|
||||||
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
|
public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
|
||||||
|
public static final String IS_DLQ = "isDLQ";
|
||||||
|
|
||||||
public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
|
public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
|
||||||
|
|
||||||
|
@ -398,6 +400,17 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
|
||||||
return isPattern;
|
return isPattern;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isDLQ() {
|
||||||
|
return options != null && options.containsKey(IS_DLQ);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDLQ() {
|
||||||
|
if (options == null) {
|
||||||
|
options = new HashMap<String, String>();
|
||||||
|
}
|
||||||
|
options.put(IS_DLQ, String.valueOf(true));
|
||||||
|
}
|
||||||
|
|
||||||
public static UnresolvedDestinationTransformer getUnresolvableDestinationTransformer() {
|
public static UnresolvedDestinationTransformer getUnresolvableDestinationTransformer() {
|
||||||
return unresolvableDestinationTransformer;
|
return unresolvableDestinationTransformer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,11 @@ import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
|
||||||
|
import javax.management.MalformedObjectNameException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
import org.apache.activemq.broker.BrokerRegistry;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -173,6 +177,14 @@ public abstract class TestSupport extends CombinationTestSupport {
|
||||||
regionBroker.getTopicRegion().getDestinationMap();
|
regionBroker.getTopicRegion().getDestinationMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
|
||||||
|
BrokerService brokerService = BrokerRegistry.getInstance().lookup("localhost");
|
||||||
|
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
|
||||||
|
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
|
||||||
|
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
return proxy;
|
||||||
|
}
|
||||||
|
|
||||||
public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM };
|
public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM };
|
||||||
|
|
||||||
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.broker.policy;
|
||||||
import javax.jms.Destination;
|
import javax.jms.Destination;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
|
|
||||||
|
import javax.jms.Queue;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.RedeliveryPolicy;
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
|
@ -52,6 +53,7 @@ public class DeadLetterTest extends DeadLetterTestSupport {
|
||||||
consumeAndRollback(i);
|
consumeAndRollback(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
verifyIsDlq((Queue) dlqDestination);
|
||||||
for (int i = 0; i < messageCount; i++) {
|
for (int i = 0; i < messageCount; i++) {
|
||||||
Message msg = dlqConsumer.receive(1000);
|
Message msg = dlqConsumer.receive(1000);
|
||||||
assertMessage(msg, i);
|
assertMessage(msg, i);
|
||||||
|
|
|
@ -31,6 +31,7 @@ import javax.jms.Topic;
|
||||||
|
|
||||||
import org.apache.activemq.TestSupport;
|
import org.apache.activemq.TestSupport;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
@ -118,11 +119,17 @@ public abstract class DeadLetterTestSupport extends TestSupport {
|
||||||
dlqConsumer = session.createConsumer(dlqDestination);
|
dlqConsumer = session.createConsumer(dlqDestination);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void makeDlqBrowser() throws JMSException {
|
protected void makeDlqBrowser() throws Exception {
|
||||||
dlqDestination = createDlqDestination();
|
dlqDestination = createDlqDestination();
|
||||||
|
|
||||||
LOG.info("Browsing dead letter on: " + dlqDestination);
|
LOG.info("Browsing dead letter on: " + dlqDestination);
|
||||||
dlqBrowser = session.createBrowser((Queue)dlqDestination);
|
dlqBrowser = session.createBrowser((Queue)dlqDestination);
|
||||||
|
verifyIsDlq((Queue) dlqDestination);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verifyIsDlq(Queue dlqQ) throws Exception {
|
||||||
|
final QueueViewMBean queueViewMBean = getProxyToQueue(dlqQ.getQueueName());
|
||||||
|
assertTrue("is dlq", queueViewMBean.isDLQ());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendMessages() throws JMSException {
|
protected void sendMessages() throws JMSException {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -50,7 +51,8 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
|
||||||
policy.setDeadLetterStrategy(strategy);
|
policy.setDeadLetterStrategy(strategy);
|
||||||
|
|
||||||
PolicyMap pMap = new PolicyMap();
|
PolicyMap pMap = new PolicyMap();
|
||||||
pMap.setDefaultEntry(policy);
|
pMap.put(new ActiveMQQueue(getDestinationString()), policy);
|
||||||
|
pMap.put(new ActiveMQTopic(getDestinationString()), policy);
|
||||||
|
|
||||||
broker.setDestinationPolicy(pMap);
|
broker.setDestinationPolicy(pMap);
|
||||||
|
|
||||||
|
|
|
@ -253,11 +253,4 @@ public class MemoryLimitTest extends TestSupport {
|
||||||
assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
|
assertEquals("consumer got all produced messages", producer.getMessageCount(), consumer.getReceived());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
|
|
||||||
BrokerService brokerService = BrokerRegistry.getInstance().lookup("localhost");
|
|
||||||
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
|
|
||||||
QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext()
|
|
||||||
.newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
|
||||||
return proxy;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue