[AMQ-8028] Add sendWhenNotMatched option to composite destination filter

This commit is contained in:
murali.mogalayapalli 2020-10-03 18:00:28 -04:00 committed by jbonofre
parent f5dcd8e398
commit 55914d3ac1
3 changed files with 540 additions and 4 deletions

View File

@ -30,10 +30,11 @@ public abstract class CompositeDestination implements VirtualDestination {
private Collection forwardTo;
private boolean forwardOnly = true;
private boolean concurrentSend = false;
private boolean sendWhenNotMatched = false;
@Override
public Destination intercept(Destination destination) {
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
}
@Override
@ -192,4 +193,12 @@ public abstract class CompositeDestination implements VirtualDestination {
return true;
}
public boolean isSendWhenNotMatched() {
return sendWhenNotMatched;
}
public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
this.sendWhenNotMatched = sendWhenNotMatched;
}
}

View File

@ -41,12 +41,14 @@ public class CompositeDestinationFilter extends DestinationFilter {
private Collection forwardDestinations;
private boolean forwardOnly;
private boolean concurrentSend = false;
private boolean sendWhenNotMatched=false;
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
super(next);
this.forwardDestinations = forwardDestinations;
this.forwardOnly = forwardOnly;
this.concurrentSend = concurrentSend;
this.sendWhenNotMatched = sendWhenNotMatched;
}
@Override
@ -100,9 +102,17 @@ public class CompositeDestinationFilter extends DestinationFilter {
doForward(context, message, brokerService.getRegionBroker(), destination);
}
}
if(sendWhenNotMatched)
{
if(matchingDestinations.size() <=0) {
super.send(context, message);
}
}else {
if (!forwardOnly) {
super.send(context, message);
}
}
concurrent.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();

View File

@ -0,0 +1,517 @@
package org.apache.activemq.broker.virtual;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.FilteredDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.spring.ConsumerBean;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
protected int total = 10;
protected Connection connection;
@Test
public void testSendWhenNotMatched() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.B");
Destination destination1 =new ActiveMQQueue("A.B");
Destination destination2 = new ActiveMQQueue("A.C");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
@Test
public void testSendWhenMatched() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.B");
Destination destination1 =new ActiveMQQueue("A.B");
Destination destination2 = new ActiveMQQueue("A.C");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.D");
Destination destination1 =new ActiveMQQueue("A.D");
Destination destination2 = new ActiveMQQueue("A.E");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tes13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.D");
Destination destination1 =new ActiveMQQueue("A.D");
Destination destination2 = new ActiveMQQueue("A.E");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
Thread.sleep(1*1000);
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.X");
Destination destination1 =new ActiveMQQueue("A.X");
Destination destination2 = new ActiveMQQueue("A.Y");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(1);
}
@Test
public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.X");
Destination destination1 =new ActiveMQQueue("A.X");
Destination destination2 = new ActiveMQQueue("A.Y");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.W");
Destination destination1 =new ActiveMQQueue("A.W");
Destination destination2 = new ActiveMQQueue("A.V");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.W");
Destination destination1 =new ActiveMQQueue("A.W");
Destination destination2 = new ActiveMQQueue("A.V");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
Thread.sleep(2*1000);
messageList1.assertMessagesArrived(0);
messageList2.assertMessagesArrived(0);
}
@Test
public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("X.Y");
Destination destination1 = new ActiveMQQueue("X.Y");
Destination destination2 = new ActiveMQQueue("X.Z");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("R.S");
Destination destination1 = new ActiveMQQueue("R.S");
Destination destination2 = new ActiveMQQueue("R.T");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(1);
}
protected Destination getConsumer1Dsetination() {
return new ActiveMQQueue("A.B");
}
protected Destination getConsumer2Dsetination() {
return new ActiveMQQueue("A.C");
}
protected Destination getProducerDestination() {
return new ActiveMQQueue("A.B");
}
protected TextMessage createMessage(Session session, String testid) throws JMSException {
TextMessage textMessage = session.createTextMessage("testMessage");
textMessage.setStringProperty("testid", testid);
return textMessage;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.getManagementContext().setCreateConnector(false);
answer.addConnector(bindAddress);
/*
* <destinationInterceptors> <virtualDestinationInterceptor>
* <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
* <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
* </forwardTo> </compositeQueue> </virtualDestinations>
* </virtualDestinationInterceptor> </destinationInterceptors>
*/
/*
* SendWhenNotMatched = true A message will be always forwarded to if not matched to filtered destination
* ForwardOnly setting has no impact
*/
CompositeQueue compositeQueue = new CompositeQueue();
compositeQueue.setName("A.B");
compositeQueue.setForwardOnly(true); // By default it is true
compositeQueue.setSendWhenNotMatched(true);// By default it is false
FilteredDestination filteredQueue = new FilteredDestination();
filteredQueue.setQueue("A.C");
filteredQueue.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
forwardDestinations.add(filteredQueue);
compositeQueue.setForwardTo(forwardDestinations);
CompositeQueue compositeQueue0 = new CompositeQueue();
compositeQueue0.setName("A.D");
compositeQueue0.setForwardOnly(false); // By default it is true
compositeQueue0.setSendWhenNotMatched(true);// By default it is false
FilteredDestination filteredQueue0 = new FilteredDestination();
filteredQueue0.setQueue("A.E");
filteredQueue0.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
forwardDestinations0.add(filteredQueue0);
compositeQueue0.setForwardTo(forwardDestinations0);
//Back compatibility test 1
CompositeQueue compositeQueue01 = new CompositeQueue();
compositeQueue01.setName("A.X");
compositeQueue01.setForwardOnly(false); // By default it is true
//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
FilteredDestination filteredQueue01 = new FilteredDestination();
filteredQueue01.setQueue("A.Y");
filteredQueue01.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
forwardDestinations01.add(filteredQueue01);
compositeQueue01.setForwardTo(forwardDestinations01);
//Back compatibility test 2
CompositeQueue compositeQueue02 = new CompositeQueue();
compositeQueue02.setName("A.W");
//compositeQueue02.setForwardOnly(true); // By default it is true
//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
FilteredDestination filteredQueue02 = new FilteredDestination();
filteredQueue02.setQueue("A.V");
filteredQueue02.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
forwardDestinations02.add(filteredQueue02);
compositeQueue02.setForwardTo(forwardDestinations02);
CompositeQueue compositeQueue1 = new CompositeQueue();
compositeQueue1.setName("X.Y");
compositeQueue1.setForwardOnly(true); // By default it is true
ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");
final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();
forwardDestinations1.add(forwardQueue1);
compositeQueue1.setForwardTo(forwardDestinations1);
CompositeQueue compositeQueue2 = new CompositeQueue();
compositeQueue2.setName("R.S");
compositeQueue2.setForwardOnly(false);
ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");
final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();
forwardDestinations2.add(forwardQueue2);
compositeQueue2.setForwardTo(forwardDestinations2);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
return answer;
}
}