Merge pull request #1009 from mattrpav/AMQ-9259

[AMQ-9259] Remove activemq-partition and zookeeper test dependency
This commit is contained in:
Christopher L. Shannon 2023-05-22 19:04:02 -04:00 committed by GitHub
commit 53f9390c41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 0 additions and 2159 deletions

View File

@ -68,10 +68,6 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-http</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-partition</artifactId>
</dependency>
<!-- Additional Dependencies. -->
<dependency>
@ -187,7 +183,6 @@
org.codehaus.jettison*;resolution:=optional,
org.jasypt*;resolution:=optional,
org.eclipse.jetty*;resolution:=optional;version="[9.0,10)",
org.apache.zookeeper*;resolution:=optional,
org.fusesource.hawtjni*;resolution:=optional,
org.springframework.jms*;version="[4,6)";resolution:=optional,
org.springframework.transaction*;version="[4,6)";resolution:=optional,

View File

@ -1,149 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.19.0-SNAPSHOT</version>
</parent>
<artifactId>activemq-partition</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ :: Partition Management</name>
<description>Used to partition clients over a cluster of brokers</description>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.zookeeper-impl</artifactId>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.util-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<!-- For Optional Snappy Compression -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Testing Dependencies -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
<profiles>
<profile>
<id>activemq.tests-sanity</id>
<activation>
<property>
<name>activemq.tests</name>
<value>smoke</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>**/PartitionBrokerTest.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
<name>activemq.tests</name>
<value>autoTransport</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -1,367 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.LRUCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A BrokerFilter which partitions client connections over a cluster of brokers.
*
* It can use a client identifier like client id, authenticated user name, source ip
* address or even destination being used by the connection to figure out which
* is the best broker in the cluster that the connection should be using and then
* redirects failover clients to that broker.
*/
public class PartitionBroker extends BrokerFilter {
protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class);
protected final PartitionBrokerPlugin plugin;
protected boolean reloadConfigOnPoll = true;
public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) {
super(broker);
this.plugin = plugin;
}
@Override
public void start() throws Exception {
super.start();
getExecutor().execute(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("Partition Monitor");
onMonitorStart();
try {
runPartitionMonitor();
} catch (Exception e) {
onMonitorStop();
}
}
});
}
protected void onMonitorStart() {
}
protected void onMonitorStop() {
}
protected void runPartitionMonitor() {
while( !isStopped() ) {
try {
monitorWait();
} catch (InterruptedException e) {
break;
}
if(reloadConfigOnPoll) {
try {
reloadConfiguration();
} catch (Exception e) {
continue;
}
}
for( ConnectionMonitor monitor: monitors.values()) {
checkTarget(monitor);
}
}
}
protected void monitorWait() throws InterruptedException {
synchronized (this) {
this.wait(1000);
}
}
protected void monitorWakeup() {
synchronized (this) {
this.notifyAll();
}
}
protected void reloadConfiguration() throws Exception {
}
protected void checkTarget(ConnectionMonitor monitor) {
// can we find a preferred target for the connection?
Target targetDTO = pickBestBroker(monitor);
if( targetDTO == null || targetDTO.ids==null) {
LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId());
return;
}
// Are we one the the targets?
if( targetDTO.ids.contains(getBrokerName()) ) {
LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId());
return;
}
// Then we need to move the connection over.
String connectionString = getConnectionString(targetDTO.ids);
if( connectionString==null ) {
LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids);
return;
}
LOG.info("Redirecting connection to: " + connectionString);
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
ConnectionControl cc = new ConnectionControl();
cc.setConnectedBrokers(connectionString);
cc.setRebalanceConnection(true);
connection.dispatchAsync(cc);
}
protected String getConnectionString(HashSet<String> ids) {
StringBuilder rc = new StringBuilder();
for (String id : ids) {
String url = plugin.getBrokerURL(this, id);
if( url!=null ) {
if( rc.length()!=0 ) {
rc.append(',');
}
rc.append(url);
}
}
if( rc.length()==0 )
return null;
return rc.toString();
}
static private class Score {
int value;
}
protected Target pickBestBroker(ConnectionMonitor monitor) {
if( getConfig() ==null )
return null;
if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) {
TransportConnection connection = (TransportConnection)monitor.context.getConnection();
Transport transport = connection.getTransport();
Socket socket = transport.narrow(Socket.class);
if( socket !=null ) {
SocketAddress address = socket.getRemoteSocketAddress();
if( address instanceof InetSocketAddress) {
String ip = ((InetSocketAddress) address).getAddress().getHostAddress();
Target targetDTO = getConfig().bySourceIp.get(ip);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
}
if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) {
String userName = monitor.context.getUserName();
if( userName !=null ) {
Target targetDTO = getConfig().byUserName.get(userName);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) {
String clientId = monitor.context.getClientId();
if( clientId!=null ) {
Target targetDTO = getConfig().byClientId.get(clientId);
if( targetDTO!=null ) {
return targetDTO;
}
}
}
if(
(getConfig().byQueue !=null && !getConfig().byQueue.isEmpty())
|| (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty())
) {
// Collect the destinations the connection is consuming from...
HashSet<ActiveMQDestination> dests = new HashSet<ActiveMQDestination>();
for (SessionState session : monitor.context.getConnectionState().getSessionStates()) {
for (ConsumerState consumer : session.getConsumerStates()) {
ActiveMQDestination destination = consumer.getInfo().getDestination();
if( destination.isComposite() ) {
dests.addAll(Arrays.asList(destination.getCompositeDestinations()));
} else {
dests.addAll(Collections.singletonList(destination));
}
}
}
// Group them by the partitioning target for the destinations and score them..
HashMap<Target, Score> targetScores = new HashMap<Target, Score>();
for (ActiveMQDestination dest : dests) {
Target target = getTarget(dest);
if( target!=null ) {
Score score = targetScores.get(target);
if( score == null ) {
score = new Score();
targetScores.put(target, score);
}
score.value++;
}
}
// The target with largest score wins..
if (!targetScores.isEmpty()) {
Target bestTarget = null;
int bestScore = 0;
for (Map.Entry<Target, Score> entry : targetScores.entrySet()) {
if (entry.getValue().value > bestScore) {
bestTarget = entry.getKey();
bestScore = entry.getValue().value;
}
}
return bestTarget;
}
// If we get here is because there were no consumers, or the destinations for those
// consumers did not have an assigned destination.. So partition based on producer
// usage.
Target best = monitor.findBestProducerTarget(this);
if( best!=null ) {
return best;
}
}
return null;
}
protected Target getTarget(ActiveMQDestination dest) {
Partitioning config = getConfig();
if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) {
return config.byQueue.get(dest.getPhysicalName());
} else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) {
return config.byTopic.get(dest.getPhysicalName());
}
return null;
}
protected final ConcurrentMap<ConnectionId, ConnectionMonitor> monitors = new ConcurrentHashMap<ConnectionId, ConnectionMonitor>();
@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if( info.isFaultTolerant() ) {
ConnectionMonitor monitor = new ConnectionMonitor(context);
monitors.put(info.getConnectionId(), monitor);
super.addConnection(context, info);
checkTarget(monitor);
} else {
super.addConnection(context, info);
}
}
@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
if( info.isFaultTolerant() ) {
monitors.remove(info.getConnectionId());
}
}
@Override
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId());
if( monitor!=null ) {
monitor.onSend(producerExchange, messageSend);
}
}
protected Partitioning getConfig() {
return plugin.getConfig();
}
static class Traffic {
long messages;
long bytes;
}
static class ConnectionMonitor {
final ConnectionContext context;
LRUCache<ActiveMQDestination, Traffic> trafficPerDestination = new LRUCache<ActiveMQDestination, Traffic>();
public ConnectionMonitor(ConnectionContext context) {
this.context = context;
}
synchronized public Target findBestProducerTarget(PartitionBroker broker) {
Target best = null;
long bestSize = 0 ;
for (Map.Entry<ActiveMQDestination, Traffic> entry : trafficPerDestination.entrySet()) {
Traffic t = entry.getValue();
// Once we get enough messages...
if( t.messages < broker.plugin.getMinTransferCount()) {
continue;
}
if( t.bytes > bestSize) {
bestSize = t.bytes;
Target target = broker.getTarget(entry.getKey());
if( target!=null ) {
best = target;
}
}
}
return best;
}
synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) {
ActiveMQDestination dest = message.getDestination();
Traffic traffic = trafficPerDestination.get(dest);
if( traffic == null ) {
traffic = new Traffic();
trafficPerDestination.put(dest, traffic);
}
traffic.messages += 1;
traffic.bytes += message.getSize();
}
}
}

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.partition.dto.Partitioning;
import java.io.IOException;
/**
* A BrokerPlugin which partitions client connections over a cluster of brokers.
*
* @org.apache.xbean.XBean element="partitionBrokerPlugin"
*/
public class PartitionBrokerPlugin implements BrokerPlugin {
protected int minTransferCount;
protected Partitioning config;
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new PartitionBroker(broker, this);
}
public int getMinTransferCount() {
return minTransferCount;
}
public void setMinTransferCount(int minTransferCount) {
this.minTransferCount = minTransferCount;
}
public Partitioning getConfig() {
return config;
}
public void setConfig(Partitioning config) {
this.config = config;
}
public void setConfigAsJson(String config) throws IOException {
this.config = Partitioning.MAPPER.readValue(config, Partitioning.class);
}
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
if( config!=null && config.brokers!=null ) {
return config.brokers.get(id);
}
return null;
}
}

View File

@ -1,596 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.linkedin.util.clock.Clock;
import org.linkedin.util.clock.SystemClock;
import org.linkedin.util.clock.Timespan;
import org.linkedin.util.concurrent.ConcurrentUtils;
import org.linkedin.util.io.PathUtils;
import org.linkedin.zookeeper.client.*;
import org.slf4j.Logger;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class);
private Map<String, String> acls;
private String password;
public void start() throws Exception {
// Grab the lock to make sure that the registration of the ManagedService
// won't be updated immediately but that the initial update will happen first
synchronized (_lock) {
_stateChangeDispatcher.setDaemon(true);
_stateChangeDispatcher.start();
doStart();
}
}
public void setACLs(Map<String, String> acls) {
this.acls = acls;
}
public void setPassword(String password) {
this.password = password;
}
protected void doStart() throws UnsupportedEncodingException {
connect();
}
@Override
public void close() {
if (_stateChangeDispatcher != null) {
_stateChangeDispatcher.end();
try {
_stateChangeDispatcher.join(1000);
} catch (Exception e) {
LOG.debug("ignored exception", e);
}
}
synchronized(_lock) {
if (_zk != null) {
try {
changeState(State.NONE);
_zk.close();
Thread th = getSendThread();
if (th != null) {
th.join(1000);
}
_zk = null;
} catch (Exception e) {
LOG.debug("ignored exception", e);
}
}
}
}
protected Thread getSendThread() {
try {
return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
} catch (Throwable e) {
return null;
}
}
protected Object getField(Object obj, String... names) throws Exception {
for (String name : names) {
obj = getField(obj, name);
}
return obj;
}
protected Object getField(Object obj, String name) throws Exception {
Class clazz = obj.getClass();
while (clazz != null) {
for (Field f : clazz.getDeclaredFields()) {
if (f.getName().equals(name)) {
f.setAccessible(true);
return f.get(obj);
}
}
}
throw new NoSuchFieldError(name);
}
protected void changeState(State newState) {
synchronized (_lock) {
State oldState = _state;
if (oldState != newState) {
_stateChangeDispatcher.addEvent(oldState, newState);
_state = newState;
_lock.notifyAll();
}
}
}
public void testGenerateConnectionLoss() throws Exception {
waitForConnected();
Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
callMethod(clientCnxnSocket, "testableCloseSocket");
}
protected Object callMethod(Object obj, String name, Object... args) throws Exception {
Class clazz = obj.getClass();
while (clazz != null) {
for (Method m : clazz.getDeclaredMethods()) {
if (m.getName().equals(name)) {
m.setAccessible(true);
return m.invoke(obj, args);
}
}
}
throw new NoSuchMethodError(name);
}
protected void tryConnect() {
synchronized (_lock) {
try {
connect();
} catch (Throwable e) {
LOG.warn("Error while restarting:", e);
if (_expiredSessionRecovery == null) {
_expiredSessionRecovery = new ExpiredSessionRecovery();
_expiredSessionRecovery.setDaemon(true);
_expiredSessionRecovery.start();
}
}
}
}
public void connect() throws UnsupportedEncodingException {
synchronized (_lock) {
changeState(State.CONNECTING);
_zk = _factory.createZooKeeper(this);
if (password != null) {
_zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
}
}
}
public void process(WatchedEvent event) {
if (event.getState() != null) {
LOG.debug("event: {}", event.getState());
synchronized (_lock) {
switch(event.getState()) {
case SyncConnected:
changeState(State.CONNECTED);
break;
case Disconnected:
if (_state != State.NONE) {
changeState(State.RECONNECTING);
}
break;
case Expired:
// when expired, the zookeeper object is invalid and we need to recreate a new one
_zk = null;
LOG.warn("Expiration detected: trying to restart...");
tryConnect();
break;
default:
LOG.warn("Unsupported event state: {}", event.getState());
}
}
}
}
@Override
protected IZooKeeper getZk() {
State state = _state;
if (state == State.NONE) {
throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
} else if (state != State.CONNECTING) {
try {
waitForConnected();
} catch (Exception e) {
throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
}
}
IZooKeeper zk = _zk;
if (zk == null) {
throw new IllegalStateException("No ZooKeeper connection available");
}
return zk;
}
public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
waitForState(State.CONNECTED, timeout);
}
public void waitForConnected() throws InterruptedException, TimeoutException {
waitForConnected(null);
}
public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
if (_state != state) {
synchronized (_lock) {
while (_state != state) {
ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
}
}
}
}
@Override
public void registerListener(LifecycleListener listener) {
if (listener == null) {
throw new IllegalStateException("listener is null");
}
if (!_listeners.contains(listener)) {
_listeners.add(listener);
}
if (_state == State.CONNECTED) {
listener.onConnected();
//_stateChangeDispatcher.addEvent(null, State.CONNECTED);
}
}
@Override
public void removeListener(LifecycleListener listener) {
if (listener == null) {
throw new IllegalStateException("listener is null");
}
_listeners.remove(listener);
}
@Override
public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
return new ChrootedZKClient(this, adjustPath(path));
}
@Override
public boolean isConnected() {
return _state == State.CONNECTED;
}
public boolean isConfigured() {
return _state != State.NONE;
}
@Override
public String getConnectString() {
return _factory.getConnectString();
}
public static enum State {
NONE,
CONNECTING,
CONNECTED,
RECONNECTING
}
private final static String CHARSET = "UTF-8";
private final Clock _clock = SystemClock.instance();
private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<>();
protected final Object _lock = new Object();
protected volatile State _state = State.NONE;
private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
protected IZooKeeperFactory _factory;
protected IZooKeeper _zk;
protected Timespan _reconnectTimeout = Timespan.parse("20s");
protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
private ExpiredSessionRecovery _expiredSessionRecovery = null;
private class StateChangeDispatcher extends Thread {
private final AtomicBoolean _running = new AtomicBoolean(true);
private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<>();
private StateChangeDispatcher() {
super("ZooKeeper state change dispatcher thread");
}
@Override
public void run() {
Map<Object, Boolean> history = new IdentityHashMap<>();
LOG.info("Starting StateChangeDispatcher");
while (_running.get()) {
Boolean isConnectedEvent;
try {
isConnectedEvent = _events.take();
} catch (InterruptedException e) {
continue;
}
if (!_running.get() || isConnectedEvent == null) {
continue;
}
Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
// we save which event each listener has seen last
// we don't update the map in place because we need to get rid of unregistered listeners
history = newHistory;
}
LOG.info("StateChangeDispatcher terminated.");
}
public void end() {
_running.set(false);
_events.add(false);
}
public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
LOG.debug("addEvent: {} => {}", oldState, newState);
if (newState == ZKClient.State.CONNECTED) {
_events.add(true);
} else if (oldState == ZKClient.State.CONNECTED) {
_events.add(false);
}
}
}
protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
Map<Object, Boolean> newHistory = new IdentityHashMap<>();
for (LifecycleListener listener : _listeners) {
Boolean previousEvent = history.get(listener);
// we propagate the event only if it was not already sent
if (previousEvent == null || previousEvent != connectedEvent) {
try {
if (connectedEvent) {
listener.onConnected();
} else {
listener.onDisconnected();
}
} catch (Throwable e) {
LOG.warn("Exception while executing listener (ignored)", e);
}
}
newHistory.put(listener, connectedEvent);
}
return newHistory;
}
private class ExpiredSessionRecovery extends Thread {
private ExpiredSessionRecovery() {
super("ZooKeeper expired session recovery thread");
}
@Override
public void run() {
LOG.info("Entering recovery mode");
synchronized (_lock) {
try {
int count = 0;
while (_state == ZKClient.State.NONE) {
try {
count++;
LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count);
ZKClient.this.connect();
} catch (Throwable e) {
LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e);
try {
_lock.wait(_reconnectTimeout.getDurationInMilliseconds());
} catch (InterruptedException e1) {
throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
}
}
}
} finally {
_expiredSessionRecovery = null;
LOG.info("Exiting recovery mode.");
}
}
}
}
public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) {
this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
}
public ZKClient(IZooKeeperFactory factory) {
this(factory, null);
}
public ZKClient(IZooKeeperFactory factory, String chroot) {
super(chroot);
_factory = factory;
Map<String, String> acls = new HashMap<>();
acls.put("/", "world:anyone:acdrw");
setACLs(acls);
}
static private int getPermFromString(String permString) {
int perm = 0;
for (int i = 0; i < permString.length(); i++) {
switch (permString.charAt(i)) {
case 'r':
perm |= ZooDefs.Perms.READ;
break;
case 'w':
perm |= ZooDefs.Perms.WRITE;
break;
case 'c':
perm |= ZooDefs.Perms.CREATE;
break;
case 'd':
perm |= ZooDefs.Perms.DELETE;
break;
case 'a':
perm |= ZooDefs.Perms.ADMIN;
break;
default:
System.err.println("Unknown perm type:" + permString.charAt(i));
}
}
return perm;
}
private static List<ACL> parseACLs(String aclString) {
List<ACL> acl;
String acls[] = aclString.split(",");
acl = new ArrayList<>();
for (String a : acls) {
int firstColon = a.indexOf(':');
int lastColon = a.lastIndexOf(':');
if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
System.err.println(a + " does not have the form scheme:id:perm");
continue;
}
ACL newAcl = new ACL();
newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon)));
newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
acl.add(newAcl);
}
return acl;
}
public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
if (exists(path) != null) {
return setByteData(path, data);
}
try {
createBytesNodeWithParents(path, data, acl, createMode);
return null;
} catch (KeeperException.NodeExistsException e) {
// this should not happen very often (race condition)
return setByteData(path, data);
}
}
public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
return create(path, (byte[]) null, createMode);
}
public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return create(path, toByteData(data), createMode);
}
public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
}
public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
return createWithParents(path, (byte[]) null, createMode);
}
public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return createWithParents(path, toByteData(data), createMode);
}
public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
createParents(path);
return create(path, data, createMode);
}
public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
return createOrSetWithParents(path, toByteData(data), createMode);
}
public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
if (exists(path) != null) {
return setByteData(path, data);
}
try {
createWithParents(path, data, createMode);
return null;
} catch (KeeperException.NodeExistsException e) {
// this should not happen very often (race condition)
return setByteData(path, data);
}
}
public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
if (exists(path) != null) {
doFixACLs(path, recursive);
}
}
private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
setACL(path, getNodeACLs(path), -1);
if (recursive) {
for (String child : getChildren(path)) {
doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
}
}
}
private List<ACL> getNodeACLs(String path) {
String acl = doGetNodeACLs(adjustPath(path));
if (acl == null) {
throw new IllegalStateException("Could not find matching ACLs for " + path);
}
return parseACLs(acl);
}
protected String doGetNodeACLs(String path) {
String longestPath = "";
for (String acl : acls.keySet()) {
if (acl.length() > longestPath.length() && path.startsWith(acl)) {
longestPath = acl;
}
}
return acls.get(longestPath);
}
private void createParents(String path) throws InterruptedException, KeeperException {
path = PathUtils.getParentPath(adjustPath(path));
path = PathUtils.removeTrailingSlash(path);
List<String> paths = new ArrayList<>();
while (!path.equals("") && getZk().exists(path, false) == null) {
paths.add(path);
path = PathUtils.getParentPath(path);
path = PathUtils.removeTrailingSlash(path);
}
Collections.reverse(paths);
for (String p : paths) {
try {
getZk().create(p,
null,
getNodeACLs(p),
CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
// ok we continue...
if (LOG.isDebugEnabled()) {
LOG.debug("parent already exists " + p);
}
}
}
}
private byte[] toByteData(String data) {
if (data == null) {
return null;
} else {
try {
return data.getBytes(CHARSET);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -1,124 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.linkedin.util.clock.Timespan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
*/
public class ZooKeeperPartitionBroker extends PartitionBroker {
protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class);
protected volatile ZKClient zk_client = null;
protected volatile Partitioning config;
protected final CountDownLatch configAcquired = new CountDownLatch(1);
public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) {
super(broker, plugin);
}
@Override
public void start() throws Exception {
super.start();
// Lets block a bit until we get our config.. Otherwise just keep
// on going.. not a big deal if we get our config later. Perhaps
// ZK service is not having a good day.
configAcquired.await(5, TimeUnit.SECONDS);
}
@Override
protected void onMonitorStop() {
zkDisconnect();
}
@Override
protected Partitioning getConfig() {
return config;
}
protected ZooKeeperPartitionBrokerPlugin plugin() {
return (ZooKeeperPartitionBrokerPlugin)plugin;
}
protected void zkConnect() throws Exception {
zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null);
if( plugin().getZkPassword()!=null ) {
zk_client.setPassword(plugin().getZkPassword());
}
zk_client.start();
zk_client.waitForConnected(Timespan.parse("30s"));
}
protected void zkDisconnect() {
if( zk_client!=null ) {
zk_client.close();
zk_client = null;
}
}
protected void reloadConfiguration() throws Exception {
if( zk_client==null ) {
LOG.debug("Connecting to ZooKeeper");
try {
zkConnect();
LOG.debug("Connected to ZooKeeper");
} catch (Exception e) {
LOG.debug("Connection to ZooKeeper failed: "+e);
zkDisconnect();
throw e;
}
}
byte[] data = null;
try {
Stat stat = new Stat();
data = zk_client.getData(plugin().getZkPath(), new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
reloadConfiguration();
} catch (Exception e) {
}
monitorWakeup();
}
}, stat);
configAcquired.countDown();
reloadConfigOnPoll = false;
} catch (Exception e) {
LOG.warn("Could load partitioning configuration: " + e, e);
reloadConfigOnPoll = true;
}
try {
config = Partitioning.MAPPER.readValue(data, Partitioning.class);
} catch (Exception e) {
LOG.warn("Invalid partitioning configuration: " + e, e);
}
}
}

View File

@ -1,68 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
/**
* A PartitionBrokerPlugin which gets it's configuration from ZooKeeper.
*/
public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin {
String zkAddress = "127.0.0.1:2181";
String zkPassword;
String zkPath = "/broker-assignments";
String zkSessionTmeout = "10s";
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new ZooKeeperPartitionBroker(broker, this);
}
public String getZkAddress() {
return zkAddress;
}
public void setZkAddress(String zkAddress) {
this.zkAddress = zkAddress;
}
public String getZkPassword() {
return zkPassword;
}
public void setZkPassword(String zkPassword) {
this.zkPassword = zkPassword;
}
public String getZkPath() {
return zkPath;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public String getZkSessionTmeout() {
return zkSessionTmeout;
}
public void setZkSessionTmeout(String zkSessionTmeout) {
this.zkSessionTmeout = zkSessionTmeout;
}
}

View File

@ -1,161 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition.dto;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.util.HashMap;
/**
* The main Configuration class for the PartitionBroker plugin
*/
public class Partitioning {
static final public ObjectMapper MAPPER = new ObjectMapper();
static {
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
}
static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper();
static {
TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT);
}
/**
* If a client connects with a clientId which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_client_id")
@JsonDeserialize(contentAs = Target.class)
public HashMap<String, Target> byClientId;
/**
* If a client connects with a user priciple which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_user_name")
@JsonDeserialize(contentAs = Target.class)
public HashMap<String, Target> byUserName;
/**
* If a client connects with source ip which is listed in the
* map, then he will be immediately reconnected
* to the partition target immediately.
*/
@JsonProperty("by_source_ip")
@JsonDeserialize(contentAs = Target.class)
public HashMap<String, Target> bySourceIp;
/**
* Used to map the preferred partitioning of queues across
* a set of brokers. Once a it is deemed that a connection mostly
* works with a set of targets configured in this map, the client
* will be reconnected to the appropriate target.
*/
@JsonProperty("by_queue")
@JsonDeserialize(contentAs = Target.class)
public HashMap<String, Target> byQueue;
/**
* Used to map the preferred partitioning of topics across
* a set of brokers. Once a it is deemed that a connection mostly
* works with a set of targets configured in this map, the client
* will be reconnected to the appropriate target.
*/
@JsonProperty("by_topic")
@JsonDeserialize(contentAs = Target.class)
public HashMap<String, Target> byTopic;
/**
* Maps broker names to broker URLs.
*/
@JsonProperty("brokers")
@JsonDeserialize(contentAs = String.class)
public HashMap<String, String> brokers;
@Override
public String toString() {
try {
return TO_STRING_MAPPER.writeValueAsString(this);
} catch (IOException e) {
return super.toString();
}
}
public HashMap<String, String> getBrokers() {
return brokers;
}
public void setBrokers(HashMap<String, String> brokers) {
this.brokers = brokers;
}
public HashMap<String, Target> getByClientId() {
return byClientId;
}
public void setByClientId(HashMap<String, Target> byClientId) {
this.byClientId = byClientId;
}
public HashMap<String, Target> getByQueue() {
return byQueue;
}
public void setByQueue(HashMap<String, Target> byQueue) {
this.byQueue = byQueue;
}
public HashMap<String, Target> getBySourceIp() {
return bySourceIp;
}
public void setBySourceIp(HashMap<String, Target> bySourceIp) {
this.bySourceIp = bySourceIp;
}
public HashMap<String, Target> getByTopic() {
return byTopic;
}
public void setByTopic(HashMap<String, Target> byTopic) {
this.byTopic = byTopic;
}
public HashMap<String, Target> getByUserName() {
return byUserName;
}
public void setByUserName(HashMap<String, Target> byUserName) {
this.byUserName = byUserName;
}
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
/**
* Represents a partition target. This identifies the brokers that
* a partition lives on.
*/
public class Target {
@JsonProperty("ids")
public HashSet<String> ids = new HashSet<String>();
public Target() {
ids = new HashSet<String>();
}
public Target(String ...ids) {
this.ids.addAll(java.util.Arrays.asList(ids));
}
@Override
public String toString() {
try {
return Partitioning.TO_STRING_MAPPER.writeValueAsString(this);
} catch (IOException e) {
return super.toString();
}
}
public HashSet<String> getIds() {
return ids;
}
public void setIds(Collection<String> ids) {
this.ids = new HashSet<String>(ids);
}
}

View File

@ -1,251 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.partition.dto.Partitioning;
import org.apache.activemq.partition.dto.Target;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
/**
* Unit tests for the PartitionBroker plugin.
*/
public class PartitionBrokerTest {
protected HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
protected ArrayList<Connection> connections = new ArrayList<Connection>();
Partitioning partitioning;
@Before
public void setUp() throws Exception {
partitioning = new Partitioning();
partitioning.brokers = new HashMap<String, String>();
}
/**
* Partitioning can only re-direct failover clients since those
* can re-connect and re-establish their state with another broker.
*/
@Test(timeout = 1000*60*60)
public void testNonFailoverClientHasNoPartitionEffect() throws Exception {
partitioning.byClientId = new HashMap<String, Target>();
partitioning.byClientId.put("client1", new Target("broker1"));
createBrokerCluster(2);
Connection connection = createConnectionToUrl(getConnectURL("broker2"));
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
connection.setClientID("client1");
connection.start();
Thread.sleep(1000);
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
@Test(timeout = 1000*60*60)
public void testPartitionByClientId() throws Exception {
partitioning.byClientId = new HashMap<String, Target>();
partitioning.byClientId.put("client1", new Target("broker1"));
partitioning.byClientId.put("client2", new Target("broker2"));
createBrokerCluster(2);
Connection connection = createConnectionTo("broker2");
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
connection.setClientID("client1");
connection.start();
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
}
@Test(timeout = 1000*60*60)
public void testPartitionByQueue() throws Exception {
partitioning.byQueue = new HashMap<String, Target>();
partitioning.byQueue.put("foo", new Target("broker1"));
createBrokerCluster(2);
Connection connection2 = createConnectionTo("broker2");
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(0, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo"));
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
Connection connection1 = createConnectionTo("broker2");
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session1.createProducer(session1.createQueue("foo"));
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(1, getTransportConnector("broker1").getConnections().size());
assertEquals(1, getTransportConnector("broker2").getConnections().size());
}
});
for (int i = 0; i < 100; i++) {
producer.send(session1.createTextMessage("#" + i));
}
within(5, TimeUnit.SECONDS, new Task() {
public void run() throws Exception {
assertEquals(2, getTransportConnector("broker1").getConnections().size());
assertEquals(0, getTransportConnector("broker2").getConnections().size());
}
});
}
static interface Task {
public void run() throws Exception;
}
private void within(int time, TimeUnit unit, Task task) throws InterruptedException {
long timeMS = unit.toMillis(time);
long deadline = System.currentTimeMillis() + timeMS;
while (true) {
try {
task.run();
return;
} catch (Throwable e) {
long remaining = deadline - System.currentTimeMillis();
if( remaining <=0 ) {
if( e instanceof RuntimeException ) {
throw (RuntimeException)e;
}
if( e instanceof Error ) {
throw (Error)e;
}
throw new RuntimeException(e);
}
Thread.sleep(Math.min(timeMS/10, remaining));
}
}
}
protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException {
return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false");
}
private Connection createConnectionToUrl(String url) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
Connection connection = factory.createConnection();
connections.add(connection);
return connection;
}
protected String getConnectURL(String broker) throws IOException, URISyntaxException {
TransportConnector tcp = getTransportConnector(broker);
return tcp.getConnectUri().toString();
}
private TransportConnector getTransportConnector(String broker) {
BrokerService brokerService = brokers.get(broker);
if( brokerService==null ) {
throw new IllegalArgumentException("Invalid broker id");
}
return brokerService.getTransportConnectorByName("tcp");
}
protected void createBrokerCluster(int brokerCount) throws Exception {
for (int i = 1; i <= brokerCount; i++) {
String brokerId = "broker" + i;
BrokerService broker = createBroker(brokerId);
broker.setPersistent(false);
broker.addConnector("tcp://localhost:0").setName("tcp");
addPartitionBrokerPlugin(broker);
broker.start();
broker.waitUntilStarted();
partitioning.brokers.put(brokerId, getConnectURL(brokerId));
}
}
protected void addPartitionBrokerPlugin(BrokerService broker) {
PartitionBrokerPlugin plugin = new PartitionBrokerPlugin();
plugin.setConfig(partitioning);
broker.setPlugins(new BrokerPlugin[]{plugin});
}
protected BrokerService createBroker(String name) {
BrokerService broker = new BrokerService();
broker.setBrokerName(name);
brokers.put(name, broker);
return broker;
}
@After
public void tearDown() throws Exception {
for (Connection connection : connections) {
try {
connection.close();
} catch (Throwable e) {
}
}
connections.clear();
for (BrokerService broker : brokers.values()) {
try {
broker.stop();
broker.waitUntilStopped();
} catch (Throwable e) {
}
}
brokers.clear();
}
}

View File

@ -1,97 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.partition;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After;
import org.junit.Before;
import org.linkedin.util.clock.Timespan;
import java.io.File;
import java.net.InetSocketAddress;
/**
*/
public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest {
NIOServerCnxnFactory connector;
@Before
public void setUp() throws Exception {
System.out.println("Starting ZooKeeper");
ZooKeeperServer zk_server = new ZooKeeperServer();
zk_server.setTickTime(500);
zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data")));
connector = new NIOServerCnxnFactory();
connector.configure(new InetSocketAddress(0), 100);
connector.startup(zk_server);
System.out.println("ZooKeeper started");
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
if( connector!=null ) {
connector.shutdown();
connector = null;
}
}
String zkPath = "/partition-config";
@Override
protected void createBrokerCluster(int brokerCount) throws Exception {
// Store the partitioning in ZK.
ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null);
try {
zk_client.start();
zk_client.waitForConnected(Timespan.parse("30s"));
try {
zk_client.delete(zkPath);
} catch (Throwable e) {
}
zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT);
} finally {
zk_client.close();
}
super.createBrokerCluster(brokerCount);
}
@Override
protected void addPartitionBrokerPlugin(BrokerService broker) {
// Have the borker plugin get the partition config via ZK.
ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){
@Override
public String getBrokerURL(PartitionBroker partitionBroker, String id) {
try {
return getConnectURL(id);
} catch (Exception e) {
return null;
}
}
};
plugin.setZkAddress("localhost:" + connector.getLocalPort());
plugin.setZkPath(zkPath);
broker.setPlugins(new BrokerPlugin[]{plugin});
}
}

View File

@ -77,21 +77,6 @@
<version>${hawtdispatch-version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.zookeeper-impl</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.util-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>osgi.core</artifactId>
@ -202,7 +187,6 @@
<include>${basedir}/../activemq-kahadb-store/src/main/java</include>
<include>${basedir}/../activemq-mqtt/src/main/java</include>
<include>${basedir}/../activemq-stomp/src/main/java</include>
<include>${basedir}/../activemq-partition/src/main/java</include>
<include>${basedir}/../activemq-runtime-config/src/main/java</include>
</includes>
<strictXsdOrder>false</strictXsdOrder>

View File

@ -58,10 +58,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-stomp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-partition</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-runtime-config</artifactId>

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.partition;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.partition.PartitionBrokerPlugin;
import org.apache.activemq.partition.dto.Partitioning;
/**
*/
public class SpringPartitionBrokerTest extends TestCase {
public void testCreatePartitionBroker() throws Exception {
BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml");
assertEquals(1, broker.getPlugins().length);
PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0];
Partitioning config = plugin.getConfig();
assertEquals(2, config.getBrokers().size());
Object o;
String json = "{\n" +
" \"by_client_id\":{\n" +
" \"client1\":{\"ids\":[\"broker1\"]},\n" +
" \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" +
" },\n" +
" \"brokers\":{\n" +
" \"broker1\":\"tcp://localhost:61616\",\n" +
" \"broker2\":\"tcp://localhost:61616\"\n" +
" }\n" +
"}";
Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class);
assertEquals(expected.toString(), config.toString());
}
}

View File

@ -1,58 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- START SNIPPET: xbean -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<bean id="config" class="java.lang.String">
<constructor-arg><value>
<![CDATA[
{
"by_client_id":{
"client1":{"ids":["broker1"]},
"client2":{"ids":["broker1","broker2"]}
},
"brokers":{
"broker1":"tcp://localhost:61616",
"broker2":"tcp://localhost:61616"
}
}
]]>
</value></constructor-arg>
</bean>
<broker useJmx="false" xmlns="http://activemq.apache.org/schema/core" persistent="false">
<plugins>
<partitionBrokerPlugin minTransferCount="5" configAsJson="#config"/>
</plugins>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -54,10 +54,6 @@
<artifactId>activemq-unit-tests</artifactId>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-partition</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
@ -68,19 +64,6 @@
<artifactId>hawtdispatch-transport</artifactId>
<version>${hawtdispatch-version}</version>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.zookeeper-impl</artifactId>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.util-core</artifactId>
<version>${linkedin-zookeeper-version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>

View File

@ -182,7 +182,6 @@
<include>${pom.groupId}:activemq-log4j-appender</include>
<include>${pom.groupId}:activemq-jms-pool</include>
<include>${pom.groupId}:activemq-pool</include>
<include>${pom.groupId}:activemq-partition</include>
<include>${pom.groupId}:activemq-shiro</include>
<include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include>

67
pom.xml
View File

@ -90,8 +90,6 @@
<mqtt-client-version>1.16</mqtt-client-version>
<org-apache-derby-version>10.15.2.0</org-apache-derby-version>
<osgi-version>6.0.0</osgi-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.14</zookeeper-version>
<qpid-proton-version>0.33.10</qpid-proton-version>
<qpid-jms-version>1.6.0</qpid-jms-version>
<netty-version>4.1.75.Final</netty-version>
@ -229,7 +227,6 @@
<module>activemq-runtime-config</module>
<module>activemq-tooling</module>
<module>activemq-web</module>
<module>activemq-partition</module>
<module>activemq-osgi</module>
<module>activemq-blueprint</module>
<module>activemq-web-demo</module>
@ -315,11 +312,6 @@
<artifactId>activemq-all</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-partition</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
@ -585,65 +577,6 @@
<version>${pax-logging-version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper-version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper-version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.zookeeper-impl</artifactId>
<version>${linkedin-zookeeper-version}</version>
<exclusions>
<exclusion>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.linkedin</groupId>
<artifactId>org.linkedin.util-core</artifactId>
<version>${linkedin-zookeeper-version}</version>
</dependency>
<!-- zeroconf transport -->
<dependency>
<groupId>org.jmdns</groupId>