diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 725e83d3fe..1ce75a5ac9 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1347,6 +1347,10 @@ public class TransportConnection implements Connection, Task, CommandVisitor { return transport.getRemoteAddress(); } + public Transport getTransport() { + return transport; + } + @Override public String getConnectionId() { List connectionStates = listConnectionStates(); diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml new file mode 100644 index 0000000000..25f070ece7 --- /dev/null +++ b/activemq-partition/pom.xml @@ -0,0 +1,110 @@ + + + + + 4.0.0 + + + org.apache.activemq + activemq-parent + 5.9-SNAPSHOT + + + activemq-partition + jar + + ActiveMQ :: Partition Management + Used to partition clients over a cluster of brokers + + + + + org.apache.activemq + activemq-broker + provided + + + + org.slf4j + slf4j-api + compile + + + + org.apache.activemq + activemq-leveldb-store + + + org.linkedin + org.linkedin.zookeeper-impl + ${linkedin-zookeeper-version} + + + org.linkedin + org.linkedin.util-core + ${linkedin-zookeeper-version} + + + org.apache.zookeeper + zookeeper + ${zookeeper-version} + + + + + org.codehaus.jackson + jackson-core-asl + ${jackson-version} + + + org.codehaus.jackson + jackson-mapper-asl + ${jackson-version} + + + + + org.slf4j + slf4j-log4j12 + test + + + log4j + log4j + test + + + org.apache.activemq + activemq-broker + test-jar + test + + + + junit + junit + test + + + + + + + + + diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java new file mode 100644 index 0000000000..cab6eb654c --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ConnectionProxy.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.Connector; +import org.apache.activemq.broker.region.ConnectionStatistics; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.ConnectionControl; +import org.apache.activemq.command.Response; + +import java.io.IOException; + +/** + * A Connection implementation that proxies all Connection invocation to + * a delegate connection. + */ +public class ConnectionProxy implements Connection { + final Connection next; + + public ConnectionProxy(Connection next) { + this.next = next; + } + + @Override + public void dispatchAsync(Command command) { + next.dispatchAsync(command); + } + + @Override + public void dispatchSync(Command message) { + next.dispatchSync(message); + } + + @Override + public String getConnectionId() { + return next.getConnectionId(); + } + + @Override + public Connector getConnector() { + return next.getConnector(); + } + + @Override + public int getDispatchQueueSize() { + return next.getDispatchQueueSize(); + } + + @Override + public String getRemoteAddress() { + return next.getRemoteAddress(); + } + + @Override + public ConnectionStatistics getStatistics() { + return next.getStatistics(); + } + + @Override + public boolean isActive() { + return next.isActive(); + } + + @Override + public boolean isBlocked() { + return next.isBlocked(); + } + + @Override + public boolean isConnected() { + return next.isConnected(); + } + + @Override + public boolean isFaultTolerantConnection() { + return next.isFaultTolerantConnection(); + } + + @Override + public boolean isManageable() { + return next.isManageable(); + } + + @Override + public boolean isNetworkConnection() { + return next.isNetworkConnection(); + } + + @Override + public boolean isSlow() { + return next.isSlow(); + } + + @Override + public Response service(Command command) { + return next.service(command); + } + + @Override + public void serviceException(Throwable error) { + next.serviceException(error); + } + + @Override + public void serviceExceptionAsync(IOException e) { + next.serviceExceptionAsync(e); + } + + @Override + public void start() throws Exception { + next.start(); + } + + @Override + public void stop() throws Exception { + next.stop(); + } + + @Override + public void updateClient(ConnectionControl control) { + next.updateClient(control); + } +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java new file mode 100644 index 0000000000..1a8e78b3f6 --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.broker.*; +import org.apache.activemq.command.*; +import org.apache.activemq.partition.dto.Partitioning; +import org.apache.activemq.partition.dto.Target; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.LRUCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * A BrokerFilter which partitions client connections over a cluster of brokers. + * + * It can use a client identifier like client id, authenticated user name, source ip + * address or even destination being used by the connection to figure out which + * is the best broker in the cluster that the connection should be using and then + * redirects failover clients to that broker. + */ +public class PartitionBroker extends BrokerFilter { + + protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class); + protected final PartitionBrokerPlugin plugin; + protected boolean reloadConfigOnPoll = true; + + public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) { + super(broker); + this.plugin = plugin; + } + + @Override + public void start() throws Exception { + super.start(); + getExecutor().execute(new Runnable() { + @Override + public void run() { + Thread.currentThread().setName("Partition Monitor"); + onMonitorStart(); + try { + runPartitionMonitor(); + } catch (Exception e) { + onMonitorStop(); + } + } + }); + } + + protected void onMonitorStart() { + } + protected void onMonitorStop() { + } + + protected void runPartitionMonitor() { + while( !isStopped() ) { + try { + monitorWait(); + } catch (InterruptedException e) { + break; + } + + if(reloadConfigOnPoll) { + try { + reloadConfiguration(); + } catch (Exception e) { + continue; + } + } + + for( ConnectionMonitor monitor: monitors.values()) { + checkTarget(monitor); + } + } + } + + protected void monitorWait() throws InterruptedException { + synchronized (this) { + this.wait(1000); + } + } + + protected void monitorWakeup() { + synchronized (this) { + this.notifyAll(); + } + } + + protected void reloadConfiguration() throws Exception { + } + + protected void checkTarget(ConnectionMonitor monitor) { + + // can we find a preferred target for the connection? + Target targetDTO = pickBestBroker(monitor); + if( targetDTO == null || targetDTO.ids==null) { + LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId()); + return; + } + + // Are we one the the targets? + if( targetDTO.ids.contains(getBrokerName()) ) { + LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId()); + return; + } + + // Then we need to move the connection over. + String connectionString = getConnectionString(targetDTO.ids); + if( connectionString==null ) { + LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids); + } + + LOG.info("Redirecting connection to: " + connectionString); + TransportConnection connection = (TransportConnection)monitor.next; + ConnectionControl cc = new ConnectionControl(); + cc.setConnectedBrokers(connectionString); + cc.setRebalanceConnection(true); + connection.dispatchAsync(cc); + } + + protected String getConnectionString(HashSet ids) { + if( getConfig().brokers==null || getConfig().brokers.isEmpty() ) + return null; + StringBuilder rc = new StringBuilder(); + for (String id : ids) { + String url = getConfig().brokers.get(id); + if( url!=null ) { + if( rc.length()!=0 ) { + rc.append(','); + } + rc.append(url); + } + } + return rc.toString(); + } + + protected Target pickBestBroker(ConnectionMonitor monitor) { + + if( getConfig() ==null ) + return null; + + if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) { + TransportConnection connection = (TransportConnection)monitor.context.getConnection(); + Transport transport = connection.getTransport(); + Socket socket = transport.narrow(Socket.class); + if( socket !=null ) { + SocketAddress address = socket.getRemoteSocketAddress(); + if( address instanceof InetSocketAddress) { + String ip = ((InetSocketAddress) address).getAddress().getHostAddress(); + Target targetDTO = getConfig().bySourceIp.get(ip); + if( targetDTO!=null ) { + return targetDTO; + } + } + } + } + + if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) { + String userName = monitor.context.getUserName(); + if( userName !=null ) { + Target targetDTO = getConfig().byUserName.get(userName); + if( targetDTO!=null ) { + return targetDTO; + } + } + } + + if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) { + String clientId = monitor.context.getClientId(); + if( clientId!=null ) { + Target targetDTO = getConfig().byClientId.get(clientId); + if( targetDTO!=null ) { + return targetDTO; + } + } + } + + if( + (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty()) + || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) + ) { + + ActiveMQDestination best = monitor.findMostActiveDestination(plugin); + if( best!=null ) { + if( getConfig().byQueue !=null && !getConfig().byQueue.isEmpty() && best.isQueue() ) { + Target targetDTO = getConfig().byQueue.get(best.getPhysicalName()); + if( targetDTO!=null ) { + return targetDTO; + } + } + + if( getConfig().byTopic !=null && !getConfig().byTopic.isEmpty() && best.isTopic() ) { + Target targetDTO = getConfig().byTopic.get(best.getPhysicalName()); + if( targetDTO!=null ) { + return targetDTO; + } + } + } + } + return null; + } + + protected final ConcurrentHashMap monitors = new ConcurrentHashMap(); + + @Override + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + ConnectionMonitor monitor = new ConnectionMonitor(context); + context.setConnection(monitor); + monitors.put(info.getConnectionId(), monitor); + super.addConnection(context, info); + checkTarget(monitor); + } + + @Override + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { + super.removeConnection(context, info, error); + ConnectionMonitor removed = monitors.remove(info.getConnectionId()); + if( removed!=null ) { + context.setConnection(removed.next); + } + } + + @Override + public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { + ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId()); + if( monitor!=null ) { + monitor.onSend(producerExchange, messageSend); + } + } + + protected Partitioning getConfig() { + return plugin.getConfig(); + } + + + static class Traffic { + long messages; + long bytes; + } + + static class ConnectionMonitor extends ConnectionProxy { + final ConnectionContext context; + + LRUCache trafficPerDestination = new LRUCache(); + + ConnectionMonitor(ConnectionContext context) { + super(context.getConnection()); + this.context = context; + } + + synchronized public ActiveMQDestination findMostActiveDestination(PartitionBrokerPlugin plugin) { + ActiveMQDestination best = null; + long bestSize = 0 ; + for (Map.Entry entry : trafficPerDestination.entrySet()) { + Traffic t = entry.getValue(); + // Once we get enough messages... + if( t.messages < plugin.getMinTransferCount()) { + continue; + } + if( t.bytes > bestSize) { + bestSize = t.bytes; + best = entry.getKey(); + } + } + return best; + } + + synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) { + ActiveMQDestination dest = message.getDestination(); + Traffic traffic = trafficPerDestination.get(dest); + if( traffic == null ) { + traffic = new Traffic(); + trafficPerDestination.put(dest, traffic); + } + traffic.messages += 1; + traffic.bytes += message.getSize(); + } + + + @Override + public void dispatchAsync(Command command) { + if (command.getClass() == MessageDispatch.class) { + MessageDispatch md = (MessageDispatch) command; + Message message = md.getMessage(); + synchronized (this) { + ActiveMQDestination dest = md.getDestination(); + Traffic traffic = trafficPerDestination.get(dest); + if( traffic == null ) { + traffic = new Traffic(); + trafficPerDestination.put(dest, traffic); + } + traffic.messages += 1; + traffic.bytes += message.getSize(); + } + } + super.dispatchAsync(command); + } + + } + +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java new file mode 100644 index 0000000000..936d2eaefd --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.partition.dto.Partitioning; + +/** + * A BrokerPlugin which partitions client connections over a cluster of brokers. + * + * @org.apache.xbean.XBean element="partitionBrokerPlugin" + */ +public class PartitionBrokerPlugin implements BrokerPlugin { + + protected int minTransferCount; + protected Partitioning config; + + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new PartitionBroker(broker, this); + } + + public int getMinTransferCount() { + return minTransferCount; + } + + public void setMinTransferCount(int minTransferCount) { + this.minTransferCount = minTransferCount; + } + + public Partitioning getConfig() { + return config; + } + + public void setConfig(Partitioning config) { + this.config = config; + } +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java new file mode 100644 index 0000000000..2c18f2df42 --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.leveldb.replicated.groups.ZKClient; +import org.apache.activemq.partition.dto.Partitioning; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.linkedin.util.clock.Timespan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + */ +public class ZooKeeperPartitionBroker extends PartitionBroker { + + protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class); + + protected volatile ZKClient zk_client = null; + protected volatile Partitioning config; + + public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) { + super(broker, plugin); + } + + + @Override + protected void onMonitorStop() { + zkDisconnect(); + } + + @Override + protected Partitioning getConfig() { + return config; + } + + protected ZooKeeperPartitionBrokerPlugin plugin() { + return (ZooKeeperPartitionBrokerPlugin)plugin; + } + + protected void zkConnect() throws Exception { + zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null); + if( plugin().getZkPassword()!=null ) { + zk_client.setPassword(plugin().getZkPassword()); + } + zk_client.start(); + zk_client.waitForConnected(Timespan.parse("30s")); + } + + protected void zkDisconnect() { + if( zk_client!=null ) { + zk_client.close(); + zk_client = null; + } + } + + protected void reloadConfiguration() throws Exception { + if( zk_client==null ) { + LOG.debug("Connecting to ZooKeeper"); + try { + zkConnect(); + LOG.debug("Connected to ZooKeeper"); + } catch (Exception e) { + LOG.debug("Connection to ZooKeeper failed: "+e); + zkDisconnect(); + throw e; + } + } + + byte[] data = null; + try { + Stat stat = new Stat(); + data = zk_client.getData(plugin().getZkPath(), new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + try { + reloadConfiguration(); + } catch (Exception e) { + } + monitorWakeup(); + } + }, stat); + reloadConfigOnPoll = false; + } catch (Exception e) { + LOG.warn("Could load partitioning configuration: " + e, e); + reloadConfigOnPoll = true; + } + + try { + config = Partitioning.MAPPER.readValue(data, Partitioning.class); + } catch (Exception e) { + LOG.warn("Invalid partitioning configuration: " + e, e); + } + } + +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java new file mode 100644 index 0000000000..34fa0fc1d1 --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; + +/** + * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper. + */ +public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin { + + String zkAddress = "127.0.0.1:2181"; + String zkPassword; + String zkPath = "/broker-assignments"; + String zkSessionTmeout = "10s"; + + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new ZooKeeperPartitionBroker(broker, this); + } + + public String getZkAddress() { + return zkAddress; + } + + public void setZkAddress(String zkAddress) { + this.zkAddress = zkAddress; + } + + public String getZkPassword() { + return zkPassword; + } + + public void setZkPassword(String zkPassword) { + this.zkPassword = zkPassword; + } + + public String getZkPath() { + return zkPath; + } + + public void setZkPath(String zkPath) { + this.zkPath = zkPath; + } + + public String getZkSessionTmeout() { + return zkSessionTmeout; + } + + public void setZkSessionTmeout(String zkSessionTmeout) { + this.zkSessionTmeout = zkSessionTmeout; + } +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java new file mode 100644 index 0000000000..299d73bba4 --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition.dto; + +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import java.io.IOException; +import java.util.HashMap; + +/** + * The main Configuration class for the PartitionBroker plugin + * + * @org.apache.xbean.XBean element="partitioning" + */ +public class Partitioning { + + static final public ObjectMapper MAPPER = new ObjectMapper(); + static { + MAPPER.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES); + } + + static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper(); + static { + TO_STRING_MAPPER.disable(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES); + TO_STRING_MAPPER.enable(SerializationConfig.Feature.INDENT_OUTPUT); + } + + /** + * If a client connects with a clientId which is listed in the + * map, then he will be immediately reconnected + * to the partition target immediately. + */ + @JsonProperty("by_client_id") + public HashMap byClientId; + + /** + * If a client connects with a user priciple which is listed in the + * map, then he will be immediately reconnected + * to the partition target immediately. + */ + @JsonProperty("by_user_name") + public HashMap byUserName; + + /** + * If a client connects with source ip which is listed in the + * map, then he will be immediately reconnected + * to the partition target immediately. + */ + @JsonProperty("by_source_ip") + public HashMap bySourceIp; + + /** + * Used to map the preferred partitioning of queues across + * a set of brokers. Once a it is deemed that a connection mostly + * works with a set of targets configured in this map, the client + * will be reconnected to the appropriate target. + */ + @JsonProperty("by_queue") + public HashMap byQueue; + + /** + * Used to map the preferred partitioning of topics across + * a set of brokers. Once a it is deemed that a connection mostly + * works with a set of targets configured in this map, the client + * will be reconnected to the appropriate target. + */ + @JsonProperty("by_topic") + public HashMap byTopic; + + /** + * Maps broker names to broker URLs. + */ + @JsonProperty("brokers") + public HashMap brokers; + + + @Override + public String toString() { + try { + return TO_STRING_MAPPER.writeValueAsString(this); + } catch (IOException e) { + return super.toString(); + } + } + + public HashMap getBrokers() { + return brokers; + } + + public void setBrokers(HashMap brokers) { + this.brokers = brokers; + } + + public HashMap getByClientId() { + return byClientId; + } + + public void setByClientId(HashMap byClientId) { + this.byClientId = byClientId; + } + + public HashMap getByQueue() { + return byQueue; + } + + public void setByQueue(HashMap byQueue) { + this.byQueue = byQueue; + } + + public HashMap getBySourceIp() { + return bySourceIp; + } + + public void setBySourceIp(HashMap bySourceIp) { + this.bySourceIp = bySourceIp; + } + + public HashMap getByTopic() { + return byTopic; + } + + public void setByTopic(HashMap byTopic) { + this.byTopic = byTopic; + } + + public HashMap getByUserName() { + return byUserName; + } + + public void setByUserName(HashMap byUserName) { + this.byUserName = byUserName; + } +} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java new file mode 100644 index 0000000000..b6f0ee24f3 --- /dev/null +++ b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition.dto; + +import org.codehaus.jackson.annotate.JsonProperty; +import scala.actors.threadpool.Arrays; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; + +/** + * Represents a partition target. This identifies the brokers that + * a partition lives on. + * + * @org.apache.xbean.XBean element="target" + */ +public class Target { + + @JsonProperty("ids") + public HashSet ids = new HashSet(); + + public Target() { + ids = new HashSet(); + } + + public Target(String ...ids) { + this.ids.addAll(Arrays.asList(ids)); + } + + @Override + public String toString() { + try { + return Partitioning.TO_STRING_MAPPER.writeValueAsString(this); + } catch (IOException e) { + return super.toString(); + } + } + + public HashSet getIds() { + return ids; + } + + public void setIds(Collection ids) { + this.ids = new HashSet(ids); + } + +} diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java new file mode 100644 index 0000000000..9b7450c689 --- /dev/null +++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.partition; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.partition.dto.Partitioning; +import org.apache.activemq.partition.dto.Target; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +/** + * Unit tests for the PartitionBroker plugin. + */ +public class PartitionBrokerTest extends AutoFailTestSupport { + + protected HashMap brokers = new HashMap(); + protected ArrayList connections = new ArrayList(); + Partitioning partitioning; + + @Override + protected void setUp() throws Exception { + super.setUp(); + partitioning = new Partitioning(); + partitioning.brokers = new HashMap(); + } + + + public void testPartitionByClientId() throws Exception { + partitioning.byClientId = new HashMap(); + partitioning.byClientId.put("client1", new Target("broker1")); + partitioning.byClientId.put("client2", new Target("broker2")); + createBrokerCluster(2); + + Connection connection = createConnectionTo("broker2"); + + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(0, getTransportConnector("broker1").getConnections().size()); + assertEquals(1, getTransportConnector("broker2").getConnections().size()); + } + }); + + connection.setClientID("client1"); + connection.start(); + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(1, getTransportConnector("broker1").getConnections().size()); + assertEquals(0, getTransportConnector("broker2").getConnections().size()); + } + }); + } + + public void testPartitionByQueue() throws Exception { + partitioning.byQueue = new HashMap(); + partitioning.byQueue.put("foo", new Target("broker1")); + createBrokerCluster(2); + + Connection connection = createConnectionTo("broker2"); + + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(0, getTransportConnector("broker1").getConnections().size()); + assertEquals(1, getTransportConnector("broker2").getConnections().size()); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("foo")); + for (int i = 0; i < 100; i++) { + producer.send(session.createTextMessage("#"+i)); + } + + within(5, TimeUnit.SECONDS, new Task() { + public void run() throws Exception { + assertEquals(1, getTransportConnector("broker1").getConnections().size()); + assertEquals(0, getTransportConnector("broker2").getConnections().size()); + } + }); + } + static interface Task { + public void run() throws Exception; + } + + private void within(int time, TimeUnit unit, Task task) throws InterruptedException { + long timeMS = unit.toMillis(time); + long deadline = System.currentTimeMillis() + timeMS; + while (true) { + try { + task.run(); + return; + } catch (Throwable e) { + long remaining = deadline - System.currentTimeMillis(); + if( remaining <=0 ) { + if( e instanceof RuntimeException ) { + throw (RuntimeException)e; + } + if( e instanceof Error ) { + throw (Error)e; + } + throw new RuntimeException(e); + } + Thread.sleep(Math.min(timeMS/10, remaining)); + } + } + } + + protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException { + String url = "failover://(" + getConnectURL(brokerId) + ")"; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + Connection connection = factory.createConnection(); + connections.add(connection); + return connection; + } + + protected String getConnectURL(String broker) throws IOException, URISyntaxException { + TransportConnector tcp = getTransportConnector(broker); + return tcp.getConnectUri().toString(); + } + + private TransportConnector getTransportConnector(String broker) { + BrokerService brokerService = brokers.get(broker); + if( brokerService==null ) { + throw new IllegalArgumentException("Invalid broker id"); + } + return brokerService.getTransportConnectorByName("tcp"); + } + + protected void createBrokerCluster(int brokerCount) throws Exception { + for (int i = 1; i <= brokerCount; i++) { + String brokerId = "broker" + i; + BrokerService broker = createBroker(brokerId); + broker.setPersistent(false); + PartitionBrokerPlugin plugin = new PartitionBrokerPlugin(); + plugin.setConfig(partitioning); + broker.setPlugins(new BrokerPlugin[]{plugin}); + broker.addConnector("tcp://localhost:0").setName("tcp"); + broker.start(); + broker.waitUntilStarted(); + partitioning.brokers.put(brokerId, getConnectURL(brokerId)); + } + } + + protected BrokerService createBroker(String name) { + BrokerService broker = new BrokerService(); + broker.setBrokerName(name); + brokers.put(name, broker); + return broker; + } + + @Override + protected void tearDown() throws Exception { + for (Connection connection : connections) { + try { + connection.close(); + } catch (Throwable e) { + } + } + connections.clear(); + for (BrokerService broker : brokers.values()) { + try { + broker.stop(); + broker.waitUntilStopped(); + } catch (Throwable e) { + } + } + brokers.clear(); + super.tearDown(); + } + +} diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index 9136c05b5a..36e2c554f7 100755 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -237,6 +237,7 @@ ${basedir}/../activemq-kahadb-store/src/main/java ${basedir}/../activemq-mqtt/src/main/java ${basedir}/../activemq-stomp/src/main/java + ${basedir}/../activemq-partition/src/main/java ${basedir}/../activemq-runtime-config/src/main/java false diff --git a/pom.xml b/pom.xml index c293ffdef9..19b6c641e1 100755 --- a/pom.xml +++ b/pom.xml @@ -249,6 +249,7 @@ activemq-runtime-config activemq-tooling activemq-web + activemq-partition activemq-osgi activemq-blueprint activemq-web-demo