Added Interceptor to support

https://issues.apache.org/jira/browse/AMQ-4690

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1517222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-08-25 06:19:56 +00:00
parent 01704eece0
commit 8f4ebbb40e
10 changed files with 932 additions and 0 deletions

View File

@ -52,6 +52,9 @@ public class BrokerRegistry {
LOG.warn("Broker localhost not started so using " + result.getBrokerName() + " instead"); LOG.warn("Broker localhost not started so using " + result.getBrokerName() + " instead");
} }
} }
if (result == null && (brokerName==null || brokerName.isEmpty() || brokerName.equals("null"))){
result = findFirst();
}
} }
return result; return result;
} }

View File

@ -0,0 +1,25 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.inteceptor;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.Message;
public interface MessageInterceptor {
void intercept(ProducerBrokerExchange producerExchange, Message message);
}

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.inteceptor;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.state.ProducerState;
class MessageInterceptorFilter extends BrokerFilter {
private DestinationMap interceptorMap = new DestinationMap();
MessageInterceptorFilter(Broker next) {
super(next);
}
MessageInterceptor addMessageInterceptor(String destinationName, MessageInterceptor messageInterceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
interceptorMap.put(activeMQDestination, messageInterceptor);
return messageInterceptor;
}
void removeMessageInterceptor(String destinationName, MessageInterceptor interceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
interceptorMap.remove(activeMQDestination, interceptor);
}
MessageInterceptor addMessageInterceptorForQueue(String destinationName, MessageInterceptor messageInterceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
interceptorMap.put(activeMQDestination, messageInterceptor);
return messageInterceptor;
}
void removeMessageInterceptorForQueue(String destinationName, MessageInterceptor interceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE);
interceptorMap.remove(activeMQDestination, interceptor);
}
MessageInterceptor addMessageInterceptorForTopic(String destinationName, MessageInterceptor messageInterceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.TOPIC_TYPE);
interceptorMap.put(activeMQDestination, messageInterceptor);
return messageInterceptor;
}
void removeMessageInterceptorForTopic(String destinationName, MessageInterceptor interceptor) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.TOPIC_TYPE);
interceptorMap.remove(activeMQDestination, interceptor);
}
MessageInterceptor addMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor messageInterceptor) {
interceptorMap.put(activeMQDestination, messageInterceptor);
return messageInterceptor;
}
void removeMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor interceptor) {
interceptorMap.remove(activeMQDestination, interceptor);
}
/**
* Re-inject into the Broker chain
*/
void injectMessage(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
ProducerBrokerExchange pe = producerExchange;
if (pe == null) {
pe = new ProducerBrokerExchange();
ConnectionContext cc = new ConnectionContext();
cc.setBroker(this.getRoot());
pe.setConnectionContext(cc);
pe.setMutable(true);
pe.setProducerState(new ProducerState(new ProducerInfo()));
}
super.send(pe, messageSend);
}
@Override
public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
ActiveMQDestination activeMQDestination = messageSend.getDestination();
if (!interceptorMap.isEmpty() && activeMQDestination != null) {
Set<MessageInterceptor> set = interceptorMap.get(activeMQDestination);
if (set != null && !set.isEmpty()) {
for (MessageInterceptor mi : set) {
mi.intercept(producerExchange, messageSend);
}
} else {
super.send(producerExchange, messageSend);
}
} else {
super.send(producerExchange, messageSend);
}
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.inteceptor;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageInterceptorRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MessageInterceptorRegistry.class);
private final BrokerService brokerService;
private MessageInterceptorFilter filter;
public MessageInterceptorRegistry(BrokerService brokerService) {
this.brokerService = brokerService;
}
public MessageInterceptor addMessageInterceptor(String destinationName, MessageInterceptor messageInterceptor) {
return getFilter().addMessageInterceptor(destinationName, messageInterceptor);
}
public void removeMessageInterceptor(String destinationName, MessageInterceptor messageInterceptor) {
getFilter().removeMessageInterceptor(destinationName, messageInterceptor);
}
public MessageInterceptor addMessageInterceptorForQueue(String destinationName, MessageInterceptor messageInterceptor) {
return getFilter().addMessageInterceptorForQueue(destinationName, messageInterceptor);
}
public void removeMessageInterceptorForQueue(String destinationName, MessageInterceptor messageInterceptor) {
getFilter().addMessageInterceptorForQueue(destinationName, messageInterceptor);
}
public MessageInterceptor addMessageInterceptorForTopic(String destinationName, MessageInterceptor messageInterceptor) {
return getFilter().addMessageInterceptorForTopic(destinationName, messageInterceptor);
}
public void removeMessageInterceptorForTopic(String destinationName, MessageInterceptor messageInterceptor) {
getFilter().removeMessageInterceptorForTopic(destinationName, messageInterceptor);
}
public MessageInterceptor addMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor messageInterceptor) {
return getFilter().addMessageInterceptor(activeMQDestination, messageInterceptor);
}
public void removeMessageInterceptor(ActiveMQDestination activeMQDestination, MessageInterceptor interceptor) {
getFilter().removeMessageInterceptor(activeMQDestination, interceptor);
}
/**
* Re-inject into the Broker chain
*/
public void injectMessage(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
getFilter().injectMessage(producerExchange, messageSend);
}
private synchronized MessageInterceptorFilter getFilter() {
if (filter == null) {
try {
MutableBrokerFilter mutableBrokerFilter = (MutableBrokerFilter) brokerService.getBroker().getAdaptor(MutableBrokerFilter.class);
Broker next = mutableBrokerFilter.getNext();
filter = new MessageInterceptorFilter(next);
mutableBrokerFilter.setNext(filter);
} catch (Exception e) {
LOG.error("Failed to create MessageInterceptorFilter", e);
}
}
return filter;
}
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
MessageInteceptor malarky
</body>
</html>

View File

@ -0,0 +1,25 @@
package.html<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
The Message Scheduler for delayed (or scheduled) message delivery
</body>
</html>

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.view;
import org.apache.activemq.broker.region.Destination;
public class BrokerDestinationView {
private final Destination destination;
public BrokerDestinationView(Destination destination) {
this.destination = destination;
}
public String getName() {
return destination.getName();
}
public long getEnqueueCount() {
return destination.getDestinationStatistics().getEnqueues().getCount();
}
public long getDequeueCount() {
return destination.getDestinationStatistics().getDequeues().getCount();
}
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
public long getExpiredCount() {
return destination.getDestinationStatistics().getExpired().getCount();
}
public long getConsumerCount() {
return destination.getDestinationStatistics().getConsumers().getCount();
}
public long getQueueSize() {
return destination.getDestinationStatistics().getMessages().getCount();
}
public long getMessagesCached() {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
public int getMemoryPercentUsage() {
return destination.getMemoryUsage().getPercentUsage();
}
public long getMemoryUsageByteCount() {
return destination.getMemoryUsage().getUsage();
}
public long getMemoryLimit() {
return destination.getMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
destination.getMemoryUsage().setLimit(limit);
}
public double getAverageEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getAverageTime();
}
public long getMaxEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMaxTime();
}
public long getMinEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMinTime();
}
public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
public long getProducerCount() {
return destination.getDestinationStatistics().getProducers().getCount();
}
public boolean isDLQ() {
return destination.isDLQ();
}
public long getBlockedSends() {
return destination.getDestinationStatistics().getBlockedSends().getCount();
}
public double getAverageBlockedTime() {
return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
}
public long getTotalBlockedTime() {
return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
}
}

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.view;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A view into the running Broker
*/
public class MessageBrokerView {
private static final Logger LOG = LoggerFactory.getLogger(MessageBrokerView.class);
private final BrokerService brokerService;
private Map<ActiveMQDestination,BrokerDestinationView> destinationViewMap = new LRUCache<ActiveMQDestination, BrokerDestinationView>();
MessageBrokerView(BrokerService brokerService){
this.brokerService = brokerService;
}
public String getBrokerName(){
return brokerService.getBrokerName();
}
public int getMemoryPercentUsage() {
return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
}
public int getStorePercentUsage() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
public int getJobSchedulerStorePercentUsage() {
return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
}
public boolean isPersistent() {
return brokerService.isPersistent();
}
public BrokerService getBrokerService(){
return brokerService;
}
public Set<ActiveMQDestination> getDestinations(){
Set<ActiveMQDestination> result;
try {
ActiveMQDestination[] destinations = brokerService.getBroker().getDestinations();
result = new HashSet<ActiveMQDestination>();
Collections.addAll(result, destinations);
}catch (Exception e){
result = Collections.emptySet();
}
return result;
}
public Set<ActiveMQTopic> getTopics(){
Set<ActiveMQTopic> result = new HashSet<ActiveMQTopic>();
for (ActiveMQDestination destination:getDestinations()){
if (destination.isTopic() && !destination.isTemporary()){
result.add((ActiveMQTopic) destination);
}
}
return result;
}
public Set<ActiveMQQueue> getQueues(){
Set<ActiveMQQueue> result = new HashSet<ActiveMQQueue>();
for (ActiveMQDestination destination:getDestinations()){
if (destination.isQueue() && !destination.isTemporary()){
result.add((ActiveMQQueue) destination);
}
}
return result;
}
public Set<ActiveMQTempTopic> getTempTopics(){
Set<ActiveMQTempTopic> result = new HashSet<ActiveMQTempTopic>();
for (ActiveMQDestination destination:getDestinations()){
if (destination.isTopic() && destination.isTemporary()){
result.add((ActiveMQTempTopic) destination);
}
}
return result;
}
public Set<ActiveMQTempQueue> getTempQueues(){
Set<ActiveMQTempQueue> result = new HashSet<ActiveMQTempQueue>();
for (ActiveMQDestination destination:getDestinations()){
if (destination.isTopic() && destination.isTemporary()){
result.add((ActiveMQTempQueue) destination);
}
}
return result;
}
/**
* It will be assumed the destinationName is prepended with topic:// or queue:// - but
* will default to a Queue
* @param destinationName
* @return the BrokerDestinationView associated with the destinationName
*/
public BrokerDestinationView getDestinationView(String destinationName){
return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE);
}
/**
* Get the BrokerDestinationView associated with the topic
* @param destinationName
* @return BrokerDestinationView
*/
public BrokerDestinationView getTopicDestinationView(String destinationName){
return getDestinationView(destinationName,ActiveMQDestination.TOPIC_TYPE);
}
/**
* Get the BrokerDestinationView associated with the queue
* @param destinationName
* @return BrokerDestinationView
*/
public BrokerDestinationView getQueueDestinationView(String destinationName){
return getDestinationView(destinationName,ActiveMQDestination.QUEUE_TYPE);
}
public BrokerDestinationView getDestinationView (String destinationName, byte type) {
ActiveMQDestination activeMQDestination = ActiveMQDestination.createDestination(destinationName,type);
return getDestinationView(activeMQDestination);
}
public BrokerDestinationView getDestinationView (ActiveMQDestination activeMQDestination) {
BrokerDestinationView view = null;
synchronized(destinationViewMap){
view = destinationViewMap.get(activeMQDestination);
if (view==null){
try {
/**
* If auto destinatons are allowed (on by default) - this will create a Broker Destination
* if it doesn't exist. We could query the regionBroker first to check - but this affords more
* flexibility - e.g. you might want to set up a query on destination statistics before any
* messaging clients have started (and hence created the destination themselves
*/
Destination destination = brokerService.getDestination(activeMQDestination);
BrokerDestinationView brokerDestinationView = new BrokerDestinationView(destination);
destinationViewMap.put(activeMQDestination,brokerDestinationView);
} catch (Exception e) {
LOG.warn("Failed to get Destination for " + activeMQDestination,e);
}
destinationViewMap.put(activeMQDestination,view);
}
}
return view;
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.view;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageBrokerViewRegistry {
private static final Logger LOG = LoggerFactory.getLogger(BrokerRegistry.class);
private static final MessageBrokerViewRegistry INSTANCE = new MessageBrokerViewRegistry();
private final Object mutex = new Object();
private final Map<String, MessageBrokerView> brokerViews = new HashMap<String, MessageBrokerView>();
public static MessageBrokerViewRegistry getInstance() {
return INSTANCE;
}
/**
* @param brokerName
* @return the BrokerService
*/
public MessageBrokerView lookup(String brokerName) {
MessageBrokerView result = null;
synchronized (mutex) {
result = brokerViews.get(brokerName);
if (result==null){
BrokerService brokerService = BrokerRegistry.getInstance().lookup(brokerName);
if (brokerService != null){
result = new MessageBrokerView(brokerService);
brokerViews.put(brokerName,result);
}
}
}
return result;
}
}

View File

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.interceptor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.inteceptor.MessageInterceptor;
import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
public class MessageInterceptorTest extends TestCase {
protected BrokerService brokerService;
protected ActiveMQConnectionFactory factory;
protected Connection producerConnection;
protected Connection consumerConnection;
protected Session consumerSession;
protected Session producerSession;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Topic topic;
protected int messageCount = 10000;
protected int timeOutInSeconds = 10;
@Override
protected void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.start();
factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
consumerConnection = factory.createConnection();
consumerConnection.start();
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
topic = consumerSession.createTopic(getName());
producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
consumer = consumerSession.createConsumer(topic);
producer = producerSession.createProducer(topic);
}
@Override
protected void tearDown() throws Exception {
if (producerConnection != null){
producerConnection.close();
}
if (consumerConnection != null){
consumerConnection.close();
}
if (brokerService != null) {
brokerService.stop();
}
}
public void testNormalOperation() throws Exception {
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
public void testInterceptorAll() throws Exception {
MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
//just ignore
}
});
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(messageCount,latch.getCount());
}
public void testReRouteAll() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
message.setDestination(queue);
try {
registry.injectMessage(producerExchange, message);
} catch (Exception e) {
e.printStackTrace();
}
}
});
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
public void testReRouteAllWithNullProducerExchange() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
message.setDestination(queue);
try {
registry.injectMessage(producerExchange, message);
} catch (Exception e) {
e.printStackTrace();
}
}
});
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
public void testReRouteAllowWildCards() throws Exception {
final ActiveMQQueue testQueue = new ActiveMQQueue("testQueueFor."+getName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(">", new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
try {
message.setDestination(testQueue);
registry.injectMessage(producerExchange,message);
}catch(Exception e){
e.printStackTrace();
}
}
});
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
MessageConsumer consumer1 = consumerSession.createConsumer(testQueue);
consumer1.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
}