mirror of https://github.com/apache/activemq.git
Made topics also auto-expose themselves in Camel's context
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@747001 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
45c246cd6d
commit
e01a937d29
|
@ -28,9 +28,11 @@ import org.apache.activemq.advisory.DestinationListener;
|
|||
import org.apache.activemq.advisory.DestinationSource;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.CamelContextAware;
|
||||
import org.apache.camel.Endpoint;
|
||||
import org.apache.camel.component.jms.JmsEndpoint;
|
||||
import org.apache.camel.component.jms.JmsQueueEndpoint;
|
||||
import org.apache.camel.util.ObjectHelper;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -83,6 +85,15 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca
|
|||
removeQueue(queue);
|
||||
}
|
||||
}
|
||||
else if (destination instanceof ActiveMQTopic) {
|
||||
ActiveMQTopic topic = (ActiveMQTopic) destination;
|
||||
if (event.isAddOperation()) {
|
||||
addTopic(topic);
|
||||
}
|
||||
else {
|
||||
removeTopic(topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.warn("Caught: " + e, e);
|
||||
|
@ -94,6 +105,11 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca
|
|||
for (ActiveMQQueue queue : queues) {
|
||||
addQueue(queue);
|
||||
}
|
||||
|
||||
Set<ActiveMQTopic> topics = source.getTopics();
|
||||
for (ActiveMQTopic topic : topics) {
|
||||
addTopic(topic);
|
||||
}
|
||||
}
|
||||
|
||||
public void destroy() throws Exception {
|
||||
|
@ -159,4 +175,20 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca
|
|||
String queueUri = getQueueUri(queue);
|
||||
camelContext.removeEndpoints(queueUri);
|
||||
}
|
||||
|
||||
protected void addTopic(ActiveMQTopic topic) throws Exception {
|
||||
String topicUri = getTopicUri(topic);
|
||||
ActiveMQComponent jmsComponent = getComponent();
|
||||
Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent, topic.getPhysicalName(), true, jmsComponent.getConfiguration());
|
||||
camelContext.addEndpoint(topicUri, endpoint);
|
||||
}
|
||||
|
||||
protected String getTopicUri(ActiveMQTopic topic) {
|
||||
return "activemq:topic:" + topic.getPhysicalName();
|
||||
}
|
||||
|
||||
protected void removeTopic(ActiveMQTopic topic) throws Exception {
|
||||
String topicUri = getTopicUri(topic);
|
||||
camelContext.removeEndpoints(topicUri);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue