mirror of https://github.com/apache/activemq.git
Improve the replicated leveldb bits: Avoid dependencies on fabric-group stuff. Makes it easier to embed in different versions of a fabric osgi env.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1501877 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a4f47bf74
commit
86e2426d1b
|
@ -114,24 +114,26 @@
|
||||||
<version>${hawtdispatch-version}</version>
|
<version>${hawtdispatch-version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.linkedin</groupId>
|
||||||
<artifactId>fabric-groups</artifactId>
|
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${linkedin-zookeeper-version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.linkedin</groupId>
|
||||||
<artifactId>fabric-linkedin-zookeeper</artifactId>
|
<artifactId>org.linkedin.util-core</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${linkedin-zookeeper-version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
<artifactId>fabric-zookeeper</artifactId>
|
<artifactId>zookeeper</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${zookeeper-version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.osgi</groupId>
|
<groupId>org.osgi</groupId>
|
||||||
<artifactId>org.osgi.core</artifactId>
|
<artifactId>org.osgi.core</artifactId>
|
||||||
|
|
|
@ -0,0 +1,631 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.leveldb.replicated.groups;
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.ZooDefs;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.data.Id;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.linkedin.util.clock.Clock;
|
||||||
|
import org.linkedin.util.clock.SystemClock;
|
||||||
|
import org.linkedin.util.clock.Timespan;
|
||||||
|
import org.linkedin.util.concurrent.ConcurrentUtils;
|
||||||
|
import org.linkedin.util.io.PathUtils;
|
||||||
|
import org.linkedin.zookeeper.client.ChrootedZKClient;
|
||||||
|
import org.linkedin.zookeeper.client.IZooKeeper;
|
||||||
|
import org.linkedin.zookeeper.client.IZooKeeperFactory;
|
||||||
|
import org.linkedin.zookeeper.client.LifecycleListener;
|
||||||
|
import org.linkedin.zookeeper.client.ZooKeeperFactory;
|
||||||
|
import org.osgi.framework.InvalidSyntaxException;
|
||||||
|
import org.osgi.service.cm.ConfigurationException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
|
||||||
|
|
||||||
|
private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class.getName());
|
||||||
|
|
||||||
|
private Map<String, String> acls;
|
||||||
|
private String password;
|
||||||
|
|
||||||
|
|
||||||
|
public void start() throws Exception {
|
||||||
|
// Grab the lock to make sure that the registration of the ManagedService
|
||||||
|
// won't be updated immediately but that the initial update will happen first
|
||||||
|
synchronized (_lock) {
|
||||||
|
_stateChangeDispatcher.setDaemon(true);
|
||||||
|
_stateChangeDispatcher.start();
|
||||||
|
|
||||||
|
doStart();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setACLs(Map<String, String> acls) {
|
||||||
|
this.acls = acls;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPassword(String password) {
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doStart() throws InvalidSyntaxException, ConfigurationException, UnsupportedEncodingException {
|
||||||
|
connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (_stateChangeDispatcher != null) {
|
||||||
|
_stateChangeDispatcher.end();
|
||||||
|
try {
|
||||||
|
_stateChangeDispatcher.join(1000);
|
||||||
|
} catch(Exception e) {
|
||||||
|
LOG.debug("ignored exception", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
synchronized(_lock) {
|
||||||
|
if (_zk != null) {
|
||||||
|
try {
|
||||||
|
changeState(State.NONE);
|
||||||
|
_zk.close();
|
||||||
|
// We try to avoid a NPE when shutting down fabric:
|
||||||
|
// java.lang.NullPointerException
|
||||||
|
// at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1433)
|
||||||
|
// at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:73)
|
||||||
|
// at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1844)
|
||||||
|
// at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
|
||||||
|
// at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1089)
|
||||||
|
Thread th = getSendThread();
|
||||||
|
if (th != null) {
|
||||||
|
th.join(1000);
|
||||||
|
}
|
||||||
|
_zk = null;
|
||||||
|
} catch(Exception e) {
|
||||||
|
LOG.debug("ignored exception", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Thread getSendThread() {
|
||||||
|
try {
|
||||||
|
return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
|
||||||
|
} catch (Throwable e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object getField(Object obj, String... names) throws Exception {
|
||||||
|
for (String name : names) {
|
||||||
|
obj = getField(obj, name);
|
||||||
|
}
|
||||||
|
return obj;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object getField(Object obj, String name) throws Exception {
|
||||||
|
Class clazz = obj.getClass();
|
||||||
|
while (clazz != null) {
|
||||||
|
for (Field f : clazz.getDeclaredFields()) {
|
||||||
|
if (f.getName().equals(name)) {
|
||||||
|
f.setAccessible(true);
|
||||||
|
return f.get(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new NoSuchFieldError(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void changeState(State newState) {
|
||||||
|
synchronized (_lock) {
|
||||||
|
State oldState = _state;
|
||||||
|
if (oldState != newState) {
|
||||||
|
_stateChangeDispatcher.addEvent(oldState, newState);
|
||||||
|
_state = newState;
|
||||||
|
_lock.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testGenerateConnectionLoss() throws Exception {
|
||||||
|
waitForConnected();
|
||||||
|
Object clientCnxnSocket = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
|
||||||
|
callMethod(clientCnxnSocket, "testableCloseSocket");
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Object callMethod(Object obj, String name, Object... args) throws Exception {
|
||||||
|
Class clazz = obj.getClass();
|
||||||
|
while (clazz != null) {
|
||||||
|
for (Method m : clazz.getDeclaredMethods()) {
|
||||||
|
if (m.getName().equals(name)) {
|
||||||
|
m.setAccessible(true);
|
||||||
|
return m.invoke(obj, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new NoSuchMethodError(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void tryConnect() {
|
||||||
|
synchronized (_lock) {
|
||||||
|
try {
|
||||||
|
connect();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Error while restarting:", e);
|
||||||
|
if (_expiredSessionRecovery == null) {
|
||||||
|
_expiredSessionRecovery = new ExpiredSessionRecovery();
|
||||||
|
_expiredSessionRecovery.setDaemon(true);
|
||||||
|
_expiredSessionRecovery.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connect() throws UnsupportedEncodingException {
|
||||||
|
synchronized (_lock) {
|
||||||
|
changeState(State.CONNECTING);
|
||||||
|
_zk = _factory.createZooKeeper(this);
|
||||||
|
if (password != null) {
|
||||||
|
_zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void process(WatchedEvent event) {
|
||||||
|
if (event.getState() != null) {
|
||||||
|
LOG.debug("event: {}", event.getState());
|
||||||
|
synchronized (_lock) {
|
||||||
|
switch(event.getState())
|
||||||
|
{
|
||||||
|
case SyncConnected:
|
||||||
|
changeState(State.CONNECTED);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Disconnected:
|
||||||
|
if(_state != State.NONE) {
|
||||||
|
changeState(State.RECONNECTING);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
|
case Expired:
|
||||||
|
// when expired, the zookeeper object is invalid and we need to recreate a new one
|
||||||
|
_zk = null;
|
||||||
|
LOG.warn("Expiration detected: trying to restart...");
|
||||||
|
tryConnect();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.warn("unprocessed event state: {}", event.getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected IZooKeeper getZk() {
|
||||||
|
State state = _state;
|
||||||
|
if (state == State.NONE) {
|
||||||
|
throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
|
||||||
|
} else if (state != State.CONNECTED) {
|
||||||
|
try {
|
||||||
|
waitForConnected();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IZooKeeper zk = _zk;
|
||||||
|
if (zk == null) {
|
||||||
|
throw new IllegalStateException("No ZooKeeper connection available");
|
||||||
|
}
|
||||||
|
return zk;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
|
||||||
|
waitForState(State.CONNECTED, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForConnected() throws InterruptedException, TimeoutException {
|
||||||
|
waitForConnected(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
|
||||||
|
long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
|
||||||
|
if (_state != state) {
|
||||||
|
synchronized (_lock) {
|
||||||
|
while (_state != state) {
|
||||||
|
ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerListener(LifecycleListener listener) {
|
||||||
|
if (listener == null) {
|
||||||
|
throw new IllegalStateException("listener is null");
|
||||||
|
}
|
||||||
|
if (!_listeners.contains(listener)) {
|
||||||
|
_listeners.add(listener);
|
||||||
|
|
||||||
|
}
|
||||||
|
if (_state == State.CONNECTED) {
|
||||||
|
listener.onConnected();
|
||||||
|
//_stateChangeDispatcher.addEvent(null, State.CONNECTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeListener(LifecycleListener listener) {
|
||||||
|
if (listener == null) {
|
||||||
|
throw new IllegalStateException("listener is null");
|
||||||
|
}
|
||||||
|
_listeners.remove(listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
|
||||||
|
return new ChrootedZKClient(this, adjustPath(path));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isConnected() {
|
||||||
|
return _state == State.CONNECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isConfigured() {
|
||||||
|
return _state != State.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getConnectString() {
|
||||||
|
return _factory.getConnectString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static enum State {
|
||||||
|
NONE,
|
||||||
|
CONNECTING,
|
||||||
|
CONNECTED,
|
||||||
|
RECONNECTING
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static String CHARSET = "UTF-8";
|
||||||
|
|
||||||
|
private final Clock _clock = SystemClock.instance();
|
||||||
|
private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<LifecycleListener>();
|
||||||
|
|
||||||
|
protected final Object _lock = new Object();
|
||||||
|
protected volatile State _state = State.NONE;
|
||||||
|
|
||||||
|
private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
|
||||||
|
|
||||||
|
protected IZooKeeperFactory _factory;
|
||||||
|
protected IZooKeeper _zk;
|
||||||
|
protected Timespan _reconnectTimeout = Timespan.parse("20s");
|
||||||
|
protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
|
||||||
|
|
||||||
|
private ExpiredSessionRecovery _expiredSessionRecovery = null;
|
||||||
|
|
||||||
|
private class StateChangeDispatcher extends Thread {
|
||||||
|
private final AtomicBoolean _running = new AtomicBoolean(true);
|
||||||
|
private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<Boolean>();
|
||||||
|
|
||||||
|
private StateChangeDispatcher() {
|
||||||
|
super("ZooKeeper state change dispatcher thread");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Map<Object, Boolean> history = new IdentityHashMap<Object, Boolean>();
|
||||||
|
LOG.info("Starting StateChangeDispatcher");
|
||||||
|
while (_running.get()) {
|
||||||
|
Boolean isConnectedEvent;
|
||||||
|
try {
|
||||||
|
isConnectedEvent = _events.take();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!_running.get() || isConnectedEvent == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
|
||||||
|
// we save which event each listener has seen last
|
||||||
|
// we don't update the map in place because we need to get rid of unregistered listeners
|
||||||
|
history = newHistory;
|
||||||
|
}
|
||||||
|
LOG.info("StateChangeDispatcher terminated.");
|
||||||
|
}
|
||||||
|
|
||||||
|
public void end() {
|
||||||
|
_running.set(false);
|
||||||
|
_events.add(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
|
||||||
|
LOG.debug("addEvent: {} => {}", oldState, newState);
|
||||||
|
if (newState == ZKClient.State.CONNECTED) {
|
||||||
|
_events.add(true);
|
||||||
|
} else if (oldState == ZKClient.State.CONNECTED) {
|
||||||
|
_events.add(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
|
||||||
|
Map<Object, Boolean> newHistory = new IdentityHashMap<Object, Boolean>();
|
||||||
|
for (LifecycleListener listener : _listeners) {
|
||||||
|
Boolean previousEvent = history.get(listener);
|
||||||
|
// we propagate the event only if it was not already sent
|
||||||
|
if (previousEvent == null || previousEvent != connectedEvent) {
|
||||||
|
try {
|
||||||
|
if (connectedEvent) {
|
||||||
|
listener.onConnected();
|
||||||
|
} else {
|
||||||
|
listener.onDisconnected();
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Exception while executing listener (ignored)", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
newHistory.put(listener, connectedEvent);
|
||||||
|
}
|
||||||
|
return newHistory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ExpiredSessionRecovery extends Thread {
|
||||||
|
private ExpiredSessionRecovery() {
|
||||||
|
super("ZooKeeper expired session recovery thread");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Entering recovery mode");
|
||||||
|
synchronized(_lock) {
|
||||||
|
try {
|
||||||
|
int count = 0;
|
||||||
|
while (_state == ZKClient.State.NONE) {
|
||||||
|
try {
|
||||||
|
count++;
|
||||||
|
LOG.warn("Recovery mode: trying to reconnect to zookeeper [" + count + "]");
|
||||||
|
ZKClient.this.connect();
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("Recovery mode: reconnect attempt failed [" + count + "]... waiting for " + _reconnectTimeout, e);
|
||||||
|
try {
|
||||||
|
_lock.wait(_reconnectTimeout.getDurationInMilliseconds());
|
||||||
|
} catch(InterruptedException e1) {
|
||||||
|
throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
_expiredSessionRecovery = null;
|
||||||
|
LOG.info("Exiting recovery mode.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher)
|
||||||
|
{
|
||||||
|
this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public ZKClient(IZooKeeperFactory factory)
|
||||||
|
{
|
||||||
|
this(factory, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor
|
||||||
|
*/
|
||||||
|
public ZKClient(IZooKeeperFactory factory, String chroot)
|
||||||
|
{
|
||||||
|
super(chroot);
|
||||||
|
_factory = factory;
|
||||||
|
Map<String, String> acls = new HashMap<String, String>();
|
||||||
|
acls.put("/", "world:anyone:acdrw");
|
||||||
|
setACLs(acls);
|
||||||
|
}
|
||||||
|
|
||||||
|
static private int getPermFromString(String permString) {
|
||||||
|
int perm = 0;
|
||||||
|
for (int i = 0; i < permString.length(); i++) {
|
||||||
|
switch (permString.charAt(i)) {
|
||||||
|
case 'r':
|
||||||
|
perm |= ZooDefs.Perms.READ;
|
||||||
|
break;
|
||||||
|
case 'w':
|
||||||
|
perm |= ZooDefs.Perms.WRITE;
|
||||||
|
break;
|
||||||
|
case 'c':
|
||||||
|
perm |= ZooDefs.Perms.CREATE;
|
||||||
|
break;
|
||||||
|
case 'd':
|
||||||
|
perm |= ZooDefs.Perms.DELETE;
|
||||||
|
break;
|
||||||
|
case 'a':
|
||||||
|
perm |= ZooDefs.Perms.ADMIN;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
System.err
|
||||||
|
.println("Unknown perm type: " + permString.charAt(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return perm;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<ACL> parseACLs(String aclString) {
|
||||||
|
List<ACL> acl;
|
||||||
|
String acls[] = aclString.split(",");
|
||||||
|
acl = new ArrayList<ACL>();
|
||||||
|
for (String a : acls) {
|
||||||
|
int firstColon = a.indexOf(':');
|
||||||
|
int lastColon = a.lastIndexOf(':');
|
||||||
|
if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
|
||||||
|
System.err
|
||||||
|
.println(a + " does not have the form scheme:id:perm");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
ACL newAcl = new ACL();
|
||||||
|
newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
|
||||||
|
firstColon + 1, lastColon)));
|
||||||
|
newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
|
||||||
|
acl.add(newAcl);
|
||||||
|
}
|
||||||
|
return acl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
if (exists(path) != null) {
|
||||||
|
return setByteData(path, data);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
createBytesNodeWithParents(path, data, acl, createMode);
|
||||||
|
return null;
|
||||||
|
} catch(KeeperException.NodeExistsException e) {
|
||||||
|
// this should not happen very often (race condition)
|
||||||
|
return setByteData(path, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return create(path, (byte[]) null, createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return create(path, toByteData(data), createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return createWithParents(path, (byte[]) null, createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return createWithParents(path, toByteData(data), createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
createParents(path);
|
||||||
|
return create(path, data, createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
return createOrSetWithParents(path, toByteData(data), createMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
|
||||||
|
if (exists(path) != null) {
|
||||||
|
return setByteData(path, data);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
createWithParents(path, data, createMode);
|
||||||
|
return null;
|
||||||
|
} catch (KeeperException.NodeExistsException e) {
|
||||||
|
// this should not happen very often (race condition)
|
||||||
|
return setByteData(path, data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
|
||||||
|
if (exists(path) != null) {
|
||||||
|
doFixACLs(path, recursive);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
|
||||||
|
setACL(path, getNodeACLs(path), -1);
|
||||||
|
if (recursive) {
|
||||||
|
for (String child : getChildren(path)) {
|
||||||
|
doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<ACL> getNodeACLs(String path) {
|
||||||
|
String acl = doGetNodeACLs(adjustPath(path));
|
||||||
|
if (acl == null) {
|
||||||
|
throw new IllegalStateException("Could not find matching ACLs for " + path);
|
||||||
|
}
|
||||||
|
return parseACLs(acl);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String doGetNodeACLs(String path) {
|
||||||
|
String longestPath = "";
|
||||||
|
for (String acl : acls.keySet()) {
|
||||||
|
if (acl.length() > longestPath.length() && path.startsWith(acl)) {
|
||||||
|
longestPath = acl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return acls.get(longestPath);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createParents(String path) throws InterruptedException, KeeperException {
|
||||||
|
path = PathUtils.getParentPath(adjustPath(path));
|
||||||
|
path = PathUtils.removeTrailingSlash(path);
|
||||||
|
List<String> paths = new ArrayList<String>();
|
||||||
|
while(!path.equals("") && getZk().exists(path, false) == null) {
|
||||||
|
paths.add(path);
|
||||||
|
path = PathUtils.getParentPath(path);
|
||||||
|
path = PathUtils.removeTrailingSlash(path);
|
||||||
|
}
|
||||||
|
Collections.reverse(paths);
|
||||||
|
for(String p : paths) {
|
||||||
|
try {
|
||||||
|
getZk().create(p,
|
||||||
|
null,
|
||||||
|
getNodeACLs(p),
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
} catch(KeeperException.NodeExistsException e) {
|
||||||
|
// ok we continue...
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("parent already exists " + p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private byte[] toByteData(String data) {
|
||||||
|
if (data == null) {
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
return data.getBytes(CHARSET);
|
||||||
|
} catch(UnsupportedEncodingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,8 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.leveldb.replicated
|
package org.apache.activemq.leveldb.replicated
|
||||||
|
|
||||||
import org.fusesource.fabric.groups._
|
|
||||||
import org.fusesource.fabric.zookeeper.internal.ZKClient
|
|
||||||
import org.linkedin.util.clock.Timespan
|
import org.linkedin.util.clock.Timespan
|
||||||
import scala.reflect.BeanProperty
|
import scala.reflect.BeanProperty
|
||||||
import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
|
import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
|
||||||
|
@ -38,6 +36,7 @@ import org.apache.activemq.leveldb.LevelDBStore._
|
||||||
import javax.management.ObjectName
|
import javax.management.ObjectName
|
||||||
import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
|
import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
|
||||||
import java.util
|
import java.util
|
||||||
|
import org.apache.activemq.leveldb.replicated.groups._
|
||||||
|
|
||||||
object ElectingLevelDBStore extends Log {
|
object ElectingLevelDBStore extends Log {
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
package org.apache.activemq.leveldb.replicated
|
package org.apache.activemq.leveldb.replicated
|
||||||
|
|
||||||
import org.fusesource.fabric.groups._
|
import org.apache.activemq.leveldb.replicated.groups._
|
||||||
import org.codehaus.jackson.annotate.JsonProperty
|
import org.codehaus.jackson.annotate.JsonProperty
|
||||||
import org.apache.activemq.leveldb.util.{Log, JsonCodec}
|
import org.apache.activemq.leveldb.util.{Log, JsonCodec}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,255 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.leveldb.replicated.groups
|
||||||
|
|
||||||
|
|
||||||
|
import collection.mutable.{ListBuffer, HashMap}
|
||||||
|
import internal.ChangeListenerSupport
|
||||||
|
|
||||||
|
import java.io._
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper
|
||||||
|
import collection.JavaConversions._
|
||||||
|
import java.util.LinkedHashMap
|
||||||
|
import java.lang.{IllegalStateException, String}
|
||||||
|
import reflect.BeanProperty
|
||||||
|
import org.codehaus.jackson.annotate.JsonProperty
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
trait NodeState {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The id of the cluster node. There can be multiple node with this ID,
|
||||||
|
* but only the first node in the cluster will be the master for for it.
|
||||||
|
*/
|
||||||
|
def id: String
|
||||||
|
|
||||||
|
override
|
||||||
|
def toString = new String(ClusteredSupport.encode(this), "UTF-8")
|
||||||
|
}
|
||||||
|
|
||||||
|
class TextNodeState extends NodeState {
|
||||||
|
@BeanProperty
|
||||||
|
@JsonProperty
|
||||||
|
var id:String = _
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
object ClusteredSupport {
|
||||||
|
|
||||||
|
val DEFAULT_MAPPER = new ObjectMapper
|
||||||
|
|
||||||
|
def decode[T](t : Class[T], buffer: Array[Byte], mapper: ObjectMapper=DEFAULT_MAPPER): T = decode(t, new ByteArrayInputStream(buffer), mapper)
|
||||||
|
def decode[T](t : Class[T], in: InputStream, mapper: ObjectMapper): T = mapper.readValue(in, t)
|
||||||
|
|
||||||
|
def encode(value: AnyRef, mapper: ObjectMapper=DEFAULT_MAPPER): Array[Byte] = {
|
||||||
|
var baos: ByteArrayOutputStream = new ByteArrayOutputStream
|
||||||
|
encode(value, baos, mapper)
|
||||||
|
return baos.toByteArray
|
||||||
|
}
|
||||||
|
|
||||||
|
def encode(value: AnyRef, out: OutputStream, mapper: ObjectMapper): Unit = {
|
||||||
|
mapper.writeValue(out, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends ChangeListenerSupport {
|
||||||
|
import ClusteredSupport._
|
||||||
|
|
||||||
|
protected var _group:Group = _
|
||||||
|
def group = _group
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override to use a custom configured mapper.
|
||||||
|
*/
|
||||||
|
def mapper = ClusteredSupport.DEFAULT_MAPPER
|
||||||
|
|
||||||
|
private val listener = new ChangeListener() {
|
||||||
|
def changed() {
|
||||||
|
val members = _group.members
|
||||||
|
val t = new LinkedHashMap[String, T]()
|
||||||
|
members.foreach {
|
||||||
|
case (path, data) =>
|
||||||
|
try {
|
||||||
|
val value = decode(stateClass, data, mapper)
|
||||||
|
t.put(path, value)
|
||||||
|
} catch {
|
||||||
|
case e: Throwable =>
|
||||||
|
e.printStackTrace()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changed_decoded(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
def connected = {
|
||||||
|
changed
|
||||||
|
ClusteredSingletonWatcher.this.fireConnected
|
||||||
|
}
|
||||||
|
|
||||||
|
def disconnected = {
|
||||||
|
changed
|
||||||
|
ClusteredSingletonWatcher.this.fireDisconnected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def start(group:Group) = this.synchronized {
|
||||||
|
if(_group !=null )
|
||||||
|
throw new IllegalStateException("Already started.")
|
||||||
|
_group = group
|
||||||
|
_group.add(listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
def stop = this.synchronized {
|
||||||
|
if(_group==null)
|
||||||
|
throw new IllegalStateException("Not started.")
|
||||||
|
_group.remove(listener)
|
||||||
|
_members = HashMap[String, ListBuffer[(String, T)]]()
|
||||||
|
_group = null
|
||||||
|
}
|
||||||
|
|
||||||
|
def connected = this.synchronized {
|
||||||
|
if(_group==null) {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
_group.connected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected var _members = HashMap[String, ListBuffer[(String, T)]]()
|
||||||
|
def members = this.synchronized { _members }
|
||||||
|
|
||||||
|
def changed_decoded(m: LinkedHashMap[String, T]) = {
|
||||||
|
this.synchronized {
|
||||||
|
if( _group!=null ) {
|
||||||
|
_members = HashMap[String, ListBuffer[(String, T)]]()
|
||||||
|
m.foreach { case node =>
|
||||||
|
_members.getOrElseUpdate(node._2.id, ListBuffer[(String, T)]()).append(node)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fireChanged
|
||||||
|
}
|
||||||
|
|
||||||
|
def masters = this.synchronized {
|
||||||
|
_members.mapValues(_.head._2).toArray.map(_._2).toArray(new ClassManifest[T] {
|
||||||
|
def erasure = stateClass
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends ClusteredSingletonWatcher[T](stateClass) {
|
||||||
|
import ClusteredSupport._
|
||||||
|
|
||||||
|
private var _eid:String = _
|
||||||
|
/** the ephemeral id of the node is unique within in the group */
|
||||||
|
def eid = _eid
|
||||||
|
|
||||||
|
private var _state:T = _
|
||||||
|
|
||||||
|
override def stop = {
|
||||||
|
this.synchronized {
|
||||||
|
if(_eid != null) {
|
||||||
|
leave
|
||||||
|
}
|
||||||
|
super.stop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def join(state:T):Unit = this.synchronized {
|
||||||
|
if(state==null)
|
||||||
|
throw new IllegalArgumentException("State cannot be null")
|
||||||
|
if(state.id==null)
|
||||||
|
throw new IllegalArgumentException("The state id cannot be null")
|
||||||
|
if(_group==null)
|
||||||
|
throw new IllegalStateException("Not started.")
|
||||||
|
if(this._state!=null)
|
||||||
|
throw new IllegalStateException("Already joined")
|
||||||
|
this._state = state
|
||||||
|
_eid = group.join(encode(state, mapper))
|
||||||
|
}
|
||||||
|
|
||||||
|
def leave:Unit = this.synchronized {
|
||||||
|
if(this._state==null)
|
||||||
|
throw new IllegalStateException("Not joined")
|
||||||
|
if(_group==null)
|
||||||
|
throw new IllegalStateException("Not started.")
|
||||||
|
_group.leave(_eid)
|
||||||
|
_eid = null
|
||||||
|
this._state = null.asInstanceOf[T]
|
||||||
|
}
|
||||||
|
|
||||||
|
def update(state:T) = this.synchronized {
|
||||||
|
if(this._state==null)
|
||||||
|
throw new IllegalStateException("Not joined")
|
||||||
|
if(state==null)
|
||||||
|
throw new IllegalArgumentException("State cannot be null")
|
||||||
|
if(state.id==null)
|
||||||
|
throw new IllegalArgumentException("The state id cannot be null")
|
||||||
|
if(state.id!=this._state.id)
|
||||||
|
throw new IllegalArgumentException("The state id cannot change")
|
||||||
|
|
||||||
|
if(_group==null)
|
||||||
|
throw new IllegalStateException("Not started.")
|
||||||
|
this._state = state
|
||||||
|
_group.update(_eid, encode(state, mapper))
|
||||||
|
}
|
||||||
|
|
||||||
|
def isMaster:Boolean = this.synchronized {
|
||||||
|
if(this._state==null)
|
||||||
|
return false;
|
||||||
|
_members.get(this._state.id) match {
|
||||||
|
case Some(nodes) =>
|
||||||
|
nodes.headOption.map { x=>
|
||||||
|
x._1 == _eid
|
||||||
|
}.getOrElse(false)
|
||||||
|
case None => false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def master = this.synchronized {
|
||||||
|
if(this._state==null)
|
||||||
|
throw new IllegalStateException("Not joined")
|
||||||
|
_members.get(this._state.id).map(_.head._2)
|
||||||
|
}
|
||||||
|
|
||||||
|
def slaves = this.synchronized {
|
||||||
|
if(this._state==null)
|
||||||
|
throw new IllegalStateException("Not joined")
|
||||||
|
val rc = _members.get(this._state.id).map(_.toList).getOrElse(List())
|
||||||
|
rc.drop(1).map(_._2)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.leveldb.replicated.groups
|
||||||
|
|
||||||
|
import internal.ZooKeeperGroup
|
||||||
|
import org.apache.zookeeper.data.ACL
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids
|
||||||
|
import java.util.LinkedHashMap
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
object ZooKeeperGroupFactory {
|
||||||
|
|
||||||
|
def create(zk: ZKClient, path: String):Group = new ZooKeeperGroup(zk, path)
|
||||||
|
def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Used the join a cluster group and to monitor the memberships
|
||||||
|
* of that group.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* This object is not thread safe. You should are responsible for
|
||||||
|
* synchronizing access to it across threads.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
trait Group {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a member to the group with some associated data.
|
||||||
|
*/
|
||||||
|
def join(data:Array[Byte]):String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the data associated with joined member.
|
||||||
|
*/
|
||||||
|
def update(id:String, data:Array[Byte]):Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a previously added member.
|
||||||
|
*/
|
||||||
|
def leave(id:String):Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lists all the members currently in the group.
|
||||||
|
*/
|
||||||
|
def members:java.util.LinkedHashMap[String, Array[Byte]]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers a change listener which will be called
|
||||||
|
* when the cluster membership changes.
|
||||||
|
*/
|
||||||
|
def add(listener:ChangeListener)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a previously added change listener.
|
||||||
|
*/
|
||||||
|
def remove(listener:ChangeListener)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A group should be closed to release aquired resources used
|
||||||
|
* to monitor the group membership.
|
||||||
|
*
|
||||||
|
* Whe the Group is closed, any memberships registered via this
|
||||||
|
* Group will be removed from the group.
|
||||||
|
*/
|
||||||
|
def close:Unit
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are we connected with the cluster?
|
||||||
|
*/
|
||||||
|
def connected:Boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Callback interface used to get notifications of changes
|
||||||
|
* to a cluster group.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
trait ChangeListener {
|
||||||
|
def changed:Unit
|
||||||
|
def connected:Unit
|
||||||
|
def disconnected:Unit
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.leveldb.replicated.groups.internal
|
||||||
|
|
||||||
|
import org.apache.activemq.leveldb.replicated.groups.ChangeListener
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
|
||||||
|
object ChangeListenerSupport {
|
||||||
|
val LOG: Logger = LoggerFactory.getLogger(classOf[ChangeListenerSupport])
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
trait ChangeListenerSupport {
|
||||||
|
|
||||||
|
var listeners = List[ChangeListener]()
|
||||||
|
|
||||||
|
def connected:Boolean
|
||||||
|
|
||||||
|
def add(listener: ChangeListener): Unit = {
|
||||||
|
val connected = this.synchronized {
|
||||||
|
listeners ::= listener
|
||||||
|
this.connected
|
||||||
|
}
|
||||||
|
if (connected) {
|
||||||
|
listener.connected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def remove(listener: ChangeListener): Unit = this.synchronized {
|
||||||
|
listeners = listeners.filterNot(_ == listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
def fireConnected() = {
|
||||||
|
val listener = this.synchronized { this.listeners }
|
||||||
|
check_elapsed_time {
|
||||||
|
for (listener <- listeners) {
|
||||||
|
listener.connected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def fireDisconnected() = {
|
||||||
|
val listener = this.synchronized { this.listeners }
|
||||||
|
check_elapsed_time {
|
||||||
|
for (listener <- listeners) {
|
||||||
|
listener.disconnected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def fireChanged() = {
|
||||||
|
val listener = this.synchronized { this.listeners }
|
||||||
|
val start = System.nanoTime()
|
||||||
|
check_elapsed_time {
|
||||||
|
for (listener <- listeners) {
|
||||||
|
listener.changed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def check_elapsed_time[T](func: => T):T = {
|
||||||
|
val start = System.nanoTime()
|
||||||
|
try {
|
||||||
|
func
|
||||||
|
} finally {
|
||||||
|
val end = System.nanoTime()
|
||||||
|
val elapsed = TimeUnit.NANOSECONDS.toMillis(end-start)
|
||||||
|
if( elapsed > 100 ) {
|
||||||
|
ChangeListenerSupport.LOG.warn("listeners are taking too long to process the events")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,161 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.leveldb.replicated.groups.internal
|
||||||
|
|
||||||
|
import org.apache.zookeeper._
|
||||||
|
import java.lang.String
|
||||||
|
import org.linkedin.zookeeper.tracker._
|
||||||
|
import org.apache.activemq.leveldb.replicated.groups.{ZKClient, ChangeListener, Group}
|
||||||
|
import scala.collection.mutable.HashMap
|
||||||
|
import org.linkedin.zookeeper.client.LifecycleListener
|
||||||
|
import collection.JavaConversions._
|
||||||
|
import java.util.{LinkedHashMap, Collection}
|
||||||
|
import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException, Code}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
object ZooKeeperGroup {
|
||||||
|
def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = {
|
||||||
|
var rc = new LinkedHashMap[String, Array[Byte]]
|
||||||
|
zk.getAllChildren(path).sortWith((a,b)=> a < b).foreach { node =>
|
||||||
|
try {
|
||||||
|
if( node.matches("""0\d+""") ) {
|
||||||
|
rc.put(node, zk.getData(path+"/"+node))
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case e:Throwable =>
|
||||||
|
e.printStackTrace
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rc
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with LifecycleListener with ChangeListenerSupport {
|
||||||
|
|
||||||
|
val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
|
||||||
|
val joins = HashMap[String, Int]()
|
||||||
|
|
||||||
|
var members = new LinkedHashMap[String, Array[Byte]]
|
||||||
|
|
||||||
|
private def member_path_prefix = root + "/0"
|
||||||
|
|
||||||
|
zk.registerListener(this)
|
||||||
|
|
||||||
|
create(root)
|
||||||
|
tree.track(new NodeEventsListener[Array[Byte]]() {
|
||||||
|
def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
|
||||||
|
fire_cluster_change
|
||||||
|
}
|
||||||
|
})
|
||||||
|
fire_cluster_change
|
||||||
|
|
||||||
|
|
||||||
|
def close = this.synchronized {
|
||||||
|
joins.foreach { case (path, version) =>
|
||||||
|
try {
|
||||||
|
zk.delete(member_path_prefix + path, version)
|
||||||
|
} catch {
|
||||||
|
case x:NoNodeException => // Already deleted.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
joins.clear
|
||||||
|
tree.destroy
|
||||||
|
zk.removeListener(this)
|
||||||
|
}
|
||||||
|
|
||||||
|
def connected = zk.isConnected
|
||||||
|
def onConnected() = fireConnected()
|
||||||
|
def onDisconnected() = fireDisconnected()
|
||||||
|
|
||||||
|
def join(data:Array[Byte]=null): String = this.synchronized {
|
||||||
|
val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
|
||||||
|
joins.put(id, 0)
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
|
||||||
|
joins.get(path) match {
|
||||||
|
case Some(ver) =>
|
||||||
|
val stat = zk.setData(member_path_prefix+path, data, ver)
|
||||||
|
joins.put(path, stat.getVersion)
|
||||||
|
case None => throw new IllegalArgumentException("Has not joined locally: "+path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def leave(path:String): Unit = this.synchronized {
|
||||||
|
joins.remove(path).foreach {
|
||||||
|
case version =>
|
||||||
|
try {
|
||||||
|
zk.delete(member_path_prefix + path, version)
|
||||||
|
} catch {
|
||||||
|
case x: NoNodeException => // Already deleted.
|
||||||
|
case x: ConnectionLossException => // disconnected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def fire_cluster_change: Unit = {
|
||||||
|
this.synchronized {
|
||||||
|
val t = tree.getTree.toList.filterNot { x =>
|
||||||
|
// don't include the root node, or nodes that don't match our naming convention.
|
||||||
|
(x._1 == root) || !x._1.stripPrefix(root).matches("""/0\d+""")
|
||||||
|
}
|
||||||
|
|
||||||
|
this.members = new LinkedHashMap()
|
||||||
|
t.sortWith((a,b)=> a._1 < b._1 ).foreach { x=>
|
||||||
|
this.members.put(x._1.stripPrefix(member_path_prefix), x._2.getData)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fireChanged()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def create(path: String, count : java.lang.Integer = 0): Unit = {
|
||||||
|
try {
|
||||||
|
if (zk.exists(path, false) != null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// try create given path in persistent mode
|
||||||
|
zk.createOrSetWithParents(path, "", CreateMode.PERSISTENT)
|
||||||
|
} catch {
|
||||||
|
case ignore: KeeperException.NodeExistsException =>
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case ignore : KeeperException.SessionExpiredException => {
|
||||||
|
if (count > 20) {
|
||||||
|
// we tried enought number of times
|
||||||
|
throw new IllegalStateException("Cannot create path " + path, ignore)
|
||||||
|
}
|
||||||
|
// try to create path with increased counter value
|
||||||
|
create(path, count + 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -52,8 +52,6 @@
|
||||||
org.codehaus.jettison*;resolution:=optional,
|
org.codehaus.jettison*;resolution:=optional,
|
||||||
org.jasypt*;resolution:=optional,
|
org.jasypt*;resolution:=optional,
|
||||||
org.eclipse.jetty*;resolution:=optional,
|
org.eclipse.jetty*;resolution:=optional,
|
||||||
org.fusesource.fabric*;version="[7,8]";resolution:=optional,
|
|
||||||
org.fusesource.fabric.groups*;version="[7,8]";resolution:=optional,
|
|
||||||
org.apache.zookeeper;resolution:=optional,
|
org.apache.zookeeper;resolution:=optional,
|
||||||
org.linkedin*;resolution:=optional,
|
org.linkedin*;resolution:=optional,
|
||||||
org.springframework.jms*;version="[3,4]";resolution:=optional,
|
org.springframework.jms*;version="[3,4]";resolution:=optional,
|
||||||
|
|
|
@ -82,20 +82,21 @@
|
||||||
<version>${hawtdispatch-version}</version>
|
<version>${hawtdispatch-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.linkedin</groupId>
|
||||||
<artifactId>fabric-groups</artifactId>
|
<artifactId>org.linkedin.zookeeper-impl</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${linkedin-zookeeper-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.linkedin</groupId>
|
||||||
<artifactId>fabric-linkedin-zookeeper</artifactId>
|
<artifactId>org.linkedin.util-core</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${linkedin-zookeeper-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.fusesource.fabric</groupId>
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
<artifactId>fabric-zookeeper</artifactId>
|
<artifactId>zookeeper</artifactId>
|
||||||
<version>${fabric-version}</version>
|
<version>${zookeeper-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.osgi</groupId>
|
<groupId>org.osgi</groupId>
|
||||||
<artifactId>org.osgi.core</artifactId>
|
<artifactId>org.osgi.core</artifactId>
|
||||||
|
|
|
@ -223,7 +223,9 @@
|
||||||
<include>org.xerial.snappy:*</include>
|
<include>org.xerial.snappy:*</include>
|
||||||
<include>org.iq80.snappy:*</include>
|
<include>org.iq80.snappy:*</include>
|
||||||
<include>org.codehaus.jackson:*</include>
|
<include>org.codehaus.jackson:*</include>
|
||||||
<include>org.fusesource.fabric:*</include>
|
<include>org.linkedin:org.linkedin.zookeeper-impl</include>
|
||||||
|
<include>org.linkedin:org.linkedin.util-core</include>
|
||||||
|
<include>org.apache.zookeeper:zookeeper</include>
|
||||||
|
|
||||||
</includes>
|
</includes>
|
||||||
</dependencySet>
|
</dependencySet>
|
||||||
|
|
3
pom.xml
3
pom.xml
|
@ -94,8 +94,9 @@
|
||||||
<opensymphony-version>2.4.2</opensymphony-version>
|
<opensymphony-version>2.4.2</opensymphony-version>
|
||||||
<org-apache-derby-version>10.9.1.0</org-apache-derby-version>
|
<org-apache-derby-version>10.9.1.0</org-apache-derby-version>
|
||||||
<org.osgi.core-version>4.3.1</org.osgi.core-version>
|
<org.osgi.core-version>4.3.1</org.osgi.core-version>
|
||||||
<fabric-version>7.2.0.redhat-024</fabric-version>
|
|
||||||
<p2psockets-version>1.1.2</p2psockets-version>
|
<p2psockets-version>1.1.2</p2psockets-version>
|
||||||
|
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
|
||||||
|
<zookeeper-version>3.4.3</zookeeper-version>
|
||||||
<qpid-proton-version>0.3.0-fuse-4</qpid-proton-version>
|
<qpid-proton-version>0.3.0-fuse-4</qpid-proton-version>
|
||||||
<qpid-jms-version>0.22</qpid-jms-version>
|
<qpid-jms-version>0.22</qpid-jms-version>
|
||||||
<regexp-version>1.3</regexp-version>
|
<regexp-version>1.3</regexp-version>
|
||||||
|
|
Loading…
Reference in New Issue