diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java index 2d0d54dcae..9139b79c40 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java @@ -17,14 +17,17 @@ package org.apache.activemq.plugin; import javax.xml.bind.JAXBElement; + import java.lang.reflect.Method; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; +import org.apache.activemq.broker.region.virtual.FilteredDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.schema.core.DtoFilteredDestination; import org.apache.activemq.schema.core.DtoTopic; import org.apache.activemq.schema.core.DtoQueue; import org.apache.activemq.schema.core.DtoAuthenticationUser; @@ -49,6 +52,8 @@ public class JAXBUtils { return new ActiveMQQueue(); } else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) { return new AuthenticationUser(); + } else if (DtoFilteredDestination.class.isAssignableFrom(elementContent.getClass())) { + return new FilteredDestination(); } else { return new Object(); } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java index 94a10463a3..0113e81805 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/VirtualDestTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.jms.Message; import javax.jms.MessageProducer; @@ -233,7 +235,32 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length); } + + @Test + public void testNewFilteredComposite() throws Exception { + final String brokerConfig = configurationSeed + "-new-filtered-composite-vd-broker"; + applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd", SLEEP); + + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes"); + } + + @Test + public void testModFilteredComposite() throws Exception { + final String brokerConfig = configurationSeed + "-mod-filtered-composite-vd-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes"); + + applyNewConfig(brokerConfig, configurationSeed + "-mod-filtered-composite-vd", SLEEP); + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); + } + private void forceAddDestination(String dest) throws Exception { ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); connection.start(); @@ -255,13 +282,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { LOG.info("new consumer for: " + consumer.getDestination()); MessageProducer producer = session.createProducer(session.createTopic(topic)); final String body = "To vt:" + topic; - producer.send(session.createTextMessage(body)); - LOG.info("sent to: " + producer.getDestination()); - - Message message = null; - for (int i=0; i<10 && message == null; i++) { - message = consumer.receive(1000); - } + Message message = sendAndReceiveMessage(session, consumer, producer, body); assertNotNull("got message", message); assertEquals("got expected message", body, ((TextMessage) message).getText()); connection.close(); @@ -276,16 +297,58 @@ public class VirtualDestTest extends RuntimeConfigTestSupport { LOG.info("new consumer for: " + consumer.getDestination()); MessageProducer producer = session.createProducer(session.createQueue(dest)); final String body = "To cq:" + dest; - producer.send(session.createTextMessage(body)); - LOG.info("sent to: " + producer.getDestination()); - - Message message = null; - for (int i=0; i<10 && message == null; i++) { - message = consumer.receive(1000); - } + Message message = sendAndReceiveMessage(session, consumer, producer, body); assertNotNull("got message", message); assertEquals("got expected message", body, ((TextMessage) message).getText()); connection.close(); } + + private void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception { + ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination)); + LOG.info("new consumer for: " + consumer.getDestination()); + MessageProducer producer = session.createProducer(session.createQueue(dest)); + // positive test + String body = "To filtered cq:" + dest; + + Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue)); + assertNotNull("The message did not reach the destination even though it should pass through the filter.", message); + assertEquals("Did not get expected message", body, ((TextMessage) message).getText()); + + // negative test + message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse")); + assertNull("The message reached the destination, but it should have been removed by the filter.", message); + + connection.close(); + } + + private Message sendAndReceiveMessage(Session session, + ActiveMQMessageConsumer consumer, MessageProducer producer, + final String messageBody) throws Exception { + return sendAndReceiveMessage(session, consumer, producer, messageBody, null); + } + + private Message sendAndReceiveMessage(Session session, + ActiveMQMessageConsumer consumer, MessageProducer producer, + final String messageBody, Map propertiesMap) + throws Exception { + TextMessage messageToSend = session.createTextMessage(messageBody); + if (propertiesMap != null) { + for (String headerKey : propertiesMap.keySet()) { + messageToSend.setStringProperty(headerKey, propertiesMap.get(headerKey)); + } + } + producer.send(messageToSend); + LOG.info("sent to: " + producer.getDestination()); + + Message message = null; + for (int i = 0; i < 10 && message == null; i++) { + message = consumer.receive(1000); + } + return message; + } } diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-add-filtered-composite-vd.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-add-filtered-composite-vd.xml new file mode 100644 index 0000000000..ed56245b83 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-add-filtered-composite-vd.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-mod-filtered-composite-vd.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-mod-filtered-composite-vd.xml new file mode 100644 index 0000000000..8d9a2a5e12 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/virtualDestTest-mod-filtered-composite-vd.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + \ No newline at end of file