diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java index c152445bdb..8d58a43172 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java @@ -16,20 +16,29 @@ */ package org.apache.activemq.broker.region.virtual; +import java.io.IOException; +import java.util.List; +import java.util.Set; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; +import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; - -import java.io.IOException; -import java.util.List; -import java.util.Set; +import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; +import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.util.LRUCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { + private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); + LRUCache expressionCache = new LRUCache(); + private SubQueueSelectorCacheBroker selectorCachePlugin; public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { super(next, prefix, postfix, local); @@ -45,24 +54,81 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto Set destinations = broker.getDestinations(destination); for (Destination dest : destinations) { - if (matchesSomeConsumer(message, dest)) { + if (matchesSomeConsumer(broker, message, dest)) { dest.send(context, message.copy()); } } } - - private boolean matchesSomeConsumer(Message message, Destination dest) throws IOException { + + private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException { boolean matches = false; MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(dest.getActiveMQDestination()); msgContext.setMessageReference(message); List subs = dest.getConsumers(); - for (Subscription sub: subs) { + for (Subscription sub : subs) { if (sub.matches(message, msgContext)) { matches = true; break; + + } + } + if (matches == false && subs.size() == 0) { + matches = tryMatchingCachedSubs(broker, dest, msgContext); + } + return matches; + } + + private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { + boolean matches = false; + LOG.debug("No active consumer match found. Will try cache if configured..."); + + //retrieve the specific plugin class and lookup the selector for the destination. + final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker); + + if (cache != null) { + final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); + if (selector != null) { + try { + final BooleanExpression expression = getExpression(selector); + matches = expression.matches(msgContext); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } } } return matches; } + + private BooleanExpression getExpression(String selector) throws Exception{ + BooleanExpression result; + synchronized(expressionCache){ + result = expressionCache.get(selector); + if (result == null){ + result = compileSelector(selector); + expressionCache.put(selector,result); + } + } + return result; + } + + /** + * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available. + */ + private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) { + if (selectorCachePlugin == null) { + selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class); + } //if + + return selectorCachePlugin; + } + + /** + * Pre-compile the JMS selector. + * + * @param selectorExpression The non-null JMS selector expression. + */ + private BooleanExpression compileSelector(final String selectorExpression) throws Exception { + return SelectorParser.parse(selectorExpression); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 991a625426..9493ad6c6a 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -22,6 +22,7 @@ import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.Message; +import org.apache.activemq.util.LRUCache; /** * A Destination which implements cache = new LRUCache(); public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { super(next); @@ -51,6 +53,14 @@ public class VirtualTopicInterceptor extends DestinationFilter { } protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) { - return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix); + ActiveMQQueue queue; + synchronized(cache){ + queue = cache.get(original); + if (queue==null){ + queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix); + cache.put(original,queue); + } + } + return queue; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java new file mode 100644 index 0000000000..d6fa270c7d --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A plugin which allows the caching of the selector from a subscription queue. + *

+ * This stops the build-up of unwanted messages, especially when consumers may + * disconnect from time to time when using virtual destinations. + *

+ * This is influenced by code snippets developed by Maciej Rakowicz + * + * @author Roelof Naude roelof(dot)naude(at)gmail.com + * @see https://issues.apache.org/activemq/browse/AMQ-3004 + * @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E + */ +public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class); + + /** + * The subscription's selector cache. We cache compiled expressions keyed + * by the target destination. + */ + private ConcurrentHashMap subSelectorCache = new ConcurrentHashMap(); + + private final File persistFile; + + private boolean running = true; + private Thread persistThread; + private static final long MAX_PERSIST_INTERVAL = 600000; + private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread"; + + /** + * Constructor + */ + public SubQueueSelectorCacheBroker(Broker next, final File persistFile) { + super(next); + this.persistFile = persistFile; + LOG.info("Using persisted selector cache from[" + persistFile + "]"); + + readCache(); + + persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME); + persistThread.start(); + } + + @Override + public void stop() throws Exception { + running = false; + if (persistThread != null) { + persistThread.interrupt(); + persistThread.join(); + } //if + } + + @Override + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName()); + if (info.getSelector() != null) { + subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector()); + } //if + return super.addConsumer(context, info); + } + + private void readCache() { + if (persistFile != null && persistFile.exists()) { + try { + FileInputStream fis = new FileInputStream(persistFile); + try { + ObjectInputStream in = new ObjectInputStream(fis); + try { + subSelectorCache = (ConcurrentHashMap) in.readObject(); + } catch (ClassNotFoundException ex) { + LOG.error("Invalid selector cache data found. Please remove file.", ex); + } finally { + in.close(); + } //try + } finally { + fis.close(); + } //try + } catch (IOException ex) { + LOG.error("Unable to read persisted selector cache...it will be ignored!", ex); + } //try + } //if + } + + /** + * Persist the selector cache. + */ + private void persistCache() { + LOG.debug("Persisting selector cache...."); + try { + FileOutputStream fos = new FileOutputStream(persistFile); + try { + ObjectOutputStream out = new ObjectOutputStream(fos); + try { + out.writeObject(subSelectorCache); + } finally { + out.flush(); + out.close(); + } //try + } catch (IOException ex) { + LOG.error("Unable to persist selector cache", ex); + } finally { + fos.close(); + } //try + } catch (IOException ex) { + LOG.error("Unable to access file[" + persistFile + "]", ex); + } //try + } + + /** + * @return The JMS selector for the specified {@code destination} + */ + public String getSelector(final String destination) { + return subSelectorCache.get(destination); + } + + /** + * Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms. + * + * @see java.lang.Runnable#run() + */ + public void run() { + while (running) { + try { + Thread.sleep(MAX_PERSIST_INTERVAL); + } catch (InterruptedException ex) { + } //try + + persistCache(); + } + } +} + diff --git a/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java new file mode 100644 index 0000000000..01f5e90191 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBrokerPlugin.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.plugin; + +import java.io.File; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; + +/** + * A plugin which allows the caching of the selector from a subscription queue. + *

+ * This stops the build-up of unwanted messages, especially when consumers may + * disconnect from time to time when using virtual destinations. + *

+ * This is influenced by code snippets developed by Maciej Rakowicz + * + * @author Roelof Naude roelof(dot)naude(at)gmail.com + *@org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin" + */ +public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin { + + + private File persistFile; + + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new SubQueueSelectorCacheBroker(broker, persistFile); + } + + /** + * Sets the location of the persistent cache + */ + public void setPersistFile(File persistFile) { + this.persistFile = persistFile; + } + + public File getPersistFile() { + return persistFile; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java new file mode 100644 index 0000000000..77c14b7296 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test case for https://issues.apache.org/jira/browse/AMQ-3004 + */ + +public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class); + protected Connection connection; + protected int total = 3000; + protected String messageSelector; + + public void testVirtualTopicDisconnect() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + final ConsumerBean messageList = new ConsumerBean(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination producerDestination = getProducerDestination(); + Destination destination = getConsumerDsetination(); + + LOG.info("Sending to: " + producerDestination); + LOG.info("Consuming from: " + destination ); + + MessageConsumer consumer = session.createConsumer(destination, messageSelector); + + MessageListener listener = new MessageListener(){ + public void onMessage(Message message){ + messageList.onMessage(message); + try { + message.acknowledge(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }; + + consumer.setMessageListener(listener); + + + // create topic producer + MessageProducer producer = session.createProducer(producerDestination); + assertNotNull(producer); + + int disconnectCount = total/3; + int reconnectCount = (total * 2)/3; + + for (int i = 0; i < total; i++) { + producer.send(createMessage(session, i)); + + if (i==disconnectCount){ + consumer.close(); + } + if (i==reconnectCount){ + consumer = session.createConsumer(destination, messageSelector); + consumer.setMessageListener(listener); + } + } + + assertMessagesArrived(messageList,total/2,10000); + } + + protected Destination getConsumerDsetination() { + return new ActiveMQQueue("Consumer.VirtualTopic.TEST"); + } + + + protected Destination getProducerDestination() { + return new ActiveMQTopic("VirtualTopic.TEST"); + } + + protected void setUp() throws Exception { + super.setUp(); + messageSelector = "odd = 'no'"; + } + + protected TextMessage createMessage(Session session, int i) throws JMSException { + TextMessage textMessage = session.createTextMessage("message: " + i); + if (i % 2 != 0) { + textMessage.setStringProperty("odd", "yes"); + } else { + textMessage.setStringProperty("odd", "no"); + } + textMessage.setIntProperty("i", i); + return textMessage; + } + + + + protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) { + messageList.assertMessagesArrived(expected,timeout); + + messageList.flushMessages(); + + + LOG.info("validate no other messages on queues"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination1 = getConsumerDsetination(); + + MessageConsumer c1 = session.createConsumer(destination1, null); + c1.setMessageListener(messageList); + + + LOG.info("send one simple message that should go to both consumers"); + MessageProducer producer = session.createProducer(getProducerDestination()); + assertNotNull(producer); + + producer.send(session.createTextMessage("Last Message")); + + messageList.assertMessagesArrived(1); + + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpeced ex while waiting for last messages: " + e); + } + } + + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/disconnected-selector.xml"; + } + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + return answer; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java b/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java index 2b0de0a2b5..74f8d78a83 100755 --- a/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java +++ b/activemq-core/src/test/java/org/apache/activemq/spring/ConsumerBean.java @@ -92,17 +92,21 @@ public class ConsumerBean extends Assert implements MessageListener { * * @param messageCount */ - public void waitForMessagesToArrive(int messageCount) { + + public void waitForMessagesToArrive(int messageCount){ + waitForMessagesToArrive(messageCount,120 * 1000); + } + public void waitForMessagesToArrive(int messageCount,long maxWaitTime) { long maxRemainingMessageCount = Math.max(0, messageCount - messages.size()); LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive"); long start = System.currentTimeMillis(); - long maxWaitTime = start + 120 * 1000; + long endTime = start + maxWaitTime; while (maxRemainingMessageCount > 0) { try { synchronized (messages) { messages.wait(1000); } - if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > maxWaitTime) { + if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { break; } } catch (InterruptedException e) { @@ -123,6 +127,15 @@ public class ConsumerBean extends Assert implements MessageListener { } } + public void assertMessagesArrived(int total, long maxWaitTime) { + waitForMessagesToArrive(total,maxWaitTime); + synchronized (messages) { + int count = messages.size(); + + assertEquals("Messages received", total, count); + } + } + public boolean isVerbose() { return verbose; } diff --git a/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml b/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml new file mode 100644 index 0000000000..3dfb4a59ed --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/broker/virtual/disconnected-selector.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + +