From b92a315599d53b07e2ed4eff1c6f0345f048c0a8 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Tue, 27 Aug 2013 18:18:56 +0000 Subject: [PATCH] Fix for https://issues.apache.org/jira/browse/AMQ-4690 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1517905 13f79535-47bb-0310-9956-ffa450edef68 --- .../component/broker/BrokerComponent.java | 91 ++++++++++ .../component/broker/BrokerConfiguration.java | 36 ++++ .../component/broker/BrokerConsumer.java | 64 +++++++ .../component/broker/BrokerEndpoint.java | 129 ++++++++++++++ .../component/broker/BrokerJmsMessage.java | 53 ++++++ .../component/broker/BrokerProducer.java | 159 ++++++++++++++++++ .../org/apache/camel/component/broker | 18 ++ .../broker/BrokerComponentXMLConfigTest.java | 140 +++++++++++++++ 8 files changed, 690 insertions(+) create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java create mode 100644 activemq-camel/src/main/resources/META-INF/services/org/apache/camel/component/broker create mode 100644 activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java new file mode 100644 index 0000000000..3fce9ae882 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerComponent.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.broker.view.MessageBrokerView; +import org.apache.activemq.broker.view.MessageBrokerViewRegistry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.camel.ComponentConfiguration; +import org.apache.camel.Endpoint; +import org.apache.camel.component.jms.JmsConfiguration; +import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.spi.EndpointCompleter; +import static org.apache.camel.util.ObjectHelper.removeStartingCharacters; + +public class BrokerComponent extends UriEndpointComponent implements EndpointCompleter { + + public BrokerComponent() { + super(BrokerEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + BrokerConfiguration brokerConfiguration = new BrokerConfiguration(); + setProperties(brokerConfiguration, parameters); + + byte destinationType = ActiveMQDestination.QUEUE_TYPE; + + if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) { + remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/'); + } else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) { + destinationType = ActiveMQDestination.TOPIC_TYPE; + remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/'); + } else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) { + destinationType = ActiveMQDestination.TEMP_QUEUE_TYPE; + remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/'); + } else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) { + destinationType = ActiveMQDestination.TEMP_TOPIC_TYPE; + remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/'); + } + + + ActiveMQDestination destination = ActiveMQDestination.createDestination(remaining, destinationType); + BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, destination, brokerConfiguration); + return brokerEndpoint; + } + + + @Override + public List completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) { + String brokerName = String.valueOf(componentConfiguration.getParameter("brokerName")); + MessageBrokerView messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(brokerName); + if (messageBrokerView != null) { + String destinationName = completionText; + Set set = messageBrokerView.getQueues(); + if (completionText.startsWith("topic:")) { + set = messageBrokerView.getTopics(); + destinationName = completionText.substring(6); + } else if (completionText.startsWith("queue:")) { + destinationName = completionText.substring(6); + } + ArrayList answer = new ArrayList(); + for (ActiveMQDestination destination : set) { + if (destination.getPhysicalName().startsWith(destinationName)) { + answer.add(destination.getPhysicalName()); + } + } + return answer; + + } + return null; + } +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java new file mode 100644 index 0000000000..583720edef --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConfiguration.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import org.apache.camel.spi.UriParam; + +public class BrokerConfiguration { + + @UriParam + private String brokerName = ""; + + + public String getBrokerName() { + return brokerName; + } + + public void setBrokerName(String brokerName) { + this.brokerName = brokerName; + } + + +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java new file mode 100644 index 0000000000..39b25e2406 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerConsumer.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.inteceptor.MessageInterceptor; +import org.apache.activemq.command.Message; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.component.jms.JmsBinding; +import org.apache.camel.impl.DefaultConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor { + protected final transient Logger logger = LoggerFactory.getLogger(BrokerConsumer.class); + private final JmsBinding jmsBinding = new JmsBinding(); + + public BrokerConsumer(Endpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + ((BrokerEndpoint) getEndpoint()).addMessageInterceptor(this); + } + + @Override + protected void doStop() throws Exception { + ((BrokerEndpoint) getEndpoint()).removeMessageInterceptor(this); + super.doStop(); + } + + @Override + public void intercept(ProducerBrokerExchange producerExchange, Message message) { + Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly); + + exchange.setIn(new BrokerJmsMessage((javax.jms.Message) message, jmsBinding)); + exchange.setProperty(Exchange.BINDING, jmsBinding); + exchange.setProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE, producerExchange); + try { + getProcessor().process(exchange); + } catch (Exception e) { + logger.error("Failed to process " + exchange, e); + } + } +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java new file mode 100644 index 0000000000..e0d896e1c4 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.inteceptor.MessageInterceptor; +import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry; +import org.apache.activemq.broker.view.MessageBrokerView; +import org.apache.activemq.broker.view.MessageBrokerViewRegistry; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.Message; +import org.apache.camel.Consumer; +import org.apache.camel.MultipleConsumersSupport; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.Service; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.camel.util.UnsafeUriCharactersEncoder; + +@ManagedResource(description = "Managed Camel Broker Endpoint") +@UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class) + +public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service { + static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange"; + + @UriParam + private final BrokerConfiguration configuration; + private MessageBrokerView messageBrokerView; + private MessageInterceptorRegistry messageInterceptorRegistry; + + + @UriPath + private final ActiveMQDestination destination; + private List messageInterceptorList = new CopyOnWriteArrayList(); + + public BrokerEndpoint(String uri, BrokerComponent component, ActiveMQDestination destination, BrokerConfiguration configuration) { + super(UnsafeUriCharactersEncoder.encode(uri), component); + this.destination = destination; + this.configuration = configuration; + } + + @Override + public Producer createProducer() throws Exception { + BrokerProducer producer = new BrokerProducer(this); + return producer; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + BrokerConsumer consumer = new BrokerConsumer(this, processor); + configureConsumer(consumer); + return consumer; + } + + + @Override + public boolean isSingleton() { + return false; + } + + @Override + public boolean isMultipleConsumersSupported() { + return true; + } + + public ActiveMQDestination getDestination() { + return destination; + } + + + @Override + protected void doStart() throws Exception { + super.doStart(); + messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(configuration.getBrokerName()); + messageInterceptorRegistry = new MessageInterceptorRegistry(messageBrokerView.getBrokerService()); + for (MessageInterceptor messageInterceptor : messageInterceptorList) { + addMessageInterceptor(messageInterceptor); + } + messageInterceptorList.clear(); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + } + + protected void addMessageInterceptor(MessageInterceptor messageInterceptor) { + if (isStarted()) { + messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor); + } else { + messageInterceptorList.add(messageInterceptor); + } + } + + protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) { + messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor); + + } + + protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception { + if (message != null) { + if (message.getDestination() == null) { + message.setDestination(destination); + } + messageInterceptorRegistry.injectMessage(producerBrokerExchange, message); + } + } +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java new file mode 100644 index 0000000000..02dcabefde --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerJmsMessage.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import javax.jms.Message; +import org.apache.camel.component.jms.JmsBinding; +import org.apache.camel.component.jms.JmsMessage; +import org.apache.camel.util.ObjectHelper; + +public class BrokerJmsMessage extends JmsMessage { + public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) { + super(jmsMessage, binding); + } + + @Override + public String toString() { + if (getJmsMessage() != null) { + try { + return "BrokerJmsMessage[JMSMessageID: " + getJmsMessage().getJMSMessageID(); + } catch (Exception e) { + } + } + return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this); + } + + @Override + public void copyFrom(org.apache.camel.Message that) { + super.copyFrom(that); + if (that instanceof JmsMessage && getJmsMessage() == null) { + setJmsMessage(((JmsMessage) that).getJmsMessage()); + } + } + + + @Override + public BrokerJmsMessage newInstance() { + return new BrokerJmsMessage(null, getBinding()); + } +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java new file mode 100644 index 0000000000..c12fbee6fb --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import java.util.Map; +import java.util.concurrent.RejectedExecutionException; + +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.camel.converter.ActiveMQMessageConverter; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.jms.JmsMessage; +import org.apache.camel.converter.ObjectConverter; +import org.apache.camel.impl.DefaultAsyncProducer; + +public class BrokerProducer extends DefaultAsyncProducer { + private final ActiveMQMessageConverter activeMQConverter = new ActiveMQMessageConverter(); + private final BrokerEndpoint brokerEndpoint; + + public BrokerProducer(BrokerEndpoint endpoint) { + super(endpoint); + brokerEndpoint = endpoint; + } + + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + // deny processing if we are not started + if (!isRunAllowed()) { + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + // we cannot process so invoke callback + callback.done(true); + return true; + } + + try { + //In the middle of the broker - InOut doesn't make any sense + //so we do in only + return processInOnly(exchange, callback); + + } catch (Throwable e) { + // must catch exception to ensure callback is invoked as expected + // to let Camel error handling deal with this + exchange.setException(e); + callback.done(true); + return true; + } + } + + protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) { + try { + ActiveMQMessage message = getMessage(exchange); + if (message != null) { + message.setDestination(brokerEndpoint.getDestination()); + //if the ProducerBrokerExchange is null the broker will create it + ProducerBrokerExchange producerBrokerExchange = (ProducerBrokerExchange) exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE); + + brokerEndpoint.inject(producerBrokerExchange, message); + + + } + } catch (Exception e) { + exchange.setException(e); + } + callback.done(true); + return true; + } + + private ActiveMQMessage getMessage(Exchange exchange) throws Exception { + ActiveMQMessage result = null; + Message camelMesssage = null; + if (exchange.hasOut()) { + camelMesssage = exchange.getOut(); + } else { + camelMesssage = exchange.getIn(); + } + + Map headers = camelMesssage.getHeaders(); + + /** + * We purposely don't want to support injecting messages half-way through + * broker processing - use the activemq camel component for that - but + * we will support changing message headers and destinations + */ + if (camelMesssage instanceof JmsMessage) { + JmsMessage jmsMessage = (JmsMessage) camelMesssage; + if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) { + result = (ActiveMQMessage) jmsMessage.getJmsMessage(); + //lets apply any new message headers + setJmsHeaders(result, headers); + } else { + + throw new IllegalStateException("not the original message from the broker " + jmsMessage.getJmsMessage()); + } + } else { + throw new IllegalStateException("not the original message from the broker " + camelMesssage); + } + return result; + } + + private void setJmsHeaders(ActiveMQMessage message, Map headers) { + message.setReadOnlyProperties(false); + for (Map.Entry entry : headers.entrySet()) { + if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) { + Object value = entry.getValue(); + if (value instanceof Number) { + Number number = (Number) value; + message.setJMSDeliveryMode(number.intValue()); + } + } + if (entry.getKey().equalsIgnoreCase("JmsPriority")) { + Integer value = ObjectConverter.toInteger(entry.getValue()); + if (value != null) { + message.setJMSPriority(value.intValue()); + } + } + if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) { + Long value = ObjectConverter.toLong(entry.getValue()); + if (value != null) { + message.setJMSTimestamp(value.longValue()); + } + } + if (entry.getKey().equalsIgnoreCase("JMSExpiration")) { + Long value = ObjectConverter.toLong(entry.getValue()); + if (value != null) { + message.setJMSExpiration(value.longValue()); + } + } + if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) { + message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue())); + } + if (entry.getKey().equalsIgnoreCase("JMSType")) { + Object value = entry.getValue(); + if (value != null) { + message.setJMSType(value.toString()); + } + } + } + + } +} diff --git a/activemq-camel/src/main/resources/META-INF/services/org/apache/camel/component/broker b/activemq-camel/src/main/resources/META-INF/services/org/apache/camel/component/broker new file mode 100644 index 0000000000..186f4ab63b --- /dev/null +++ b/activemq-camel/src/main/resources/META-INF/services/org/apache/camel/component/broker @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.activemq.camel.component.broker.BrokerComponent diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java new file mode 100644 index 0000000000..0696170ec6 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component.broker; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class BrokerComponentXMLConfigTest { + + protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/"; + private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class); + protected static final String TOPIC_NAME = "test.broker.component.topic"; + protected static final String QUEUE_NAME = "test.broker.component.queue"; + protected BrokerService brokerService; + protected ActiveMQConnectionFactory factory; + protected Connection producerConnection; + protected Connection consumerConnection; + protected Session consumerSession; + protected Session producerSession; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Topic topic; + protected int messageCount = 5000; + protected int timeOutInSeconds = 10; + + @Before + public void setUp() throws Exception { + brokerService = createBroker(new FileSystemResource(CONF_ROOT + "broker-camel.xml")); + + factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + consumerConnection = factory.createConnection(); + consumerConnection.start(); + producerConnection = factory.createConnection(); + producerConnection.start(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = consumerSession.createTopic(TOPIC_NAME); + producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); + consumer = consumerSession.createConsumer(topic); + producer = producerSession.createProducer(topic); + } + + protected BrokerService createBroker(String resource) throws Exception { + return createBroker(new ClassPathResource(resource)); + } + + protected BrokerService createBroker(Resource resource) throws Exception { + BrokerFactoryBean factory = new BrokerFactoryBean(resource); + factory.afterPropertiesSet(); + + BrokerService broker = factory.getBroker(); + + assertTrue("Should have a broker!", broker != null); + + // Broker is already started by default when using the XML file + // broker.start(); + + return broker; + } + + @After + public void tearDown() throws Exception { + if (producerConnection != null){ + producerConnection.close(); + } + if (consumerConnection != null){ + consumerConnection.close(); + } + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testReRouteAll() throws Exception { + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + + final CountDownLatch latch = new CountDownLatch(messageCount); + consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + assertEquals(9,message.getJMSPriority()); + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + for (int i = 0; i < messageCount; i++){ + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0,latch.getCount()); + + } + + + + +}