AMQ-5895 - apply patch with thanks to Anders Aaberg

This commit is contained in:
gtully 2015-07-29 12:49:29 +01:00
parent 5684d093c0
commit 4a603a9936
4 changed files with 152 additions and 14 deletions

View File

@ -17,14 +17,17 @@
package org.apache.activemq.plugin;
import javax.xml.bind.JAXBElement;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.region.virtual.FilteredDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.schema.core.DtoFilteredDestination;
import org.apache.activemq.schema.core.DtoTopic;
import org.apache.activemq.schema.core.DtoQueue;
import org.apache.activemq.schema.core.DtoAuthenticationUser;
@ -49,6 +52,8 @@ public class JAXBUtils {
return new ActiveMQQueue();
} else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) {
return new AuthenticationUser();
} else if (DtoFilteredDestination.class.isAssignableFrom(elementContent.getClass())) {
return new FilteredDestination();
} else {
return new Object();
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageProducer;
@ -233,7 +235,32 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length);
}
@Test
public void testNewFilteredComposite() throws Exception {
final String brokerConfig = configurationSeed + "-new-filtered-composite-vd-broker";
applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG);
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd", SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes");
}
@Test
public void testModFilteredComposite() throws Exception {
final String brokerConfig = configurationSeed + "-mod-filtered-composite-vd-broker";
applyNewConfig(brokerConfig, configurationSeed + "-add-filtered-composite-vd");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes");
applyNewConfig(brokerConfig, configurationSeed + "-mod-filtered-composite-vd", SLEEP);
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no");
}
private void forceAddDestination(String dest) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
@ -255,13 +282,7 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createTopic(topic));
final String body = "To vt:" + topic;
producer.send(session.createTextMessage(body));
LOG.info("sent to: " + producer.getDestination());
Message message = null;
for (int i=0; i<10 && message == null; i++) {
message = consumer.receive(1000);
}
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
@ -276,16 +297,58 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
final String body = "To cq:" + dest;
producer.send(session.createTextMessage(body));
LOG.info("sent to: " + producer.getDestination());
Message message = null;
for (int i=0; i<10 && message == null; i++) {
message = consumer.receive(1000);
}
Message message = sendAndReceiveMessage(session, consumer, producer, body);
assertNotNull("got message", message);
assertEquals("got expected message", body, ((TextMessage) message).getText());
connection.close();
}
private void exerciseFilteredCompositeQueue(String dest, String consumerDestination, String acceptedHeaderValue) throws Exception {
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerDestination));
LOG.info("new consumer for: " + consumer.getDestination());
MessageProducer producer = session.createProducer(session.createQueue(dest));
// positive test
String body = "To filtered cq:" + dest;
Message message = sendAndReceiveMessage(session, consumer, producer, body, Collections.singletonMap("odd", acceptedHeaderValue));
assertNotNull("The message did not reach the destination even though it should pass through the filter.", message);
assertEquals("Did not get expected message", body, ((TextMessage) message).getText());
// negative test
message = sendAndReceiveMessage(session, consumer, producer, "Not to filtered cq:" + dest, Collections.singletonMap("odd", "somethingElse"));
assertNull("The message reached the destination, but it should have been removed by the filter.", message);
connection.close();
}
private Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody) throws Exception {
return sendAndReceiveMessage(session, consumer, producer, messageBody, null);
}
private Message sendAndReceiveMessage(Session session,
ActiveMQMessageConsumer consumer, MessageProducer producer,
final String messageBody, Map<String, String> propertiesMap)
throws Exception {
TextMessage messageToSend = session.createTextMessage(messageBody);
if (propertiesMap != null) {
for (String headerKey : propertiesMap.keySet()) {
messageToSend.setStringProperty(headerKey, propertiesMap.get(headerKey));
}
}
producer.send(messageToSend);
LOG.info("sent to: " + producer.getDestination());
Message message = null;
for (int i = 0; i < 10 && message == null; i++) {
message = consumer.receive(1000);
}
return message;
}
}

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
<destinationInterceptors><virtualDestinationInterceptor><virtualDestinations>
<compositeQueue name="VirtualDestination.FilteredCompositeQueue">
<forwardTo>
<filteredDestination selector="odd = 'yes'" queue="VirtualDestination.QueueConsumer"/>
</forwardTo>
</compositeQueue></virtualDestinations></virtualDestinationInterceptor></destinationInterceptors>
</broker>
</beans>

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000"/>
</plugins>
<destinationInterceptors><virtualDestinationInterceptor><virtualDestinations>
<compositeQueue name="VirtualDestination.FilteredCompositeQueue">
<forwardTo>
<filteredDestination selector="odd = 'no'" queue="VirtualDestination.QueueConsumer"/>
</forwardTo>
</compositeQueue></virtualDestinations></virtualDestinationInterceptor></destinationInterceptors>
</broker>
</beans>