AMQ-5920 - use implicit broker transaction for virtual topic fanout and add concurrentSend=true option to use an executor to fanout. Combination gives a 3x reduction in fanout roundtrip for small persistent messages to 100 consumer queues

This commit is contained in:
gtully 2015-08-06 11:55:45 +01:00
parent d8c0ff1417
commit 340728f2d1
6 changed files with 224 additions and 22 deletions

View File

@ -40,8 +40,8 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>();
private SubQueueSelectorCacheBroker selectorCachePlugin;
public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
super(next, prefix, postfix, local);
public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next, virtualTopic);
}
/**
@ -49,18 +49,7 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
* the virtual queues, hence there is no build up of unmatched messages on these destinations
*/
@Override
protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception {
Broker broker = context.getConnectionContext().getBroker();
Set<Destination> destinations = broker.getDestinations(destination);
for (Destination dest : destinations) {
if (matchesSomeConsumer(broker, message, dest)) {
dest.send(context, message.copy());
}
}
}
private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException {
protected boolean shouldDispatch(final Broker broker, Message message, Destination dest) throws IOException {
boolean matches = false;
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(dest.getActiveMQDestination());

View File

@ -42,6 +42,7 @@ public class VirtualTopic implements VirtualDestination {
private String name = ">";
private boolean selectorAware = false;
private boolean local = false;
private boolean concurrentSend = false;
@Override
public ActiveMQDestination getVirtualDestination() {
@ -50,8 +51,8 @@ public class VirtualTopic implements VirtualDestination {
@Override
public Destination intercept(Destination destination) {
return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor(
destination, getPrefix(), getPostfix(), isLocal());
return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, this) :
new VirtualTopicInterceptor(destination, this);
}
@Override
@ -168,4 +169,16 @@ public class VirtualTopic implements VirtualDestination {
append(postfix).append(',').append(selectorAware).
append(',').append(local).toString();
}
public boolean isConcurrentSend() {
return concurrentSend;
}
/**
* When true, dispatch to matching destinations in parallel (in multiple threads)
* @param concurrentSend
*/
public void setConcurrentSend(boolean concurrentSend) {
this.concurrentSend = concurrentSend;
}
}

View File

@ -16,12 +16,21 @@
*/
package org.apache.activemq.broker.region.virtual;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache;
@ -33,13 +42,15 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private final String prefix;
private final String postfix;
private final boolean local;
private final boolean concurrentSend;
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next);
this.prefix = prefix;
this.postfix = postfix;
this.local = local;
this.prefix = virtualTopic.getPrefix();
this.postfix = virtualTopic.getPostfix();
this.local = virtualTopic.isLocal();
this.concurrentSend = virtualTopic.isConcurrentSend();
}
public Topic getTopic() {
@ -55,6 +66,82 @@ public class VirtualTopicInterceptor extends DestinationFilter {
super.send(context, message);
}
@Override
protected void send(final ProducerBrokerExchange context, final Message message, ActiveMQDestination destination) throws Exception {
final Broker broker = context.getConnectionContext().getBroker();
final Set<Destination> destinations = broker.getDestinations(destination);
final int numDestinations = destinations.size();
final LocalTransactionId localBrokerTransactionToCoalesceJournalSync =
beginLocalTransaction(numDestinations, context.getConnectionContext(), message);
try {
if (concurrentSend && numDestinations > 1) {
final CountDownLatch concurrent = new CountDownLatch(destinations.size());
final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
final BrokerService brokerService = broker.getBrokerService();
for (final Destination dest : destinations) {
if (shouldDispatch(broker, message, dest)) {
brokerService.getTaskRunnerFactory().execute(new Runnable() {
@Override
public void run() {
try {
if (exceptionAtomicReference.get() == null) {
dest.send(context, message.copy());
}
} catch (Exception e) {
exceptionAtomicReference.set(e);
} finally {
concurrent.countDown();
}
}
});
} else {
concurrent.countDown();
}
}
concurrent.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
}
} else {
for (final Destination dest : destinations) {
if (shouldDispatch(broker, message, dest)) {
dest.send(context, message.copy());
}
}
}
} finally {
commit(localBrokerTransactionToCoalesceJournalSync, context.getConnectionContext(), message);
}
}
private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
LocalTransactionId result = null;
if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
connectionContext.getBroker().beginTransaction(connectionContext, result);
connectionContext.setTransaction(connectionContext.getTransactions().get(result));
message.setTransactionId(result);
}
return result;
}
private void commit(LocalTransactionId tx, ConnectionContext connectionContext, Message message) throws Exception {
if (tx != null) {
connectionContext.getBroker().commitTransaction(connectionContext, tx, true);
connectionContext.getTransactions().remove(tx);
connectionContext.setTransaction(null);
message.setTransactionId(null);
}
}
protected boolean shouldDispatch(Broker broker, Message message, Destination dest) throws IOException {
return true;
}
protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
ActiveMQQueue queue;
synchronized (cache) {

View File

@ -74,7 +74,9 @@ public abstract class Transaction {
}
public void addSynchronization(Synchronization r) {
synchronizations.add(r);
synchronized (synchronizations) {
synchronizations.add(r);
}
if (state == START_STATE) {
state = IN_USE_STATE;
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.plugin.SubQueueSelectorCacheBroker;
import org.apache.activemq.spring.ConsumerBean;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.slf4j.Logger;
@ -107,7 +108,7 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
assertMessagesArrived(messageList, expected ,10000);
}
protected Destination getConsumerDsetination() {
protected ActiveMQQueue getConsumerDsetination() {
return new ActiveMQQueue("Consumer.VirtualTopic.TEST");
}
@ -182,4 +183,11 @@ public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSuppor
return answer;
}
protected void startBroker() throws Exception {
super.startBroker();
// start with a clean slate
SubQueueSelectorCacheBroker selectorCacheBroker = (SubQueueSelectorCacheBroker) broker.getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
selectorCacheBroker.deleteAllSelectorsForDestination(getConsumerDsetination().getQualifiedName());
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VirtualTopicFanoutPerfTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicFanoutPerfTest.class);
int numConsumers = 100;
int total = 500;
BrokerService brokerService;
ConnectionFactory connectionFactory;
@Before
public void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
brokerService.start();
for (DestinationInterceptor destinationInterceptor : brokerService.getDestinationInterceptors()) {
for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
if (virtualDestination instanceof VirtualTopic) {
((VirtualTopic) virtualDestination).setConcurrentSend(true);
}
}
}
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("vm://localhost");
ActiveMQPrefetchPolicy zeroPrefetch = new ActiveMQPrefetchPolicy();
zeroPrefetch.setAll(0);
activeMQConnectionFactory.setPrefetchPolicy(zeroPrefetch);
connectionFactory = activeMQConnectionFactory;
}
@After
public void stopBroker() throws Exception {
brokerService.stop();
}
@Test
@Ignore("comparison test - concurrentSend=true virtual topic, use transaction")
public void testFanoutDuration() throws Exception {
Session session = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
for (int i=0; i<numConsumers; i++) {
session.createConsumer(new ActiveMQQueue("Consumer." + i + ".VirtualTopic.TEST"));
}
// create topic producer
Session producerSession = createStartAndTrackConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(new ActiveMQTopic("VirtualTopic.TEST"));
long start = System.currentTimeMillis();
LOG.info("Starting producer: " + start);
for (int i = 0; i < total; i++) {
producer.send(producerSession.createTextMessage("message: " + i));
}
LOG.info("Done producer, duration: " + (System.currentTimeMillis() - start) );
}
private Connection createStartAndTrackConnection() throws Exception {
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
}
}