mirror of https://github.com/apache/activemq.git
[AMQ-9259] Remove activemq-partition and zookeeper test dependency
This commit is contained in:
parent
c7c71ef692
commit
67f5afa0f4
|
@ -68,10 +68,6 @@
|
||||||
<groupId>${project.groupId}</groupId>
|
<groupId>${project.groupId}</groupId>
|
||||||
<artifactId>activemq-http</artifactId>
|
<artifactId>activemq-http</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>${project.groupId}</groupId>
|
|
||||||
<artifactId>activemq-partition</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Additional Dependencies. -->
|
<!-- Additional Dependencies. -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -187,7 +183,6 @@
|
||||||
org.codehaus.jettison*;resolution:=optional,
|
org.codehaus.jettison*;resolution:=optional,
|
||||||
org.jasypt*;resolution:=optional,
|
org.jasypt*;resolution:=optional,
|
||||||
org.eclipse.jetty*;resolution:=optional;version="[9.0,10)",
|
org.eclipse.jetty*;resolution:=optional;version="[9.0,10)",
|
||||||
org.apache.zookeeper*;resolution:=optional,
|
|
||||||
org.fusesource.hawtjni*;resolution:=optional,
|
org.fusesource.hawtjni*;resolution:=optional,
|
||||||
org.springframework.jms*;version="[4,6)";resolution:=optional,
|
org.springframework.jms*;version="[4,6)";resolution:=optional,
|
||||||
org.springframework.transaction*;version="[4,6)";resolution:=optional,
|
org.springframework.transaction*;version="[4,6)";resolution:=optional,
|
||||||
|
|
|
@ -1,149 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<!--
|
|
||||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
contributor license agreements. See the NOTICE file distributed with
|
|
||||||
this work for additional information regarding copyright ownership.
|
|
||||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
(the "License"); you may not use this file except in compliance with
|
|
||||||
the License. You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
-->
|
|
||||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
|
||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
|
||||||
|
|
||||||
<parent>
|
|
||||||
<groupId>org.apache.activemq</groupId>
|
|
||||||
<artifactId>activemq-parent</artifactId>
|
|
||||||
<version>5.19.0-SNAPSHOT</version>
|
|
||||||
</parent>
|
|
||||||
|
|
||||||
<artifactId>activemq-partition</artifactId>
|
|
||||||
<packaging>jar</packaging>
|
|
||||||
|
|
||||||
<name>ActiveMQ :: Partition Management</name>
|
|
||||||
<description>Used to partition clients over a cluster of brokers</description>
|
|
||||||
|
|
||||||
<dependencies>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.activemq</groupId>
|
|
||||||
<artifactId>activemq-broker</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-api</artifactId>
|
|
||||||
<scope>compile</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.util-core</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- For Optional Snappy Compression -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-core</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-annotations</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
|
||||||
<artifactId>jackson-databind</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- Testing Dependencies -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.logging.log4j</groupId>
|
|
||||||
<artifactId>log4j-core</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.logging.log4j</groupId>
|
|
||||||
<artifactId>log4j-slf4j2-impl</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.activemq</groupId>
|
|
||||||
<artifactId>activemq-broker</artifactId>
|
|
||||||
<type>test-jar</type>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>junit</groupId>
|
|
||||||
<artifactId>junit</artifactId>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
|
||||||
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
<profiles>
|
|
||||||
<profile>
|
|
||||||
<id>activemq.tests-sanity</id>
|
|
||||||
<activation>
|
|
||||||
<property>
|
|
||||||
<name>activemq.tests</name>
|
|
||||||
<value>smoke</value>
|
|
||||||
</property>
|
|
||||||
</activation>
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
|
||||||
<configuration>
|
|
||||||
<includes>
|
|
||||||
<include>**/PartitionBrokerTest.*</include>
|
|
||||||
</includes>
|
|
||||||
</configuration>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
</profile>
|
|
||||||
<profile>
|
|
||||||
<id>activemq.tests-autoTransport</id>
|
|
||||||
<activation>
|
|
||||||
<property>
|
|
||||||
<name>activemq.tests</name>
|
|
||||||
<value>autoTransport</value>
|
|
||||||
</property>
|
|
||||||
</activation>
|
|
||||||
<build>
|
|
||||||
<plugins>
|
|
||||||
<plugin>
|
|
||||||
<artifactId>maven-surefire-plugin</artifactId>
|
|
||||||
<configuration>
|
|
||||||
<excludes>
|
|
||||||
<exclude>**</exclude>
|
|
||||||
</excludes>
|
|
||||||
</configuration>
|
|
||||||
</plugin>
|
|
||||||
</plugins>
|
|
||||||
</build>
|
|
||||||
</profile>
|
|
||||||
|
|
||||||
</profiles>
|
|
||||||
</project>
|
|
|
@ -1,367 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
|
||||||
import org.apache.activemq.broker.BrokerFilter;
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
|
||||||
import org.apache.activemq.broker.TransportConnection;
|
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
|
||||||
import org.apache.activemq.command.ConnectionControl;
|
|
||||||
import org.apache.activemq.command.ConnectionId;
|
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
|
||||||
import org.apache.activemq.command.Message;
|
|
||||||
import org.apache.activemq.partition.dto.Partitioning;
|
|
||||||
import org.apache.activemq.partition.dto.Target;
|
|
||||||
import org.apache.activemq.state.ConsumerState;
|
|
||||||
import org.apache.activemq.state.SessionState;
|
|
||||||
import org.apache.activemq.transport.Transport;
|
|
||||||
import org.apache.activemq.util.LRUCache;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A BrokerFilter which partitions client connections over a cluster of brokers.
|
|
||||||
*
|
|
||||||
* It can use a client identifier like client id, authenticated user name, source ip
|
|
||||||
* address or even destination being used by the connection to figure out which
|
|
||||||
* is the best broker in the cluster that the connection should be using and then
|
|
||||||
* redirects failover clients to that broker.
|
|
||||||
*/
|
|
||||||
public class PartitionBroker extends BrokerFilter {
|
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
|
|
||||||
protected final PartitionBrokerPlugin plugin;
|
|
||||||
protected boolean reloadConfigOnPoll = true;
|
|
||||||
|
|
||||||
public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
|
|
||||||
super(broker);
|
|
||||||
this.plugin = plugin;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
super.start();
|
|
||||||
getExecutor().execute(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
Thread.currentThread().setName("Partition Monitor");
|
|
||||||
onMonitorStart();
|
|
||||||
try {
|
|
||||||
runPartitionMonitor();
|
|
||||||
} catch (Exception e) {
|
|
||||||
onMonitorStop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void onMonitorStart() {
|
|
||||||
}
|
|
||||||
protected void onMonitorStop() {
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void runPartitionMonitor() {
|
|
||||||
while( !isStopped() ) {
|
|
||||||
try {
|
|
||||||
monitorWait();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(reloadConfigOnPoll) {
|
|
||||||
try {
|
|
||||||
reloadConfiguration();
|
|
||||||
} catch (Exception e) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for( ConnectionMonitor monitor: monitors.values()) {
|
|
||||||
checkTarget(monitor);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void monitorWait() throws InterruptedException {
|
|
||||||
synchronized (this) {
|
|
||||||
this.wait(1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void monitorWakeup() {
|
|
||||||
synchronized (this) {
|
|
||||||
this.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void reloadConfiguration() throws Exception {
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void checkTarget(ConnectionMonitor monitor) {
|
|
||||||
|
|
||||||
// can we find a preferred target for the connection?
|
|
||||||
Target targetDTO = pickBestBroker(monitor);
|
|
||||||
if( targetDTO == null || targetDTO.ids==null) {
|
|
||||||
LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Are we one the the targets?
|
|
||||||
if( targetDTO.ids.contains(getBrokerName()) ) {
|
|
||||||
LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Then we need to move the connection over.
|
|
||||||
String connectionString = getConnectionString(targetDTO.ids);
|
|
||||||
if( connectionString==null ) {
|
|
||||||
LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG.info("Redirecting connection to: " + connectionString);
|
|
||||||
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
|
|
||||||
ConnectionControl cc = new ConnectionControl();
|
|
||||||
cc.setConnectedBrokers(connectionString);
|
|
||||||
cc.setRebalanceConnection(true);
|
|
||||||
connection.dispatchAsync(cc);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getConnectionString(HashSet<String> ids) {
|
|
||||||
StringBuilder rc = new StringBuilder();
|
|
||||||
for (String id : ids) {
|
|
||||||
String url = plugin.getBrokerURL(this, id);
|
|
||||||
if( url!=null ) {
|
|
||||||
if( rc.length()!=0 ) {
|
|
||||||
rc.append(',');
|
|
||||||
}
|
|
||||||
rc.append(url);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if( rc.length()==0 )
|
|
||||||
return null;
|
|
||||||
return rc.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
static private class Score {
|
|
||||||
int value;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Target pickBestBroker(ConnectionMonitor monitor) {
|
|
||||||
|
|
||||||
if( getConfig() ==null )
|
|
||||||
return null;
|
|
||||||
|
|
||||||
if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
|
|
||||||
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
|
|
||||||
Transport transport = connection.getTransport();
|
|
||||||
Socket socket = transport.narrow(Socket.class);
|
|
||||||
if( socket !=null ) {
|
|
||||||
SocketAddress address = socket.getRemoteSocketAddress();
|
|
||||||
if( address instanceof InetSocketAddress) {
|
|
||||||
String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
|
|
||||||
Target targetDTO = getConfig().bySourceIp.get(ip);
|
|
||||||
if( targetDTO!=null ) {
|
|
||||||
return targetDTO;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
|
|
||||||
String userName = monitor.context.getUserName();
|
|
||||||
if( userName !=null ) {
|
|
||||||
Target targetDTO = getConfig().byUserName.get(userName);
|
|
||||||
if( targetDTO!=null ) {
|
|
||||||
return targetDTO;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
|
|
||||||
String clientId = monitor.context.getClientId();
|
|
||||||
if( clientId!=null ) {
|
|
||||||
Target targetDTO = getConfig().byClientId.get(clientId);
|
|
||||||
if( targetDTO!=null ) {
|
|
||||||
return targetDTO;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(
|
|
||||||
(getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
|
|
||||||
|| (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
|
|
||||||
) {
|
|
||||||
|
|
||||||
// Collect the destinations the connection is consuming from...
|
|
||||||
HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
|
|
||||||
for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
|
|
||||||
for (ConsumerState consumer : session.getConsumerStates()) {
|
|
||||||
ActiveMQDestination destination = consumer.getInfo().getDestination();
|
|
||||||
if( destination.isComposite() ) {
|
|
||||||
dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
|
|
||||||
} else {
|
|
||||||
dests.addAll(Collections.singletonList(destination));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Group them by the partitioning target for the destinations and score them..
|
|
||||||
HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
|
|
||||||
for (ActiveMQDestination dest : dests) {
|
|
||||||
Target target = getTarget(dest);
|
|
||||||
if( target!=null ) {
|
|
||||||
Score score = targetScores.get(target);
|
|
||||||
if( score == null ) {
|
|
||||||
score = new Score();
|
|
||||||
targetScores.put(target, score);
|
|
||||||
}
|
|
||||||
score.value++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The target with largest score wins..
|
|
||||||
if (!targetScores.isEmpty()) {
|
|
||||||
Target bestTarget = null;
|
|
||||||
int bestScore = 0;
|
|
||||||
for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
|
|
||||||
if (entry.getValue().value > bestScore) {
|
|
||||||
bestTarget = entry.getKey();
|
|
||||||
bestScore = entry.getValue().value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return bestTarget;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we get here is because there were no consumers, or the destinations for those
|
|
||||||
// consumers did not have an assigned destination.. So partition based on producer
|
|
||||||
// usage.
|
|
||||||
Target best = monitor.findBestProducerTarget(this);
|
|
||||||
if( best!=null ) {
|
|
||||||
return best;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Target getTarget(ActiveMQDestination dest) {
|
|
||||||
Partitioning config = getConfig();
|
|
||||||
if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
|
|
||||||
return config.byQueue.get(dest.getPhysicalName());
|
|
||||||
} else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
|
|
||||||
return config.byTopic.get(dest.getPhysicalName());
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
|
||||||
if( info.isFaultTolerant() ) {
|
|
||||||
ConnectionMonitor monitor = new ConnectionMonitor(context);
|
|
||||||
monitors.put(info.getConnectionId(), monitor);
|
|
||||||
super.addConnection(context, info);
|
|
||||||
checkTarget(monitor);
|
|
||||||
} else {
|
|
||||||
super.addConnection(context, info);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
|
|
||||||
super.removeConnection(context, info, error);
|
|
||||||
if( info.isFaultTolerant() ) {
|
|
||||||
monitors.remove(info.getConnectionId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
|
|
||||||
ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
|
|
||||||
if( monitor!=null ) {
|
|
||||||
monitor.onSend(producerExchange, messageSend);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Partitioning getConfig() {
|
|
||||||
return plugin.getConfig();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static class Traffic {
|
|
||||||
long messages;
|
|
||||||
long bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
static class ConnectionMonitor {
|
|
||||||
|
|
||||||
final ConnectionContext context;
|
|
||||||
LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
|
|
||||||
|
|
||||||
public ConnectionMonitor(ConnectionContext context) {
|
|
||||||
this.context = context;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized public Target findBestProducerTarget(PartitionBroker broker) {
|
|
||||||
Target best = null;
|
|
||||||
long bestSize = 0 ;
|
|
||||||
for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
|
|
||||||
Traffic t = entry.getValue();
|
|
||||||
// Once we get enough messages...
|
|
||||||
if( t.messages < broker.plugin.getMinTransferCount()) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if( t.bytes > bestSize) {
|
|
||||||
bestSize = t.bytes;
|
|
||||||
Target target = broker.getTarget(entry.getKey());
|
|
||||||
if( target!=null ) {
|
|
||||||
best = target;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return best;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
|
|
||||||
ActiveMQDestination dest = message.getDestination();
|
|
||||||
Traffic traffic = trafficPerDestination.get(dest);
|
|
||||||
if( traffic == null ) {
|
|
||||||
traffic = new Traffic();
|
|
||||||
trafficPerDestination.put(dest, traffic);
|
|
||||||
}
|
|
||||||
traffic.messages += 1;
|
|
||||||
traffic.bytes += message.getSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,66 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
|
||||||
import org.apache.activemq.partition.dto.Partitioning;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A BrokerPlugin which partitions client connections over a cluster of brokers.
|
|
||||||
*
|
|
||||||
* @org.apache.xbean.XBean element="partitionBrokerPlugin"
|
|
||||||
*/
|
|
||||||
public class PartitionBrokerPlugin implements BrokerPlugin {
|
|
||||||
|
|
||||||
protected int minTransferCount;
|
|
||||||
protected Partitioning config;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Broker installPlugin(Broker broker) throws Exception {
|
|
||||||
return new PartitionBroker(broker, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMinTransferCount() {
|
|
||||||
return minTransferCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMinTransferCount(int minTransferCount) {
|
|
||||||
this.minTransferCount = minTransferCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Partitioning getConfig() {
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConfig(Partitioning config) {
|
|
||||||
this.config = config;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setConfigAsJson(String config) throws IOException {
|
|
||||||
this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
|
|
||||||
if( config!=null && config.brokers!=null ) {
|
|
||||||
return config.brokers.get(id);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,596 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.zookeeper.*;
|
|
||||||
import org.apache.zookeeper.data.ACL;
|
|
||||||
import org.apache.zookeeper.data.Id;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
import org.linkedin.util.clock.Clock;
|
|
||||||
import org.linkedin.util.clock.SystemClock;
|
|
||||||
import org.linkedin.util.clock.Timespan;
|
|
||||||
import org.linkedin.util.concurrent.ConcurrentUtils;
|
|
||||||
import org.linkedin.util.io.PathUtils;
|
|
||||||
import org.linkedin.zookeeper.client.*;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
|
|
||||||
import java.io.UnsupportedEncodingException;
|
|
||||||
import java.lang.reflect.Field;
|
|
||||||
import java.lang.reflect.Method;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
|
|
||||||
|
|
||||||
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);
|
|
||||||
|
|
||||||
private Map<String, String> acls;
|
|
||||||
private String password;
|
|
||||||
|
|
||||||
public void start() throws Exception {
|
|
||||||
// Grab the lock to make sure that the registration of the ManagedService
|
|
||||||
// won't be updated immediately but that the initial update will happen first
|
|
||||||
synchronized (_lock) {
|
|
||||||
_stateChangeDispatcher.setDaemon(true);
|
|
||||||
_stateChangeDispatcher.start();
|
|
||||||
doStart();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setACLs(Map<String, String> acls) {
|
|
||||||
this.acls = acls;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setPassword(String password) {
|
|
||||||
this.password = password;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void doStart() throws UnsupportedEncodingException {
|
|
||||||
connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
if (_stateChangeDispatcher != null) {
|
|
||||||
_stateChangeDispatcher.end();
|
|
||||||
try {
|
|
||||||
_stateChangeDispatcher.join(1000);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("ignored exception", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
synchronized(_lock) {
|
|
||||||
if (_zk != null) {
|
|
||||||
try {
|
|
||||||
changeState(State.NONE);
|
|
||||||
_zk.close();
|
|
||||||
Thread th = getSendThread();
|
|
||||||
if (th != null) {
|
|
||||||
th.join(1000);
|
|
||||||
}
|
|
||||||
_zk = null;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("ignored exception", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Thread getSendThread() {
|
|
||||||
try {
|
|
||||||
return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
|
|
||||||
} catch (Throwable e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Object getField(Object obj, String... names) throws Exception {
|
|
||||||
for (String name : names) {
|
|
||||||
obj = getField(obj, name);
|
|
||||||
}
|
|
||||||
return obj;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Object getField(Object obj, String name) throws Exception {
|
|
||||||
Class clazz = obj.getClass();
|
|
||||||
while (clazz != null) {
|
|
||||||
for (Field f : clazz.getDeclaredFields()) {
|
|
||||||
if (f.getName().equals(name)) {
|
|
||||||
f.setAccessible(true);
|
|
||||||
return f.get(obj);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new NoSuchFieldError(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void changeState(State newState) {
|
|
||||||
synchronized (_lock) {
|
|
||||||
State oldState = _state;
|
|
||||||
if (oldState != newState) {
|
|
||||||
_stateChangeDispatcher.addEvent(oldState, newState);
|
|
||||||
_state = newState;
|
|
||||||
_lock.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testGenerateConnectionLoss() throws Exception {
|
|
||||||
waitForConnected();
|
|
||||||
Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
|
|
||||||
callMethod(clientCnxnSocket, "testableCloseSocket");
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Object callMethod(Object obj, String name, Object... args) throws Exception {
|
|
||||||
Class clazz = obj.getClass();
|
|
||||||
while (clazz != null) {
|
|
||||||
for (Method m : clazz.getDeclaredMethods()) {
|
|
||||||
if (m.getName().equals(name)) {
|
|
||||||
m.setAccessible(true);
|
|
||||||
return m.invoke(obj, args);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new NoSuchMethodError(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void tryConnect() {
|
|
||||||
synchronized (_lock) {
|
|
||||||
try {
|
|
||||||
connect();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("Error while restarting:", e);
|
|
||||||
if (_expiredSessionRecovery == null) {
|
|
||||||
_expiredSessionRecovery = new ExpiredSessionRecovery();
|
|
||||||
_expiredSessionRecovery.setDaemon(true);
|
|
||||||
_expiredSessionRecovery.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void connect() throws UnsupportedEncodingException {
|
|
||||||
synchronized (_lock) {
|
|
||||||
changeState(State.CONNECTING);
|
|
||||||
_zk = _factory.createZooKeeper(this);
|
|
||||||
if (password != null) {
|
|
||||||
_zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void process(WatchedEvent event) {
|
|
||||||
if (event.getState() != null) {
|
|
||||||
LOG.debug("event: {}", event.getState());
|
|
||||||
synchronized (_lock) {
|
|
||||||
switch(event.getState()) {
|
|
||||||
case SyncConnected:
|
|
||||||
changeState(State.CONNECTED);
|
|
||||||
break;
|
|
||||||
case Disconnected:
|
|
||||||
if (_state != State.NONE) {
|
|
||||||
changeState(State.RECONNECTING);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case Expired:
|
|
||||||
// when expired, the zookeeper object is invalid and we need to recreate a new one
|
|
||||||
_zk = null;
|
|
||||||
LOG.warn("Expiration detected: trying to restart...");
|
|
||||||
tryConnect();
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
LOG.warn("Unsupported event state: {}", event.getState());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected IZooKeeper getZk() {
|
|
||||||
State state = _state;
|
|
||||||
if (state == State.NONE) {
|
|
||||||
throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
|
|
||||||
} else if (state != State.CONNECTING) {
|
|
||||||
try {
|
|
||||||
waitForConnected();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IZooKeeper zk = _zk;
|
|
||||||
if (zk == null) {
|
|
||||||
throw new IllegalStateException("No ZooKeeper connection available");
|
|
||||||
}
|
|
||||||
return zk;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
|
|
||||||
waitForState(State.CONNECTED, timeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForConnected() throws InterruptedException, TimeoutException {
|
|
||||||
waitForConnected(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
|
|
||||||
long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
|
|
||||||
if (_state != state) {
|
|
||||||
synchronized (_lock) {
|
|
||||||
while (_state != state) {
|
|
||||||
ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void registerListener(LifecycleListener listener) {
|
|
||||||
if (listener == null) {
|
|
||||||
throw new IllegalStateException("listener is null");
|
|
||||||
}
|
|
||||||
if (!_listeners.contains(listener)) {
|
|
||||||
_listeners.add(listener);
|
|
||||||
}
|
|
||||||
if (_state == State.CONNECTED) {
|
|
||||||
listener.onConnected();
|
|
||||||
//_stateChangeDispatcher.addEvent(null, State.CONNECTED);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void removeListener(LifecycleListener listener) {
|
|
||||||
if (listener == null) {
|
|
||||||
throw new IllegalStateException("listener is null");
|
|
||||||
}
|
|
||||||
_listeners.remove(listener);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
|
|
||||||
return new ChrootedZKClient(this, adjustPath(path));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isConnected() {
|
|
||||||
return _state == State.CONNECTED;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isConfigured() {
|
|
||||||
return _state != State.NONE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getConnectString() {
|
|
||||||
return _factory.getConnectString();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static enum State {
|
|
||||||
NONE,
|
|
||||||
CONNECTING,
|
|
||||||
CONNECTED,
|
|
||||||
RECONNECTING
|
|
||||||
}
|
|
||||||
|
|
||||||
private final static String CHARSET = "UTF-8";
|
|
||||||
|
|
||||||
private final Clock _clock = SystemClock.instance();
|
|
||||||
private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();
|
|
||||||
|
|
||||||
protected final Object _lock = new Object();
|
|
||||||
protected volatile State _state = State.NONE;
|
|
||||||
|
|
||||||
private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
|
|
||||||
|
|
||||||
protected IZooKeeperFactory _factory;
|
|
||||||
protected IZooKeeper _zk;
|
|
||||||
protected Timespan _reconnectTimeout = Timespan.parse("20s");
|
|
||||||
protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
|
|
||||||
|
|
||||||
private ExpiredSessionRecovery _expiredSessionRecovery = null;
|
|
||||||
|
|
||||||
private class StateChangeDispatcher extends Thread {
|
|
||||||
private final AtomicBoolean _running = new AtomicBoolean(true);
|
|
||||||
private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();
|
|
||||||
|
|
||||||
private StateChangeDispatcher() {
|
|
||||||
super("ZooKeeper state change dispatcher thread");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
Map<Object, Boolean> history = new IdentityHashMap<>();
|
|
||||||
LOG.info("Starting StateChangeDispatcher");
|
|
||||||
while (_running.get()) {
|
|
||||||
Boolean isConnectedEvent;
|
|
||||||
try {
|
|
||||||
isConnectedEvent = _events.take();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (!_running.get() || isConnectedEvent == null) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
|
|
||||||
// we save which event each listener has seen last
|
|
||||||
// we don't update the map in place because we need to get rid of unregistered listeners
|
|
||||||
history = newHistory;
|
|
||||||
}
|
|
||||||
LOG.info("StateChangeDispatcher terminated.");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void end() {
|
|
||||||
_running.set(false);
|
|
||||||
_events.add(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
|
|
||||||
LOG.debug("addEvent: {} => {}", oldState, newState);
|
|
||||||
if (newState == ZKClient.State.CONNECTED) {
|
|
||||||
_events.add(true);
|
|
||||||
} else if (oldState == ZKClient.State.CONNECTED) {
|
|
||||||
_events.add(false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
|
|
||||||
Map<Object, Boolean> newHistory = new IdentityHashMap<>();
|
|
||||||
for (LifecycleListener listener : _listeners) {
|
|
||||||
Boolean previousEvent = history.get(listener);
|
|
||||||
// we propagate the event only if it was not already sent
|
|
||||||
if (previousEvent == null || previousEvent != connectedEvent) {
|
|
||||||
try {
|
|
||||||
if (connectedEvent) {
|
|
||||||
listener.onConnected();
|
|
||||||
} else {
|
|
||||||
listener.onDisconnected();
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("Exception while executing listener (ignored)", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
newHistory.put(listener, connectedEvent);
|
|
||||||
}
|
|
||||||
return newHistory;
|
|
||||||
}
|
|
||||||
|
|
||||||
private class ExpiredSessionRecovery extends Thread {
|
|
||||||
|
|
||||||
private ExpiredSessionRecovery() {
|
|
||||||
super("ZooKeeper expired session recovery thread");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
LOG.info("Entering recovery mode");
|
|
||||||
synchronized (_lock) {
|
|
||||||
try {
|
|
||||||
int count = 0;
|
|
||||||
while (_state == ZKClient.State.NONE) {
|
|
||||||
try {
|
|
||||||
count++;
|
|
||||||
LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
|
|
||||||
ZKClient.this.connect();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
|
|
||||||
try {
|
|
||||||
_lock.wait(_reconnectTimeout.getDurationInMilliseconds());
|
|
||||||
} catch (InterruptedException e1) {
|
|
||||||
throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
_expiredSessionRecovery = null;
|
|
||||||
LOG.info("Exiting recovery mode.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
|
|
||||||
this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
|
|
||||||
}
|
|
||||||
|
|
||||||
public ZKClient(IZooKeeperFactory factory) {
|
|
||||||
this(factory, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ZKClient(IZooKeeperFactory factory, String chroot) {
|
|
||||||
super(chroot);
|
|
||||||
_factory = factory;
|
|
||||||
Map<String, String> acls = new HashMap<>();
|
|
||||||
acls.put("/", "world:anyone:acdrw");
|
|
||||||
setACLs(acls);
|
|
||||||
}
|
|
||||||
|
|
||||||
static private int getPermFromString(String permString) {
|
|
||||||
int perm = 0;
|
|
||||||
for (int i = 0; i < permString.length(); i++) {
|
|
||||||
switch (permString.charAt(i)) {
|
|
||||||
case 'r':
|
|
||||||
perm |= ZooDefs.Perms.READ;
|
|
||||||
break;
|
|
||||||
case 'w':
|
|
||||||
perm |= ZooDefs.Perms.WRITE;
|
|
||||||
break;
|
|
||||||
case 'c':
|
|
||||||
perm |= ZooDefs.Perms.CREATE;
|
|
||||||
break;
|
|
||||||
case 'd':
|
|
||||||
perm |= ZooDefs.Perms.DELETE;
|
|
||||||
break;
|
|
||||||
case 'a':
|
|
||||||
perm |= ZooDefs.Perms.ADMIN;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
System.err.println("Unknown perm type:" + permString.charAt(i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return perm;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static List<ACL> parseACLs(String aclString) {
|
|
||||||
List<ACL> acl;
|
|
||||||
String acls[] = aclString.split(",");
|
|
||||||
acl = new ArrayList<>();
|
|
||||||
for (String a : acls) {
|
|
||||||
int firstColon = a.indexOf(':');
|
|
||||||
int lastColon = a.lastIndexOf(':');
|
|
||||||
if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
|
|
||||||
System.err.println(a + " does not have the form scheme:id:perm");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
ACL newAcl = new ACL();
|
|
||||||
newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
|
|
||||||
newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
|
|
||||||
acl.add(newAcl);
|
|
||||||
}
|
|
||||||
return acl;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
if (exists(path) != null) {
|
|
||||||
return setByteData(path, data);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
createBytesNodeWithParents(path, data, acl, createMode);
|
|
||||||
return null;
|
|
||||||
} catch (KeeperException.NodeExistsException e) {
|
|
||||||
// this should not happen very often (race condition)
|
|
||||||
return setByteData(path, data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return create(path, (byte[]) null, createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return create(path, toByteData(data), createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return createWithParents(path, (byte[]) null, createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return createWithParents(path, toByteData(data), createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
createParents(path);
|
|
||||||
return create(path, data, createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
return createOrSetWithParents(path, toByteData(data), createMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
|
||||||
if (exists(path) != null) {
|
|
||||||
return setByteData(path, data);
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
createWithParents(path, data, createMode);
|
|
||||||
return null;
|
|
||||||
} catch (KeeperException.NodeExistsException e) {
|
|
||||||
// this should not happen very often (race condition)
|
|
||||||
return setByteData(path, data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
|
|
||||||
if (exists(path) != null) {
|
|
||||||
doFixACLs(path, recursive);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
|
|
||||||
setACL(path, getNodeACLs(path), -1);
|
|
||||||
if (recursive) {
|
|
||||||
for (String child : getChildren(path)) {
|
|
||||||
doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<ACL> getNodeACLs(String path) {
|
|
||||||
String acl = doGetNodeACLs(adjustPath(path));
|
|
||||||
if (acl == null) {
|
|
||||||
throw new IllegalStateException("Could not find matching ACLs for " + path);
|
|
||||||
}
|
|
||||||
return parseACLs(acl);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String doGetNodeACLs(String path) {
|
|
||||||
String longestPath = "";
|
|
||||||
for (String acl : acls.keySet()) {
|
|
||||||
if (acl.length() > longestPath.length() && path.startsWith(acl)) {
|
|
||||||
longestPath = acl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return acls.get(longestPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createParents(String path) throws InterruptedException, KeeperException {
|
|
||||||
path = PathUtils.getParentPath(adjustPath(path));
|
|
||||||
path = PathUtils.removeTrailingSlash(path);
|
|
||||||
List<String> paths = new ArrayList<>();
|
|
||||||
while (!path.equals("") && getZk().exists(path, false) == null) {
|
|
||||||
paths.add(path);
|
|
||||||
path = PathUtils.getParentPath(path);
|
|
||||||
path = PathUtils.removeTrailingSlash(path);
|
|
||||||
}
|
|
||||||
Collections.reverse(paths);
|
|
||||||
for (String p : paths) {
|
|
||||||
try {
|
|
||||||
getZk().create(p,
|
|
||||||
null,
|
|
||||||
getNodeACLs(p),
|
|
||||||
CreateMode.PERSISTENT);
|
|
||||||
} catch (KeeperException.NodeExistsException e) {
|
|
||||||
// ok we continue...
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("parent already exists " + p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private byte[] toByteData(String data) {
|
|
||||||
if (data == null) {
|
|
||||||
return null;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
return data.getBytes(CHARSET);
|
|
||||||
} catch (UnsupportedEncodingException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,124 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
|
||||||
import org.apache.activemq.partition.dto.Partitioning;
|
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
|
||||||
import org.apache.zookeeper.Watcher;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
import org.linkedin.util.clock.Timespan;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class ZooKeeperPartitionBroker extends PartitionBroker {
|
|
||||||
|
|
||||||
protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
|
|
||||||
|
|
||||||
protected volatile ZKClient zk_client = null;
|
|
||||||
protected volatile Partitioning config;
|
|
||||||
protected final CountDownLatch configAcquired = new CountDownLatch(1);
|
|
||||||
|
|
||||||
public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
|
|
||||||
super(broker, plugin);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() throws Exception {
|
|
||||||
super.start();
|
|
||||||
// Lets block a bit until we get our config.. Otherwise just keep
|
|
||||||
// on going.. not a big deal if we get our config later. Perhaps
|
|
||||||
// ZK service is not having a good day.
|
|
||||||
configAcquired.await(5, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void onMonitorStop() {
|
|
||||||
zkDisconnect();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Partitioning getConfig() {
|
|
||||||
return config;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ZooKeeperPartitionBrokerPlugin plugin() {
|
|
||||||
return (ZooKeeperPartitionBrokerPlugin)plugin;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void zkConnect() throws Exception {
|
|
||||||
zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
|
|
||||||
if( plugin().getZkPassword()!=null ) {
|
|
||||||
zk_client.setPassword(plugin().getZkPassword());
|
|
||||||
}
|
|
||||||
zk_client.start();
|
|
||||||
zk_client.waitForConnected(Timespan.parse("30s"));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void zkDisconnect() {
|
|
||||||
if( zk_client!=null ) {
|
|
||||||
zk_client.close();
|
|
||||||
zk_client = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void reloadConfiguration() throws Exception {
|
|
||||||
if( zk_client==null ) {
|
|
||||||
LOG.debug("Connecting to ZooKeeper");
|
|
||||||
try {
|
|
||||||
zkConnect();
|
|
||||||
LOG.debug("Connected to ZooKeeper");
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("Connection to ZooKeeper failed: "+e);
|
|
||||||
zkDisconnect();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
byte[] data = null;
|
|
||||||
try {
|
|
||||||
Stat stat = new Stat();
|
|
||||||
data = zk_client.getData(plugin().getZkPath(), new Watcher() {
|
|
||||||
@Override
|
|
||||||
public void process(WatchedEvent watchedEvent) {
|
|
||||||
try {
|
|
||||||
reloadConfiguration();
|
|
||||||
} catch (Exception e) {
|
|
||||||
}
|
|
||||||
monitorWakeup();
|
|
||||||
}
|
|
||||||
}, stat);
|
|
||||||
configAcquired.countDown();
|
|
||||||
reloadConfigOnPoll = false;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Could load partitioning configuration: " + e, e);
|
|
||||||
reloadConfigOnPoll = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
config = Partitioning.MAPPER.readValue(data, Partitioning.class);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Invalid partitioning configuration: " + e, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,68 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.Broker;
|
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
|
|
||||||
*/
|
|
||||||
public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
|
|
||||||
|
|
||||||
String zkAddress = "127.0.0.1:2181";
|
|
||||||
String zkPassword;
|
|
||||||
String zkPath = "/broker-assignments";
|
|
||||||
String zkSessionTmeout = "10s";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Broker installPlugin(Broker broker) throws Exception {
|
|
||||||
return new ZooKeeperPartitionBroker(broker, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getZkAddress() {
|
|
||||||
return zkAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setZkAddress(String zkAddress) {
|
|
||||||
this.zkAddress = zkAddress;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getZkPassword() {
|
|
||||||
return zkPassword;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setZkPassword(String zkPassword) {
|
|
||||||
this.zkPassword = zkPassword;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getZkPath() {
|
|
||||||
return zkPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setZkPath(String zkPath) {
|
|
||||||
this.zkPath = zkPath;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getZkSessionTmeout() {
|
|
||||||
return zkSessionTmeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setZkSessionTmeout(String zkSessionTmeout) {
|
|
||||||
this.zkSessionTmeout = zkSessionTmeout;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,161 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition.dto;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import com.fasterxml.jackson.databind.DeserializationConfig;
|
|
||||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The main Configuration class for the PartitionBroker plugin
|
|
||||||
*/
|
|
||||||
public class Partitioning {
|
|
||||||
|
|
||||||
static final public ObjectMapper MAPPER = new ObjectMapper();
|
|
||||||
static {
|
|
||||||
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
|
|
||||||
MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
|
|
||||||
}
|
|
||||||
|
|
||||||
static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
|
|
||||||
static {
|
|
||||||
TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
|
|
||||||
TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If a client connects with a clientId which is listed in the
|
|
||||||
* map, then he will be immediately reconnected
|
|
||||||
* to the partition target immediately.
|
|
||||||
*/
|
|
||||||
@JsonProperty("by_client_id")
|
|
||||||
@JsonDeserialize(contentAs = Target.class)
|
|
||||||
public HashMap<String, Target> byClientId;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If a client connects with a user priciple which is listed in the
|
|
||||||
* map, then he will be immediately reconnected
|
|
||||||
* to the partition target immediately.
|
|
||||||
*/
|
|
||||||
@JsonProperty("by_user_name")
|
|
||||||
@JsonDeserialize(contentAs = Target.class)
|
|
||||||
public HashMap<String, Target> byUserName;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If a client connects with source ip which is listed in the
|
|
||||||
* map, then he will be immediately reconnected
|
|
||||||
* to the partition target immediately.
|
|
||||||
*/
|
|
||||||
@JsonProperty("by_source_ip")
|
|
||||||
@JsonDeserialize(contentAs = Target.class)
|
|
||||||
public HashMap<String, Target> bySourceIp;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Used to map the preferred partitioning of queues across
|
|
||||||
* a set of brokers. Once a it is deemed that a connection mostly
|
|
||||||
* works with a set of targets configured in this map, the client
|
|
||||||
* will be reconnected to the appropriate target.
|
|
||||||
*/
|
|
||||||
@JsonProperty("by_queue")
|
|
||||||
@JsonDeserialize(contentAs = Target.class)
|
|
||||||
public HashMap<String, Target> byQueue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Used to map the preferred partitioning of topics across
|
|
||||||
* a set of brokers. Once a it is deemed that a connection mostly
|
|
||||||
* works with a set of targets configured in this map, the client
|
|
||||||
* will be reconnected to the appropriate target.
|
|
||||||
*/
|
|
||||||
@JsonProperty("by_topic")
|
|
||||||
@JsonDeserialize(contentAs = Target.class)
|
|
||||||
public HashMap<String, Target> byTopic;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Maps broker names to broker URLs.
|
|
||||||
*/
|
|
||||||
@JsonProperty("brokers")
|
|
||||||
@JsonDeserialize(contentAs = String.class)
|
|
||||||
public HashMap<String, String> brokers;
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
try {
|
|
||||||
return TO_STRING_MAPPER.writeValueAsString(this);
|
|
||||||
} catch (IOException e) {
|
|
||||||
return super.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, String> getBrokers() {
|
|
||||||
return brokers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBrokers(HashMap<String, String> brokers) {
|
|
||||||
this.brokers = brokers;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, Target> getByClientId() {
|
|
||||||
return byClientId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setByClientId(HashMap<String, Target> byClientId) {
|
|
||||||
this.byClientId = byClientId;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, Target> getByQueue() {
|
|
||||||
return byQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setByQueue(HashMap<String, Target> byQueue) {
|
|
||||||
this.byQueue = byQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, Target> getBySourceIp() {
|
|
||||||
return bySourceIp;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBySourceIp(HashMap<String, Target> bySourceIp) {
|
|
||||||
this.bySourceIp = bySourceIp;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, Target> getByTopic() {
|
|
||||||
return byTopic;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setByTopic(HashMap<String, Target> byTopic) {
|
|
||||||
this.byTopic = byTopic;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashMap<String, Target> getByUserName() {
|
|
||||||
return byUserName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setByUserName(HashMap<String, Target> byUserName) {
|
|
||||||
this.byUserName = byUserName;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,59 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition.dto;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.HashSet;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents a partition target. This identifies the brokers that
|
|
||||||
* a partition lives on.
|
|
||||||
*/
|
|
||||||
public class Target {
|
|
||||||
|
|
||||||
@JsonProperty("ids")
|
|
||||||
public HashSet<String> ids = new HashSet<String>();
|
|
||||||
|
|
||||||
public Target() {
|
|
||||||
ids = new HashSet<String>();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Target(String ...ids) {
|
|
||||||
this.ids.addAll(java.util.Arrays.asList(ids));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
try {
|
|
||||||
return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
|
|
||||||
} catch (IOException e) {
|
|
||||||
return super.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HashSet<String> getIds() {
|
|
||||||
return ids;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setIds(Collection<String> ids) {
|
|
||||||
this.ids = new HashSet<String>(ids);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,251 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
|
||||||
import org.apache.activemq.partition.dto.Partitioning;
|
|
||||||
import org.apache.activemq.partition.dto.Target;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import javax.jms.*;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Unit tests for the PartitionBroker plugin.
|
|
||||||
*/
|
|
||||||
public class PartitionBrokerTest {
|
|
||||||
|
|
||||||
protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
|
|
||||||
protected ArrayList<Connection> connections = new ArrayList<Connection>();
|
|
||||||
Partitioning partitioning;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
partitioning = new Partitioning();
|
|
||||||
partitioning.brokers = new HashMap<String, String>();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Partitioning can only re-direct failover clients since those
|
|
||||||
* can re-connect and re-establish their state with another broker.
|
|
||||||
*/
|
|
||||||
@Test(timeout = 1000*60*60)
|
|
||||||
public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
|
|
||||||
|
|
||||||
partitioning.byClientId = new HashMap<String, Target>();
|
|
||||||
partitioning.byClientId.put("client1", new Target("broker1"));
|
|
||||||
createBrokerCluster(2);
|
|
||||||
|
|
||||||
Connection connection = createConnectionToUrl(getConnectURL("broker2"));
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(0, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(1, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
connection.setClientID("client1");
|
|
||||||
connection.start();
|
|
||||||
|
|
||||||
Thread.sleep(1000);
|
|
||||||
assertEquals(0, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(1, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 1000*60*60)
|
|
||||||
public void testPartitionByClientId() throws Exception {
|
|
||||||
partitioning.byClientId = new HashMap<String, Target>();
|
|
||||||
partitioning.byClientId.put("client1", new Target("broker1"));
|
|
||||||
partitioning.byClientId.put("client2", new Target("broker2"));
|
|
||||||
createBrokerCluster(2);
|
|
||||||
|
|
||||||
Connection connection = createConnectionTo("broker2");
|
|
||||||
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(0, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(1, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
connection.setClientID("client1");
|
|
||||||
connection.start();
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(1, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(0, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 1000*60*60)
|
|
||||||
public void testPartitionByQueue() throws Exception {
|
|
||||||
partitioning.byQueue = new HashMap<String, Target>();
|
|
||||||
partitioning.byQueue.put("foo", new Target("broker1"));
|
|
||||||
createBrokerCluster(2);
|
|
||||||
|
|
||||||
Connection connection2 = createConnectionTo("broker2");
|
|
||||||
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(0, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(1, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
|
|
||||||
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(1, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(0, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Connection connection1 = createConnectionTo("broker2");
|
|
||||||
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
||||||
MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
|
|
||||||
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(1, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(1, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
producer.send(session1.createTextMessage("#" + i));
|
|
||||||
}
|
|
||||||
|
|
||||||
within(5, TimeUnit.SECONDS, new Task() {
|
|
||||||
public void run() throws Exception {
|
|
||||||
assertEquals(2, getTransportConnector("broker1").getConnections().size());
|
|
||||||
assertEquals(0, getTransportConnector("broker2").getConnections().size());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static interface Task {
|
|
||||||
public void run() throws Exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
|
|
||||||
long timeMS = unit.toMillis(time);
|
|
||||||
long deadline = System.currentTimeMillis() + timeMS;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
task.run();
|
|
||||||
return;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
long remaining = deadline - System.currentTimeMillis();
|
|
||||||
if( remaining <=0 ) {
|
|
||||||
if( e instanceof RuntimeException ) {
|
|
||||||
throw (RuntimeException)e;
|
|
||||||
}
|
|
||||||
if( e instanceof Error ) {
|
|
||||||
throw (Error)e;
|
|
||||||
}
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
Thread.sleep(Math.min(timeMS/10, remaining));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
|
|
||||||
return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false");
|
|
||||||
}
|
|
||||||
|
|
||||||
private Connection createConnectionToUrl(String url) throws JMSException {
|
|
||||||
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
|
|
||||||
Connection connection = factory.createConnection();
|
|
||||||
connections.add(connection);
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getConnectURL(String broker) throws IOException, URISyntaxException {
|
|
||||||
TransportConnector tcp = getTransportConnector(broker);
|
|
||||||
return tcp.getConnectUri().toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
private TransportConnector getTransportConnector(String broker) {
|
|
||||||
BrokerService brokerService = brokers.get(broker);
|
|
||||||
if( brokerService==null ) {
|
|
||||||
throw new IllegalArgumentException("Invalid broker id");
|
|
||||||
}
|
|
||||||
return brokerService.getTransportConnectorByName("tcp");
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void createBrokerCluster(int brokerCount) throws Exception {
|
|
||||||
for (int i = 1; i <= brokerCount; i++) {
|
|
||||||
String brokerId = "broker" + i;
|
|
||||||
BrokerService broker = createBroker(brokerId);
|
|
||||||
broker.setPersistent(false);
|
|
||||||
broker.addConnector("tcp://localhost:0").setName("tcp");
|
|
||||||
addPartitionBrokerPlugin(broker);
|
|
||||||
broker.start();
|
|
||||||
broker.waitUntilStarted();
|
|
||||||
partitioning.brokers.put(brokerId, getConnectURL(brokerId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void addPartitionBrokerPlugin(BrokerService broker) {
|
|
||||||
PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
|
|
||||||
plugin.setConfig(partitioning);
|
|
||||||
broker.setPlugins(new BrokerPlugin[]{plugin});
|
|
||||||
}
|
|
||||||
|
|
||||||
protected BrokerService createBroker(String name) {
|
|
||||||
BrokerService broker = new BrokerService();
|
|
||||||
broker.setBrokerName(name);
|
|
||||||
brokers.put(name, broker);
|
|
||||||
return broker;
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
for (Connection connection : connections) {
|
|
||||||
try {
|
|
||||||
connection.close();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
connections.clear();
|
|
||||||
for (BrokerService broker : brokers.values()) {
|
|
||||||
try {
|
|
||||||
broker.stop();
|
|
||||||
broker.waitUntilStopped();
|
|
||||||
} catch (Throwable e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
brokers.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,97 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.partition;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.BrokerPlugin;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.zookeeper.CreateMode;
|
|
||||||
import org.apache.zookeeper.server.NIOServerCnxnFactory;
|
|
||||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
|
||||||
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.linkedin.util.clock.Timespan;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
|
|
||||||
|
|
||||||
NIOServerCnxnFactory connector;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
System.out.println("Starting ZooKeeper");
|
|
||||||
ZooKeeperServer zk_server = new ZooKeeperServer();
|
|
||||||
zk_server.setTickTime(500);
|
|
||||||
zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data")));
|
|
||||||
connector = new NIOServerCnxnFactory();
|
|
||||||
connector.configure(new InetSocketAddress(0), 100);
|
|
||||||
connector.startup(zk_server);
|
|
||||||
System.out.println("ZooKeeper started");
|
|
||||||
super.setUp();
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
super.tearDown();
|
|
||||||
if( connector!=null ) {
|
|
||||||
connector.shutdown();
|
|
||||||
connector = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
String zkPath = "/partition-config";
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void createBrokerCluster(int brokerCount) throws Exception {
|
|
||||||
// Store the partitioning in ZK.
|
|
||||||
ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null);
|
|
||||||
try {
|
|
||||||
zk_client.start();
|
|
||||||
zk_client.waitForConnected(Timespan.parse("30s"));
|
|
||||||
try {
|
|
||||||
zk_client.delete(zkPath);
|
|
||||||
} catch (Throwable e) {
|
|
||||||
}
|
|
||||||
zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
|
|
||||||
} finally {
|
|
||||||
zk_client.close();
|
|
||||||
}
|
|
||||||
super.createBrokerCluster(brokerCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void addPartitionBrokerPlugin(BrokerService broker) {
|
|
||||||
// Have the borker plugin get the partition config via ZK.
|
|
||||||
ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
|
|
||||||
@Override
|
|
||||||
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
|
|
||||||
try {
|
|
||||||
return getConnectURL(id);
|
|
||||||
} catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
plugin.setZkAddress("localhost:" + connector.getLocalPort());
|
|
||||||
plugin.setZkPath(zkPath);
|
|
||||||
broker.setPlugins(new BrokerPlugin[]{plugin});
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -77,21 +77,6 @@
|
||||||
<version>${hawtdispatch-version}</version>
|
<version>${hawtdispatch-version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.util-core</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
<scope>provided</scope>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.osgi</groupId>
|
<groupId>org.osgi</groupId>
|
||||||
<artifactId>osgi.core</artifactId>
|
<artifactId>osgi.core</artifactId>
|
||||||
|
@ -202,7 +187,6 @@
|
||||||
<include>${basedir}/../activemq-kahadb-store/src/main/java</include>
|
<include>${basedir}/../activemq-kahadb-store/src/main/java</include>
|
||||||
<include>${basedir}/../activemq-mqtt/src/main/java</include>
|
<include>${basedir}/../activemq-mqtt/src/main/java</include>
|
||||||
<include>${basedir}/../activemq-stomp/src/main/java</include>
|
<include>${basedir}/../activemq-stomp/src/main/java</include>
|
||||||
<include>${basedir}/../activemq-partition/src/main/java</include>
|
|
||||||
<include>${basedir}/../activemq-runtime-config/src/main/java</include>
|
<include>${basedir}/../activemq-runtime-config/src/main/java</include>
|
||||||
</includes>
|
</includes>
|
||||||
<strictXsdOrder>false</strictXsdOrder>
|
<strictXsdOrder>false</strictXsdOrder>
|
||||||
|
|
|
@ -58,10 +58,6 @@
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-stomp</artifactId>
|
<artifactId>activemq-stomp</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.activemq</groupId>
|
|
||||||
<artifactId>activemq-partition</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq</groupId>
|
<groupId>org.apache.activemq</groupId>
|
||||||
<artifactId>activemq-runtime-config</artifactId>
|
<artifactId>activemq-runtime-config</artifactId>
|
||||||
|
|
|
@ -1,53 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.activemq.broker.partition;
|
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
|
||||||
import org.apache.activemq.broker.BrokerService;
|
|
||||||
import org.apache.activemq.partition.PartitionBrokerPlugin;
|
|
||||||
import org.apache.activemq.partition.dto.Partitioning;
|
|
||||||
|
|
||||||
/**
|
|
||||||
*/
|
|
||||||
public class SpringPartitionBrokerTest extends TestCase {
|
|
||||||
|
|
||||||
public void testCreatePartitionBroker() throws Exception {
|
|
||||||
|
|
||||||
BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml");
|
|
||||||
assertEquals(1, broker.getPlugins().length);
|
|
||||||
PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0];
|
|
||||||
Partitioning config = plugin.getConfig();
|
|
||||||
assertEquals(2, config.getBrokers().size());
|
|
||||||
|
|
||||||
Object o;
|
|
||||||
String json = "{\n" +
|
|
||||||
" \"by_client_id\":{\n" +
|
|
||||||
" \"client1\":{\"ids\":[\"broker1\"]},\n" +
|
|
||||||
" \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" +
|
|
||||||
" },\n" +
|
|
||||||
" \"brokers\":{\n" +
|
|
||||||
" \"broker1\":\"tcp://localhost:61616\",\n" +
|
|
||||||
" \"broker2\":\"tcp://localhost:61616\"\n" +
|
|
||||||
" }\n" +
|
|
||||||
"}";
|
|
||||||
Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class);
|
|
||||||
assertEquals(expected.toString(), config.toString());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,58 +0,0 @@
|
||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<!--
|
|
||||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
contributor license agreements. See the NOTICE file distributed with
|
|
||||||
this work for additional information regarding copyright ownership.
|
|
||||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
(the "License"); you may not use this file except in compliance with
|
|
||||||
the License. You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
-->
|
|
||||||
<!-- START SNIPPET: xbean -->
|
|
||||||
<beans
|
|
||||||
xmlns="http://www.springframework.org/schema/beans"
|
|
||||||
xmlns:amq="http://activemq.apache.org/schema/core"
|
|
||||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
|
||||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
|
||||||
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
|
|
||||||
|
|
||||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
|
||||||
|
|
||||||
<bean id="config" class="java.lang.String">
|
|
||||||
<constructor-arg><value>
|
|
||||||
<![CDATA[
|
|
||||||
{
|
|
||||||
"by_client_id":{
|
|
||||||
"client1":{"ids":["broker1"]},
|
|
||||||
"client2":{"ids":["broker1","broker2"]}
|
|
||||||
},
|
|
||||||
"brokers":{
|
|
||||||
"broker1":"tcp://localhost:61616",
|
|
||||||
"broker2":"tcp://localhost:61616"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
]]>
|
|
||||||
</value></constructor-arg>
|
|
||||||
</bean>
|
|
||||||
|
|
||||||
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false">
|
|
||||||
|
|
||||||
<plugins>
|
|
||||||
<partitionBrokerPlugin minTransferCount="5" configAsJson="#config"/>
|
|
||||||
</plugins>
|
|
||||||
|
|
||||||
<transportConnectors>
|
|
||||||
<transportConnector uri="tcp://localhost:61616"/>
|
|
||||||
</transportConnectors>
|
|
||||||
|
|
||||||
</broker>
|
|
||||||
|
|
||||||
</beans>
|
|
||||||
<!-- END SNIPPET: xbean -->
|
|
|
@ -54,10 +54,6 @@
|
||||||
<artifactId>activemq-unit-tests</artifactId>
|
<artifactId>activemq-unit-tests</artifactId>
|
||||||
<type>test-jar</type>
|
<type>test-jar</type>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>${project.groupId}</groupId>
|
|
||||||
<artifactId>activemq-partition</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq.tooling</groupId>
|
<groupId>org.apache.activemq.tooling</groupId>
|
||||||
<artifactId>activemq-junit</artifactId>
|
<artifactId>activemq-junit</artifactId>
|
||||||
|
@ -68,19 +64,6 @@
|
||||||
<artifactId>hawtdispatch-transport</artifactId>
|
<artifactId>hawtdispatch-transport</artifactId>
|
||||||
<version>${hawtdispatch-version}</version>
|
<version>${hawtdispatch-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.util-core</artifactId>
|
|
||||||
<version>${linkedin-zookeeper-version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.osgi</groupId>
|
<groupId>org.osgi</groupId>
|
||||||
|
|
|
@ -182,7 +182,6 @@
|
||||||
<include>${pom.groupId}:activemq-log4j-appender</include>
|
<include>${pom.groupId}:activemq-log4j-appender</include>
|
||||||
<include>${pom.groupId}:activemq-jms-pool</include>
|
<include>${pom.groupId}:activemq-jms-pool</include>
|
||||||
<include>${pom.groupId}:activemq-pool</include>
|
<include>${pom.groupId}:activemq-pool</include>
|
||||||
<include>${pom.groupId}:activemq-partition</include>
|
|
||||||
<include>${pom.groupId}:activemq-shiro</include>
|
<include>${pom.groupId}:activemq-shiro</include>
|
||||||
<include>commons-beanutils:commons-beanutils</include>
|
<include>commons-beanutils:commons-beanutils</include>
|
||||||
<include>commons-collections:commons-collections</include>
|
<include>commons-collections:commons-collections</include>
|
||||||
|
|
67
pom.xml
67
pom.xml
|
@ -90,8 +90,6 @@
|
||||||
<mqtt-client-version>1.16</mqtt-client-version>
|
<mqtt-client-version>1.16</mqtt-client-version>
|
||||||
<org-apache-derby-version>10.15.2.0</org-apache-derby-version>
|
<org-apache-derby-version>10.15.2.0</org-apache-derby-version>
|
||||||
<osgi-version>6.0.0</osgi-version>
|
<osgi-version>6.0.0</osgi-version>
|
||||||
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
|
||||||
<zookeeper-version>3.4.14</zookeeper-version>
|
|
||||||
<qpid-proton-version>0.33.10</qpid-proton-version>
|
<qpid-proton-version>0.33.10</qpid-proton-version>
|
||||||
<qpid-jms-version>1.6.0</qpid-jms-version>
|
<qpid-jms-version>1.6.0</qpid-jms-version>
|
||||||
<netty-version>4.1.75.Final</netty-version>
|
<netty-version>4.1.75.Final</netty-version>
|
||||||
|
@ -229,7 +227,6 @@
|
||||||
<module>activemq-runtime-config</module>
|
<module>activemq-runtime-config</module>
|
||||||
<module>activemq-tooling</module>
|
<module>activemq-tooling</module>
|
||||||
<module>activemq-web</module>
|
<module>activemq-web</module>
|
||||||
<module>activemq-partition</module>
|
|
||||||
<module>activemq-osgi</module>
|
<module>activemq-osgi</module>
|
||||||
<module>activemq-blueprint</module>
|
<module>activemq-blueprint</module>
|
||||||
<module>activemq-web-demo</module>
|
<module>activemq-web-demo</module>
|
||||||
|
@ -315,11 +312,6 @@
|
||||||
<artifactId>activemq-all</artifactId>
|
<artifactId>activemq-all</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.activemq</groupId>
|
|
||||||
<artifactId>activemq-partition</artifactId>
|
|
||||||
<version>${project.version}</version>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.activemq.tooling</groupId>
|
<groupId>org.apache.activemq.tooling</groupId>
|
||||||
<artifactId>activemq-junit</artifactId>
|
<artifactId>activemq-junit</artifactId>
|
||||||
|
@ -585,65 +577,6 @@
|
||||||
<version>${pax-logging-version}</version>
|
<version>${pax-logging-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.hadoop.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
<version>${zookeeper-version}</version>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>io.netty</groupId>
|
|
||||||
<artifactId>netty</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>log4j</groupId>
|
|
||||||
<artifactId>log4j</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.apache.zookeeper</groupId>
|
|
||||||
<artifactId>zookeeper</artifactId>
|
|
||||||
<version>${zookeeper-version}</version>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>io.netty</groupId>
|
|
||||||
<artifactId>netty</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.slf4j</groupId>
|
|
||||||
<artifactId>slf4j-log4j12</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>log4j</groupId>
|
|
||||||
<artifactId>log4j</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
|
||||||
<version>${linkedin-zookeeper-version}</version>
|
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>org.json</groupId>
|
|
||||||
<artifactId>json</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>log4j</groupId>
|
|
||||||
<artifactId>log4j</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.linkedin</groupId>
|
|
||||||
<artifactId>org.linkedin.util-core</artifactId>
|
|
||||||
<version>${linkedin-zookeeper-version}</version>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<!-- zeroconf transport -->
|
<!-- zeroconf transport -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.jmdns</groupId>
|
<groupId>org.jmdns</groupId>
|
||||||
|
|
Loading…
Reference in New Issue