Polished camel broker component

This commit is contained in:
Claus Ibsen 2013-11-02 19:27:00 +01:00
parent 6e49ef3a60
commit 5469d806e8
6 changed files with 25 additions and 48 deletions

View File

@ -57,13 +57,12 @@ public class BrokerComponent extends UriEndpointComponent implements EndpointCom
remaining = removeStartingCharacters(remaining.substring(JmsConfiguration.TEMP_TOPIC_PREFIX.length()), '/');
}
ActiveMQDestination destination = ActiveMQDestination.createDestination(remaining, destinationType);
BrokerEndpoint brokerEndpoint = new BrokerEndpoint(uri, this, destination, brokerConfiguration);
setProperties(brokerEndpoint, parameters);
return brokerEndpoint;
}
@Override
public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) {
String brokerName = String.valueOf(componentConfiguration.getParameter("brokerName"));

View File

@ -23,7 +23,6 @@ public class BrokerConfiguration {
@UriParam
private String brokerName = "";
public String getBrokerName() {
return brokerName;
}
@ -32,5 +31,4 @@ public class BrokerConfiguration {
this.brokerName = brokerName;
}
}

View File

@ -25,11 +25,8 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsBinding;
import org.apache.camel.impl.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BrokerConsumer extends DefaultConsumer implements MessageInterceptor {
protected final transient Logger logger = LoggerFactory.getLogger(BrokerConsumer.class);
private final JmsBinding jmsBinding = new JmsBinding();
public BrokerConsumer(Endpoint endpoint, Processor processor) {
@ -58,7 +55,12 @@ public class BrokerConsumer extends DefaultConsumer implements MessageIntercepto
try {
getProcessor().process(exchange);
} catch (Exception e) {
logger.error("Failed to process " + exchange, e);
exchange.setException(e);
}
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing intercepted message: " + message, exchange, exchange.getException());
}
}
}

View File

@ -38,15 +38,13 @@ import org.apache.camel.util.UnsafeUriCharactersEncoder;
@ManagedResource(description = "Managed Camel Broker Endpoint")
@UriEndpoint(scheme = "broker", consumerClass = BrokerConsumer.class)
public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumersSupport, Service {
static final String PRODUCER_BROKER_EXCHANGE = "producerBrokerExchange";
@UriParam
private final BrokerConfiguration configuration;
private MessageInterceptorRegistry messageInterceptorRegistry;
@UriPath
private final ActiveMQDestination destination;
private List<MessageInterceptor> messageInterceptorList = new CopyOnWriteArrayList<MessageInterceptor>();
@ -70,7 +68,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
return consumer;
}
@Override
public boolean isSingleton() {
return false;
@ -85,7 +82,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
return destination;
}
@Override
protected void doStart() throws Exception {
super.doStart();
@ -111,7 +107,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers
protected void removeMessageInterceptor(MessageInterceptor messageInterceptor) {
messageInterceptorRegistry.removeMessageInterceptor(destination, messageInterceptor);
}
protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception {

View File

@ -17,11 +17,14 @@
package org.apache.activemq.camel.component.broker;
import javax.jms.Message;
import org.apache.camel.component.jms.JmsBinding;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.jms.JmsMessageHelper;
import org.apache.camel.util.ObjectHelper;
public class BrokerJmsMessage extends JmsMessage {
public BrokerJmsMessage(Message jmsMessage, JmsBinding binding) {
super(jmsMessage, binding);
}
@ -29,12 +32,10 @@ public class BrokerJmsMessage extends JmsMessage {
@Override
public String toString() {
if (getJmsMessage() != null) {
try {
return "BrokerJmsMessage[JMSMessageID: " + getJmsMessage().getJMSMessageID();
} catch (Exception e) {
}
return "BrokerJmsMessage[JMSMessageID: " + JmsMessageHelper.getJMSMessageID(getJmsMessage());
} else {
return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
}
return "BrokerJmsMessage@" + ObjectHelper.getIdentityHashCode(this);
}
@Override
@ -45,7 +46,6 @@ public class BrokerJmsMessage extends JmsMessage {
}
}
@Override
public BrokerJmsMessage newInstance() {
return new BrokerJmsMessage(null, getBinding());

View File

@ -17,10 +17,8 @@
package org.apache.activemq.camel.component.broker;
import java.util.Map;
import java.util.concurrent.RejectedExecutionException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.camel.converter.ActiveMQMessageConverter;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@ -30,7 +28,6 @@ import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
public class BrokerProducer extends DefaultAsyncProducer {
private final ActiveMQMessageConverter activeMQConverter = new ActiveMQMessageConverter();
private final BrokerEndpoint brokerEndpoint;
public BrokerProducer(BrokerEndpoint endpoint) {
@ -38,24 +35,12 @@ public class BrokerProducer extends DefaultAsyncProducer {
brokerEndpoint = endpoint;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// deny processing if we are not started
if (!isRunAllowed()) {
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
// we cannot process so invoke callback
callback.done(true);
return true;
}
try {
//In the middle of the broker - InOut doesn't make any sense
//so we do in only
return processInOnly(exchange, callback);
} catch (Throwable e) {
// must catch exception to ensure callback is invoked as expected
// to let Camel error handling deal with this
@ -74,8 +59,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
ProducerBrokerExchange producerBrokerExchange = (ProducerBrokerExchange) exchange.getProperty(BrokerEndpoint.PRODUCER_BROKER_EXCHANGE);
brokerEndpoint.inject(producerBrokerExchange, message);
}
} catch (Exception e) {
exchange.setException(e);
@ -85,34 +68,34 @@ public class BrokerProducer extends DefaultAsyncProducer {
}
private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
ActiveMQMessage result = null;
Message camelMesssage = null;
ActiveMQMessage result;
Message camelMessage;
if (exchange.hasOut()) {
camelMesssage = exchange.getOut();
camelMessage = exchange.getOut();
} else {
camelMesssage = exchange.getIn();
camelMessage = exchange.getIn();
}
Map<String, Object> headers = camelMesssage.getHeaders();
Map<String, Object> headers = camelMessage.getHeaders();
/**
* We purposely don't want to support injecting messages half-way through
* broker processing - use the activemq camel component for that - but
* we will support changing message headers and destinations
*/
if (camelMesssage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage) camelMesssage;
if (camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage) camelMessage;
if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
result = (ActiveMQMessage) jmsMessage.getJmsMessage();
//lets apply any new message headers
setJmsHeaders(result, headers);
} else {
throw new IllegalStateException("not the original message from the broker " + jmsMessage.getJmsMessage());
throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage());
}
} else {
throw new IllegalStateException("not the original message from the broker " + camelMesssage);
throw new IllegalStateException("Not the original message from the broker " + camelMessage);
}
return result;
}
@ -154,6 +137,6 @@ public class BrokerProducer extends DefaultAsyncProducer {
}
}
}
}
}