Implementing AMQ-4788 - Add support for allowing the broker to partition client client load across a broker cluster using a partitioning config

This commit is contained in:
Hiram Chirino 2013-10-02 09:16:37 -04:00
parent 0da02fca9d
commit 7c63788e1a
12 changed files with 1219 additions and 0 deletions

View File

@ -1347,6 +1347,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return transport.getRemoteAddress();
}
public Transport getTransport() {
return transport;
}
@Override
public String getConnectionId() {
List<TransportConnectionState> connectionStates = listConnectionStates();

110
activemq-partition/pom.xml Normal file
View File

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.9-SNAPSHOT</version>
</parent>
<artifactId>activemq-partition</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: Partition Management</name>
<description>Used to partition clients over a cluster of brokers</description>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-leveldb-store</artifactId>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.zookeeper-impl</artifactId>
<version>${linkedin-zookeeper-version}</version>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.util-core</artifactId>
<version>${linkedin-zookeeper-version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper-version}</version>
</dependency>
<!-- For Optional Snappy Compression -->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${jackson-version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson-version}</version>
</dependency>
<!-- Testing Dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
</project>

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.Connector;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.Response;
import java.io.IOException;
/**
* A Connection implementation that proxies all Connection invocation to
* a delegate connection.
*/
public class ConnectionProxy implements Connection {
final Connection next;
public ConnectionProxy(Connection next) {
this.next = next;
}
@Override
public void dispatchAsync(Command command) {
next.dispatchAsync(command);
}
@Override
public void dispatchSync(Command message) {
next.dispatchSync(message);
}
@Override
public String getConnectionId() {
return next.getConnectionId();
}
@Override
public Connector getConnector() {
return next.getConnector();
}
@Override
public int getDispatchQueueSize() {
return next.getDispatchQueueSize();
}
@Override
public String getRemoteAddress() {
return next.getRemoteAddress();
}
@Override
public ConnectionStatistics getStatistics() {
return next.getStatistics();
}
@Override
public boolean isActive() {
return next.isActive();
}
@Override
public boolean isBlocked() {
return next.isBlocked();
}
@Override
public boolean isConnected() {
return next.isConnected();
}
@Override
public boolean isFaultTolerantConnection() {
return next.isFaultTolerantConnection();
}
@Override
public boolean isManageable() {
return next.isManageable();
}
@Override
public boolean isNetworkConnection() {
return next.isNetworkConnection();
}
@Override
public boolean isSlow() {
return next.isSlow();
}
@Override
public Response service(Command command) {
return next.service(command);
}
@Override
public void serviceException(Throwable error) {
next.serviceException(error);
}
@Override
public void serviceExceptionAsync(IOException e) {
next.serviceExceptionAsync(e);
}
@Override
public void start() throws Exception {
next.start();
}
@Override
public void stop() throws Exception {
next.stop();
}
@Override
public void updateClient(ConnectionControl control) {
next.updateClient(control);
}
}

View File

@ -0,0 +1,322 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.*;
import org.apache.activemq.command.*;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A BrokerFilter which partitions client connections over a cluster of brokers.
*
* It can use a client identifier like client id, authenticated user name, source ip
* address or even destination being used by the connection to figure out which
* is the best broker in the cluster that the connection should be using and then
* redirects failover clients to that broker.
*/
public class PartitionBroker extends BrokerFilter {
protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
protected final PartitionBrokerPlugin plugin;
protected boolean reloadConfigOnPoll = true;
public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
super(broker);
this.plugin = plugin;
}
@Override
public void start() throws Exception {
super.start();
getExecutor().execute(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("Partition Monitor");
onMonitorStart();
try {
runPartitionMonitor();
} catch (Exception e) {
onMonitorStop();
}
}
});
}
protected void onMonitorStart() {
}
protected void onMonitorStop() {
}
protected void runPartitionMonitor() {
while( !isStopped() ) {
try {
monitorWait();
} catch (InterruptedException e) {
break;
}
if(reloadConfigOnPoll) {
try {
reloadConfiguration();
} catch (Exception e) {
continue;
}
}
for( ConnectionMonitor monitor: monitors.values()) {
checkTarget(monitor);
}
}
}
protected void monitorWait() throws InterruptedException {
synchronized (this) {
this.wait(1000);
}
}
protected void monitorWakeup() {
synchronized (this) {
this.notifyAll();
}
}
protected void reloadConfiguration() throws Exception {
}
protected void checkTarget(ConnectionMonitor monitor) {
// can we find a preferred target for the connection?
Target targetDTO = pickBestBroker(monitor);
if( targetDTO == null || targetDTO.ids==null) {
LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
return;
}
// Are we one the the targets?
if( targetDTO.ids.contains(getBrokerName()) ) {
LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
return;
}
// Then we need to move the connection over.
String connectionString = getConnectionString(targetDTO.ids);
if( connectionString==null ) {
LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
}
LOG.info("Redirecting connection to: " + connectionString);
TransportConnection connection = (TransportConnection)monitor.next;
ConnectionControl cc = new ConnectionControl();
cc.setConnectedBrokers(connectionString);
cc.setRebalanceConnection(true);
connection.dispatchAsync(cc);
}
protected String getConnectionString(HashSet<String> ids) {
if( getConfig().brokers==null || getConfig().brokers.isEmpty() )
return null;
StringBuilder rc = new StringBuilder();
for (String id : ids) {
String url = getConfig().brokers.get(id);
if( url!=null ) {
if( rc.length()!=0 ) {
rc.append(',');
}
rc.append(url);
}
}
return rc.toString();
}
protected Target pickBestBroker(ConnectionMonitor monitor) {
if( getConfig() ==null )
return null;
if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
Transport transport = connection.getTransport();
Socket socket = transport.narrow(Socket.class);
if( socket !=null ) {
SocketAddress address = socket.getRemoteSocketAddress();
if( address instanceof InetSocketAddress) {
String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
Target targetDTO = getConfig().bySourceIp.get(ip);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
}
if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
String userName = monitor.context.getUserName();
if( userName !=null ) {
Target targetDTO = getConfig().byUserName.get(userName);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
String clientId = monitor.context.getClientId();
if( clientId!=null ) {
Target targetDTO = getConfig().byClientId.get(clientId);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if(
(getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
|| (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
) {
ActiveMQDestination best = monitor.findMostActiveDestination(plugin);
if( best!=null ) {
if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty() && best.isQueue() ) {
Target targetDTO = getConfig().byQueue.get(best.getPhysicalName());
if( targetDTO!=null ) {
return targetDTO;
}
}
if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty() && best.isTopic() ) {
Target targetDTO = getConfig().byTopic.get(best.getPhysicalName());
if( targetDTO!=null ) {
return targetDTO;
}
}
}
}
return null;
}
protected final ConcurrentHashMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
ConnectionMonitor monitor = new ConnectionMonitor(context);
context.setConnection(monitor);
monitors.put(info.getConnectionId(), monitor);
super.addConnection(context, info);
checkTarget(monitor);
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
ConnectionMonitor removed = monitors.remove(info.getConnectionId());
if( removed!=null ) {
context.setConnection(removed.next);
}
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
if( monitor!=null ) {
monitor.onSend(producerExchange, messageSend);
}
}
protected Partitioning getConfig() {
return plugin.getConfig();
}
static class Traffic {
long messages;
long bytes;
}
static class ConnectionMonitor extends ConnectionProxy {
final ConnectionContext context;
LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
ConnectionMonitor(ConnectionContext context) {
super(context.getConnection());
this.context = context;
}
synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin plugin) {
ActiveMQDestination best = null;
long bestSize = 0 ;
for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
Traffic t = entry.getValue();
// Once we get enough messages...
if( t.messages < plugin.getMinTransferCount()) {
continue;
}
if( t.bytes > bestSize) {
bestSize = t.bytes;
best = entry.getKey();
}
}
return best;
}
synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
ActiveMQDestination dest = message.getDestination();
Traffic traffic = trafficPerDestination.get(dest);
if( traffic == null ) {
traffic = new Traffic();
trafficPerDestination.put(dest, traffic);
}
traffic.messages += 1;
traffic.bytes += message.getSize();
}
@Override
public void dispatchAsync(Command command) {
if (command.getClass() == MessageDispatch.class) {
MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
synchronized (this) {
ActiveMQDestination dest = md.getDestination();
Traffic traffic = trafficPerDestination.get(dest);
if( traffic == null ) {
traffic = new Traffic();
trafficPerDestination.put(dest, traffic);
}
traffic.messages += 1;
traffic.bytes += message.getSize();
}
}
super.dispatchAsync(command);
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.partition.dto.Partitioning;
/**
* A BrokerPlugin which partitions client connections over a cluster of brokers.
*
* @org.apache.xbean.XBean element="partitionBrokerPlugin"
*/
public class PartitionBrokerPlugin implements BrokerPlugin {
protected int minTransferCount;
protected Partitioning config;
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new PartitionBroker(broker, this);
}
public int getMinTransferCount() {
return minTransferCount;
}
public void setMinTransferCount(int minTransferCount) {
this.minTransferCount = minTransferCount;
}
public Partitioning getConfig() {
return config;
}
public void setConfig(Partitioning config) {
this.config = config;
}
}

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.leveldb.replicated.groups.ZKClient;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.linkedin.util.clock.Timespan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
public class ZooKeeperPartitionBroker extends PartitionBroker {
protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
protected volatile ZKClient zk_client = null;
protected volatile Partitioning config;
public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
super(broker, plugin);
}
@Override
protected void onMonitorStop() {
zkDisconnect();
}
@Override
protected Partitioning getConfig() {
return config;
}
protected ZooKeeperPartitionBrokerPlugin plugin() {
return (ZooKeeperPartitionBrokerPlugin)plugin;
}
protected void zkConnect() throws Exception {
zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
if( plugin().getZkPassword()!=null ) {
zk_client.setPassword(plugin().getZkPassword());
}
zk_client.start();
zk_client.waitForConnected(Timespan.parse("30s"));
}
protected void zkDisconnect() {
if( zk_client!=null ) {
zk_client.close();
zk_client = null;
}
}
protected void reloadConfiguration() throws Exception {
if( zk_client==null ) {
LOG.debug("Connecting to ZooKeeper");
try {
zkConnect();
LOG.debug("Connected to ZooKeeper");
} catch (Exception e) {
LOG.debug("Connection to ZooKeeper failed: "+e);
zkDisconnect();
throw e;
}
}
byte[] data = null;
try {
Stat stat = new Stat();
data = zk_client.getData(plugin().getZkPath(), new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
reloadConfiguration();
} catch (Exception e) {
}
monitorWakeup();
}
}, stat);
reloadConfigOnPoll = false;
} catch (Exception e) {
LOG.warn("Could load partitioning configuration: " + e, e);
reloadConfigOnPoll = true;
}
try {
config = Partitioning.MAPPER.readValue(data, Partitioning.class);
} catch (Exception e) {
LOG.warn("Invalid partitioning configuration: " + e, e);
}
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
/**
* A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
*/
public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
String zkAddress = "127.0.0.1:2181";
String zkPassword;
String zkPath = "/broker-assignments";
String zkSessionTmeout = "10s";
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new ZooKeeperPartitionBroker(broker, this);
}
public String getZkAddress() {
return zkAddress;
}
public void setZkAddress(String zkAddress) {
this.zkAddress = zkAddress;
}
public String getZkPassword() {
return zkPassword;
}
public void setZkPassword(String zkPassword) {
this.zkPassword = zkPassword;
}
public String getZkPath() {
return zkPath;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public String getZkSessionTmeout() {
return zkSessionTmeout;
}
public void setZkSessionTmeout(String zkSessionTmeout) {
this.zkSessionTmeout = zkSessionTmeout;
}
}

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition.dto;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import java.io.IOException;
import java.util.HashMap;
/**
* The main Configuration class for the PartitionBroker plugin
*
* @org.apache.xbean.XBean element="partitioning"
*/
public class Partitioning {
static final public ObjectMapper MAPPER = new ObjectMapper();
static {
MAPPER.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
}
static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
static {
TO_STRING_MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
TO_STRING_MAPPER.enable(SerializationConfig.Feature.INDENT_OUTPUT);
}
/**
* If a client connects with a clientId which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_client_id")
public HashMap<String, Target> byClientId;
/**
* If a client connects with a user priciple which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_user_name")
public HashMap<String, Target> byUserName;
/**
* If a client connects with source ip which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_source_ip")
public HashMap<String, Target> bySourceIp;
/**
* Used to map the preferred partitioning of queues across
* a set of brokers. Once a it is deemed that a connection mostly
* works with a set of targets configured in this map, the client
* will be reconnected to the appropriate target.
*/
@JsonProperty("by_queue")
public HashMap<String, Target> byQueue;
/**
* Used to map the preferred partitioning of topics across
* a set of brokers. Once a it is deemed that a connection mostly
* works with a set of targets configured in this map, the client
* will be reconnected to the appropriate target.
*/
@JsonProperty("by_topic")
public HashMap<String, Target> byTopic;
/**
* Maps broker names to broker URLs.
*/
@JsonProperty("brokers")
public HashMap<String, String> brokers;
@Override
public String toString() {
try {
return TO_STRING_MAPPER.writeValueAsString(this);
} catch (IOException e) {
return super.toString();
}
}
public HashMap<String, String> getBrokers() {
return brokers;
}
public void setBrokers(HashMap<String, String> brokers) {
this.brokers = brokers;
}
public HashMap<String, Target> getByClientId() {
return byClientId;
}
public void setByClientId(HashMap<String, Target> byClientId) {
this.byClientId = byClientId;
}
public HashMap<String, Target> getByQueue() {
return byQueue;
}
public void setByQueue(HashMap<String, Target> byQueue) {
this.byQueue = byQueue;
}
public HashMap<String, Target> getBySourceIp() {
return bySourceIp;
}
public void setBySourceIp(HashMap<String, Target> bySourceIp) {
this.bySourceIp = bySourceIp;
}
public HashMap<String, Target> getByTopic() {
return byTopic;
}
public void setByTopic(HashMap<String, Target> byTopic) {
this.byTopic = byTopic;
}
public HashMap<String, Target> getByUserName() {
return byUserName;
}
public void setByUserName(HashMap<String, Target> byUserName) {
this.byUserName = byUserName;
}
}

View File

@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition.dto;
import org.codehaus.jackson.annotate.JsonProperty;
import scala.actors.threadpool.Arrays;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
/**
* Represents a partition target. This identifies the brokers that
* a partition lives on.
*
* @org.apache.xbean.XBean element="target"
*/
public class Target {
@JsonProperty("ids")
public HashSet<String> ids = new HashSet<String>();
public Target() {
ids = new HashSet<String>();
}
public Target(String ...ids) {
this.ids.addAll(Arrays.asList(ids));
}
@Override
public String toString() {
try {
return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
} catch (IOException e) {
return super.toString();
}
}
public HashSet<String> getIds() {
return ids;
}
public void setIds(Collection<String> ids) {
this.ids = new HashSet<String>(ids);
}
}

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
/**
* Unit tests for the PartitionBroker plugin.
*/
public class PartitionBrokerTest extends AutoFailTestSupport {
protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
protected ArrayList<Connection> connections = new ArrayList<Connection>();
Partitioning partitioning;
@Override
protected void setUp() throws Exception {
super.setUp();
partitioning = new Partitioning();
partitioning.brokers = new HashMap<String, String>();
}
public void testPartitionByClientId() throws Exception {
partitioning.byClientId = new HashMap<String, Target>();
partitioning.byClientId.put("client1", new Target("broker1"));
partitioning.byClientId.put("client2", new Target("broker2"));
createBrokerCluster(2);
Connection connection = createConnectionTo("broker2");
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
connection.setClientID("client1");
connection.start();
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
}
public void testPartitionByQueue() throws Exception {
partitioning.byQueue = new HashMap<String, Target>();
partitioning.byQueue.put("foo", new Target("broker1"));
createBrokerCluster(2);
Connection connection = createConnectionTo("broker2");
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue("foo"));
for (int i = 0; i < 100; i++) {
producer.send(session.createTextMessage("#"+i));
}
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
}
static interface Task {
public void run() throws Exception;
}
private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
long timeMS = unit.toMillis(time);
long deadline = System.currentTimeMillis() + timeMS;
while (true) {
try {
task.run();
return;
} catch (Throwable e) {
long remaining = deadline - System.currentTimeMillis();
if( remaining <=0 ) {
if( e instanceof RuntimeException ) {
throw (RuntimeException)e;
}
if( e instanceof Error ) {
throw (Error)e;
}
throw new RuntimeException(e);
}
Thread.sleep(Math.min(timeMS/10, remaining));
}
}
}
protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
String url = "failover://(" + getConnectURL(brokerId) + ")";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connections.add(connection);
return connection;
}
protected String getConnectURL(String broker) throws IOException, URISyntaxException {
TransportConnector tcp = getTransportConnector(broker);
return tcp.getConnectUri().toString();
}
private TransportConnector getTransportConnector(String broker) {
BrokerService brokerService = brokers.get(broker);
if( brokerService==null ) {
throw new IllegalArgumentException("Invalid broker id");
}
return brokerService.getTransportConnectorByName("tcp");
}
protected void createBrokerCluster(int brokerCount) throws Exception {
for (int i = 1; i <= brokerCount; i++) {
String brokerId = "broker" + i;
BrokerService broker = createBroker(brokerId);
broker.setPersistent(false);
PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
plugin.setConfig(partitioning);
broker.setPlugins(new BrokerPlugin[]{plugin});
broker.addConnector("tcp://localhost:0").setName("tcp");
broker.start();
broker.waitUntilStarted();
partitioning.brokers.put(brokerId, getConnectURL(brokerId));
}
}
protected BrokerService createBroker(String name) {
BrokerService broker = new BrokerService();
broker.setBrokerName(name);
brokers.put(name, broker);
return broker;
}
@Override
protected void tearDown() throws Exception {
for (Connection connection : connections) {
try {
connection.close();
} catch (Throwable e) {
}
}
connections.clear();
for (BrokerService broker : brokers.values()) {
try {
broker.stop();
broker.waitUntilStopped();
} catch (Throwable e) {
}
}
brokers.clear();
super.tearDown();
}
}

View File

@ -237,6 +237,7 @@
<include>${basedir}/../activemq-kahadb-store/src/main/java</include>
<include>${basedir}/../activemq-mqtt/src/main/java</include>
<include>${basedir}/../activemq-stomp/src/main/java</include>
<include>${basedir}/../activemq-partition/src/main/java</include>
<include>${basedir}/../activemq-runtime-config/src/main/java</include>
</includes>
<strictXsdOrder>false</strictXsdOrder>

View File

@ -249,6 +249,7 @@
<module>activemq-runtime-config</module>
<module>activemq-tooling</module>
<module>activemq-web</module>
<module>activemq-partition</module>
<module>activemq-osgi</module>
<module>activemq-blueprint</module>
<module>activemq-web-demo</module>