Fix for AMQ-4899

This commit is contained in:
Kevin Earls 2014-02-07 09:41:50 +01:00
parent f88043eaf7
commit afded924ff
3 changed files with 224 additions and 21 deletions

View File

@ -16,10 +16,6 @@
*/
package org.apache.activemq.broker.region.virtual;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
@ -35,6 +31,10 @@ import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
@ -70,10 +70,9 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
if (sub.matches(message, msgContext)) {
matches = true;
break;
}
}
if (matches == false && subs.size() == 0) {
if (matches == false) {
matches = tryMatchingCachedSubs(broker, dest, msgContext);
}
return matches;
@ -87,11 +86,14 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
if (cache != null) {
final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
if (selector != null) {
final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
for (String selector : selectors) {
try {
final BooleanExpression expression = getExpression(selector);
matches = expression.matches(msgContext);
if (matches) {
return true;
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}

View File

@ -16,14 +16,6 @@
*/
package org.apache.activemq.plugin;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
@ -32,6 +24,17 @@ import org.apache.activemq.command.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* A plugin which allows the caching of the selector from a subscription queue.
* <p/>
@ -51,7 +54,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
* The subscription's selector cache. We cache compiled expressions keyed
* by the target destination.
*/
private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
private ConcurrentHashMap<String, Set<String>> subSelectorCache = new ConcurrentHashMap<String, Set<String>>();
private final File persistFile;
@ -85,7 +88,8 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), info.getDestination().getQualifiedName());
String destinationName = info.getDestination().getQualifiedName();
LOG.debug("Caching consumer selector [{}] on a {}", info.getSelector(), destinationName);
String selector = info.getSelector();
// As ConcurrentHashMap doesn't support null values, use always true expression
@ -93,7 +97,12 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
selector = "TRUE";
}
subSelectorCache.put(info.getDestination().getQualifiedName(), selector);
Set<String> selectors = subSelectorCache.get(destinationName);
if (selectors == null) {
selectors = Collections.synchronizedSet(new HashSet<String>());
}
selectors.add(selector);
subSelectorCache.put(destinationName, selectors);
return super.addConsumer(context, info);
}
@ -105,7 +114,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
try {
ObjectInputStream in = new ObjectInputStream(fis);
try {
subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
subSelectorCache = (ConcurrentHashMap<String, Set<String>>) in.readObject();
} catch (ClassNotFoundException ex) {
LOG.error("Invalid selector cache data found. Please remove file.", ex);
} finally {
@ -148,7 +157,7 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
/**
* @return The JMS selector for the specified {@code destination}
*/
public String getSelector(final String destination) {
public Set<String> getSelector(final String destination) {
return subSelectorCache.get(destination);
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class AMQ4899Test {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class);
private static final String QUEUE_NAME="AMQ4899TestQueue";
private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME;
private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME;
private static final Integer MESSAGE_LIMIT = 20;
public static final String CONSUMER_A_SELECTOR = "Order < " + 10;
public static String CONSUMER_B_SELECTOR = "Order >= " + 10;
private CountDownLatch consumersStarted = new CountDownLatch(2);
private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10);
private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10);
private BrokerService broker;
@Before
public void setUp() {
setupBroker("broker://()/localhost?");
}
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}
@Test(timeout = 60 * 1000)
public void testVirtualTopicMultipleSelectors() throws Exception{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = session.createQueue(CONSUMER_QUEUE);
MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount);
MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
consumerA.setMessageListener(listenerA);
MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount);
MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
consumerB.setMessageListener(listenerB);
consumersStarted.await(10, TimeUnit.SECONDS);
assertEquals("Not all consumers started in time", 0, consumersStarted.getCount());
Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME);
MessageProducer producer = session.createProducer(producerDestination);
int messageIndex = 0;
for (int i=0; i < MESSAGE_LIMIT; i++) {
if (i==3) {
LOG.debug("Stopping consumerA");
consumerA.close();
}
if (i == 14) {
LOG.debug("Stopping consumer B");
consumerB.close();
}
String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString();
TextMessage message = session.createTextMessage(messageText);
message.setIntProperty("Order", i);
LOG.debug("Sending message [{}]", messageText);
producer.send(message);
Thread.sleep(100);
}
Thread.sleep(1 * 1000);
// restart consumerA
LOG.debug("Restarting consumerA");
consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR);
consumerA.setMessageListener(listenerA);
// restart consumerB
LOG.debug("restarting consumerB");
consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR);
consumerB.setMessageListener(listenerB);
consumerAtoConsumeCount.await(5, TimeUnit.SECONDS);
consumerBtoConsumeCount.await(5, TimeUnit.SECONDS);
LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount());
assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount());
assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount());
connection.close();
}
/**
* Setup broker with VirtualTopic configured
*/
private void setupBroker(String uri) {
try {
broker = BrokerFactory.createBroker(uri);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
VirtualTopic virtualTopic = new VirtualTopic();
virtualTopic.setName("VirtualOrders.>");
virtualTopic.setSelectorAware(true);
VirtualDestination[] virtualDestinations = { virtualTopic };
interceptor.setVirtualDestinations(virtualDestinations);
broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin();
BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin};
broker.setPlugins(updatedPlugins);
broker.start();
broker.waitUntilStarted();
} catch (Exception e) {
LOG.error("Failed creating broker", e);
}
}
}
class AMQ4899Listener implements MessageListener {
Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class);
CountDownLatch toConsume;
String id;
public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) {
this.id = id;
this.toConsume = toConsume;
started.countDown();
}
@Override
public void onMessage(Message message) {
toConsume.countDown();
try {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
LOG.debug("Listener {} received [{}]", id, textMessage.getText());
} else {
LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName());
}
} catch (JMSException e) {
LOG.error("Unexpected JMSException in Listener " + id, e);
}
}
}