This commit is contained in:
rajdavies 2013-09-06 13:28:53 +01:00
parent 2eb0203f02
commit 0a5b14386f
3 changed files with 85 additions and 6 deletions

View File

@ -16,7 +16,11 @@
*/
package org.apache.activemq.broker.inteceptor;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.MutableBrokerFilter;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -27,11 +31,35 @@ import org.slf4j.LoggerFactory;
public class MessageInterceptorRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MessageInterceptorRegistry.class);
private static final MessageInterceptorRegistry INSTANCE = new MessageInterceptorRegistry();
private final BrokerService brokerService;
private MessageInterceptorFilter filter;
private final Map<BrokerService, MessageInterceptorRegistry> messageInterceptorRegistryMap = new HashMap<BrokerService, MessageInterceptorRegistry>();
public MessageInterceptorRegistry(BrokerService brokerService) {
public static MessageInterceptorRegistry getInstance() {
return INSTANCE;
}
public MessageInterceptorRegistry get(String brokerName){
BrokerService brokerService = BrokerRegistry.getInstance().lookup(brokerName);
return get(brokerService);
}
public synchronized MessageInterceptorRegistry get(BrokerService brokerService){
MessageInterceptorRegistry result = messageInterceptorRegistryMap.get(brokerService);
if (result == null){
result = new MessageInterceptorRegistry(brokerService);
messageInterceptorRegistryMap.put(brokerService,result);
}
return result;
}
private MessageInterceptorRegistry(){
this.brokerService=null;
}
private MessageInterceptorRegistry(BrokerService brokerService) {
this.brokerService = brokerService;
}

View File

@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
@ -55,6 +56,21 @@ public class MessageBrokerView {
}
}
/**
* Create a view of a running Broker
* @param brokerName
*/
public MessageBrokerView(String brokerName){
this.brokerService = BrokerRegistry.getInstance().lookup(brokerName);
if (brokerService == null){
throw new NullPointerException("BrokerService is null");
}
if (!brokerService.isStarted()){
throw new IllegalStateException("BrokerService " + brokerService.getBrokerName() + " is not started");
}
}
/**
* @return the brokerName

View File

@ -81,7 +81,9 @@ public class MessageInterceptorTest extends TestCase {
}
}
public void testNormalOperation() throws Exception {
public void testNoIntercept() throws Exception {
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@ -101,8 +103,41 @@ public class MessageInterceptorTest extends TestCase {
}
public void testNoStackOverFlow() throws Exception {
final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
try {
registry.injectMessage(producerExchange, message);
} catch (Exception e) {
e.printStackTrace();
}
}
});
final CountDownLatch latch = new CountDownLatch(messageCount);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
latch.countDown();
}
});
for (int i = 0; i < messageCount; i++){
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
}
public void testInterceptorAll() throws Exception {
MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
@ -132,7 +167,7 @@ public class MessageInterceptorTest extends TestCase {
public void testReRouteAll() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
@ -167,7 +202,7 @@ public class MessageInterceptorTest extends TestCase {
public void testReRouteAllWithNullProducerExchange() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {
@ -203,7 +238,7 @@ public class MessageInterceptorTest extends TestCase {
final ActiveMQQueue testQueue = new ActiveMQQueue("testQueueFor."+getName());
final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst());
final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst());
registry.addMessageInterceptorForTopic(">", new MessageInterceptor() {
@Override
public void intercept(ProducerBrokerExchange producerExchange, Message message) {