This commit is contained in:
rajdavies 2013-09-06 13:45:37 +01:00
parent 0a5b14386f
commit 8d31e44e8d
3 changed files with 114 additions and 41 deletions

View File

@ -22,8 +22,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
import org.apache.activemq.broker.view.MessageBrokerView;
import org.apache.activemq.broker.view.MessageBrokerViewRegistry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.camel.Consumer;
@ -46,7 +44,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
@UriParam
private final BrokerConfiguration configuration;
private MessageBrokerView messageBrokerView;
private MessageInterceptorRegistry messageInterceptorRegistry;
@ -92,8 +89,7 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
@Override
protected void doStart() throws Exception {
super.doStart();
messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(configuration.getBrokerName());
messageInterceptorRegistry = new MessageInterceptorRegistry(messageBrokerView.getBrokerService());
messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName());
for (MessageInterceptor messageInterceptor : messageInterceptorList) {
addMessageInterceptor(messageInterceptor);
}
@ -119,11 +115,18 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
}
protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
ProducerBrokerExchange pbe = producerBrokerExchange;
if (message != null) {
if (message.getDestination() == null) {
message.setDestination(destination);
message.setDestination(destination);
if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){
if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){
//The message broker will create a new ProducerBrokerExchange with the
//correct region broker set
pbe = null;
}
}
messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
messageInterceptorRegistry.injectMessage(pbe, message);
}
}
}

View File

@ -47,16 +47,18 @@ public class BrokerComponentXMLConfigTest {
private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class);
protected static final String TOPIC_NAME = "test.broker.component.topic";
protected static final String QUEUE_NAME = "test.broker.component.queue";
protected static final String ROUTE_QUEUE_NAME = "test.broker.component.route";
protected static final String DIVERTED_QUEUE_NAME = "test.broker.component.ProcessLater";
protected static final int DIVERT_COUNT = 100;
protected BrokerService brokerService;
protected ActiveMQConnectionFactory factory;
protected Connection producerConnection;
protected Connection consumerConnection;
protected Session consumerSession;
protected Session producerSession;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Topic topic;
protected int messageCount = 5000;
protected int messageCount = 1000;
protected int timeOutInSeconds = 10;
@Before
@ -69,10 +71,9 @@ public class BrokerComponentXMLConfigTest {
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = consumerSession.createTopic(TOPIC_NAME);
producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(topic);
producer = producerSession.createProducer(topic);
}
protected BrokerService createBroker(String resource) throws Exception {
@ -110,9 +111,10 @@ public class BrokerComponentXMLConfigTest {
public void testReRouteAll() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
Topic topic = consumerSession.createTopic(TOPIC_NAME);
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
@ -124,6 +126,8 @@ public class BrokerComponentXMLConfigTest {
}
}
});
MessageProducer producer = producerSession.createProducer(topic);
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
@ -134,7 +138,50 @@ public class BrokerComponentXMLConfigTest {
}
@Test
public void testRouteWithDestinationLimit() throws Exception {
final ActiveMQQueue routeQueue = new ActiveMQQueue(ROUTE_QUEUE_NAME);
final CountDownLatch routeLatch = new CountDownLatch(DIVERT_COUNT);
MessageConsumer messageConsumer = consumerSession.createConsumer(routeQueue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
routeLatch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
}
});
final CountDownLatch divertLatch = new CountDownLatch(messageCount-DIVERT_COUNT);
MessageConsumer divertConsumer = consumerSession.createConsumer(new ActiveMQQueue(DIVERTED_QUEUE_NAME));
divertConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
divertLatch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
}
});
MessageProducer producer = producerSession.createProducer(routeQueue);
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
routeLatch.await(timeOutInSeconds, TimeUnit.SECONDS);
divertLatch.await(timeOutInSeconds,TimeUnit.SECONDS);
assertEquals(0,routeLatch.getCount());
assertEquals(0,divertLatch.getCount());
}
}

View File

@ -1,36 +1,59 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
<camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
<!-- You can use Spring XML syntax to define the routes here using the <route> element -->
<route id ="brokerComponentTest">
<route id="brokerComponentTest">
<from uri="broker:topic:test.broker.>"/>
<setHeader headerName="JMSPriority">
<constant>9</constant>
</setHeader>
<setHeader headerName="JMSPriority">
<constant>9</constant>
</setHeader>
<to uri="broker:queue:test.broker.component.queue"/>
</route>
<route id="brokerComponentDLQAboveLimitTest">
<from uri="broker:queue:test.broker.component.route"/>
<choice>
<when>
<spel>#{@destinationView.enqueueCount >= 100}</spel>
<to uri="broker:queue:test.broker.component.ProcessLater"/>
</when>
<otherwise>
<to uri="broker:queue:test.broker.component.route"/>
</otherwise>
</choice>
</route>
</camelContext>
</beans>
<bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView">
<constructor-arg value="testBroker"/>
</bean>
<bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView">
<constructor-arg value="test.broker.component.route"/>
</bean>
</beans>