From f88f2803ac0a2a984137caa86ca9ef2b1199fbe9 Mon Sep 17 00:00:00 2001 From: Hiram Chirino Date: Mon, 7 Oct 2013 12:40:28 -0400 Subject: [PATCH] Implementing AMQ-4788 - Lets give preference to consumers vs producers. --- .../activemq/partition/ConnectionProxy.java | 138 ------------------ .../activemq/partition/PartitionBroker.java | 118 +++++++++------ .../partition/PartitionBrokerTest.java | 38 +++-- 3 files changed, 99 insertions(+), 195 deletions(-) delete mode 100644 activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java deleted file mode 100644 index cab6eb654c..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Connection; -import org.apache.activemq.broker.Connector; -import org.apache.activemq.broker.region.ConnectionStatistics; -import org.apache.activemq.command.Command; -import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.command.Response; - -import java.io.IOException; - -/** - * A Connection implementation that proxies all Connection invocation to - * a delegate connection. - */ -public class ConnectionProxy implements Connection { - final Connection next; - - public ConnectionProxy(Connection next) { - this.next = next; - } - - @Override - public void dispatchAsync(Command command) { - next.dispatchAsync(command); - } - - @Override - public void dispatchSync(Command message) { - next.dispatchSync(message); - } - - @Override - public String getConnectionId() { - return next.getConnectionId(); - } - - @Override - public Connector getConnector() { - return next.getConnector(); - } - - @Override - public int getDispatchQueueSize() { - return next.getDispatchQueueSize(); - } - - @Override - public String getRemoteAddress() { - return next.getRemoteAddress(); - } - - @Override - public ConnectionStatistics getStatistics() { - return next.getStatistics(); - } - - @Override - public boolean isActive() { - return next.isActive(); - } - - @Override - public boolean isBlocked() { - return next.isBlocked(); - } - - @Override - public boolean isConnected() { - return next.isConnected(); - } - - @Override - public boolean isFaultTolerantConnection() { - return next.isFaultTolerantConnection(); - } - - @Override - public boolean isManageable() { - return next.isManageable(); - } - - @Override - public boolean isNetworkConnection() { - return next.isNetworkConnection(); - } - - @Override - public boolean isSlow() { - return next.isSlow(); - } - - @Override - public Response service(Command command) { - return next.service(command); - } - - @Override - public void serviceException(Throwable error) { - next.serviceException(error); - } - - @Override - public void serviceExceptionAsync(IOException e) { - next.serviceExceptionAsync(e); - } - - @Override - public void start() throws Exception { - next.start(); - } - - @Override - public void stop() throws Exception { - next.stop(); - } - - @Override - public void updateClient(ConnectionControl control) { - next.updateClient(control); - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java index 1a8e78b3f6..51902072b0 100644 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java @@ -20,6 +20,8 @@ import org.apache.activemq.broker.*; import org.apache.activemq.command.*; import org.apache.activemq.partition.dto.Partitioning; import org.apache.activemq.partition.dto.Target; +import org.apache.activemq.state.ConsumerState; +import org.apache.activemq.state.SessionState; import org.apache.activemq.transport.Transport; import org.apache.activemq.util.LRUCache; import org.slf4j.Logger; @@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; -import java.util.HashSet; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -132,7 +133,7 @@ public class PartitionBroker extends BrokerFilter { } LOG.info("Redirecting connection to: " + connectionString); - TransportConnection connection = (TransportConnection)monitor.next; + TransportConnection connection = (TransportConnection)monitor.context.getConnection(); ConnectionControl cc = new ConnectionControl(); cc.setConnectedBrokers(connectionString); cc.setRebalanceConnection(true); @@ -155,6 +156,10 @@ public class PartitionBroker extends BrokerFilter { return rc.toString(); } + static private class Score { + int value; + } + protected Target pickBestBroker(ConnectionMonitor monitor) { if( getConfig() ==null ) @@ -201,22 +206,62 @@ public class PartitionBroker extends BrokerFilter { || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) ) { - ActiveMQDestination best = monitor.findMostActiveDestination(plugin); - if( best!=null ) { - if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty() && best.isQueue() ) { - Target targetDTO = getConfig().byQueue.get(best.getPhysicalName()); - if( targetDTO!=null ) { - return targetDTO; - } - } - - if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty() && best.isTopic() ) { - Target targetDTO = getConfig().byTopic.get(best.getPhysicalName()); - if( targetDTO!=null ) { - return targetDTO; + // Collect the destinations the connection is consuming from... + HashSet dests = new HashSet(); + for (SessionState session : monitor.context.getConnectionState().getSessionStates()) { + for (ConsumerState consumer : session.getConsumerStates()) { + ActiveMQDestination destination = consumer.getInfo().getDestination(); + if( destination.isComposite() ) { + dests.addAll(Arrays.asList(destination.getCompositeDestinations())); + } else { + dests.addAll(Collections.singletonList(destination)); } } } + + // Group them by the partitioning target for the destinations and score them.. + HashMap targetScores = new HashMap(); + for (ActiveMQDestination dest : dests) { + Target target = getTarget(dest); + if( target!=null ) { + Score score = targetScores.get(target); + if( score == null ) { + score = new Score(); + targetScores.put(target, score); + } + score.value++; + } + } + + // The target with largest score wins.. + if( !targetScores.isEmpty() ) { + Target bestTarget = null; + int bestScore=0; + for (Map.Entry entry : targetScores.entrySet()) { + if( entry.getValue().value > bestScore ) { + bestTarget = entry.getKey(); + } + } + return bestTarget; + } + + // If we get here is because there were no consumers, or the destinations for those + // consumers did not have an assigned destination.. So partition based on producer + // usage. + Target best = monitor.findBestProducerTarget(this); + if( best!=null ) { + return best; + } + } + return null; + } + + protected Target getTarget(ActiveMQDestination dest) { + Partitioning config = getConfig(); + if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) { + return config.byQueue.get(dest.getPhysicalName()); + } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) { + return config.byTopic.get(dest.getPhysicalName()); } return null; } @@ -226,7 +271,6 @@ public class PartitionBroker extends BrokerFilter { @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { ConnectionMonitor monitor = new ConnectionMonitor(context); - context.setConnection(monitor); monitors.put(info.getConnectionId(), monitor); super.addConnection(context, info); checkTarget(monitor); @@ -236,9 +280,6 @@ public class PartitionBroker extends BrokerFilter { public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); ConnectionMonitor removed = monitors.remove(info.getConnectionId()); - if( removed!=null ) { - context.setConnection(removed.next); - } } @Override @@ -259,28 +300,30 @@ public class PartitionBroker extends BrokerFilter { long bytes; } - static class ConnectionMonitor extends ConnectionProxy { - final ConnectionContext context; + static class ConnectionMonitor { + final ConnectionContext context; LRUCache trafficPerDestination = new LRUCache(); - ConnectionMonitor(ConnectionContext context) { - super(context.getConnection()); + public ConnectionMonitor(ConnectionContext context) { this.context = context; } - synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin plugin) { - ActiveMQDestination best = null; + synchronized public Target findBestProducerTarget(PartitionBroker broker) { + Target best = null; long bestSize = 0 ; for (Map.Entry entry : trafficPerDestination.entrySet()) { Traffic t = entry.getValue(); // Once we get enough messages... - if( t.messages < plugin.getMinTransferCount()) { + if( t.messages < broker.plugin.getMinTransferCount()) { continue; } if( t.bytes > bestSize) { bestSize = t.bytes; - best = entry.getKey(); + Target target = broker.getTarget(entry.getKey()); + if( target!=null ) { + best = target; + } } } return best; @@ -298,25 +341,6 @@ public class PartitionBroker extends BrokerFilter { } - @Override - public void dispatchAsync(Command command) { - if (command.getClass() == MessageDispatch.class) { - MessageDispatch md = (MessageDispatch) command; - Message message = md.getMessage(); - synchronized (this) { - ActiveMQDestination dest = md.getDestination(); - Traffic traffic = trafficPerDestination.get(dest); - if( traffic == null ) { - traffic = new Traffic(); - trafficPerDestination.put(dest, traffic); - } - traffic.messages += 1; - traffic.bytes += message.getSize(); - } - } - super.dispatchAsync(command); - } - } } diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java index 9b7450c689..efb5e661a4 100644 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java +++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java @@ -24,10 +24,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.partition.dto.Partitioning; import org.apache.activemq.partition.dto.Target; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; +import javax.jms.*; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; @@ -81,7 +78,7 @@ public class PartitionBrokerTest extends AutoFailTestSupport { partitioning.byQueue.put("foo", new Target("broker1")); createBrokerCluster(2); - Connection connection = createConnectionTo("broker2"); + Connection connection2 = createConnectionTo("broker2"); within(5, TimeUnit.SECONDS, new Task() { public void run() throws Exception { @@ -90,11 +87,8 @@ public class PartitionBrokerTest extends AutoFailTestSupport { } }); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue("foo")); - for (int i = 0; i < 100; i++) { - producer.send(session.createTextMessage("#"+i)); - } + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo")); within(5, TimeUnit.SECONDS, new Task() { public void run() throws Exception { @@ -102,7 +96,31 @@ public class PartitionBrokerTest extends AutoFailTestSupport { assertEquals(0, getTransportConnector("broker2").getConnections().size()); } }); + + Connection connection1 = createConnectionTo("broker2"); + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session1.createProducer(session1.createQueue("foo")); + + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(1, getTransportConnector("broker1").getConnections().size()); + assertEquals(1, getTransportConnector("broker2").getConnections().size()); + } + }); + + for (int i = 0; i < 100; i++) { + producer.send(session1.createTextMessage("#" + i)); + } + + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(2, getTransportConnector("broker1").getConnections().size()); + assertEquals(0, getTransportConnector("broker2").getConnections().size()); + } + }); } + + static interface Task { public void run() throws Exception; }