git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1326298 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2012-04-15 10:36:24 +00:00
parent 50d3e8e09a
commit 1b9f5f6727
7 changed files with 536 additions and 12 deletions

View File

@ -16,20 +16,29 @@
*/
package org.apache.activemq.broker.region.virtual;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class);
LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
private SubQueueSelectorCacheBroker selectorCachePlugin;
public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next, prefix, postfix, local);
@ -45,13 +54,13 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
Set<Destination> destinations = broker.getDestinations(destination);
for (Destination dest : destinations) {
if (matchesSomeConsumer(message, dest)) {
if (matchesSomeConsumer(broker, message, dest)) {
dest.send(context, message.copy());
}
}
}
private boolean matchesSomeConsumer(Message message, Destination dest) throws IOException {
private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
boolean matches = false;
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(dest.getActiveMQDestination());
@ -61,8 +70,65 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
if (sub.matches(message, msgContext)) {
matches = true;
break;
}
}
if (matches == false && subs.size() == 0) {
matches = tryMatchingCachedSubs(broker, dest, msgContext);
}
return matches;
}
private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) {
boolean matches = false;
LOG.debug("No active consumer match found. Will try cache if configured...");
//retrieve the specific plugin class and lookup the selector for the destination.
final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker);
if (cache != null) {
final String selector = cache.getSelector(dest.getActiveMQDestination().getQualifiedName());
if (selector != null) {
try {
final BooleanExpression expression = getExpression(selector);
matches = expression.matches(msgContext);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
}
return matches;
}
private BooleanExpression getExpression(String selector) throws Exception{
BooleanExpression result;
synchronized(expressionCache){
result = expressionCache.get(selector);
if (result == null){
result = compileSelector(selector);
expressionCache.put(selector,result);
}
}
return result;
}
/**
* @return The SubQueueSelectorCacheBroker instance or null if no such broker is available.
*/
private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) {
if (selectorCachePlugin == null) {
selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class);
} //if
return selectorCachePlugin;
}
/**
* Pre-compile the JMS selector.
*
* @param selectorExpression The non-null JMS selector expression.
*/
private BooleanExpression compileSelector(final String selectorExpression) throws Exception {
return SelectorParser.parse(selectorExpression);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache;
/**
* A Destination which implements <a
@ -34,6 +35,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private String prefix;
private String postfix;
private boolean local;
private LRUCache<ActiveMQDestination,ActiveMQQueue> cache = new LRUCache<ActiveMQDestination,ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next);
@ -51,6 +53,14 @@ public class VirtualTopicInterceptor extends DestinationFilter {
}
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
return new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
ActiveMQQueue queue;
synchronized(cache){
queue = cache.get(original);
if (queue==null){
queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
cache.put(original,queue);
}
}
return queue;
}
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A plugin which allows the caching of the selector from a subscription queue.
* <p/>
* This stops the build-up of unwanted messages, especially when consumers may
* disconnect from time to time when using virtual destinations.
* <p/>
* This is influenced by code snippets developed by Maciej Rakowicz
*
* @author Roelof Naude roelof(dot)naude(at)gmail.com
* @see https://issues.apache.org/activemq/browse/AMQ-3004
* @see http://mail-archives.apache.org/mod_mbox/activemq-users/201011.mbox/%3C8A013711-2613-450A-A487-379E784AF1D6@homeaway.co.uk%3E
*/
public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(SubQueueSelectorCacheBroker.class);
/**
* The subscription's selector cache. We cache compiled expressions keyed
* by the target destination.
*/
private ConcurrentHashMap<String, String> subSelectorCache = new ConcurrentHashMap<String, String>();
private final File persistFile;
private boolean running = true;
private Thread persistThread;
private static final long MAX_PERSIST_INTERVAL = 600000;
private static final String SELECTOR_CACHE_PERSIST_THREAD_NAME = "SelectorCachePersistThread";
/**
* Constructor
*/
public SubQueueSelectorCacheBroker(Broker next, final File persistFile) {
super(next);
this.persistFile = persistFile;
LOG.info("Using persisted selector cache from[" + persistFile + "]");
readCache();
persistThread = new Thread(this, SELECTOR_CACHE_PERSIST_THREAD_NAME);
persistThread.start();
}
@Override
public void stop() throws Exception {
running = false;
if (persistThread != null) {
persistThread.interrupt();
persistThread.join();
} //if
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Caching consumer selector [" + info.getSelector() + "] on a " + info.getDestination().getQualifiedName());
if (info.getSelector() != null) {
subSelectorCache.put(info.getDestination().getQualifiedName(), info.getSelector());
} //if
return super.addConsumer(context, info);
}
private void readCache() {
if (persistFile != null && persistFile.exists()) {
try {
FileInputStream fis = new FileInputStream(persistFile);
try {
ObjectInputStream in = new ObjectInputStream(fis);
try {
subSelectorCache = (ConcurrentHashMap<String, String>) in.readObject();
} catch (ClassNotFoundException ex) {
LOG.error("Invalid selector cache data found. Please remove file.", ex);
} finally {
in.close();
} //try
} finally {
fis.close();
} //try
} catch (IOException ex) {
LOG.error("Unable to read persisted selector cache...it will be ignored!", ex);
} //try
} //if
}
/**
* Persist the selector cache.
*/
private void persistCache() {
LOG.debug("Persisting selector cache....");
try {
FileOutputStream fos = new FileOutputStream(persistFile);
try {
ObjectOutputStream out = new ObjectOutputStream(fos);
try {
out.writeObject(subSelectorCache);
} finally {
out.flush();
out.close();
} //try
} catch (IOException ex) {
LOG.error("Unable to persist selector cache", ex);
} finally {
fos.close();
} //try
} catch (IOException ex) {
LOG.error("Unable to access file[" + persistFile + "]", ex);
} //try
}
/**
* @return The JMS selector for the specified {@code destination}
*/
public String getSelector(final String destination) {
return subSelectorCache.get(destination);
}
/**
* Persist the selector cache every {@code MAX_PERSIST_INTERVAL}ms.
*
* @see java.lang.Runnable#run()
*/
public void run() {
while (running) {
try {
Thread.sleep(MAX_PERSIST_INTERVAL);
} catch (InterruptedException ex) {
} //try
persistCache();
}
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.plugin;
import java.io.File;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
/**
* A plugin which allows the caching of the selector from a subscription queue.
* <p/>
* This stops the build-up of unwanted messages, especially when consumers may
* disconnect from time to time when using virtual destinations.
* <p/>
* This is influenced by code snippets developed by Maciej Rakowicz
*
* @author Roelof Naude roelof(dot)naude(at)gmail.com
*@org.apache.xbean.XBean element="virtualSelectorCacheBrokerPlugin"
*/
public class SubQueueSelectorCacheBrokerPlugin implements BrokerPlugin {
private File persistFile;
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new SubQueueSelectorCacheBroker(broker, persistFile);
}
/**
* Sets the location of the persistent cache
*/
public void setPersistFile(File persistFile) {
this.persistFile = persistFile;
}
public File getPersistFile() {
return persistFile;
}
}

View File

@ -0,0 +1,172 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test case for https://issues.apache.org/jira/browse/AMQ-3004
*/
public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class);
protected Connection connection;
protected int total = 3000;
protected String messageSelector;
public void testVirtualTopicDisconnect() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
final ConsumerBean messageList = new ConsumerBean();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination producerDestination = getProducerDestination();
Destination destination = getConsumerDsetination();
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination );
MessageConsumer consumer = session.createConsumer(destination, messageSelector);
MessageListener listener = new MessageListener(){
public void onMessage(Message message){
messageList.onMessage(message);
try {
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
};
consumer.setMessageListener(listener);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
int disconnectCount = total/3;
int reconnectCount = (total * 2)/3;
for (int i = 0; i < total; i++) {
producer.send(createMessage(session, i));
if (i==disconnectCount){
consumer.close();
}
if (i==reconnectCount){
consumer = session.createConsumer(destination, messageSelector);
consumer.setMessageListener(listener);
}
}
assertMessagesArrived(messageList,total/2,10000);
}
protected Destination getConsumerDsetination() {
return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
}
protected Destination getProducerDestination() {
return new ActiveMQTopic("VirtualTopic.TEST");
}
protected void setUp() throws Exception {
super.setUp();
messageSelector = "odd = 'no'";
}
protected TextMessage createMessage(Session session, int i) throws JMSException {
TextMessage textMessage = session.createTextMessage("message: " + i);
if (i % 2 != 0) {
textMessage.setStringProperty("odd", "yes");
} else {
textMessage.setStringProperty("odd", "no");
}
textMessage.setIntProperty("i", i);
return textMessage;
}
protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) {
messageList.assertMessagesArrived(expected,timeout);
messageList.flushMessages();
LOG.info("validate no other messages on queues");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination1 = getConsumerDsetination();
MessageConsumer c1 = session.createConsumer(destination1, null);
c1.setMessageListener(messageList);
LOG.info("send one simple message that should go to both consumers");
MessageProducer producer = session.createProducer(getProducerDestination());
assertNotNull(producer);
producer.send(session.createTextMessage("Last Message"));
messageList.assertMessagesArrived(1);
} catch (JMSException e) {
e.printStackTrace();
fail("unexpeced ex while waiting for last messages: " + e);
}
}
protected String getBrokerConfigUri() {
return "org/apache/activemq/broker/virtual/disconnected-selector.xml";
}
protected BrokerService createBroker() throws Exception {
XBeanBrokerFactory factory = new XBeanBrokerFactory();
BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri()));
return answer;
}
}

View File

@ -92,17 +92,21 @@ public class ConsumerBean extends Assert implements MessageListener {
*
* @param messageCount
*/
public void waitForMessagesToArrive(int messageCount){
waitForMessagesToArrive(messageCount,120 * 1000);
}
public void waitForMessagesToArrive(int messageCount,long maxWaitTime) {
long maxRemainingMessageCount = Math.max(0, messageCount - messages.size());
LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive");
long start = System.currentTimeMillis();
long maxWaitTime = start + 120 * 1000;
long endTime = start + maxWaitTime;
while (maxRemainingMessageCount > 0) {
try {
synchronized (messages) {
messages.wait(1000);
}
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > maxWaitTime) {
if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) {
break;
}
} catch (InterruptedException e) {
@ -123,6 +127,15 @@ public class ConsumerBean extends Assert implements MessageListener {
}
}
public void assertMessagesArrived(int total, long maxWaitTime) {
waitForMessagesToArrive(total,maxWaitTime);
synchronized (messages) {
int count = messages.size();
assertEquals("Messages received", total, count);
}
}
public boolean isVerbose() {
return verbose;
}

View File

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" />
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
<plugins>
<virtualSelectorCacheBrokerPlugin persistFile = "selectorcache.data"/>
</plugins>
</broker>
</beans>
<!-- END SNIPPET: xbean -->