mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5077 - provide concurrentSend option to composite destinations such that fanout can occur in parallel, resulting in write batching. little perf test that shows a large improvement in send rate w/o concurrentStoreAndDispatch
This commit is contained in:
parent
44bb9fbeae
commit
08bb172f3c
|
@ -33,9 +33,10 @@ public abstract class CompositeDestination implements VirtualDestination {
|
|||
private Collection forwardTo;
|
||||
private boolean forwardOnly = true;
|
||||
private boolean copyMessage = true;
|
||||
private boolean concurrentSend = false;
|
||||
|
||||
public Destination intercept(Destination destination) {
|
||||
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
|
||||
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage(), isConcurrentSend());
|
||||
}
|
||||
|
||||
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) {
|
||||
|
@ -92,4 +93,15 @@ public abstract class CompositeDestination implements VirtualDestination {
|
|||
this.copyMessage = copyMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* when true, sends are done in parallel with the broker executor
|
||||
*/
|
||||
public void setConcurrentSend(boolean concurrentSend) {
|
||||
this.concurrentSend = concurrentSend;
|
||||
}
|
||||
|
||||
public boolean isConcurrentSend() {
|
||||
return this.concurrentSend;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,8 +18,12 @@ package org.apache.activemq.broker.region.virtual;
|
|||
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationFilter;
|
||||
|
@ -39,17 +43,20 @@ public class CompositeDestinationFilter extends DestinationFilter {
|
|||
private Collection forwardDestinations;
|
||||
private boolean forwardOnly;
|
||||
private boolean copyMessage;
|
||||
private boolean concurrentSend = false;
|
||||
|
||||
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage) {
|
||||
public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean copyMessage, boolean concurrentSend) {
|
||||
super(next);
|
||||
this.forwardDestinations = forwardDestinations;
|
||||
this.forwardOnly = forwardOnly;
|
||||
this.copyMessage = copyMessage;
|
||||
this.concurrentSend = concurrentSend;
|
||||
}
|
||||
|
||||
public void send(ProducerBrokerExchange context, Message message) throws Exception {
|
||||
public void send(final ProducerBrokerExchange context, final Message message) throws Exception {
|
||||
MessageEvaluationContext messageContext = null;
|
||||
|
||||
Collection<ActiveMQDestination> matchingDestinations = new LinkedList<ActiveMQDestination>();
|
||||
for (Iterator iter = forwardDestinations.iterator(); iter.hasNext();) {
|
||||
ActiveMQDestination destination = null;
|
||||
Object value = iter.next();
|
||||
|
@ -70,23 +77,53 @@ public class CompositeDestinationFilter extends DestinationFilter {
|
|||
if (destination == null) {
|
||||
continue;
|
||||
}
|
||||
matchingDestinations.add(destination);
|
||||
}
|
||||
|
||||
Message forwarded_message;
|
||||
if (copyMessage) {
|
||||
forwarded_message = message.copy();
|
||||
forwarded_message.setDestination(destination);
|
||||
final CountDownLatch concurrent = new CountDownLatch(concurrentSend ? matchingDestinations.size() : 0);
|
||||
final AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
|
||||
final BrokerService brokerService = context.getConnectionContext().getBroker().getBrokerService();
|
||||
for (final ActiveMQDestination destination : matchingDestinations) {
|
||||
if (concurrent.getCount() > 0) {
|
||||
brokerService.getTaskRunnerFactory().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (exceptionAtomicReference.get() == null) {
|
||||
doForward(context.copy(), message, brokerService.getRegionBroker(), destination);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
exceptionAtomicReference.set(e);
|
||||
} finally {
|
||||
concurrent.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
doForward(context, message, brokerService.getRegionBroker(), destination);
|
||||
}
|
||||
else {
|
||||
forwarded_message = message;
|
||||
}
|
||||
|
||||
// Send it back through the region broker for routing.
|
||||
context.setMutable(true);
|
||||
Broker regionBroker = context.getConnectionContext().getBroker().getBrokerService().getRegionBroker();
|
||||
regionBroker.send(context, forwarded_message);
|
||||
}
|
||||
if (!forwardOnly) {
|
||||
super.send(context, message);
|
||||
}
|
||||
concurrent.await();
|
||||
if (exceptionAtomicReference.get() != null) {
|
||||
throw exceptionAtomicReference.get();
|
||||
}
|
||||
}
|
||||
|
||||
private void doForward(ProducerBrokerExchange context, Message message, Broker regionBroker, ActiveMQDestination destination) throws Exception {
|
||||
Message forwarded_message;
|
||||
if (copyMessage) {
|
||||
forwarded_message = message.copy();
|
||||
forwarded_message.setDestination(destination);
|
||||
}
|
||||
else {
|
||||
forwarded_message = message;
|
||||
}
|
||||
|
||||
// Send it back through the region broker for routing.
|
||||
context.setMutable(true);
|
||||
regionBroker.send(context, forwarded_message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.virtual;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.DestinationInterceptor;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.virtual.CompositeTopic;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestination;
|
||||
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
|
||||
import org.apache.activemq.command.ActiveMQBytesMessage;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
||||
public class VirtualDestPerfTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class);
|
||||
ActiveMQTopic target = new ActiveMQTopic("target");
|
||||
BrokerService brokerService;
|
||||
ActiveMQConnectionFactory connectionFactory;
|
||||
|
||||
@Test
|
||||
@Ignore("comparison test - takes too long and really needs a peek at the graph")
|
||||
public void testPerf() throws Exception {
|
||||
LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>();
|
||||
LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<Integer, Long>();
|
||||
|
||||
for (int i=2;i<11;i++) {
|
||||
for (Boolean concurrent : new Boolean[]{true, false}) {
|
||||
startBroker(i, concurrent);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
produceMessages();
|
||||
long endTime = System.currentTimeMillis();
|
||||
long seconds = (endTime - startTime) / 1000;
|
||||
LOG.info("For routes {} duration {}", i, seconds);
|
||||
if (concurrent) {
|
||||
resultsT.put(i, seconds);
|
||||
} else {
|
||||
resultsF.put(i, seconds);
|
||||
}
|
||||
brokerService.stop();
|
||||
brokerService.waitUntilStopped();
|
||||
}
|
||||
}
|
||||
LOG.info("results T{} F{}", resultsT, resultsF);
|
||||
LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en"
|
||||
+ "&xaxis1=" + toStr(resultsT.keySet())
|
||||
+ "&yaxis1=" + toStr(resultsT.values())
|
||||
+ "&group1=concurrent"
|
||||
+ "&xaxis2=" + toStr(resultsF.keySet())
|
||||
+ "&yaxis2=" + toStr(resultsF.values())
|
||||
+ "&group2=serial"
|
||||
+ "&from=linejsp");
|
||||
}
|
||||
|
||||
private String toStr(Collection set) {
|
||||
return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", "");
|
||||
}
|
||||
|
||||
|
||||
protected void produceMessages() throws Exception {
|
||||
Connection connection = connectionFactory.createConnection();
|
||||
MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(target);
|
||||
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
|
||||
message.setContent(new ByteSequence(new byte[5*1024]));
|
||||
for (int i=0; i<10000; i++) {
|
||||
messageProducer.send(message);
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
||||
private void startBroker(int fanoutCount, boolean concurrentSend) throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||
brokerService.setUseVirtualTopics(true);
|
||||
brokerService.addConnector("tcp://0.0.0.0:0");
|
||||
brokerService.setAdvisorySupport(false);
|
||||
PolicyMap destPolicyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setExpireMessagesPeriod(0);
|
||||
destPolicyMap.setDefaultEntry(defaultEntry);
|
||||
brokerService.setDestinationPolicy(destPolicyMap);
|
||||
|
||||
CompositeTopic route = new CompositeTopic();
|
||||
route.setName("target");
|
||||
route.setForwardOnly(true);
|
||||
route.setConcurrentSend(concurrentSend);
|
||||
Collection<ActiveMQQueue> routes = new ArrayList<ActiveMQQueue>();
|
||||
for (int i=0; i<fanoutCount; i++) {
|
||||
routes.add(new ActiveMQQueue("route." + i));
|
||||
}
|
||||
route.setForwardTo(routes);
|
||||
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
|
||||
interceptor.setVirtualDestinations(new VirtualDestination[]{route});
|
||||
brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
|
||||
brokerService.start();
|
||||
|
||||
connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString());
|
||||
connectionFactory.setUseAsyncSend(false);
|
||||
if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) {
|
||||
|
||||
//with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default
|
||||
// will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce
|
||||
// order issues
|
||||
((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue