mirror of https://github.com/apache/activemq.git
added support for http://issues.apache.org/activemq/browse/AMQ-1199 so that its easy to browse the available queues & topics in a broker
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634277 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cd54a162f6
commit
f6efc5660d
|
@ -94,6 +94,7 @@ import org.apache.activemq.util.IntrospectionSupport;
|
||||||
import org.apache.activemq.util.JMSExceptionSupport;
|
import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
import org.apache.activemq.util.LongSequenceGenerator;
|
import org.apache.activemq.util.LongSequenceGenerator;
|
||||||
import org.apache.activemq.util.ServiceSupport;
|
import org.apache.activemq.util.ServiceSupport;
|
||||||
|
import org.apache.activemq.advisory.DestinationSource;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -178,6 +179,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
|
||||||
private long timeCreated;
|
private long timeCreated;
|
||||||
private ConnectionAudit connectionAudit = new ConnectionAudit();
|
private ConnectionAudit connectionAudit = new ConnectionAudit();
|
||||||
|
private DestinationSource destinationSource;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct an <code>ActiveMQConnection</code>
|
* Construct an <code>ActiveMQConnection</code>
|
||||||
|
@ -554,6 +556,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
if (!closed.get()) {
|
if (!closed.get()) {
|
||||||
closing.set(true);
|
closing.set(true);
|
||||||
|
|
||||||
|
if (destinationSource != null) {
|
||||||
|
destinationSource.stop();
|
||||||
|
destinationSource = null;
|
||||||
|
}
|
||||||
if (advisoryConsumer != null) {
|
if (advisoryConsumer != null) {
|
||||||
advisoryConsumer.dispose();
|
advisoryConsumer.dispose();
|
||||||
advisoryConsumer = null;
|
advisoryConsumer = null;
|
||||||
|
@ -908,6 +914,21 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
this.stats.setEnabled(statsEnabled);
|
this.stats.setEnabled(statsEnabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the {@link DestinationSource} object which can be used to listen to destinations
|
||||||
|
* being created or destroyed or to enquire about the current destinations available on the broker
|
||||||
|
*
|
||||||
|
* @return a lazily created destination source
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
public DestinationSource getDestinationSource() throws JMSException {
|
||||||
|
if (destinationSource == null) {
|
||||||
|
destinationSource = new DestinationSource(this);
|
||||||
|
destinationSource.start();
|
||||||
|
}
|
||||||
|
return destinationSource;
|
||||||
|
}
|
||||||
|
|
||||||
// Implementation methods
|
// Implementation methods
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -2079,4 +2100,5 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
|
||||||
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
|
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
|
||||||
connectionAudit.rollbackDuplicate(dispatcher, message);
|
connectionAudit.rollbackDuplicate(dispatcher, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.advisory;
|
||||||
|
|
||||||
|
import java.util.EventObject;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ConsumerId;
|
||||||
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An event caused when a destination is created or deleted
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class DestinationEvent extends EventObject {
|
||||||
|
private static final long serialVersionUID = 2442156576867593780L;
|
||||||
|
private DestinationInfo destinationInfo;
|
||||||
|
|
||||||
|
public DestinationEvent(DestinationSource source, DestinationInfo destinationInfo) {
|
||||||
|
super(source);
|
||||||
|
this.destinationInfo = destinationInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQDestination getDestination() {
|
||||||
|
return getDestinationInfo().getDestination();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAddOperation() {
|
||||||
|
return getDestinationInfo().isAddOperation();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimeout() {
|
||||||
|
return getDestinationInfo().getTimeout();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRemoveOperation() {
|
||||||
|
return getDestinationInfo().isRemoveOperation();
|
||||||
|
}
|
||||||
|
|
||||||
|
public DestinationInfo getDestinationInfo() {
|
||||||
|
return destinationInfo;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.advisory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Listen to the changes in destinations being created or destroyed
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public interface DestinationListener {
|
||||||
|
void onDestinationEvent(DestinationEvent event);
|
||||||
|
}
|
|
@ -0,0 +1,193 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.advisory;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageListener;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.Service;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper class which keeps track of the Destinations available in a broker and allows you to listen to them
|
||||||
|
* being created or deleted.
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class DestinationSource implements MessageListener {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ConsumerEventSource.class);
|
||||||
|
private AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
private final Connection connection;
|
||||||
|
private Session session;
|
||||||
|
private MessageConsumer queueConsumer;
|
||||||
|
private MessageConsumer topicConsumer;
|
||||||
|
private MessageConsumer tempTopicConsumer;
|
||||||
|
private MessageConsumer tempQueueConsumer;
|
||||||
|
private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>();
|
||||||
|
private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>();
|
||||||
|
private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>();
|
||||||
|
private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>();
|
||||||
|
private DestinationListener listener;
|
||||||
|
|
||||||
|
public DestinationSource(Connection connection) throws JMSException {
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
public DestinationListener getListener() {
|
||||||
|
return listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConsumerListener(DestinationListener listener) {
|
||||||
|
this.listener = listener;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current queues available on the broker
|
||||||
|
*/
|
||||||
|
public Set<ActiveMQQueue> getQueues() {
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current topics on the broker
|
||||||
|
*/
|
||||||
|
public Set<ActiveMQTopic> getTopics() {
|
||||||
|
return topics;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current temporary topics available on the broker
|
||||||
|
*/
|
||||||
|
public Set<ActiveMQTempQueue> getTemporaryQueues() {
|
||||||
|
return temporaryQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the current temporary queues available on the broker
|
||||||
|
*/
|
||||||
|
public Set<ActiveMQTempTopic> getTemporaryTopics() {
|
||||||
|
return temporaryTopics;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start() throws JMSException {
|
||||||
|
if (started.compareAndSet(false, true)) {
|
||||||
|
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC);
|
||||||
|
queueConsumer.setMessageListener(this);
|
||||||
|
|
||||||
|
topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC);
|
||||||
|
topicConsumer.setMessageListener(this);
|
||||||
|
|
||||||
|
tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC);
|
||||||
|
tempQueueConsumer.setMessageListener(this);
|
||||||
|
|
||||||
|
tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC);
|
||||||
|
tempTopicConsumer.setMessageListener(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() throws JMSException {
|
||||||
|
if (started.compareAndSet(true, false)) {
|
||||||
|
if (session != null) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onMessage(Message message) {
|
||||||
|
if (message instanceof ActiveMQMessage) {
|
||||||
|
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
|
||||||
|
Object command = activeMessage.getDataStructure();
|
||||||
|
if (command instanceof DestinationInfo) {
|
||||||
|
DestinationInfo destinationInfo = (DestinationInfo) command;
|
||||||
|
DestinationEvent event = new DestinationEvent(this, destinationInfo);
|
||||||
|
fireDestinationEvent(event);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOG.warn("Unknown dataStructure: " + command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOG.warn("Unknown message type: " + message + ". Message ignored");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void fireDestinationEvent(DestinationEvent event) {
|
||||||
|
if (listener != null) {
|
||||||
|
listener.onDestinationEvent(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
// now lets update the data structures
|
||||||
|
ActiveMQDestination destination = event.getDestination();
|
||||||
|
boolean add = event.isAddOperation();
|
||||||
|
if (destination instanceof ActiveMQQueue) {
|
||||||
|
ActiveMQQueue queue = (ActiveMQQueue) destination;
|
||||||
|
if (add) {
|
||||||
|
queues.add(queue);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
queues.remove(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (destination instanceof ActiveMQTopic) {
|
||||||
|
ActiveMQTopic topic = (ActiveMQTopic) destination;
|
||||||
|
if (add) {
|
||||||
|
topics.add(topic);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
topics.remove(topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (destination instanceof ActiveMQTempQueue) {
|
||||||
|
ActiveMQTempQueue queue = (ActiveMQTempQueue) destination;
|
||||||
|
if (add) {
|
||||||
|
temporaryQueues.add(queue);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
temporaryQueues.remove(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (destination instanceof ActiveMQTempTopic) {
|
||||||
|
ActiveMQTempTopic topic = (ActiveMQTempTopic) destination;
|
||||||
|
if (add) {
|
||||||
|
temporaryTopics.add(topic);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
temporaryTopics.remove(topic);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
LOG.warn("Unknown destination type: " + destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.advisory;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class DestinationListenerTest extends EmbeddedBrokerTestSupport implements DestinationListener {
|
||||||
|
private static final Log LOG = LogFactory.getLog(DestinationListenerTest.class);
|
||||||
|
protected ActiveMQConnection connection;
|
||||||
|
protected DestinationSource destinationSource;
|
||||||
|
|
||||||
|
public void testDestiationSource() throws Exception {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
System.out.println("Queues: " + destinationSource.getQueues());
|
||||||
|
System.out.println("Topics: " + destinationSource.getTopics());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void onDestinationEvent(DestinationEvent event) {
|
||||||
|
ActiveMQDestination destination = event.getDestination();
|
||||||
|
if (event.isAddOperation()) {
|
||||||
|
System.out.println("Added: " + destination);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
System.out.println("Removed: " + destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
connection = (ActiveMQConnection) createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
destinationSource = connection.getDestinationSource();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService broker = super.createBroker();
|
||||||
|
broker.setDestinations(new ActiveMQDestination[]{
|
||||||
|
new ActiveMQQueue("foo.bar"),
|
||||||
|
new ActiveMQTopic("cheese")
|
||||||
|
});
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.camel.component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.DataStructure;
|
||||||
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.camel.ContextTestSupport;
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.Processor;
|
||||||
|
import org.apache.camel.Message;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.apache.camel.component.mock.AssertionClause;
|
||||||
|
import org.apache.camel.component.mock.MockEndpoint;
|
||||||
|
import org.apache.camel.component.jms.JmsMessage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class AdvisoryConsumerExample extends ContextTestSupport {
|
||||||
|
|
||||||
|
public void testWorks() throws Exception {
|
||||||
|
// lets create a new queue
|
||||||
|
template.sendBody("activemq:NewQueue." + System.currentTimeMillis(), "<hello>world!</hello>");
|
||||||
|
|
||||||
|
Thread.sleep(10000);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected RouteBuilder createRouteBuilder() throws Exception {
|
||||||
|
return new RouteBuilder() {
|
||||||
|
public void configure() throws Exception {
|
||||||
|
// lets force the creation of a queue up front
|
||||||
|
from("activemq:InitialQueue").to("log:Messages");
|
||||||
|
|
||||||
|
from("activemq:topic:ActiveMQ.Advisory.Queue?cacheLevelName=CACHE_CONSUMER").process(new Processor() {
|
||||||
|
public void process(Exchange exchange) throws Exception {
|
||||||
|
Message in = exchange.getIn();
|
||||||
|
if (in instanceof JmsMessage) {
|
||||||
|
JmsMessage jmsMessage = (JmsMessage) in;
|
||||||
|
javax.jms.Message value = jmsMessage.getJmsMessage();
|
||||||
|
if (value instanceof ActiveMQMessage) {
|
||||||
|
ActiveMQMessage activeMQMessage = (ActiveMQMessage) value;
|
||||||
|
DataStructure structure = activeMQMessage.getDataStructure();
|
||||||
|
if (structure instanceof DestinationInfo) {
|
||||||
|
DestinationInfo destinationInfo = (DestinationInfo) structure;
|
||||||
|
System.out.println("Received: " + destinationInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue