git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1517905 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-08-27 18:18:56 +00:00
parent 4d3923a242
commit b92a315599
8 changed files with 690 additions and 0 deletions

View File

@ -0,0 +1,91 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.view.MessageBrokerView;
import org.apache.activemq.broker.view.MessageBrokerViewRegistry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.camel.ComponentConfiguration;
import org.apache.camel.Endpoint;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.impl.UriEndpointComponent;
import org.apache.camel.spi.EndpointCompleter;
import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
public class BrokerComponent extends UriEndpointComponent implements EndpointCompleter {
public BrokerComponent() {
super(BrokerEndpoint.class);
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
BrokerConfiguration brokerConfiguration = new BrokerConfiguration();
setProperties(brokerConfiguration, parameters);
byte destinationType = ActiveMQDestination.QUEUE_TYPE;
if (remaining.startsWith(JmsConfiguration.QUEUE_PREFIX)) {
remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.QUEUE_PREFIX.length()), '/');
} else if (remaining.startsWith(JmsConfiguration.TOPIC_PREFIX)) {
destinationType = ActiveMQDestination.TOPIC_TYPE;
remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TOPIC_PREFIX.length()), '/');
} else if (remaining.startsWith(JmsConfiguration.TEMP_QUEUE_PREFIX)) {
destinationType = ActiveMQDestination.TEMP_QUEUE_TYPE;
remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_QUEUE_PREFIX.length()), '/');
} else if (remaining.startsWith(JmsConfiguration.TEMP_TOPIC_PREFIX)) {
destinationType = ActiveMQDestination.TEMP_TOPIC_TYPE;
remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/');
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(remaining, destinationType);
BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, destination, brokerConfiguration);
return brokerEndpoint;
}
@Override
public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) {
String brokerName = String.valueOf(componentConfiguration.getParameter("brokerName"));
MessageBrokerView messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(brokerName);
if (messageBrokerView != null) {
String destinationName = completionText;
Set<? extends ActiveMQDestination> set = messageBrokerView.getQueues();
if (completionText.startsWith("topic:")) {
set = messageBrokerView.getTopics();
destinationName = completionText.substring(6);
} else if (completionText.startsWith("queue:")) {
destinationName = completionText.substring(6);
}
ArrayList<String> answer = new ArrayList<String>();
for (ActiveMQDestination destination : set) {
if (destination.getPhysicalName().startsWith(destinationName)) {
answer.add(destination.getPhysicalName());
}
}
return answer;
}
return null;
}
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import org.apache.camel.spi.UriParam;
public class BrokerConfiguration {
@UriParam
private String brokerName = "";
public String getBrokerName() {
return brokerName;
}
public void setBrokerName(String brokerName) {
this.brokerName = brokerName;
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.command.Message;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsBinding;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor {
protected final transient Logger logger = LoggerFactory.getLogger(BrokerConsumer.class);
private final JmsBinding jmsBinding = new JmsBinding();
public BrokerConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
@Override
protected void doStart() throws Exception {
super.doStart();
((BrokerEndpoint) getEndpoint()).addMessageInterceptor(this);
}
@Override
protected void doStop() throws Exception {
((BrokerEndpoint) getEndpoint()).removeMessageInterceptor(this);
super.doStop();
}
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
Exchange exchange = getEndpoint().createExchange(ExchangePattern.InOnly);
exchange.setIn(new BrokerJmsMessage((javax.jms.Message) message, jmsBinding));
exchange.setProperty(Exchange.BINDING, jmsBinding);
exchange.setProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE, producerExchange);
try {
getProcessor().process(exchange);
} catch (Exception e) {
logger.error("Failed to process " + exchange, e);
}
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
import org.apache.activemq.broker.view.MessageBrokerView;
import org.apache.activemq.broker.view.MessageBrokerViewRegistry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.camel.Consumer;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
@ManagedResource(description = "Managed Camel Broker Endpoint")
@UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class)
public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service {
static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
@UriParam
private final BrokerConfiguration configuration;
private MessageBrokerView messageBrokerView;
private MessageInterceptorRegistry messageInterceptorRegistry;
@UriPath
private final ActiveMQDestination destination;
private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
public BrokerEndpoint(String uri, BrokerComponent component, ActiveMQDestination destination, BrokerConfiguration configuration) {
super(UnsafeUriCharactersEncoder.encode(uri), component);
this.destination = destination;
this.configuration = configuration;
}
@Override
public Producer createProducer() throws Exception {
BrokerProducer producer = new BrokerProducer(this);
return producer;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
BrokerConsumer consumer = new BrokerConsumer(this, processor);
configureConsumer(consumer);
return consumer;
}
@Override
public boolean isSingleton() {
return false;
}
@Override
public boolean isMultipleConsumersSupported() {
return true;
}
public ActiveMQDestination getDestination() {
return destination;
}
@Override
protected void doStart() throws Exception {
super.doStart();
messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(configuration.getBrokerName());
messageInterceptorRegistry = new MessageInterceptorRegistry(messageBrokerView.getBrokerService());
for (MessageInterceptor messageInterceptor : messageInterceptorList) {
addMessageInterceptor(messageInterceptor);
}
messageInterceptorList.clear();
}
@Override
protected void doStop() throws Exception {
super.doStop();
}
protected void addMessageInterceptor(MessageInterceptor messageInterceptor) {
if (isStarted()) {
messageInterceptorRegistry.addMessageInterceptor(destination, messageInterceptor);
} else {
messageInterceptorList.add(messageInterceptor);
}
}
protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
}
protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {
if (message != null) {
if (message.getDestination() == null) {
message.setDestination(destination);
}
messageInterceptorRegistry.injectMessage(producerBrokerExchange, message);
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import javax.jms.Message;
import org.apache.camel.component.jms.JmsBinding;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.util.ObjectHelper;
public class BrokerJmsMessage extends JmsMessage {
public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) {
super(jmsMessage, binding);
}
@Override
public String toString() {
if (getJmsMessage() != null) {
try {
return "BrokerJmsMessage[JMSMessageID: " + getJmsMessage().getJMSMessageID();
} catch (Exception e) {
}
}
return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
}
@Override
public void copyFrom(org.apache.camel.Message that) {
super.copyFrom(that);
if (that instanceof JmsMessage && getJmsMessage() == null) {
setJmsMessage(((JmsMessage) that).getJmsMessage());
}
}
@Override
public BrokerJmsMessage newInstance() {
return new BrokerJmsMessage(null, getBinding());
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
public class BrokerProducer extends DefaultAsyncProducer {
private final ActiveMQMessageConverter activeMQConverter = new ActiveMQMessageConverter();
private final BrokerEndpoint brokerEndpoint;
public BrokerProducer(BrokerEndpoint endpoint) {
super(endpoint);
brokerEndpoint = endpoint;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// deny processing if we are not started
if (!isRunAllowed()) {
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
// we cannot process so invoke callback
callback.done(true);
return true;
}
try {
//In the middle of the broker - InOut doesn't make any sense
//so we do in only
return processInOnly(exchange, callback);
} catch (Throwable e) {
// must catch exception to ensure callback is invoked as expected
// to let Camel error handling deal with this
exchange.setException(e);
callback.done(true);
return true;
}
}
protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
try {
ActiveMQMessage message = getMessage(exchange);
if (message != null) {
message.setDestination(brokerEndpoint.getDestination());
//if the ProducerBrokerExchange is null the broker will create it
ProducerBrokerExchange producerBrokerExchange = (ProducerBrokerExchange) exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE);
brokerEndpoint.inject(producerBrokerExchange, message);
}
} catch (Exception e) {
exchange.setException(e);
}
callback.done(true);
return true;
}
private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
ActiveMQMessage result = null;
Message camelMesssage = null;
if (exchange.hasOut()) {
camelMesssage = exchange.getOut();
} else {
camelMesssage = exchange.getIn();
}
Map<String, Object> headers = camelMesssage.getHeaders();
/**
* We purposely don't want to support injecting messages half-way through
* broker processing - use the activemq camel component for that - but
* we will support changing message headers and destinations
*/
if (camelMesssage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage) camelMesssage;
if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
result = (ActiveMQMessage) jmsMessage.getJmsMessage();
//lets apply any new message headers
setJmsHeaders(result, headers);
} else {
throw new IllegalStateException("not the original message from the broker " + jmsMessage.getJmsMessage());
}
} else {
throw new IllegalStateException("not the original message from the broker " + camelMesssage);
}
return result;
}
private void setJmsHeaders(ActiveMQMessage message, Map<String, Object> headers) {
message.setReadOnlyProperties(false);
for (Map.Entry<String, Object> entry : headers.entrySet()) {
if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) {
Object value = entry.getValue();
if (value instanceof Number) {
Number number = (Number) value;
message.setJMSDeliveryMode(number.intValue());
}
}
if (entry.getKey().equalsIgnoreCase("JmsPriority")) {
Integer value = ObjectConverter.toInteger(entry.getValue());
if (value != null) {
message.setJMSPriority(value.intValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) {
Long value = ObjectConverter.toLong(entry.getValue());
if (value != null) {
message.setJMSTimestamp(value.longValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSExpiration")) {
Long value = ObjectConverter.toLong(entry.getValue());
if (value != null) {
message.setJMSExpiration(value.longValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) {
message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue()));
}
if (entry.getKey().equalsIgnoreCase("JMSType")) {
Object value = entry.getValue();
if (value != null) {
message.setJMSType(value.toString());
}
}
}
}
}

View File

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class=org.apache.activemq.camel.component.broker.BrokerComponent

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component.broker;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class BrokerComponentXMLConfigTest {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/";
private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class);
protected static final String TOPIC_NAME = "test.broker.component.topic";
protected static final String QUEUE_NAME = "test.broker.component.queue";
protected BrokerService brokerService;
protected ActiveMQConnectionFactory factory;
protected Connection producerConnection;
protected Connection consumerConnection;
protected Session consumerSession;
protected Session producerSession;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Topic topic;
protected int messageCount = 5000;
protected int timeOutInSeconds = 10;
@Before
public void setUp() throws Exception {
brokerService = createBroker(new FileSystemResource(CONF_ROOT + "broker-camel.xml"));
factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
consumerConnection = factory.createConnection();
consumerConnection.start();
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
topic = consumerSession.createTopic(TOPIC_NAME);
producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(topic);
producer = producerSession.createProducer(topic);
}
protected BrokerService createBroker(String resource) throws Exception {
return createBroker(new ClassPathResource(resource));
}
protected BrokerService createBroker(Resource resource) throws Exception {
BrokerFactoryBean factory = new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
BrokerService broker = factory.getBroker();
assertTrue("Should have a broker!", broker != null);
// Broker is already started by default when using the XML file
// broker.start();
return broker;
}
@After
public void tearDown() throws Exception {
if (producerConnection != null){
producerConnection.close();
}
if (consumerConnection != null){
consumerConnection.close();
}
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testReRouteAll() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
assertEquals(9,message.getJMSPriority());
latch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
}