[AMQ-6691] allow dlq flag to be set via jmx to allow retry op after a restart - use destinations element for long term persistence

This commit is contained in:
gtully 2017-05-31 12:39:48 +01:00
parent 8023b9ee44
commit d2c0eddaad
5 changed files with 26 additions and 3 deletions

View File

@ -535,6 +535,11 @@ public class DestinationView implements DestinationViewMBean {
return destination.getActiveMQDestination().isDLQ();
}
@Override
public void setDLQ(boolean val) {
destination.getActiveMQDestination().setDLQ(val);
}
@Override
public long getBlockedSends() {
return destination.getDestinationStatistics().getBlockedSends().getCount();

View File

@ -409,6 +409,12 @@ public interface DestinationViewMBean {
@MBeanInfo("Dead Letter Queue")
boolean isDLQ();
/**
* @param value
* enable/disable the DLQ flag
*/
void setDLQ(boolean value);
@MBeanInfo("Number of messages blocked for flow control")
long getBlockedSends();

View File

@ -786,7 +786,7 @@ public class RegionBroker extends EmptyBroker {
if (context.getSecurityContext() == null || !context.getSecurityContext().isBrokerContext()) {
adminContext = BrokerSupport.getConnectionContext(this);
}
addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ();
addDestination(adminContext, deadLetterDestination, false).getActiveMQDestination().setDLQ(true);
BrokerSupport.resendNoCopy(adminContext, message, deadLetterDestination);
return true;
}

View File

@ -425,11 +425,11 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da
return options != null && options.containsKey(IS_DLQ);
}
public void setDLQ() {
public void setDLQ(boolean val) {
if (options == null) {
options = new HashMap<String, String>();
}
options.put(IS_DLQ, String.valueOf(true));
options.put(IS_DLQ, String.valueOf(val));
}
public static UnresolvedDestinationTransformer getUnresolvableDestinationTransformer() {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -107,6 +108,17 @@ public class AMQ6059Test {
verifyMessageIsRecovered(dlqQueue);
}
@Test
public void testSetDlqFlag() throws Exception {
final ActiveMQQueue toFlp = new ActiveMQQueue("QNameToFlip");
sendMessage(toFlp);
final QueueViewMBean queueViewMBean = getProxyToQueue(toFlp.getQueueName());
assertFalse(queueViewMBean.isDLQ());
queueViewMBean.setDLQ(true);
assertTrue(queueViewMBean.isDLQ());
}
protected BrokerService createBroker() throws Exception {
return createBrokerWithDLQ(true);
}