HBASE-7382 Port ZK.multi support from HBASE-6775 to 0.96 (Gregory, Himanshu and Ted)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1438317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dab6872457
commit
474163397a
|
@ -176,6 +176,9 @@ public final class HConstants {
|
||||||
/** Default value for ZooKeeper session timeout */
|
/** Default value for ZooKeeper session timeout */
|
||||||
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
|
public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
|
||||||
|
|
||||||
|
/** Configuration key for whether to use ZK.multi */
|
||||||
|
public static final String ZOOKEEPER_USEMULTI = "hbase.zookeeper.useMulti";
|
||||||
|
|
||||||
/** Parameter name for port region server listens on. */
|
/** Parameter name for port region server listens on. */
|
||||||
public static final String REGIONSERVER_PORT = "hbase.regionserver.port";
|
public static final String REGIONSERVER_PORT = "hbase.regionserver.port";
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.security.SecureRandom;
|
import java.security.SecureRandom;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
@ -35,11 +36,16 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||||
import org.apache.zookeeper.AsyncCallback;
|
import org.apache.zookeeper.AsyncCallback;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.Op;
|
||||||
|
import org.apache.zookeeper.OpResult;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.ZooDefs;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.apache.zookeeper.ZooKeeper.States;
|
import org.apache.zookeeper.ZooKeeper.States;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.apache.zookeeper.proto.CreateRequest;
|
||||||
|
import org.apache.zookeeper.proto.SetDataRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A zookeeper that can handle 'recoverable' errors.
|
* A zookeeper that can handle 'recoverable' errors.
|
||||||
|
@ -494,6 +500,60 @@ public class RecoverableZooKeeper {
|
||||||
retryCounter.useRetry();
|
retryCounter.useRetry();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
|
||||||
|
* instances to actually pass to multi (need to do this in order to appendMetaData).
|
||||||
|
*/
|
||||||
|
private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
|
||||||
|
throws UnsupportedOperationException {
|
||||||
|
if(ops == null) return null;
|
||||||
|
|
||||||
|
List<Op> preparedOps = new LinkedList<Op>();
|
||||||
|
for (Op op : ops) {
|
||||||
|
if (op.getType() == ZooDefs.OpCode.create) {
|
||||||
|
CreateRequest create = (CreateRequest)op.toRequestRecord();
|
||||||
|
preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
|
||||||
|
create.getAcl(), create.getFlags()));
|
||||||
|
} else if (op.getType() == ZooDefs.OpCode.delete) {
|
||||||
|
// no need to appendMetaData for delete
|
||||||
|
preparedOps.add(op);
|
||||||
|
} else if (op.getType() == ZooDefs.OpCode.setData) {
|
||||||
|
SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
|
||||||
|
preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
|
||||||
|
setData.getVersion()));
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return preparedOps;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Run multiple operations in a transactional manner. Retry before throwing exception
|
||||||
|
*/
|
||||||
|
public List<OpResult> multi(Iterable<Op> ops)
|
||||||
|
throws KeeperException, InterruptedException {
|
||||||
|
RetryCounter retryCounter = retryCounterFactory.create();
|
||||||
|
Iterable<Op> multiOps = prepareZKMulti(ops);
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return zk.multi(multiOps);
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
switch (e.code()) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case SESSIONEXPIRED:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
retryOrThrow(retryCounter, e, "multi");
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retryCounter.sleepUntilNextRetry();
|
||||||
|
retryCounter.useRetry();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private String findPreviousSequentialNode(String path)
|
private String findPreviousSequentialNode(String path)
|
||||||
throws KeeperException, InterruptedException {
|
throws KeeperException, InterruptedException {
|
||||||
|
|
|
@ -23,16 +23,15 @@ import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import javax.security.auth.login.LoginException;
|
|
||||||
import javax.security.auth.login.AppConfigurationEntry;
|
import javax.security.auth.login.AppConfigurationEntry;
|
||||||
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
|
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag;
|
||||||
|
|
||||||
|
@ -50,9 +49,13 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.CreateAndFailSilent;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.DeleteNodeFailSilent;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp.SetData;
|
||||||
import org.apache.zookeeper.AsyncCallback;
|
import org.apache.zookeeper.AsyncCallback;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.Op;
|
||||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
import org.apache.zookeeper.ZooDefs.Ids;
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
|
@ -60,6 +63,9 @@ import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.apache.zookeeper.data.ACL;
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
import org.apache.zookeeper.client.ZooKeeperSaslClient;
|
import org.apache.zookeeper.client.ZooKeeperSaslClient;
|
||||||
|
import org.apache.zookeeper.proto.CreateRequest;
|
||||||
|
import org.apache.zookeeper.proto.DeleteRequest;
|
||||||
|
import org.apache.zookeeper.proto.SetDataRequest;
|
||||||
import org.apache.zookeeper.server.ZooKeeperSaslServer;
|
import org.apache.zookeeper.server.ZooKeeperSaslServer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -882,7 +888,13 @@ public class ZKUtil {
|
||||||
*/
|
*/
|
||||||
public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
|
public static void setData(ZooKeeperWatcher zkw, String znode, byte [] data)
|
||||||
throws KeeperException, KeeperException.NoNodeException {
|
throws KeeperException, KeeperException.NoNodeException {
|
||||||
setData(zkw, znode, data, -1);
|
setData(zkw, (SetData)ZKUtilOp.setData(znode, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setData(ZooKeeperWatcher zkw, SetData setData)
|
||||||
|
throws KeeperException, KeeperException.NoNodeException {
|
||||||
|
SetDataRequest sd = (SetDataRequest)toZooKeeperOp(zkw, setData).toRequestRecord();
|
||||||
|
setData(zkw, sd.getPath(), sd.getData(), sd.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1088,14 +1100,20 @@ public class ZKUtil {
|
||||||
* @throws KeeperException if unexpected zookeeper exception
|
* @throws KeeperException if unexpected zookeeper exception
|
||||||
*/
|
*/
|
||||||
public static void createAndFailSilent(ZooKeeperWatcher zkw,
|
public static void createAndFailSilent(ZooKeeperWatcher zkw,
|
||||||
String znode)
|
String znode) throws KeeperException {
|
||||||
|
createAndFailSilent(zkw,
|
||||||
|
(CreateAndFailSilent)ZKUtilOp.createAndFailSilent(znode, new byte[0]));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createAndFailSilent(ZooKeeperWatcher zkw, CreateAndFailSilent cafs)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
|
CreateRequest create = (CreateRequest)toZooKeeperOp(zkw, cafs).toRequestRecord();
|
||||||
|
String znode = create.getPath();
|
||||||
try {
|
try {
|
||||||
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
|
RecoverableZooKeeper zk = zkw.getRecoverableZooKeeper();
|
||||||
waitForZKConnectionIfAuthenticating(zkw);
|
waitForZKConnectionIfAuthenticating(zkw);
|
||||||
if (zk.exists(znode, false) == null) {
|
if (zk.exists(znode, false) == null) {
|
||||||
zk.create(znode, new byte[0], createACL(zkw,znode),
|
zk.create(znode, create.getData(), create.getAcl(), CreateMode.fromFlag(create.getFlags()));
|
||||||
CreateMode.PERSISTENT);
|
|
||||||
}
|
}
|
||||||
} catch(KeeperException.NodeExistsException nee) {
|
} catch(KeeperException.NodeExistsException nee) {
|
||||||
} catch(KeeperException.NoAuthException nee){
|
} catch(KeeperException.NoAuthException nee){
|
||||||
|
@ -1181,14 +1199,22 @@ public class ZKUtil {
|
||||||
*/
|
*/
|
||||||
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
|
public static void deleteNodeFailSilent(ZooKeeperWatcher zkw, String node)
|
||||||
throws KeeperException {
|
throws KeeperException {
|
||||||
|
deleteNodeFailSilent(zkw,
|
||||||
|
(DeleteNodeFailSilent)ZKUtilOp.deleteNodeFailSilent(node));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void deleteNodeFailSilent(ZooKeeperWatcher zkw,
|
||||||
|
DeleteNodeFailSilent dnfs) throws KeeperException {
|
||||||
|
DeleteRequest delete = (DeleteRequest)toZooKeeperOp(zkw, dnfs).toRequestRecord();
|
||||||
try {
|
try {
|
||||||
zkw.getRecoverableZooKeeper().delete(node, -1);
|
zkw.getRecoverableZooKeeper().delete(delete.getPath(), delete.getVersion());
|
||||||
} catch(KeeperException.NoNodeException nne) {
|
} catch(KeeperException.NoNodeException nne) {
|
||||||
} catch(InterruptedException ie) {
|
} catch(InterruptedException ie) {
|
||||||
zkw.interruptedException(ie);
|
zkw.interruptedException(ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete the specified node and all of it's children.
|
* Delete the specified node and all of it's children.
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -1230,6 +1256,232 @@ public class ZKUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents an action taken by ZKUtil, e.g. createAndFailSilent.
|
||||||
|
* These actions are higher-level than ZKOp actions, which represent
|
||||||
|
* individual actions in the ZooKeeper API, like create.
|
||||||
|
*/
|
||||||
|
public abstract static class ZKUtilOp {
|
||||||
|
private String path;
|
||||||
|
|
||||||
|
private ZKUtilOp(String path) {
|
||||||
|
this.path = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a createAndFailSilent ZKUtilOp
|
||||||
|
*/
|
||||||
|
public static ZKUtilOp createAndFailSilent(String path, byte[] data) {
|
||||||
|
return new CreateAndFailSilent(path, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a deleteNodeFailSilent ZKUtilOP
|
||||||
|
*/
|
||||||
|
public static ZKUtilOp deleteNodeFailSilent(String path) {
|
||||||
|
return new DeleteNodeFailSilent(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a setData ZKUtilOp
|
||||||
|
*/
|
||||||
|
public static ZKUtilOp setData(String path, byte [] data) {
|
||||||
|
return new SetData(path, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return path to znode where the ZKOp will occur
|
||||||
|
*/
|
||||||
|
public String getPath() {
|
||||||
|
return path;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZKUtilOp representing createAndFailSilent in ZooKeeper
|
||||||
|
* (attempt to create node, ignore error if already exists)
|
||||||
|
*/
|
||||||
|
public static class CreateAndFailSilent extends ZKUtilOp {
|
||||||
|
private byte [] data;
|
||||||
|
|
||||||
|
private CreateAndFailSilent(String path, byte [] data) {
|
||||||
|
super(path);
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getData() {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (!(o instanceof CreateAndFailSilent)) return false;
|
||||||
|
|
||||||
|
CreateAndFailSilent op = (CreateAndFailSilent) o;
|
||||||
|
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int ret = 17 + getPath().hashCode() * 31;
|
||||||
|
return ret * 31 + Bytes.hashCode(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZKUtilOp representing deleteNodeFailSilent in ZooKeeper
|
||||||
|
* (attempt to delete node, ignore error if node doesn't exist)
|
||||||
|
*/
|
||||||
|
public static class DeleteNodeFailSilent extends ZKUtilOp {
|
||||||
|
private DeleteNodeFailSilent(String path) {
|
||||||
|
super(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (!(o instanceof DeleteNodeFailSilent)) return false;
|
||||||
|
|
||||||
|
return super.equals(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getPath().hashCode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* ZKUtilOp representing setData in ZooKeeper
|
||||||
|
*/
|
||||||
|
public static class SetData extends ZKUtilOp {
|
||||||
|
private byte [] data;
|
||||||
|
|
||||||
|
private SetData(String path, byte [] data) {
|
||||||
|
super(path);
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getData() {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (!(o instanceof SetData)) return false;
|
||||||
|
|
||||||
|
SetData op = (SetData) o;
|
||||||
|
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int ret = getPath().hashCode();
|
||||||
|
return ret * 31 + Bytes.hashCode(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert from ZKUtilOp to ZKOp
|
||||||
|
*/
|
||||||
|
private static Op toZooKeeperOp(ZooKeeperWatcher zkw, ZKUtilOp op)
|
||||||
|
throws UnsupportedOperationException {
|
||||||
|
if(op == null) return null;
|
||||||
|
|
||||||
|
if (op instanceof CreateAndFailSilent) {
|
||||||
|
CreateAndFailSilent cafs = (CreateAndFailSilent)op;
|
||||||
|
return Op.create(cafs.getPath(), cafs.getData(), createACL(zkw, cafs.getPath()),
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
} else if (op instanceof DeleteNodeFailSilent) {
|
||||||
|
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
|
||||||
|
return Op.delete(dnfs.getPath(), -1);
|
||||||
|
} else if (op instanceof SetData) {
|
||||||
|
SetData sd = (SetData)op;
|
||||||
|
return Op.setData(sd.getPath(), sd.getData(), -1);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
|
||||||
|
+ op.getClass().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If hbase.zookeeper.useMulti is true, use ZooKeeper's multi-update functionality.
|
||||||
|
* Otherwise, run the list of operations sequentially.
|
||||||
|
*
|
||||||
|
* If all of the following are true:
|
||||||
|
* - runSequentialOnMultiFailure is true
|
||||||
|
* - hbase.zookeeper.useMulti is true
|
||||||
|
* - on calling multi, we get a ZooKeeper exception that can be handled by a sequential call(*)
|
||||||
|
* Then:
|
||||||
|
* - we retry the operations one-by-one (sequentially)
|
||||||
|
*
|
||||||
|
* Note *: an example is receiving a NodeExistsException from a "create" call. Without multi,
|
||||||
|
* a user could call "createAndFailSilent" to ensure that a node exists if they don't care who
|
||||||
|
* actually created the node (i.e. the NodeExistsException from ZooKeeper is caught).
|
||||||
|
* This will cause all operations in the multi to fail, however, because
|
||||||
|
* the NodeExistsException that zk.create throws will fail the multi transaction.
|
||||||
|
* In this case, if the previous conditions hold, the commands are run sequentially, which should
|
||||||
|
* result in the correct final state, but means that the operations will not run atomically.
|
||||||
|
*
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
public static void multiOrSequential(ZooKeeperWatcher zkw, List<ZKUtilOp> ops,
|
||||||
|
boolean runSequentialOnMultiFailure) throws KeeperException {
|
||||||
|
if (ops == null) return;
|
||||||
|
boolean useMulti = zkw.getConfiguration().getBoolean(HConstants.ZOOKEEPER_USEMULTI, false);
|
||||||
|
|
||||||
|
if (useMulti) {
|
||||||
|
List<Op> zkOps = new LinkedList<Op>();
|
||||||
|
for (ZKUtilOp op : ops) {
|
||||||
|
zkOps.add(toZooKeeperOp(zkw, op));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
zkw.getRecoverableZooKeeper().multi(zkOps);
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
switch (ke.code()) {
|
||||||
|
case NODEEXISTS:
|
||||||
|
case NONODE:
|
||||||
|
case BADVERSION:
|
||||||
|
case NOAUTH:
|
||||||
|
// if we get an exception that could be solved by running sequentially
|
||||||
|
// (and the client asked us to), then break out and run sequentially
|
||||||
|
if (runSequentialOnMultiFailure) {
|
||||||
|
LOG.info("On call to ZK.multi, received exception: " + ke.toString() + "."
|
||||||
|
+ " Attempting to run operations sequentially because"
|
||||||
|
+ " runSequentialOnMultiFailure is: " + runSequentialOnMultiFailure + ".");
|
||||||
|
processSequentially(zkw, ops);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
throw ke;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
zkw.interruptedException(ie);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// run sequentially
|
||||||
|
processSequentially(zkw, ops);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void processSequentially(ZooKeeperWatcher zkw, List<ZKUtilOp> ops)
|
||||||
|
throws KeeperException, NoNodeException {
|
||||||
|
for (ZKUtilOp op : ops) {
|
||||||
|
if (op instanceof CreateAndFailSilent) {
|
||||||
|
createAndFailSilent(zkw, (CreateAndFailSilent) op);
|
||||||
|
} else if (op instanceof DeleteNodeFailSilent) {
|
||||||
|
deleteNodeFailSilent(zkw, (DeleteNodeFailSilent) op);
|
||||||
|
} else if (op instanceof SetData) {
|
||||||
|
setData(zkw, (SetData) op);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
|
||||||
|
+ op.getClass().getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// ZooKeeper cluster information
|
// ZooKeeper cluster information
|
||||||
//
|
//
|
||||||
|
|
|
@ -705,6 +705,18 @@
|
||||||
for more information.
|
for more information.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.zookeeper.useMulti</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>Instructs HBase to make use of ZooKeeper's multi-update functionality.
|
||||||
|
This allows certain ZooKeeper operations to complete more quickly and prevents some issues
|
||||||
|
with rare Replication failure scenarios (see the release note of HBASE-2611 for an example).
|
||||||
|
IMPORTANT: only set this to true if all ZooKeeper servers in the cluster are on version 3.4+
|
||||||
|
and will not be downgraded. ZooKeeper versions before 3.4 do not support multi-update and will
|
||||||
|
not fail gracefully if multi-update is invoked (see ZOOKEEPER-1495).
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<!-- End of properties used to generate ZooKeeper host:port quorum list. -->
|
<!-- End of properties used to generate ZooKeeper host:port quorum list. -->
|
||||||
|
|
||||||
<!--
|
<!--
|
||||||
|
|
|
@ -0,0 +1,291 @@
|
||||||
|
/**
|
||||||
|
* Copyright The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test ZooKeeper multi-update functionality
|
||||||
|
*/
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestZKMulti {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestZKMulti.class);
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
private static ZooKeeperWatcher zkw = null;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.startMiniZKCluster();
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
conf.setBoolean("hbase.zookeeper.useMulti", true);
|
||||||
|
Abortable abortable = new Abortable() {
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
LOG.info(why, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
zkw = new ZooKeeperWatcher(conf,
|
||||||
|
"TestZKMulti", abortable, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniZKCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleMulti() throws Exception {
|
||||||
|
// null multi
|
||||||
|
ZKUtil.multiOrSequential(zkw, null, false);
|
||||||
|
|
||||||
|
// empty multi
|
||||||
|
ZKUtil.multiOrSequential(zkw, new LinkedList<ZKUtilOp>(), false);
|
||||||
|
|
||||||
|
// single create
|
||||||
|
String path = ZKUtil.joinZNode(zkw.baseZNode, "testSimpleMulti");
|
||||||
|
LinkedList<ZKUtilOp> singleCreate = new LinkedList<ZKUtilOp>();
|
||||||
|
singleCreate.add(ZKUtilOp.createAndFailSilent(path, new byte[0]));
|
||||||
|
ZKUtil.multiOrSequential(zkw, singleCreate, false);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path) != -1);
|
||||||
|
|
||||||
|
// single setdata
|
||||||
|
LinkedList<ZKUtilOp> singleSetData = new LinkedList<ZKUtilOp>();
|
||||||
|
byte [] data = Bytes.toBytes("foobar");
|
||||||
|
singleSetData.add(ZKUtilOp.setData(path, data));
|
||||||
|
ZKUtil.multiOrSequential(zkw, singleSetData, false);
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path), data));
|
||||||
|
|
||||||
|
// single delete
|
||||||
|
LinkedList<ZKUtilOp> singleDelete = new LinkedList<ZKUtilOp>();
|
||||||
|
singleDelete.add(ZKUtilOp.deleteNodeFailSilent(path));
|
||||||
|
ZKUtil.multiOrSequential(zkw, singleDelete, false);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path) == -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testComplexMulti() throws Exception {
|
||||||
|
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti1");
|
||||||
|
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti2");
|
||||||
|
String path3 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti3");
|
||||||
|
String path4 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti4");
|
||||||
|
String path5 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti5");
|
||||||
|
String path6 = ZKUtil.joinZNode(zkw.baseZNode, "testComplexMulti6");
|
||||||
|
// create 4 nodes that we'll setData on or delete later
|
||||||
|
LinkedList<ZKUtilOp> create4Nodes = new LinkedList<ZKUtilOp>();
|
||||||
|
create4Nodes.add(ZKUtilOp.createAndFailSilent(path1, Bytes.toBytes(path1)));
|
||||||
|
create4Nodes.add(ZKUtilOp.createAndFailSilent(path2, Bytes.toBytes(path2)));
|
||||||
|
create4Nodes.add(ZKUtilOp.createAndFailSilent(path3, Bytes.toBytes(path3)));
|
||||||
|
create4Nodes.add(ZKUtilOp.createAndFailSilent(path4, Bytes.toBytes(path4)));
|
||||||
|
ZKUtil.multiOrSequential(zkw, create4Nodes, false);
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1), Bytes.toBytes(path1)));
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path2), Bytes.toBytes(path2)));
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path3), Bytes.toBytes(path3)));
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path4), Bytes.toBytes(path4)));
|
||||||
|
|
||||||
|
// do multiple of each operation (setData, delete, create)
|
||||||
|
LinkedList<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
|
||||||
|
// setData
|
||||||
|
ops.add(ZKUtilOp.setData(path1, Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1))));
|
||||||
|
ops.add(ZKUtilOp.setData(path2, Bytes.add(Bytes.toBytes(path2), Bytes.toBytes(path2))));
|
||||||
|
// delete
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(path3));
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(path4));
|
||||||
|
// create
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path5, Bytes.toBytes(path5)));
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path6, Bytes.toBytes(path6)));
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1),
|
||||||
|
Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1))));
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path2),
|
||||||
|
Bytes.add(Bytes.toBytes(path2), Bytes.toBytes(path2))));
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path3) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path4) == -1);
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path5), Bytes.toBytes(path5)));
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path6), Bytes.toBytes(path6)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleFailure() throws Exception {
|
||||||
|
// try to delete a node that doesn't exist
|
||||||
|
boolean caughtNoNode = false;
|
||||||
|
String path = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureZ");
|
||||||
|
LinkedList<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(path));
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
caughtNoNode = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNoNode);
|
||||||
|
|
||||||
|
// try to setData on a node that doesn't exist
|
||||||
|
caughtNoNode = false;
|
||||||
|
ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.setData(path, Bytes.toBytes(path)));
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
caughtNoNode = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNoNode);
|
||||||
|
|
||||||
|
// try to create on a node that already exists
|
||||||
|
boolean caughtNodeExists = false;
|
||||||
|
ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path, Bytes.toBytes(path)));
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NodeExistsException nee) {
|
||||||
|
caughtNodeExists = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNodeExists);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleFailureInMulti() throws Exception {
|
||||||
|
// try a multi where all but one operation succeeds
|
||||||
|
String pathA = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureInMultiA");
|
||||||
|
String pathB = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureInMultiB");
|
||||||
|
String pathC = ZKUtil.joinZNode(zkw.baseZNode, "testSingleFailureInMultiC");
|
||||||
|
LinkedList<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathA, Bytes.toBytes(pathA)));
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathB, Bytes.toBytes(pathB)));
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(pathC));
|
||||||
|
boolean caughtNoNode = false;
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
caughtNoNode = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNoNode);
|
||||||
|
// assert that none of the operations succeeded
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathA) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathB) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathC) == -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiFailure() throws Exception {
|
||||||
|
String pathX = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureX");
|
||||||
|
String pathY = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureY");
|
||||||
|
String pathZ = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureZ");
|
||||||
|
// create X that we will use to fail create later
|
||||||
|
LinkedList<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX)));
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
|
||||||
|
// fail one of each create ,setData, delete
|
||||||
|
String pathV = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureV");
|
||||||
|
String pathW = ZKUtil.joinZNode(zkw.baseZNode, "testMultiFailureW");
|
||||||
|
ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX))); // fail -- already exists
|
||||||
|
ops.add(ZKUtilOp.setData(pathY, Bytes.toBytes(pathY))); // fail -- doesn't exist
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(pathZ)); // fail -- doesn't exist
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathV))); // pass
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathW))); // pass
|
||||||
|
boolean caughtNodeExists = false;
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NodeExistsException nee) {
|
||||||
|
// check first operation that fails throws exception
|
||||||
|
caughtNodeExists = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNodeExists);
|
||||||
|
// check that no modifications were made
|
||||||
|
assertFalse(ZKUtil.checkExists(zkw, pathX) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathY) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathZ) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathW) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathV) == -1);
|
||||||
|
|
||||||
|
// test that with multiple failures, throws an exception corresponding to first failure in list
|
||||||
|
ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.setData(pathY, Bytes.toBytes(pathY))); // fail -- doesn't exist
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX))); // fail -- exists
|
||||||
|
boolean caughtNoNode = false;
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
} catch (KeeperException.NoNodeException nne) {
|
||||||
|
// check first operation that fails throws exception
|
||||||
|
caughtNoNode = true;
|
||||||
|
}
|
||||||
|
assertTrue(caughtNoNode);
|
||||||
|
// check that no modifications were made
|
||||||
|
assertFalse(ZKUtil.checkExists(zkw, pathX) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathY) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathZ) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathW) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, pathV) == -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRunSequentialOnMultiFailure() throws Exception {
|
||||||
|
String path1 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential1");
|
||||||
|
String path2 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential2");
|
||||||
|
String path3 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential3");
|
||||||
|
String path4 = ZKUtil.joinZNode(zkw.baseZNode, "runSequential4");
|
||||||
|
|
||||||
|
// create some nodes that we will use later
|
||||||
|
LinkedList<ZKUtilOp> ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path1, Bytes.toBytes(path1)));
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path2, Bytes.toBytes(path2)));
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, false);
|
||||||
|
|
||||||
|
// test that, even with operations that fail, the ones that would pass will pass
|
||||||
|
// with runSequentialOnMultiFailure
|
||||||
|
ops = new LinkedList<ZKUtilOp>();
|
||||||
|
ops.add(ZKUtilOp.setData(path1, Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1)))); // pass
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(path2)); // pass
|
||||||
|
ops.add(ZKUtilOp.deleteNodeFailSilent(path3)); // fail -- node doesn't exist
|
||||||
|
ops.add(ZKUtilOp.createAndFailSilent(path4,
|
||||||
|
Bytes.add(Bytes.toBytes(path4), Bytes.toBytes(path4)))); // pass
|
||||||
|
ZKUtil.multiOrSequential(zkw, ops, true);
|
||||||
|
assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1),
|
||||||
|
Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1))));
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path2) == -1);
|
||||||
|
assertTrue(ZKUtil.checkExists(zkw, path3) == -1);
|
||||||
|
assertFalse(ZKUtil.checkExists(zkw, path4) == -1);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue