HBASE-22057 Cap the size of the nodes we delete in one ZK.multi call
If we try to delete too many znodes at once, we'll smack into the jute.maxbuffer size. Try to prevent that from happening. The dominating factor of the ZK client request should be the znode side on a delete. Signed-off-by: Peter Somogyi <psomogyi@apache.org> Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java
This commit is contained in:
parent
01f8e1eaa3
commit
677ed9ef89
@ -88,6 +88,7 @@ public class RecoverableZooKeeper {
|
||||
private String quorumServers;
|
||||
private final Random salter;
|
||||
private final RetryCounter authFailedRetryCounter;
|
||||
private int maxMultiSize;
|
||||
|
||||
// The metadata attached to each piece of data has the
|
||||
// format:
|
||||
@ -106,7 +107,7 @@ public class RecoverableZooKeeper {
|
||||
justification="None. Its always been this way.")
|
||||
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
int authFailedRetries, int authFailedPause, int maxMultiSize) throws IOException {
|
||||
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
|
||||
this.retryCounterFactory =
|
||||
new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
|
||||
@ -123,7 +124,14 @@ public class RecoverableZooKeeper {
|
||||
this.watcher = watcher;
|
||||
this.sessionTimeout = sessionTimeout;
|
||||
this.quorumServers = quorumServers;
|
||||
try {checkZk();} catch (Exception x) {/* ignore */}
|
||||
this.maxMultiSize = maxMultiSize;
|
||||
|
||||
try {
|
||||
checkZk();
|
||||
} catch (Exception x) {
|
||||
/* ignore */
|
||||
}
|
||||
|
||||
salter = new Random();
|
||||
|
||||
RetryConfig authFailedRetryConfig = new RetryConfig(
|
||||
@ -136,7 +144,18 @@ public class RecoverableZooKeeper {
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to create a Zookeeper connection. Turns any exception encountered into a
|
||||
* Returns the maximum size (in bytes) that should be included in any single multi() call.
|
||||
*
|
||||
* NB: This is an approximation, so there may be variance in the msg actually sent over the
|
||||
* wire. Please be sure to set this approximately, with respect to your ZK server configuration
|
||||
* for jute.maxbuffer.
|
||||
*/
|
||||
public int getMaxMultiSizeLimit() {
|
||||
return maxMultiSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to create a ZooKeeper connection. Turns any exception encountered into a
|
||||
* KeeperException.OperationTimeoutException so it can retried.
|
||||
* @return The created Zookeeper connection object
|
||||
* @throws KeeperException
|
||||
|
@ -31,6 +31,7 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -115,16 +116,19 @@ public class ZKUtil {
|
||||
*/
|
||||
RecoverableZooKeeper create(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime,
|
||||
String identifier, int authFailedRetries, int authFailedPause) throws IOException;
|
||||
String identifier, int authFailedRetries, int authFailedPause, int multiMaxSize)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
public static class DefaultZooKeeperFactory implements ZooKeeperFactory {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime,
|
||||
String identifier, int authFailedRetries, int authFailedPause) throws IOException {
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime,
|
||||
String identifier, int authFailedRetries, int authFailedPause, int multiMaxSize)
|
||||
throws IOException {
|
||||
return new RecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause,
|
||||
multiMaxSize);
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,13 +175,14 @@ public class ZKUtil {
|
||||
|
||||
int authFailedRetries = conf.getInt(AUTH_FAILED_RETRIES_KEY, AUTH_FAILED_RETRIES_DEFAULT);
|
||||
int authFailedPause = conf.getInt(AUTH_FAILED_PAUSE_KEY, AUTH_FAILED_PAUSE_DEFAULT);
|
||||
int multiMaxSize = conf.getInt("zookeeper.multi.max.size", 1024*1024);
|
||||
|
||||
Class<? extends ZooKeeperFactory> factoryClz = conf.getClass("zookeeper.factory.class",
|
||||
DefaultZooKeeperFactory.class, ZooKeeperFactory.class);
|
||||
try {
|
||||
ZooKeeperFactory factory = factoryClz.newInstance();
|
||||
return factory.create(ensemble, timeout, watcher, retry, retryIntervalMillis,
|
||||
maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
maxSleepTime, identifier, authFailedRetries, authFailedPause, multiMaxSize);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof RuntimeException) {
|
||||
throw (RuntimeException) e;
|
||||
@ -316,10 +321,6 @@ public class ZKUtil {
|
||||
private final String keytabFile;
|
||||
private final String principal;
|
||||
|
||||
public JaasConfiguration(String loginContextName, String principal) {
|
||||
this(loginContextName, principal, null, true);
|
||||
}
|
||||
|
||||
public JaasConfiguration(String loginContextName, String principal, String keytabFile) {
|
||||
this(loginContextName, principal, keytabFile, keytabFile == null || keytabFile.length() == 0);
|
||||
}
|
||||
@ -1401,10 +1402,7 @@ public class ZKUtil {
|
||||
ops.add(ZKUtilOp.deleteNodeFailSilent(children.get(i)));
|
||||
}
|
||||
}
|
||||
// atleast one element should exist
|
||||
if (ops.size() > 0) {
|
||||
multiOrSequential(zkw, ops, runSequentialOnMultiFailure);
|
||||
}
|
||||
submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1464,10 +1462,61 @@ public class ZKUtil {
|
||||
zkw.interruptedException(e);
|
||||
}
|
||||
}
|
||||
// atleast one element should exist
|
||||
if (ops.size() > 0) {
|
||||
multiOrSequential(zkw, ops, runSequentialOnMultiFailure);
|
||||
submitBatchedMultiOrSequential(zkw, runSequentialOnMultiFailure, ops);
|
||||
}
|
||||
|
||||
/**
|
||||
* Chunks the provided {@code ops} when their approximate size exceeds the the configured limit.
|
||||
* Take caution that this can ONLY be used for operations where atomicity is not important,
|
||||
* e.g. deletions. It must not be used when atomicity of the operations is critical.
|
||||
*/
|
||||
static void submitBatchedMultiOrSequential(ZooKeeperWatcher zkw,
|
||||
boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException {
|
||||
// at least one element should exist
|
||||
if (ops.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
final int multiMaxSize = zkw.getRecoverableZooKeeper().getMaxMultiSizeLimit();
|
||||
// Batch up the items to over smashing through jute.maxbuffer with too many Ops.
|
||||
final List<List<ZKUtilOp>> batchedOps = partitionOps(ops, multiMaxSize);
|
||||
// Would use forEach() but have to handle KeeperException
|
||||
for (List<ZKUtilOp> batch : batchedOps) {
|
||||
multiOrSequential(zkw, batch, runSequentialOnMultiFailure);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Partition the list of {@code ops} by size (using {@link #estimateSize(ZKUtilOp)}).
|
||||
*/
|
||||
static List<List<ZKUtilOp>> partitionOps(List<ZKUtilOp> ops, int maxPartitionSize) {
|
||||
List<List<ZKUtilOp>> partitionedOps = new ArrayList<>();
|
||||
List<ZKUtilOp> currentPartition = new ArrayList<>();
|
||||
int currentPartitionSize = 0;
|
||||
partitionedOps.add(currentPartition);
|
||||
Iterator<ZKUtilOp> iter = ops.iterator();
|
||||
while (iter.hasNext()) {
|
||||
ZKUtilOp currentOp = iter.next();
|
||||
int currentOpSize = estimateSize(currentOp);
|
||||
|
||||
// Roll a new partition if necessary
|
||||
// If the current partition is empty, put the element in there anyways.
|
||||
// We can roll a new partition if we get another element
|
||||
if (!currentPartition.isEmpty() && currentOpSize + currentPartitionSize > maxPartitionSize) {
|
||||
currentPartition = new ArrayList<>();
|
||||
partitionedOps.add(currentPartition);
|
||||
currentPartitionSize = 0;
|
||||
}
|
||||
|
||||
// Add the current op to the partition
|
||||
currentPartition.add(currentOp);
|
||||
// And record its size
|
||||
currentPartitionSize += currentOpSize;
|
||||
}
|
||||
return partitionedOps;
|
||||
}
|
||||
|
||||
static int estimateSize(ZKUtilOp op) {
|
||||
return Bytes.toBytes(op.getPath()).length;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -33,9 +33,9 @@ public class AuthFailingRecoverableZooKeeper extends RecoverableZooKeeper {
|
||||
|
||||
public AuthFailingRecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
int authFailedRetries, int authFailedPause, int multiMaxSize) throws IOException {
|
||||
super(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
|
||||
identifier, authFailedRetries, authFailedPause);
|
||||
identifier, authFailedRetries, authFailedPause, multiMaxSize);
|
||||
this.quorumServers = quorumServers;
|
||||
this.sessionTimeout = sessionTimeout;
|
||||
this.watcher = watcher;
|
||||
|
@ -39,9 +39,10 @@ public class SelfHealingRecoverableZooKeeper extends RecoverableZooKeeper {
|
||||
|
||||
public SelfHealingRecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause, int numFailuresBeforeSuccess) throws IOException {
|
||||
int authFailedRetries, int authFailedPause, int numFailuresBeforeSuccess, int multiMaxSize)
|
||||
throws IOException {
|
||||
super(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
|
||||
identifier, authFailedRetries, authFailedPause);
|
||||
identifier, authFailedRetries, authFailedPause, multiMaxSize);
|
||||
this.quorumServers = quorumServers;
|
||||
this.sessionTimeout = sessionTimeout;
|
||||
this.watcher = watcher;
|
||||
|
@ -59,9 +59,10 @@ public class TestZKAuthFailedRecovery {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
int authFailedRetries, int authFailedPause, int multiMaxSize) throws IOException {
|
||||
return new AuthFailingRecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause,
|
||||
multiMaxSize);
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,10 +72,10 @@ public class TestZKAuthFailedRecovery {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
int authFailedRetries, int authFailedPause, int multiMaxSize) throws IOException {
|
||||
return new SelfHealingRecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause,
|
||||
FAILURES_BEFORE_SUCCESS);
|
||||
FAILURES_BEFORE_SUCCESS, multiMaxSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,11 +20,13 @@
|
||||
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
@ -54,6 +56,18 @@ public class TestZKMulti {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static ZooKeeperWatcher zkw = null;
|
||||
|
||||
private static class ZKMultiAbortable implements Abortable {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.info(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
@ -412,6 +426,73 @@ public class TestZKMulti {
|
||||
assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchedDeletesOfWideZNodes() throws Exception {
|
||||
// Batch every 50bytes
|
||||
final int batchSize = 50;
|
||||
Configuration localConf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
localConf.setInt("zookeeper.multi.max.size", batchSize);
|
||||
try (ZooKeeperWatcher customZkw = new ZooKeeperWatcher(localConf,
|
||||
"TestZKMulti_Custom", new ZKMultiAbortable(), true)) {
|
||||
|
||||
// With a parent znode like this, we'll get batches of 2-3 elements
|
||||
final String parent1 = "/batchedDeletes1";
|
||||
final String parent2 = "/batchedDeletes2";
|
||||
final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
// Write one node
|
||||
List<Op> ops = new ArrayList<>();
|
||||
ops.add(Op.create(parent1, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
|
||||
for (int i = 0; i < batchSize * 2; i++) {
|
||||
ops.add(Op.create(
|
||||
parent1 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
|
||||
}
|
||||
customZkw.getRecoverableZooKeeper().multi(ops);
|
||||
|
||||
// Write into a second node
|
||||
ops.clear();
|
||||
ops.add(Op.create(parent2, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
|
||||
for (int i = 0; i < batchSize * 4; i++) {
|
||||
ops.add(Op.create(
|
||||
parent2 + "/" + i, EMPTY_BYTES, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
|
||||
}
|
||||
customZkw.getRecoverableZooKeeper().multi(ops);
|
||||
|
||||
// These should return successfully
|
||||
ZKUtil.deleteChildrenRecursively(customZkw, parent1);
|
||||
ZKUtil.deleteChildrenRecursively(customZkw, parent2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListPartitioning() {
|
||||
// 10 Bytes
|
||||
ZKUtilOp tenByteOp = ZKUtilOp.deleteNodeFailSilent("/123456789");
|
||||
|
||||
// Simple, single element case
|
||||
assertEquals(Collections.singletonList(Collections.singletonList(tenByteOp)),
|
||||
ZKUtil.partitionOps(Collections.singletonList(tenByteOp), 15));
|
||||
|
||||
// Simple case where we exceed the limit, but must make the list
|
||||
assertEquals(Collections.singletonList(Collections.singletonList(tenByteOp)),
|
||||
ZKUtil.partitionOps(Collections.singletonList(tenByteOp), 5));
|
||||
|
||||
// Each gets its own bucket
|
||||
assertEquals(
|
||||
Arrays.asList(Arrays.asList(tenByteOp), Arrays.asList(tenByteOp), Arrays.asList(tenByteOp)),
|
||||
ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 15));
|
||||
|
||||
// Test internal boundary
|
||||
assertEquals(
|
||||
Arrays.asList(Arrays.asList(tenByteOp,tenByteOp), Arrays.asList(tenByteOp)),
|
||||
ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 20));
|
||||
|
||||
// Plenty of space for one partition
|
||||
assertEquals(
|
||||
Arrays.asList(Arrays.asList(tenByteOp, tenByteOp, tenByteOp)),
|
||||
ZKUtil.partitionOps(Arrays.asList(tenByteOp, tenByteOp, tenByteOp), 50));
|
||||
}
|
||||
|
||||
private void createZNodeTree(String rootZNode) throws KeeperException,
|
||||
InterruptedException {
|
||||
List<Op> opList = new ArrayList<Op>();
|
||||
|
Loading…
x
Reference in New Issue
Block a user