mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4843 - implement and test for composite virtual destinations - forwardTo
This commit is contained in:
parent
4ddbb8546d
commit
4109b79059
|
@ -576,10 +576,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
|
||||||
// deal with nested elements
|
// deal with nested elements
|
||||||
for (Object nested : filter(dto, Object.class)) {
|
for (Object nested : filter(dto, Object.class)) {
|
||||||
String elementName = nested.getClass().getSimpleName();
|
String elementName = nested.getClass().getSimpleName();
|
||||||
if (elementName.endsWith("s")) {
|
|
||||||
Method setter = findSetter(instance, elementName);
|
Method setter = findSetter(instance, elementName);
|
||||||
if (setter != null) {
|
if (setter != null) {
|
||||||
|
|
||||||
List<Object> argument = new LinkedList<Object>();
|
List<Object> argument = new LinkedList<Object>();
|
||||||
for (Object elementContent : filter(nested, Object.class)) {
|
for (Object elementContent : filter(nested, Object.class)) {
|
||||||
argument.add(fromDto(elementContent, inferTargetObject(elementContent)));
|
argument.add(fromDto(elementContent, inferTargetObject(elementContent)));
|
||||||
|
@ -592,9 +590,6 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
|
||||||
} else {
|
} else {
|
||||||
info("failed to find setter for " + elementName + " on :" + instance);
|
info("failed to find setter for " + elementName + " on :" + instance);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
info("unsupported mapping of element for non plural:" + elementName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,6 +96,22 @@
|
||||||
<jxb:property name="Contents" />
|
<jxb:property name="Contents" />
|
||||||
</jxb:bindings>
|
</jxb:bindings>
|
||||||
|
|
||||||
|
<jxb:bindings node="xs:element[@name='compositeQueue']/xs:complexType/xs:choice">
|
||||||
|
<jxb:property name="Contents" />
|
||||||
|
</jxb:bindings>
|
||||||
|
|
||||||
|
<jxb:bindings node="xs:element[@name='compositeQueue']/xs:complexType/xs:choice/xs:choice/xs:element[@name='forwardTo']/xs:complexType/xs:sequence">
|
||||||
|
<jxb:property name="Contents" />
|
||||||
|
</jxb:bindings>
|
||||||
|
|
||||||
|
<jxb:bindings node="xs:element[@name='compositeTopic']/xs:complexType/xs:choice">
|
||||||
|
<jxb:property name="Contents" />
|
||||||
|
</jxb:bindings>
|
||||||
|
|
||||||
|
<jxb:bindings node="xs:element[@name='compositeTopic']/xs:complexType/xs:choice/xs:choice/xs:element[@name='forwardTo']/xs:complexType/xs:sequence">
|
||||||
|
<jxb:property name="Contents" />
|
||||||
|
</jxb:bindings>
|
||||||
|
|
||||||
<jxb:bindings node="xs:element[@name='authorizationPlugin']/xs:complexType/xs:choice">
|
<jxb:bindings node="xs:element[@name='authorizationPlugin']/xs:complexType/xs:choice">
|
||||||
<jxb:property name="Contents" />
|
<jxb:property name="Contents" />
|
||||||
</jxb:bindings>
|
</jxb:bindings>
|
||||||
|
|
|
@ -68,6 +68,17 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
|
||||||
assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]);
|
assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewComposite() throws Exception {
|
||||||
|
final String brokerConfig = configurationSeed + "-new-composite-vd-broker";
|
||||||
|
applyNewConfig(brokerConfig, RuntimeConfigTestSupport.EMPTY_UPDATABLE_CONFIG);
|
||||||
|
startBroker(brokerConfig);
|
||||||
|
assertTrue("broker alive", brokerService.isStarted());
|
||||||
|
|
||||||
|
applyNewConfig(brokerConfig, configurationSeed + "-add-composite-vd", SLEEP);
|
||||||
|
|
||||||
|
exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
|
public void testNewNoDefaultVirtualTopicSupport() throws Exception {
|
||||||
|
@ -218,11 +229,15 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void exerciseVirtualTopic(String topic) throws Exception {
|
private void exerciseVirtualTopic(String topic) throws Exception {
|
||||||
|
exerciseVirtualTopic("Consumer.A.", topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void exerciseVirtualTopic(String prefix, String topic) throws Exception {
|
||||||
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
|
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue("Consumer.A." + topic));
|
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(prefix + topic));
|
||||||
LOG.info("new consumer for: " + consumer.getDestination());
|
LOG.info("new consumer for: " + consumer.getDestination());
|
||||||
MessageProducer producer = session.createProducer(session.createTopic(topic));
|
MessageProducer producer = session.createProducer(session.createTopic(topic));
|
||||||
final String body = "To vt:" + topic;
|
final String body = "To vt:" + topic;
|
||||||
|
@ -238,4 +253,25 @@ public class VirtualDestTest extends RuntimeConfigTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void exerciseCompositeQueue(String dest, String consumerQ) throws Exception {
|
||||||
|
ActiveMQConnection connection = new ActiveMQConnectionFactory("vm://localhost").createActiveMQConnection();
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
|
||||||
|
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(session.createQueue(consumerQ));
|
||||||
|
LOG.info("new consumer for: " + consumer.getDestination());
|
||||||
|
MessageProducer producer = session.createProducer(session.createQueue(dest));
|
||||||
|
final String body = "To cq:" + dest;
|
||||||
|
producer.send(session.createTextMessage(body));
|
||||||
|
LOG.info("sent to: " + producer.getDestination());
|
||||||
|
|
||||||
|
Message message = null;
|
||||||
|
for (int i=0; i<10 && message == null; i++) {
|
||||||
|
message = consumer.receive(1000);
|
||||||
|
}
|
||||||
|
assertNotNull("got message", message);
|
||||||
|
assertEquals("got expected message", body, ((TextMessage) message).getText());
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<beans
|
||||||
|
xmlns="http://www.springframework.org/schema/beans"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||||
|
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
||||||
|
|
||||||
|
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false" >
|
||||||
|
<plugins>
|
||||||
|
<runtimeConfigurationPlugin checkPeriod="1000"/>
|
||||||
|
</plugins>
|
||||||
|
<destinationInterceptors><virtualDestinationInterceptor><virtualDestinations>
|
||||||
|
<compositeQueue name="VirtualDestination.CompositeQueue">
|
||||||
|
<forwardTo>
|
||||||
|
<queue physicalName="VirtualDestination.QueueConsumer"/>
|
||||||
|
<topic physicalName="VirtualDestination.TopicConsumer"/>
|
||||||
|
</forwardTo>
|
||||||
|
</compositeQueue></virtualDestinations></virtualDestinationInterceptor></destinationInterceptors>
|
||||||
|
</broker>
|
||||||
|
</beans>
|
Loading…
Reference in New Issue