Implementing AMQ-4788 - Lets give preference to consumers vs producers.

This commit is contained in:
Hiram Chirino 2013-10-07 12:40:28 -04:00
parent 1981adf429
commit f88f2803ac
3 changed files with 99 additions and 195 deletions

View File

@ -1,138 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.Response;
import java.io.IOException;
/**
* A Connection implementation that proxies all Connection invocation to
* a delegate connection.
*/
public class ConnectionProxy implements Connection {
final Connection next;
public ConnectionProxy(Connection next) {
this.next = next;
}
@Override
public void dispatchAsync(Command command) {
next.dispatchAsync(command);
}
@Override
public void dispatchSync(Command message) {
next.dispatchSync(message);
}
@Override
public String getConnectionId() {
return next.getConnectionId();
}
@Override
public Connector getConnector() {
return next.getConnector();
}
@Override
public int getDispatchQueueSize() {
return next.getDispatchQueueSize();
}
@Override
public String getRemoteAddress() {
return next.getRemoteAddress();
}
@Override
public ConnectionStatistics getStatistics() {
return next.getStatistics();
}
@Override
public boolean isActive() {
return next.isActive();
}
@Override
public boolean isBlocked() {
return next.isBlocked();
}
@Override
public boolean isConnected() {
return next.isConnected();
}
@Override
public boolean isFaultTolerantConnection() {
return next.isFaultTolerantConnection();
}
@Override
public boolean isManageable() {
return next.isManageable();
}
@Override
public boolean isNetworkConnection() {
return next.isNetworkConnection();
}
@Override
public boolean isSlow() {
return next.isSlow();
}
@Override
public Response service(Command command) {
return next.service(command);
}
@Override
public void serviceException(Throwable error) {
next.serviceException(error);
}
@Override
public void serviceExceptionAsync(IOException e) {
next.serviceExceptionAsync(e);
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public void updateClient(ConnectionControl control) {
next.updateClient(control);
}
}

View File

@ -20,6 +20,8 @@ import org.apache.activemq.broker.*;
import org.apache.activemq.command.*; import org.apache.activemq.command.*;
import org.apache.activemq.partition.dto.Partitioning; import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target; import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.HashSet; import java.util.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -132,7 +133,7 @@ public class PartitionBroker extends BrokerFilter {
} }
LOG.info("Redirecting connection to: " + connectionString); LOG.info("Redirecting connection to: " + connectionString);
TransportConnection connection = (TransportConnection)monitor.next; TransportConnection connection = (TransportConnection)monitor.context.getConnection();
ConnectionControl cc = new ConnectionControl(); ConnectionControl cc = new ConnectionControl();
cc.setConnectedBrokers(connectionString); cc.setConnectedBrokers(connectionString);
cc.setRebalanceConnection(true); cc.setRebalanceConnection(true);
@ -155,6 +156,10 @@ public class PartitionBroker extends BrokerFilter {
return rc.toString(); return rc.toString();
} }
static private class Score {
int value;
}
protected Target pickBestBroker(ConnectionMonitor monitor) { protected Target pickBestBroker(ConnectionMonitor monitor) {
if( getConfig() ==null ) if( getConfig() ==null )
@ -201,23 +206,63 @@ public class PartitionBroker extends BrokerFilter {
|| (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
) { ) {
ActiveMQDestination best = monitor.findMostActiveDestination(plugin); // Collect the destinations the connection is consuming from...
if( best!=null ) { HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty() && best.isQueue() ) { for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
Target targetDTO = getConfig().byQueue.get(best.getPhysicalName()); for (ConsumerState consumer : session.getConsumerStates()) {
if( targetDTO!=null ) { ActiveMQDestination destination = consumer.getInfo().getDestination();
return targetDTO; if( destination.isComposite() ) {
dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
} else {
dests.addAll(Collections.singletonList(destination));
}
} }
} }
if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty() && best.isTopic() ) { // Group them by the partitioning target for the destinations and score them..
Target targetDTO = getConfig().byTopic.get(best.getPhysicalName()); HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
if( targetDTO!=null ) { for (ActiveMQDestination dest : dests) {
return targetDTO; Target target = getTarget(dest);
if( target!=null ) {
Score score = targetScores.get(target);
if( score == null ) {
score = new Score();
targetScores.put(target, score);
}
score.value++;
} }
} }
// The target with largest score wins..
if( !targetScores.isEmpty() ) {
Target bestTarget = null;
int bestScore=0;
for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
if( entry.getValue().value > bestScore ) {
bestTarget = entry.getKey();
} }
} }
return bestTarget;
}
// If we get here is because there were no consumers, or the destinations for those
// consumers did not have an assigned destination.. So partition based on producer
// usage.
Target best = monitor.findBestProducerTarget(this);
if( best!=null ) {
return best;
}
}
return null;
}
protected Target getTarget(ActiveMQDestination dest) {
Partitioning config = getConfig();
if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
return config.byQueue.get(dest.getPhysicalName());
} else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
return config.byTopic.get(dest.getPhysicalName());
}
return null; return null;
} }
@ -226,7 +271,6 @@ public class PartitionBroker extends BrokerFilter {
@Override @Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
ConnectionMonitor monitor = new ConnectionMonitor(context); ConnectionMonitor monitor = new ConnectionMonitor(context);
context.setConnection(monitor);
monitors.put(info.getConnectionId(), monitor); monitors.put(info.getConnectionId(), monitor);
super.addConnection(context, info); super.addConnection(context, info);
checkTarget(monitor); checkTarget(monitor);
@ -236,9 +280,6 @@ public class PartitionBroker extends BrokerFilter {
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error); super.removeConnection(context, info, error);
ConnectionMonitor removed = monitors.remove(info.getConnectionId()); ConnectionMonitor removed = monitors.remove(info.getConnectionId());
if( removed!=null ) {
context.setConnection(removed.next);
}
} }
@Override @Override
@ -259,28 +300,30 @@ public class PartitionBroker extends BrokerFilter {
long bytes; long bytes;
} }
static class ConnectionMonitor extends ConnectionProxy { static class ConnectionMonitor {
final ConnectionContext context;
final ConnectionContext context;
LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>(); LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
ConnectionMonitor(ConnectionContext context) { public ConnectionMonitor(ConnectionContext context) {
super(context.getConnection());
this.context = context; this.context = context;
} }
synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin plugin) { synchronized public Target findBestProducerTarget(PartitionBroker broker) {
ActiveMQDestination best = null; Target best = null;
long bestSize = 0 ; long bestSize = 0 ;
for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) { for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
Traffic t = entry.getValue(); Traffic t = entry.getValue();
// Once we get enough messages... // Once we get enough messages...
if( t.messages < plugin.getMinTransferCount()) { if( t.messages < broker.plugin.getMinTransferCount()) {
continue; continue;
} }
if( t.bytes > bestSize) { if( t.bytes > bestSize) {
bestSize = t.bytes; bestSize = t.bytes;
best = entry.getKey(); Target target = broker.getTarget(entry.getKey());
if( target!=null ) {
best = target;
}
} }
} }
return best; return best;
@ -298,25 +341,6 @@ public class PartitionBroker extends BrokerFilter {
} }
@Override
public void dispatchAsync(Command command) {
if (command.getClass() == MessageDispatch.class) {
MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
synchronized (this) {
ActiveMQDestination dest = md.getDestination();
Traffic traffic = trafficPerDestination.get(dest);
if( traffic == null ) {
traffic = new Traffic();
trafficPerDestination.put(dest, traffic);
}
traffic.messages += 1;
traffic.bytes += message.getSize();
}
}
super.dispatchAsync(command);
}
} }
} }

View File

@ -24,10 +24,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.partition.dto.Partitioning; import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target; import org.apache.activemq.partition.dto.Target;
import javax.jms.Connection; import javax.jms.*;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
@ -81,7 +78,7 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
partitioning.byQueue.put("foo", new Target("broker1")); partitioning.byQueue.put("foo", new Target("broker1"));
createBrokerCluster(2); createBrokerCluster(2);
Connection connection = createConnectionTo("broker2"); Connection connection2 = createConnectionTo("broker2");
within(5, TimeUnit.SECONDS, new Task() { within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception { public void run() throws Exception {
@ -90,11 +87,8 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
} }
}); });
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("foo")); MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("#"+i));
}
within(5, TimeUnit.SECONDS, new Task() { within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception { public void run() throws Exception {
@ -102,7 +96,31 @@ public class PartitionBrokerTest extends AutoFailTestSupport {
assertEquals(0, getTransportConnector("broker2").getConnections().size()); assertEquals(0, getTransportConnector("broker2").getConnections().size());
} }
}); });
Connection connection1 = createConnectionTo("broker2");
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
} }
});
for (int i = 0; i < 100; i++) {
producer.send(session1.createTextMessage("#" + i));
}
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(2, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
}
static interface Task { static interface Task {
public void run() throws Exception; public void run() throws Exception;
} }