diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml index 03d29cd7a0..7b99b34119 100644 --- a/activemq-osgi/pom.xml +++ b/activemq-osgi/pom.xml @@ -68,10 +68,6 @@ ${project.groupId} activemq-http - - ${project.groupId} - activemq-partition - @@ -187,7 +183,6 @@ org.codehaus.jettison*;resolution:=optional, org.jasypt*;resolution:=optional, org.eclipse.jetty*;resolution:=optional;version="[9.0,10)", - org.apache.zookeeper*;resolution:=optional, org.fusesource.hawtjni*;resolution:=optional, org.springframework.jms*;version="[4,6)";resolution:=optional, org.springframework.transaction*;version="[4,6)";resolution:=optional, diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml deleted file mode 100644 index 901c869d3a..0000000000 --- a/activemq-partition/pom.xml +++ /dev/null @@ -1,149 +0,0 @@ - - - - - 4.0.0 - - - org.apache.activemq - activemq-parent - 5.19.0-SNAPSHOT - - - activemq-partition - jar - - ActiveMQ :: Partition Management - Used to partition clients over a cluster of brokers - - - - org.apache.activemq - activemq-broker - provided - - - - org.slf4j - slf4j-api - compile - - - - org.linkedin - org.linkedin.zookeeper-impl - - - org.linkedin - org.linkedin.util-core - - - org.apache.zookeeper - zookeeper - - - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.core - jackson-databind - - - - - org.apache.logging.log4j - log4j-core - test - - - org.apache.logging.log4j - log4j-slf4j2-impl - test - - - - org.apache.activemq - activemq-broker - test-jar - test - - - - junit - junit - test - - - - - - - - - - activemq.tests-sanity - - - activemq.tests - smoke - - - - - - maven-surefire-plugin - - - **/PartitionBrokerTest.* - - - - - - - - activemq.tests-autoTransport - - - activemq.tests - autoTransport - - - - - - maven-surefire-plugin - - - ** - - - - - - - - - diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java deleted file mode 100644 index 9362e64b26..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBroker.java +++ /dev/null @@ -1,367 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.TransportConnection; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConnectionControl; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.Message; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.activemq.partition.dto.Target; -import org.apache.activemq.state.ConsumerState; -import org.apache.activemq.state.SessionState; -import org.apache.activemq.transport.Transport; -import org.apache.activemq.util.LRUCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A BrokerFilter which partitions client connections over a cluster of brokers. - * - * It can use a client identifier like client id, authenticated user name, source ip - * address or even destination being used by the connection to figure out which - * is the best broker in the cluster that the connection should be using and then - * redirects failover clients to that broker. - */ -public class PartitionBroker extends BrokerFilter { - - protected static final Logger LOG = LoggerFactory.getLogger(PartitionBroker.class); - protected final PartitionBrokerPlugin plugin; - protected boolean reloadConfigOnPoll = true; - - public PartitionBroker(Broker broker, PartitionBrokerPlugin plugin) { - super(broker); - this.plugin = plugin; - } - - @Override - public void start() throws Exception { - super.start(); - getExecutor().execute(new Runnable() { - @Override - public void run() { - Thread.currentThread().setName("Partition Monitor"); - onMonitorStart(); - try { - runPartitionMonitor(); - } catch (Exception e) { - onMonitorStop(); - } - } - }); - } - - protected void onMonitorStart() { - } - protected void onMonitorStop() { - } - - protected void runPartitionMonitor() { - while( !isStopped() ) { - try { - monitorWait(); - } catch (InterruptedException e) { - break; - } - - if(reloadConfigOnPoll) { - try { - reloadConfiguration(); - } catch (Exception e) { - continue; - } - } - - for( ConnectionMonitor monitor: monitors.values()) { - checkTarget(monitor); - } - } - } - - protected void monitorWait() throws InterruptedException { - synchronized (this) { - this.wait(1000); - } - } - - protected void monitorWakeup() { - synchronized (this) { - this.notifyAll(); - } - } - - protected void reloadConfiguration() throws Exception { - } - - protected void checkTarget(ConnectionMonitor monitor) { - - // can we find a preferred target for the connection? - Target targetDTO = pickBestBroker(monitor); - if( targetDTO == null || targetDTO.ids==null) { - LOG.debug("No partition target found for connection: "+monitor.context.getConnectionId()); - return; - } - - // Are we one the the targets? - if( targetDTO.ids.contains(getBrokerName()) ) { - LOG.debug("We are a partition target for connection: "+monitor.context.getConnectionId()); - return; - } - - // Then we need to move the connection over. - String connectionString = getConnectionString(targetDTO.ids); - if( connectionString==null ) { - LOG.debug("Could not convert to partition targets to connection string: " + targetDTO.ids); - return; - } - - LOG.info("Redirecting connection to: " + connectionString); - TransportConnection connection = (TransportConnection)monitor.context.getConnection(); - ConnectionControl cc = new ConnectionControl(); - cc.setConnectedBrokers(connectionString); - cc.setRebalanceConnection(true); - connection.dispatchAsync(cc); - } - - protected String getConnectionString(HashSet ids) { - StringBuilder rc = new StringBuilder(); - for (String id : ids) { - String url = plugin.getBrokerURL(this, id); - if( url!=null ) { - if( rc.length()!=0 ) { - rc.append(','); - } - rc.append(url); - } - } - if( rc.length()==0 ) - return null; - return rc.toString(); - } - - static private class Score { - int value; - } - - protected Target pickBestBroker(ConnectionMonitor monitor) { - - if( getConfig() ==null ) - return null; - - if( getConfig().bySourceIp !=null && !getConfig().bySourceIp.isEmpty() ) { - TransportConnection connection = (TransportConnection)monitor.context.getConnection(); - Transport transport = connection.getTransport(); - Socket socket = transport.narrow(Socket.class); - if( socket !=null ) { - SocketAddress address = socket.getRemoteSocketAddress(); - if( address instanceof InetSocketAddress) { - String ip = ((InetSocketAddress) address).getAddress().getHostAddress(); - Target targetDTO = getConfig().bySourceIp.get(ip); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - } - - if( getConfig().byUserName !=null && !getConfig().byUserName.isEmpty() ) { - String userName = monitor.context.getUserName(); - if( userName !=null ) { - Target targetDTO = getConfig().byUserName.get(userName); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - - if( getConfig().byClientId !=null && !getConfig().byClientId.isEmpty() ) { - String clientId = monitor.context.getClientId(); - if( clientId!=null ) { - Target targetDTO = getConfig().byClientId.get(clientId); - if( targetDTO!=null ) { - return targetDTO; - } - } - } - - if( - (getConfig().byQueue !=null && !getConfig().byQueue.isEmpty()) - || (getConfig().byTopic !=null && !getConfig().byTopic.isEmpty()) - ) { - - // Collect the destinations the connection is consuming from... - HashSet dests = new HashSet(); - for (SessionState session : monitor.context.getConnectionState().getSessionStates()) { - for (ConsumerState consumer : session.getConsumerStates()) { - ActiveMQDestination destination = consumer.getInfo().getDestination(); - if( destination.isComposite() ) { - dests.addAll(Arrays.asList(destination.getCompositeDestinations())); - } else { - dests.addAll(Collections.singletonList(destination)); - } - } - } - - // Group them by the partitioning target for the destinations and score them.. - HashMap targetScores = new HashMap(); - for (ActiveMQDestination dest : dests) { - Target target = getTarget(dest); - if( target!=null ) { - Score score = targetScores.get(target); - if( score == null ) { - score = new Score(); - targetScores.put(target, score); - } - score.value++; - } - } - - // The target with largest score wins.. - if (!targetScores.isEmpty()) { - Target bestTarget = null; - int bestScore = 0; - for (Map.Entry entry : targetScores.entrySet()) { - if (entry.getValue().value > bestScore) { - bestTarget = entry.getKey(); - bestScore = entry.getValue().value; - } - } - return bestTarget; - } - - // If we get here is because there were no consumers, or the destinations for those - // consumers did not have an assigned destination.. So partition based on producer - // usage. - Target best = monitor.findBestProducerTarget(this); - if( best!=null ) { - return best; - } - } - return null; - } - - protected Target getTarget(ActiveMQDestination dest) { - Partitioning config = getConfig(); - if( dest.isQueue() && config.byQueue !=null && !config.byQueue.isEmpty() ) { - return config.byQueue.get(dest.getPhysicalName()); - } else if( dest.isTopic() && config.byTopic !=null && !config.byTopic.isEmpty() ) { - return config.byTopic.get(dest.getPhysicalName()); - } - return null; - } - - protected final ConcurrentMap monitors = new ConcurrentHashMap(); - - @Override - public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { - if( info.isFaultTolerant() ) { - ConnectionMonitor monitor = new ConnectionMonitor(context); - monitors.put(info.getConnectionId(), monitor); - super.addConnection(context, info); - checkTarget(monitor); - } else { - super.addConnection(context, info); - } - } - - @Override - public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { - super.removeConnection(context, info, error); - if( info.isFaultTolerant() ) { - monitors.remove(info.getConnectionId()); - } - } - - @Override - public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { - ConnectionMonitor monitor = monitors.get(producerExchange.getConnectionContext().getConnectionId()); - if( monitor!=null ) { - monitor.onSend(producerExchange, messageSend); - } - } - - protected Partitioning getConfig() { - return plugin.getConfig(); - } - - - static class Traffic { - long messages; - long bytes; - } - - static class ConnectionMonitor { - - final ConnectionContext context; - LRUCache trafficPerDestination = new LRUCache(); - - public ConnectionMonitor(ConnectionContext context) { - this.context = context; - } - - synchronized public Target findBestProducerTarget(PartitionBroker broker) { - Target best = null; - long bestSize = 0 ; - for (Map.Entry entry : trafficPerDestination.entrySet()) { - Traffic t = entry.getValue(); - // Once we get enough messages... - if( t.messages < broker.plugin.getMinTransferCount()) { - continue; - } - if( t.bytes > bestSize) { - bestSize = t.bytes; - Target target = broker.getTarget(entry.getKey()); - if( target!=null ) { - best = target; - } - } - } - return best; - } - - synchronized public void onSend(ProducerBrokerExchange producerExchange, Message message) { - ActiveMQDestination dest = message.getDestination(); - Traffic traffic = trafficPerDestination.get(dest); - if( traffic == null ) { - traffic = new Traffic(); - trafficPerDestination.put(dest, traffic); - } - traffic.messages += 1; - traffic.bytes += message.getSize(); - } - - - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java deleted file mode 100644 index 418f564cab..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/PartitionBrokerPlugin.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.partition.dto.Partitioning; - -import java.io.IOException; - -/** - * A BrokerPlugin which partitions client connections over a cluster of brokers. - * - * @org.apache.xbean.XBean element="partitionBrokerPlugin" - */ -public class PartitionBrokerPlugin implements BrokerPlugin { - - protected int minTransferCount; - protected Partitioning config; - - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new PartitionBroker(broker, this); - } - - public int getMinTransferCount() { - return minTransferCount; - } - - public void setMinTransferCount(int minTransferCount) { - this.minTransferCount = minTransferCount; - } - - public Partitioning getConfig() { - return config; - } - - public void setConfig(Partitioning config) { - this.config = config; - } - - public void setConfigAsJson(String config) throws IOException { - this.config = Partitioning.MAPPER.readValue(config, Partitioning.class); - } - - public String getBrokerURL(PartitionBroker partitionBroker, String id) { - if( config!=null && config.brokers!=null ) { - return config.brokers.get(id); - } - return null; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java deleted file mode 100644 index 2baec62762..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZKClient.java +++ /dev/null @@ -1,596 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.zookeeper.*; -import org.apache.zookeeper.data.ACL; -import org.apache.zookeeper.data.Id; -import org.apache.zookeeper.data.Stat; -import org.linkedin.util.clock.Clock; -import org.linkedin.util.clock.SystemClock; -import org.linkedin.util.clock.Timespan; -import org.linkedin.util.concurrent.ConcurrentUtils; -import org.linkedin.util.io.PathUtils; -import org.linkedin.zookeeper.client.*; -import org.slf4j.Logger; - -import java.io.UnsupportedEncodingException; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; - -public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class); - - private Map acls; - private String password; - - public void start() throws Exception { - // Grab the lock to make sure that the registration of the ManagedService - // won't be updated immediately but that the initial update will happen first - synchronized (_lock) { - _stateChangeDispatcher.setDaemon(true); - _stateChangeDispatcher.start(); - doStart(); - } - } - - public void setACLs(Map acls) { - this.acls = acls; - } - - public void setPassword(String password) { - this.password = password; - } - - protected void doStart() throws UnsupportedEncodingException { - connect(); - } - - @Override - public void close() { - if (_stateChangeDispatcher != null) { - _stateChangeDispatcher.end(); - try { - _stateChangeDispatcher.join(1000); - } catch (Exception e) { - LOG.debug("ignored exception", e); - } - } - synchronized(_lock) { - if (_zk != null) { - try { - changeState(State.NONE); - _zk.close(); - Thread th = getSendThread(); - if (th != null) { - th.join(1000); - } - _zk = null; - } catch (Exception e) { - LOG.debug("ignored exception", e); - } - } - } - } - - protected Thread getSendThread() { - try { - return (Thread) getField(_zk, "_zk", "cnxn", "sendThread"); - } catch (Throwable e) { - return null; - } - } - - protected Object getField(Object obj, String... names) throws Exception { - for (String name : names) { - obj = getField(obj, name); - } - return obj; - } - - protected Object getField(Object obj, String name) throws Exception { - Class clazz = obj.getClass(); - while (clazz != null) { - for (Field f : clazz.getDeclaredFields()) { - if (f.getName().equals(name)) { - f.setAccessible(true); - return f.get(obj); - } - } - } - throw new NoSuchFieldError(name); - } - - protected void changeState(State newState) { - synchronized (_lock) { - State oldState = _state; - if (oldState != newState) { - _stateChangeDispatcher.addEvent(oldState, newState); - _state = newState; - _lock.notifyAll(); - } - } - } - - public void testGenerateConnectionLoss() throws Exception { - waitForConnected(); - Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket"); - callMethod(clientCnxnSocket, "testableCloseSocket"); - } - - protected Object callMethod(Object obj, String name, Object... args) throws Exception { - Class clazz = obj.getClass(); - while (clazz != null) { - for (Method m : clazz.getDeclaredMethods()) { - if (m.getName().equals(name)) { - m.setAccessible(true); - return m.invoke(obj, args); - } - } - } - throw new NoSuchMethodError(name); - } - - protected void tryConnect() { - synchronized (_lock) { - try { - connect(); - } catch (Throwable e) { - LOG.warn("Error while restarting:", e); - if (_expiredSessionRecovery == null) { - _expiredSessionRecovery = new ExpiredSessionRecovery(); - _expiredSessionRecovery.setDaemon(true); - _expiredSessionRecovery.start(); - } - } - } - } - - public void connect() throws UnsupportedEncodingException { - synchronized (_lock) { - changeState(State.CONNECTING); - _zk = _factory.createZooKeeper(this); - if (password != null) { - _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8")); - } - } - } - - public void process(WatchedEvent event) { - if (event.getState() != null) { - LOG.debug("event: {}", event.getState()); - synchronized (_lock) { - switch(event.getState()) { - case SyncConnected: - changeState(State.CONNECTED); - break; - case Disconnected: - if (_state != State.NONE) { - changeState(State.RECONNECTING); - } - break; - case Expired: - // when expired, the zookeeper object is invalid and we need to recreate a new one - _zk = null; - LOG.warn("Expiration detected: trying to restart..."); - tryConnect(); - break; - default: - LOG.warn("Unsupported event state: {}", event.getState()); - } - } - } - } - - @Override - protected IZooKeeper getZk() { - State state = _state; - if (state == State.NONE) { - throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one."); - } else if (state != State.CONNECTING) { - try { - waitForConnected(); - } catch (Exception e) { - throw new IllegalStateException("Error waiting for ZooKeeper connection", e); - } - } - IZooKeeper zk = _zk; - if (zk == null) { - throw new IllegalStateException("No ZooKeeper connection available"); - } - return zk; - } - - public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException { - waitForState(State.CONNECTED, timeout); - } - - public void waitForConnected() throws InterruptedException, TimeoutException { - waitForConnected(null); - } - - public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException { - long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock); - if (_state != state) { - synchronized (_lock) { - while (_state != state) { - ConcurrentUtils.awaitUntil(_clock, _lock, endTime); - } - } - } - } - - @Override - public void registerListener(LifecycleListener listener) { - if (listener == null) { - throw new IllegalStateException("listener is null"); - } - if (!_listeners.contains(listener)) { - _listeners.add(listener); - } - if (_state == State.CONNECTED) { - listener.onConnected(); - //_stateChangeDispatcher.addEvent(null, State.CONNECTED); - } - } - - @Override - public void removeListener(LifecycleListener listener) { - if (listener == null) { - throw new IllegalStateException("listener is null"); - } - _listeners.remove(listener); - } - - @Override - public org.linkedin.zookeeper.client.IZKClient chroot(String path) { - return new ChrootedZKClient(this, adjustPath(path)); - } - - @Override - public boolean isConnected() { - return _state == State.CONNECTED; - } - - public boolean isConfigured() { - return _state != State.NONE; - } - - @Override - public String getConnectString() { - return _factory.getConnectString(); - } - - public static enum State { - NONE, - CONNECTING, - CONNECTED, - RECONNECTING - } - - private final static String CHARSET = "UTF-8"; - - private final Clock _clock = SystemClock.instance(); - private final List _listeners = new CopyOnWriteArrayList<>(); - - protected final Object _lock = new Object(); - protected volatile State _state = State.NONE; - - private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher(); - - protected IZooKeeperFactory _factory; - protected IZooKeeper _zk; - protected Timespan _reconnectTimeout = Timespan.parse("20s"); - protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND); - - private ExpiredSessionRecovery _expiredSessionRecovery = null; - - private class StateChangeDispatcher extends Thread { - private final AtomicBoolean _running = new AtomicBoolean(true); - private final BlockingQueue _events = new LinkedBlockingQueue<>(); - - private StateChangeDispatcher() { - super("ZooKeeper state change dispatcher thread"); - } - - @Override - public void run() { - Map history = new IdentityHashMap<>(); - LOG.info("Starting StateChangeDispatcher"); - while (_running.get()) { - Boolean isConnectedEvent; - try { - isConnectedEvent = _events.take(); - } catch (InterruptedException e) { - continue; - } - if (!_running.get() || isConnectedEvent == null) { - continue; - } - Map newHistory = callListeners(history, isConnectedEvent); - // we save which event each listener has seen last - // we don't update the map in place because we need to get rid of unregistered listeners - history = newHistory; - } - LOG.info("StateChangeDispatcher terminated."); - } - - public void end() { - _running.set(false); - _events.add(false); - } - - public void addEvent(ZKClient.State oldState, ZKClient.State newState) { - LOG.debug("addEvent: {} => {}", oldState, newState); - if (newState == ZKClient.State.CONNECTED) { - _events.add(true); - } else if (oldState == ZKClient.State.CONNECTED) { - _events.add(false); - } - } - } - - protected Map callListeners(Map history, Boolean connectedEvent) { - Map newHistory = new IdentityHashMap<>(); - for (LifecycleListener listener : _listeners) { - Boolean previousEvent = history.get(listener); - // we propagate the event only if it was not already sent - if (previousEvent == null || previousEvent != connectedEvent) { - try { - if (connectedEvent) { - listener.onConnected(); - } else { - listener.onDisconnected(); - } - } catch (Throwable e) { - LOG.warn("Exception while executing listener (ignored)", e); - } - } - newHistory.put(listener, connectedEvent); - } - return newHistory; - } - - private class ExpiredSessionRecovery extends Thread { - - private ExpiredSessionRecovery() { - super("ZooKeeper expired session recovery thread"); - } - - @Override - public void run() { - LOG.info("Entering recovery mode"); - synchronized (_lock) { - try { - int count = 0; - while (_state == ZKClient.State.NONE) { - try { - count++; - LOG.warn("Recovery mode: trying to reconnect to zookeeper [{}]", count); - ZKClient.this.connect(); - } catch (Throwable e) { - LOG.warn("Recovery mode: reconnect attempt failed [{}]... waiting for {}", count, _reconnectTimeout, e); - try { - _lock.wait(_reconnectTimeout.getDurationInMilliseconds()); - } catch (InterruptedException e1) { - throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1); - } - } - } - } finally { - _expiredSessionRecovery = null; - LOG.info("Exiting recovery mode."); - } - } - } - - } - - public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher) { - this(new ZooKeeperFactory(connectString, sessionTimeout, watcher)); - } - - public ZKClient(IZooKeeperFactory factory) { - this(factory, null); - } - - public ZKClient(IZooKeeperFactory factory, String chroot) { - super(chroot); - _factory = factory; - Map acls = new HashMap<>(); - acls.put("/", "world:anyone:acdrw"); - setACLs(acls); - } - - static private int getPermFromString(String permString) { - int perm = 0; - for (int i = 0; i < permString.length(); i++) { - switch (permString.charAt(i)) { - case 'r': - perm |= ZooDefs.Perms.READ; - break; - case 'w': - perm |= ZooDefs.Perms.WRITE; - break; - case 'c': - perm |= ZooDefs.Perms.CREATE; - break; - case 'd': - perm |= ZooDefs.Perms.DELETE; - break; - case 'a': - perm |= ZooDefs.Perms.ADMIN; - break; - default: - System.err.println("Unknown perm type:" + permString.charAt(i)); - } - } - return perm; - } - - private static List parseACLs(String aclString) { - List acl; - String acls[] = aclString.split(","); - acl = new ArrayList<>(); - for (String a : acls) { - int firstColon = a.indexOf(':'); - int lastColon = a.lastIndexOf(':'); - if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) { - System.err.println(a + " does not have the form scheme:id:perm"); - continue; - } - ACL newAcl = new ACL(); - newAcl.setId(new Id(a.substring(0, firstColon), a.substring(firstColon + 1, lastColon))); - newAcl.setPerms(getPermFromString(a.substring(lastColon + 1))); - acl.add(newAcl); - } - return acl; - } - - public Stat createOrSetByteWithParents(String path, byte[] data, List acl, CreateMode createMode) throws InterruptedException, KeeperException { - if (exists(path) != null) { - return setByteData(path, data); - } - try { - createBytesNodeWithParents(path, data, acl, createMode); - return null; - } catch (KeeperException.NodeExistsException e) { - // this should not happen very often (race condition) - return setByteData(path, data); - } - } - - public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException { - return create(path, (byte[]) null, createMode); - } - - public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return create(path, toByteData(data), createMode); - } - - public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode); - } - - public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException { - return createWithParents(path, (byte[]) null, createMode); - } - - public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return createWithParents(path, toByteData(data), createMode); - } - - public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - createParents(path); - return create(path, data, createMode); - } - - public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException { - return createOrSetWithParents(path, toByteData(data), createMode); - } - - public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException { - if (exists(path) != null) { - return setByteData(path, data); - } - try { - createWithParents(path, data, createMode); - return null; - } catch (KeeperException.NodeExistsException e) { - // this should not happen very often (race condition) - return setByteData(path, data); - } - } - - public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException { - if (exists(path) != null) { - doFixACLs(path, recursive); - } - } - - private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException { - setACL(path, getNodeACLs(path), -1); - if (recursive) { - for (String child : getChildren(path)) { - doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive); - } - } - } - - private List getNodeACLs(String path) { - String acl = doGetNodeACLs(adjustPath(path)); - if (acl == null) { - throw new IllegalStateException("Could not find matching ACLs for " + path); - } - return parseACLs(acl); - } - - protected String doGetNodeACLs(String path) { - String longestPath = ""; - for (String acl : acls.keySet()) { - if (acl.length() > longestPath.length() && path.startsWith(acl)) { - longestPath = acl; - } - } - return acls.get(longestPath); - } - - private void createParents(String path) throws InterruptedException, KeeperException { - path = PathUtils.getParentPath(adjustPath(path)); - path = PathUtils.removeTrailingSlash(path); - List paths = new ArrayList<>(); - while (!path.equals("") && getZk().exists(path, false) == null) { - paths.add(path); - path = PathUtils.getParentPath(path); - path = PathUtils.removeTrailingSlash(path); - } - Collections.reverse(paths); - for (String p : paths) { - try { - getZk().create(p, - null, - getNodeACLs(p), - CreateMode.PERSISTENT); - } catch (KeeperException.NodeExistsException e) { - // ok we continue... - if (LOG.isDebugEnabled()) { - LOG.debug("parent already exists " + p); - } - } - } - } - - private byte[] toByteData(String data) { - if (data == null) { - return null; - } else { - try { - return data.getBytes(CHARSET); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); - } - } - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java deleted file mode 100644 index 6d2474b495..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBroker.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.data.Stat; -import org.linkedin.util.clock.Timespan; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - */ -public class ZooKeeperPartitionBroker extends PartitionBroker { - - protected static final Logger LOG = LoggerFactory.getLogger(ZooKeeperPartitionBroker.class); - - protected volatile ZKClient zk_client = null; - protected volatile Partitioning config; - protected final CountDownLatch configAcquired = new CountDownLatch(1); - - public ZooKeeperPartitionBroker(Broker broker, ZooKeeperPartitionBrokerPlugin plugin) { - super(broker, plugin); - } - - @Override - public void start() throws Exception { - super.start(); - // Lets block a bit until we get our config.. Otherwise just keep - // on going.. not a big deal if we get our config later. Perhaps - // ZK service is not having a good day. - configAcquired.await(5, TimeUnit.SECONDS); - } - - @Override - protected void onMonitorStop() { - zkDisconnect(); - } - - @Override - protected Partitioning getConfig() { - return config; - } - - protected ZooKeeperPartitionBrokerPlugin plugin() { - return (ZooKeeperPartitionBrokerPlugin)plugin; - } - - protected void zkConnect() throws Exception { - zk_client = new ZKClient(plugin().getZkAddress(), Timespan.parse(plugin().getZkSessionTmeout()), null); - if( plugin().getZkPassword()!=null ) { - zk_client.setPassword(plugin().getZkPassword()); - } - zk_client.start(); - zk_client.waitForConnected(Timespan.parse("30s")); - } - - protected void zkDisconnect() { - if( zk_client!=null ) { - zk_client.close(); - zk_client = null; - } - } - - protected void reloadConfiguration() throws Exception { - if( zk_client==null ) { - LOG.debug("Connecting to ZooKeeper"); - try { - zkConnect(); - LOG.debug("Connected to ZooKeeper"); - } catch (Exception e) { - LOG.debug("Connection to ZooKeeper failed: "+e); - zkDisconnect(); - throw e; - } - } - - byte[] data = null; - try { - Stat stat = new Stat(); - data = zk_client.getData(plugin().getZkPath(), new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { - try { - reloadConfiguration(); - } catch (Exception e) { - } - monitorWakeup(); - } - }, stat); - configAcquired.countDown(); - reloadConfigOnPoll = false; - } catch (Exception e) { - LOG.warn("Could load partitioning configuration: " + e, e); - reloadConfigOnPoll = true; - } - - try { - config = Partitioning.MAPPER.readValue(data, Partitioning.class); - } catch (Exception e) { - LOG.warn("Invalid partitioning configuration: " + e, e); - } - } - -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java b/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java deleted file mode 100644 index 34fa0fc1d1..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerPlugin.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerPlugin; - -/** - * A PartitionBrokerPlugin which gets it's configuration from ZooKeeper. - */ -public class ZooKeeperPartitionBrokerPlugin extends PartitionBrokerPlugin { - - String zkAddress = "127.0.0.1:2181"; - String zkPassword; - String zkPath = "/broker-assignments"; - String zkSessionTmeout = "10s"; - - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new ZooKeeperPartitionBroker(broker, this); - } - - public String getZkAddress() { - return zkAddress; - } - - public void setZkAddress(String zkAddress) { - this.zkAddress = zkAddress; - } - - public String getZkPassword() { - return zkPassword; - } - - public void setZkPassword(String zkPassword) { - this.zkPassword = zkPassword; - } - - public String getZkPath() { - return zkPath; - } - - public void setZkPath(String zkPath) { - this.zkPath = zkPath; - } - - public String getZkSessionTmeout() { - return zkSessionTmeout; - } - - public void setZkSessionTmeout(String zkSessionTmeout) { - this.zkSessionTmeout = zkSessionTmeout; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java deleted file mode 100644 index 43f79242fe..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Partitioning.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition.dto; - - - -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.DeserializationConfig; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.util.HashMap; - -/** - * The main Configuration class for the PartitionBroker plugin - */ -public class Partitioning { - - static final public ObjectMapper MAPPER = new ObjectMapper(); - static { - MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL); - MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - } - - static final public ObjectMapper TO_STRING_MAPPER = new ObjectMapper(); - static { - TO_STRING_MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); - TO_STRING_MAPPER.enable(SerializationFeature.INDENT_OUTPUT); - } - - /** - * If a client connects with a clientId which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_client_id") - @JsonDeserialize(contentAs = Target.class) - public HashMap byClientId; - - /** - * If a client connects with a user priciple which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_user_name") - @JsonDeserialize(contentAs = Target.class) - public HashMap byUserName; - - /** - * If a client connects with source ip which is listed in the - * map, then he will be immediately reconnected - * to the partition target immediately. - */ - @JsonProperty("by_source_ip") - @JsonDeserialize(contentAs = Target.class) - public HashMap bySourceIp; - - /** - * Used to map the preferred partitioning of queues across - * a set of brokers. Once a it is deemed that a connection mostly - * works with a set of targets configured in this map, the client - * will be reconnected to the appropriate target. - */ - @JsonProperty("by_queue") - @JsonDeserialize(contentAs = Target.class) - public HashMap byQueue; - - /** - * Used to map the preferred partitioning of topics across - * a set of brokers. Once a it is deemed that a connection mostly - * works with a set of targets configured in this map, the client - * will be reconnected to the appropriate target. - */ - @JsonProperty("by_topic") - @JsonDeserialize(contentAs = Target.class) - public HashMap byTopic; - - /** - * Maps broker names to broker URLs. - */ - @JsonProperty("brokers") - @JsonDeserialize(contentAs = String.class) - public HashMap brokers; - - - @Override - public String toString() { - try { - return TO_STRING_MAPPER.writeValueAsString(this); - } catch (IOException e) { - return super.toString(); - } - } - - public HashMap getBrokers() { - return brokers; - } - - public void setBrokers(HashMap brokers) { - this.brokers = brokers; - } - - public HashMap getByClientId() { - return byClientId; - } - - public void setByClientId(HashMap byClientId) { - this.byClientId = byClientId; - } - - public HashMap getByQueue() { - return byQueue; - } - - public void setByQueue(HashMap byQueue) { - this.byQueue = byQueue; - } - - public HashMap getBySourceIp() { - return bySourceIp; - } - - public void setBySourceIp(HashMap bySourceIp) { - this.bySourceIp = bySourceIp; - } - - public HashMap getByTopic() { - return byTopic; - } - - public void setByTopic(HashMap byTopic) { - this.byTopic = byTopic; - } - - public HashMap getByUserName() { - return byUserName; - } - - public void setByUserName(HashMap byUserName) { - this.byUserName = byUserName; - } -} diff --git a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java b/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java deleted file mode 100644 index 79b53efa3a..0000000000 --- a/activemq-partition/src/main/java/org/apache/activemq/partition/dto/Target.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition.dto; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; - -/** - * Represents a partition target. This identifies the brokers that - * a partition lives on. - */ -public class Target { - - @JsonProperty("ids") - public HashSet ids = new HashSet(); - - public Target() { - ids = new HashSet(); - } - - public Target(String ...ids) { - this.ids.addAll(java.util.Arrays.asList(ids)); - } - - @Override - public String toString() { - try { - return Partitioning.TO_STRING_MAPPER.writeValueAsString(this); - } catch (IOException e) { - return super.toString(); - } - } - - public HashSet getIds() { - return ids; - } - - public void setIds(Collection ids) { - this.ids = new HashSet(ids); - } - -} diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java deleted file mode 100644 index 1b49f0b008..0000000000 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.partition.dto.Partitioning; -import org.apache.activemq.partition.dto.Target; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import javax.jms.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.*; - -/** - * Unit tests for the PartitionBroker plugin. - */ -public class PartitionBrokerTest { - - protected HashMap brokers = new HashMap(); - protected ArrayList connections = new ArrayList(); - Partitioning partitioning; - - @Before - public void setUp() throws Exception { - partitioning = new Partitioning(); - partitioning.brokers = new HashMap(); - } - - /** - * Partitioning can only re-direct failover clients since those - * can re-connect and re-establish their state with another broker. - */ - @Test(timeout = 1000*60*60) - public void testNonFailoverClientHasNoPartitionEffect() throws Exception { - - partitioning.byClientId = new HashMap(); - partitioning.byClientId.put("client1", new Target("broker1")); - createBrokerCluster(2); - - Connection connection = createConnectionToUrl(getConnectURL("broker2")); - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - connection.setClientID("client1"); - connection.start(); - - Thread.sleep(1000); - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - - @Test(timeout = 1000*60*60) - public void testPartitionByClientId() throws Exception { - partitioning.byClientId = new HashMap(); - partitioning.byClientId.put("client1", new Target("broker1")); - partitioning.byClientId.put("client2", new Target("broker2")); - createBrokerCluster(2); - - Connection connection = createConnectionTo("broker2"); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - connection.setClientID("client1"); - connection.start(); - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - } - - @Test(timeout = 1000*60*60) - public void testPartitionByQueue() throws Exception { - partitioning.byQueue = new HashMap(); - partitioning.byQueue.put("foo", new Target("broker1")); - createBrokerCluster(2); - - Connection connection2 = createConnectionTo("broker2"); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(0, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session2.createConsumer(session2.createQueue("foo")); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - - Connection connection1 = createConnectionTo("broker2"); - Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session1.createProducer(session1.createQueue("foo")); - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(1, getTransportConnector("broker1").getConnections().size()); - assertEquals(1, getTransportConnector("broker2").getConnections().size()); - } - }); - - for (int i = 0; i < 100; i++) { - producer.send(session1.createTextMessage("#" + i)); - } - - within(5, TimeUnit.SECONDS, new Task() { - public void run() throws Exception { - assertEquals(2, getTransportConnector("broker1").getConnections().size()); - assertEquals(0, getTransportConnector("broker2").getConnections().size()); - } - }); - } - - - static interface Task { - public void run() throws Exception; - } - - private void within(int time, TimeUnit unit, Task task) throws InterruptedException { - long timeMS = unit.toMillis(time); - long deadline = System.currentTimeMillis() + timeMS; - while (true) { - try { - task.run(); - return; - } catch (Throwable e) { - long remaining = deadline - System.currentTimeMillis(); - if( remaining <=0 ) { - if( e instanceof RuntimeException ) { - throw (RuntimeException)e; - } - if( e instanceof Error ) { - throw (Error)e; - } - throw new RuntimeException(e); - } - Thread.sleep(Math.min(timeMS/10, remaining)); - } - } - } - - protected Connection createConnectionTo(String brokerId) throws IOException, URISyntaxException, JMSException { - return createConnectionToUrl("failover://(" + getConnectURL(brokerId) + ")?randomize=false"); - } - - private Connection createConnectionToUrl(String url) throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); - Connection connection = factory.createConnection(); - connections.add(connection); - return connection; - } - - protected String getConnectURL(String broker) throws IOException, URISyntaxException { - TransportConnector tcp = getTransportConnector(broker); - return tcp.getConnectUri().toString(); - } - - private TransportConnector getTransportConnector(String broker) { - BrokerService brokerService = brokers.get(broker); - if( brokerService==null ) { - throw new IllegalArgumentException("Invalid broker id"); - } - return brokerService.getTransportConnectorByName("tcp"); - } - - protected void createBrokerCluster(int brokerCount) throws Exception { - for (int i = 1; i <= brokerCount; i++) { - String brokerId = "broker" + i; - BrokerService broker = createBroker(brokerId); - broker.setPersistent(false); - broker.addConnector("tcp://localhost:0").setName("tcp"); - addPartitionBrokerPlugin(broker); - broker.start(); - broker.waitUntilStarted(); - partitioning.brokers.put(brokerId, getConnectURL(brokerId)); - } - } - - protected void addPartitionBrokerPlugin(BrokerService broker) { - PartitionBrokerPlugin plugin = new PartitionBrokerPlugin(); - plugin.setConfig(partitioning); - broker.setPlugins(new BrokerPlugin[]{plugin}); - } - - protected BrokerService createBroker(String name) { - BrokerService broker = new BrokerService(); - broker.setBrokerName(name); - brokers.put(name, broker); - return broker; - } - - @After - public void tearDown() throws Exception { - for (Connection connection : connections) { - try { - connection.close(); - } catch (Throwable e) { - } - } - connections.clear(); - for (BrokerService broker : brokers.values()) { - try { - broker.stop(); - broker.waitUntilStopped(); - } catch (Throwable e) { - } - } - brokers.clear(); - } - -} diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java deleted file mode 100644 index 0a6416b2d7..0000000000 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/ZooKeeperPartitionBrokerTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.partition; - -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.junit.After; -import org.junit.Before; -import org.linkedin.util.clock.Timespan; - -import java.io.File; -import java.net.InetSocketAddress; - -/** - */ -public class ZooKeeperPartitionBrokerTest extends PartitionBrokerTest { - - NIOServerCnxnFactory connector; - - @Before - public void setUp() throws Exception { - System.out.println("Starting ZooKeeper"); - ZooKeeperServer zk_server = new ZooKeeperServer(); - zk_server.setTickTime(500); - zk_server.setTxnLogFactory(new FileTxnSnapLog(new File("target/test-data/zk-log"), new File("target/test-data/zk-data"))); - connector = new NIOServerCnxnFactory(); - connector.configure(new InetSocketAddress(0), 100); - connector.startup(zk_server); - System.out.println("ZooKeeper started"); - super.setUp(); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - if( connector!=null ) { - connector.shutdown(); - connector = null; - } - } - - String zkPath = "/partition-config"; - - @Override - protected void createBrokerCluster(int brokerCount) throws Exception { - // Store the partitioning in ZK. - ZKClient zk_client = new ZKClient("localhost:" + connector.getLocalPort(), Timespan.parse("10s"), null); - try { - zk_client.start(); - zk_client.waitForConnected(Timespan.parse("30s")); - try { - zk_client.delete(zkPath); - } catch (Throwable e) { - } - zk_client.create(zkPath, partitioning.toString(), CreateMode.PERSISTENT); - } finally { - zk_client.close(); - } - super.createBrokerCluster(brokerCount); - } - - @Override - protected void addPartitionBrokerPlugin(BrokerService broker) { - // Have the borker plugin get the partition config via ZK. - ZooKeeperPartitionBrokerPlugin plugin = new ZooKeeperPartitionBrokerPlugin(){ - @Override - public String getBrokerURL(PartitionBroker partitionBroker, String id) { - try { - return getConnectURL(id); - } catch (Exception e) { - return null; - } - } - }; - plugin.setZkAddress("localhost:" + connector.getLocalPort()); - plugin.setZkPath(zkPath); - broker.setPlugins(new BrokerPlugin[]{plugin}); - } -} diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index 005f46a10d..9618371824 100644 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -77,21 +77,6 @@ ${hawtdispatch-version} provided - - org.linkedin - org.linkedin.zookeeper-impl - provided - - - org.linkedin - org.linkedin.util-core - provided - - - org.apache.zookeeper - zookeeper - provided - org.osgi osgi.core @@ -202,7 +187,6 @@ ${basedir}/../activemq-kahadb-store/src/main/java ${basedir}/../activemq-mqtt/src/main/java ${basedir}/../activemq-stomp/src/main/java - ${basedir}/../activemq-partition/src/main/java ${basedir}/../activemq-runtime-config/src/main/java false diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index bbe633cab0..6184efbb3e 100644 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -58,10 +58,6 @@ org.apache.activemq activemq-stomp - - org.apache.activemq - activemq-partition - org.apache.activemq activemq-runtime-config diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java deleted file mode 100644 index dcf4e69e5f..0000000000 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/partition/SpringPartitionBrokerTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.broker.partition; - -import junit.framework.TestCase; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.partition.PartitionBrokerPlugin; -import org.apache.activemq.partition.dto.Partitioning; - -/** - */ -public class SpringPartitionBrokerTest extends TestCase { - - public void testCreatePartitionBroker() throws Exception { - - BrokerService broker = BrokerFactory.createBroker("xbean:activemq-partition.xml"); - assertEquals(1, broker.getPlugins().length); - PartitionBrokerPlugin plugin = (PartitionBrokerPlugin)broker.getPlugins()[0]; - Partitioning config = plugin.getConfig(); - assertEquals(2, config.getBrokers().size()); - - Object o; - String json = "{\n" + - " \"by_client_id\":{\n" + - " \"client1\":{\"ids\":[\"broker1\"]},\n" + - " \"client2\":{\"ids\":[\"broker1\",\"broker2\"]}\n" + - " },\n" + - " \"brokers\":{\n" + - " \"broker1\":\"tcp://localhost:61616\",\n" + - " \"broker2\":\"tcp://localhost:61616\"\n" + - " }\n" + - "}"; - Partitioning expected = Partitioning.MAPPER.readValue(json, Partitioning.class); - assertEquals(expected.toString(), config.toString()); - - } - -} diff --git a/activemq-unit-tests/src/test/resources/activemq-partition.xml b/activemq-unit-tests/src/test/resources/activemq-partition.xml deleted file mode 100644 index 4bb96f22a9..0000000000 --- a/activemq-unit-tests/src/test/resources/activemq-partition.xml +++ /dev/null @@ -1,58 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/assembly/pom.xml b/assembly/pom.xml index a5d2b0cdb8..d1d30d44f9 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -54,10 +54,6 @@ activemq-unit-tests test-jar - - ${project.groupId} - activemq-partition - org.apache.activemq.tooling activemq-junit @@ -68,19 +64,6 @@ hawtdispatch-transport ${hawtdispatch-version} - - org.linkedin - org.linkedin.zookeeper-impl - - - org.linkedin - org.linkedin.util-core - ${linkedin-zookeeper-version} - - - org.apache.zookeeper - zookeeper - org.osgi diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml index f77fa1c7d2..ff9c509459 100644 --- a/assembly/src/main/descriptors/common-bin.xml +++ b/assembly/src/main/descriptors/common-bin.xml @@ -182,7 +182,6 @@ ${pom.groupId}:activemq-log4j-appender ${pom.groupId}:activemq-jms-pool ${pom.groupId}:activemq-pool - ${pom.groupId}:activemq-partition ${pom.groupId}:activemq-shiro commons-beanutils:commons-beanutils commons-collections:commons-collections diff --git a/pom.xml b/pom.xml index 325985d291..4d02cf94c2 100644 --- a/pom.xml +++ b/pom.xml @@ -90,8 +90,6 @@ 1.16 10.15.2.0 6.0.0 - 1.4.0 - 3.4.14 0.33.10 1.6.0 4.1.75.Final @@ -229,7 +227,6 @@ activemq-runtime-config activemq-tooling activemq-web - activemq-partition activemq-osgi activemq-blueprint activemq-web-demo @@ -315,11 +312,6 @@ activemq-all ${project.version} - - org.apache.activemq - activemq-partition - ${project.version} - org.apache.activemq.tooling activemq-junit @@ -585,65 +577,6 @@ ${pax-logging-version} - - org.apache.hadoop.zookeeper - zookeeper - ${zookeeper-version} - - - io.netty - netty - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - - - org.apache.zookeeper - zookeeper - ${zookeeper-version} - - - io.netty - netty - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - - - org.linkedin - org.linkedin.zookeeper-impl - ${linkedin-zookeeper-version} - - - org.json - json - - - log4j - log4j - - - - - org.linkedin - org.linkedin.util-core - ${linkedin-zookeeper-version} - - org.jmdns