HBASE-21796 Recover a ZK client from the AUTH_FAILED state
Introduces "hbase.zookeeper.authfailed.retries.number" and "hbase.zookeeper.authfailed.pause" to control number of retries from the AUTH_FAILED state (and the pause in millis between attempts) before giving up and throwing an uncaught exception. Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
701c29b30b
commit
25defc9293
|
@ -1468,17 +1468,33 @@ class ConnectionManager {
|
|||
@Override
|
||||
public void clearRegionCache() {
|
||||
metaCache.clearCache();
|
||||
clearMetaRegionLocation();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionCache(final TableName tableName) {
|
||||
if (TableName.META_TABLE_NAME.equals(tableName)) {
|
||||
clearMetaRegionLocation();
|
||||
} else {
|
||||
metaCache.clearCache(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearRegionCache(final byte[] tableName) {
|
||||
if (Bytes.equals(TableName.META_TABLE_NAME.getName(), tableName)) {
|
||||
clearMetaRegionLocation();
|
||||
} else {
|
||||
clearRegionCache(TableName.valueOf(tableName));
|
||||
}
|
||||
}
|
||||
|
||||
private void clearMetaRegionLocation() {
|
||||
// Meta's location is cached separately from the MetaCache
|
||||
synchronized (metaRegionLock) {
|
||||
this.metaLocations = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Put a newly discovered HRegionLocation into the cache.
|
||||
|
|
|
@ -24,13 +24,18 @@ import java.util.ArrayList;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.BackoffPolicy;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceScope;
|
||||
import org.apache.zookeeper.AsyncCallback;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
@ -44,8 +49,6 @@ import org.apache.zookeeper.data.ACL;
|
|||
import org.apache.zookeeper.data.Stat;
|
||||
import org.apache.zookeeper.proto.CreateRequest;
|
||||
import org.apache.zookeeper.proto.SetDataRequest;
|
||||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceScope;
|
||||
|
||||
/**
|
||||
* A zookeeper that can handle 'recoverable' errors.
|
||||
|
@ -73,6 +76,7 @@ import org.apache.htrace.TraceScope;
|
|||
@InterfaceAudience.Private
|
||||
public class RecoverableZooKeeper {
|
||||
private static final Log LOG = LogFactory.getLog(RecoverableZooKeeper.class);
|
||||
|
||||
// the actual ZooKeeper client instance
|
||||
private ZooKeeper zk;
|
||||
private final RetryCounterFactory retryCounterFactory;
|
||||
|
@ -83,6 +87,7 @@ public class RecoverableZooKeeper {
|
|||
private int sessionTimeout;
|
||||
private String quorumServers;
|
||||
private final Random salter;
|
||||
private final RetryCounter authFailedRetryCounter;
|
||||
|
||||
// The metadata attached to each piece of data has the
|
||||
// format:
|
||||
|
@ -97,18 +102,11 @@ public class RecoverableZooKeeper {
|
|||
private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
|
||||
private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
|
||||
|
||||
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime)
|
||||
throws IOException {
|
||||
this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
|
||||
null);
|
||||
}
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
|
||||
justification="None. Its always been this way.")
|
||||
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier)
|
||||
throws IOException {
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
|
||||
this.retryCounterFactory =
|
||||
new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
|
||||
|
@ -127,6 +125,14 @@ public class RecoverableZooKeeper {
|
|||
this.quorumServers = quorumServers;
|
||||
try {checkZk();} catch (Exception x) {/* ignore */}
|
||||
salter = new Random();
|
||||
|
||||
RetryConfig authFailedRetryConfig = new RetryConfig(
|
||||
authFailedRetries + 1,
|
||||
authFailedPause,
|
||||
authFailedPause,
|
||||
TimeUnit.MILLISECONDS,
|
||||
new BackoffPolicy());
|
||||
this.authFailedRetryCounter = new RetryCounter(authFailedRetryConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -137,14 +143,49 @@ public class RecoverableZooKeeper {
|
|||
*/
|
||||
protected synchronized ZooKeeper checkZk() throws KeeperException {
|
||||
if (this.zk == null) {
|
||||
this.zk = createNewZooKeeper();
|
||||
}
|
||||
return zk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ZooKeeper client. Implemented in its own method to
|
||||
* allow for mock'ed objects to be returned for testing.
|
||||
*/
|
||||
ZooKeeper createNewZooKeeper() throws KeeperException {
|
||||
try {
|
||||
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
return new ZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Unable to create ZooKeeper Connection", ex);
|
||||
throw new KeeperException.OperationTimeoutException();
|
||||
}
|
||||
}
|
||||
return zk;
|
||||
|
||||
public synchronized void reconnectAfterAuthFailure() throws InterruptedException,
|
||||
KeeperException {
|
||||
if (zk != null) {
|
||||
LOG.info("Closing ZooKeeper connection which saw AUTH_FAILED, session" +
|
||||
" was: 0x"+Long.toHexString(zk.getSessionId()));
|
||||
zk.close();
|
||||
// Null out the ZK object so checkZk() will create a new one
|
||||
zk = null;
|
||||
// Check our maximum number of retries before retrying
|
||||
if (!authFailedRetryCounter.shouldRetry()) {
|
||||
throw new RuntimeException("Exceeded the configured retries for handling ZooKeeper"
|
||||
+ " AUTH_FAILED exceptions (" + authFailedRetryCounter.getMaxAttempts() + ")");
|
||||
}
|
||||
// Avoid a fast retry loop.
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Sleeping " + authFailedRetryCounter.getBackoffTime()
|
||||
+ "ms before re-creating ZooKeeper object after AUTH_FAILED state ("
|
||||
+ authFailedRetryCounter.getAttemptTimes() + "/"
|
||||
+ authFailedRetryCounter.getMaxAttempts() + ")");
|
||||
}
|
||||
authFailedRetryCounter.sleepUntilNextRetry();
|
||||
}
|
||||
checkZk();
|
||||
LOG.info("Recreated a ZooKeeper, session" +
|
||||
" is: 0x"+Long.toHexString(zk.getSessionId()));
|
||||
}
|
||||
|
||||
public synchronized void reconnectAfterExpiration()
|
||||
|
@ -192,6 +233,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "delete");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "delete");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -224,6 +269,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -255,6 +304,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "exists");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -269,7 +322,7 @@ public class RecoverableZooKeeper {
|
|||
|
||||
private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
|
||||
String opName) throws KeeperException {
|
||||
LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e);
|
||||
LOG.debug("Possibly transient ZooKeeper, quorum=" + quorumServers + ", exception=" + e, e);
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
LOG.error("ZooKeeper " + opName + " failed after "
|
||||
+ retryCounter.getMaxAttempts() + " attempts");
|
||||
|
@ -296,6 +349,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -327,6 +384,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "getChildren");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -359,6 +420,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -391,6 +456,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "getData");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -426,6 +495,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "setData");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "setData");
|
||||
break;
|
||||
case BADVERSION:
|
||||
if (isRetry) {
|
||||
// try to verify whether the previous setData success or not
|
||||
|
@ -473,6 +546,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "getAcl");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "getAcl");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -504,6 +581,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "setAcl");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "setAcl");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -588,6 +669,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "create");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "create");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -621,6 +706,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "create");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "create");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
@ -676,6 +765,10 @@ public class RecoverableZooKeeper {
|
|||
case OPERATIONTIMEOUT:
|
||||
retryOrThrow(retryCounter, e, "multi");
|
||||
break;
|
||||
case AUTHFAILED:
|
||||
reconnectAfterAuthFailure();
|
||||
retryOrThrow(retryCounter, e, "multi");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw e;
|
||||
|
|
|
@ -96,10 +96,38 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
|||
public class ZKUtil {
|
||||
private static final Log LOG = LogFactory.getLog(ZKUtil.class);
|
||||
|
||||
// Configuration keys/defaults for handling AUTH_FAILED
|
||||
public static final String AUTH_FAILED_RETRIES_KEY = "hbase.zookeeper.authfailed.retries.number";
|
||||
public static final int AUTH_FAILED_RETRIES_DEFAULT = 15;
|
||||
public static final String AUTH_FAILED_PAUSE_KEY = "hbase.zookeeper.authfailed.pause";
|
||||
public static final int AUTH_FAILED_PAUSE_DEFAULT = 100;
|
||||
|
||||
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
|
||||
public static final char ZNODE_PATH_SEPARATOR = '/';
|
||||
private static int zkDumpConnectionTimeOut;
|
||||
|
||||
/**
|
||||
* Interface to allow custom implementations of RecoverableZooKeeper to be created.
|
||||
*/
|
||||
public static interface ZooKeeperFactory {
|
||||
/**
|
||||
* Creates a new instance of a RecoverableZooKeeper.
|
||||
*/
|
||||
RecoverableZooKeeper create(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime,
|
||||
String identifier, int authFailedRetries, int authFailedPause) throws IOException;
|
||||
}
|
||||
|
||||
public static class DefaultZooKeeperFactory implements ZooKeeperFactory {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout,
|
||||
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime,
|
||||
String identifier, int authFailedRetries, int authFailedPause) throws IOException {
|
||||
return new RecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
|
||||
* from the specified configuration object using methods from {@link ZKConfig}.
|
||||
|
@ -140,8 +168,22 @@ public class ZKUtil {
|
|||
int maxSleepTime = conf.getInt("zookeeper.recovery.retry.maxsleeptime", 60000);
|
||||
zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
|
||||
1000);
|
||||
return new RecoverableZooKeeper(ensemble, timeout, watcher,
|
||||
retry, retryIntervalMillis, maxSleepTime, identifier);
|
||||
|
||||
int authFailedRetries = conf.getInt(AUTH_FAILED_RETRIES_KEY, AUTH_FAILED_RETRIES_DEFAULT);
|
||||
int authFailedPause = conf.getInt(AUTH_FAILED_PAUSE_KEY, AUTH_FAILED_PAUSE_DEFAULT);
|
||||
|
||||
Class<? extends ZooKeeperFactory> factoryClz = conf.getClass("zookeeper.factory.class",
|
||||
DefaultZooKeeperFactory.class, ZooKeeperFactory.class);
|
||||
try {
|
||||
ZooKeeperFactory factory = factoryClz.newInstance();
|
||||
return factory.create(ensemble, timeout, watcher, retry, retryIntervalMillis,
|
||||
maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof RuntimeException) {
|
||||
throw (RuntimeException) e;
|
||||
}
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -36,6 +39,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
|||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -224,6 +228,21 @@ public class HConnectionTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
public static HConnectionImplementation requireHConnImpl(Connection conn) {
|
||||
assertNotNull("Cannot operate on a null Connection", conn);
|
||||
assertEquals("This method requires an HConnectionImplementation",
|
||||
HConnectionImplementation.class, conn.getClass());
|
||||
return (HConnectionImplementation) conn;
|
||||
}
|
||||
|
||||
public static RecoverableZooKeeper unwrapZK(Connection conn) throws IOException {
|
||||
return requireHConnImpl(conn).getKeepAliveZooKeeperWatcher().getRecoverableZooKeeper();
|
||||
}
|
||||
|
||||
public static void clearRegionCache(Connection conn) throws IOException {
|
||||
requireHConnImpl(conn).clearRegionCache();
|
||||
}
|
||||
|
||||
/**
|
||||
* This coproceesor sleep 2s at first increment/append rpc call.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class AuthFailingRecoverableZooKeeper extends RecoverableZooKeeper {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AuthFailingRecoverableZooKeeper.class);
|
||||
private Watcher watcher;
|
||||
private int sessionTimeout;
|
||||
private String quorumServers;
|
||||
|
||||
public AuthFailingRecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
super(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
|
||||
identifier, authFailedRetries, authFailedPause);
|
||||
this.quorumServers = quorumServers;
|
||||
this.sessionTimeout = sessionTimeout;
|
||||
this.watcher = watcher;
|
||||
}
|
||||
|
||||
@Override
|
||||
ZooKeeper createNewZooKeeper() throws KeeperException {
|
||||
try {
|
||||
// Construct our "special" ZooKeeper instance
|
||||
return new AuthFailingZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Unable to create ZooKeeper Connection", ex);
|
||||
throw new KeeperException.OperationTimeoutException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.KeeperException.AuthFailedException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* A wrapper around {@link ZooKeeper} which tries to mimic semantics around AUTH_FAILED. When
|
||||
* an AuthFailedException is thrown the first time, it is thrown every time after that.
|
||||
*/
|
||||
public class AuthFailingZooKeeper extends ZooKeeper {
|
||||
private static final AuthFailedException AUTH_FAILED_EXCEPTION = new AuthFailedException();
|
||||
|
||||
// Latch for the "first" AUTH_FAILED occurrence
|
||||
private final AtomicBoolean FAILURE_LATCH = new AtomicBoolean(false);
|
||||
// Latch for when we start always throwing AUTH_FAILED
|
||||
private final AtomicBoolean IS_AUTH_FAILED = new AtomicBoolean(false);
|
||||
|
||||
public AuthFailingZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
|
||||
throws IOException {
|
||||
super(connectString, sessionTimeout, watcher);
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes AUTH_FAILED exceptions to be thrown by {@code this}.
|
||||
*/
|
||||
public void triggerAuthFailed() {
|
||||
FAILURE_LATCH.set(true);
|
||||
}
|
||||
|
||||
void check() throws KeeperException {
|
||||
// ZK state model states that once an AUTH_FAILED exception is thrown, it is thrown for
|
||||
// every subsequent operation
|
||||
if (IS_AUTH_FAILED.get()) {
|
||||
throw AUTH_FAILED_EXCEPTION;
|
||||
}
|
||||
// We're not yet throwing AUTH_FAILED
|
||||
if (!FAILURE_LATCH.get()) {
|
||||
return;
|
||||
}
|
||||
// Start throwing AUTH_FAILED
|
||||
IS_AUTH_FAILED.set(true);
|
||||
throw AUTH_FAILED_EXCEPTION;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException,
|
||||
InterruptedException {
|
||||
check();
|
||||
return super.getData(path, watcher, stat);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String create(String path, byte[] data, List<ACL> acl, CreateMode cmode)
|
||||
throws KeeperException, InterruptedException {
|
||||
check();
|
||||
return super.create(path, data, acl, cmode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
|
||||
check();
|
||||
return super.exists(path, watch);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
|
||||
check();
|
||||
return super.exists(path, watcher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getChildren(String path, boolean watch)
|
||||
throws KeeperException, InterruptedException {
|
||||
check();
|
||||
return super.getChildren(path, watch);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A RecoverableZooKeeper instance which gives broken connections a number of times, and then
|
||||
* returns good connections.
|
||||
*/
|
||||
public class SelfHealingRecoverableZooKeeper extends RecoverableZooKeeper {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SelfHealingRecoverableZooKeeper.class);
|
||||
private Watcher watcher;
|
||||
private int sessionTimeout;
|
||||
private String quorumServers;
|
||||
private final AtomicInteger counter;
|
||||
|
||||
public SelfHealingRecoverableZooKeeper(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause, int numFailuresBeforeSuccess) throws IOException {
|
||||
super(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis, maxSleepTime,
|
||||
identifier, authFailedRetries, authFailedPause);
|
||||
this.quorumServers = quorumServers;
|
||||
this.sessionTimeout = sessionTimeout;
|
||||
this.watcher = watcher;
|
||||
this.counter = new AtomicInteger(numFailuresBeforeSuccess);
|
||||
}
|
||||
|
||||
@Override
|
||||
ZooKeeper createNewZooKeeper() throws KeeperException {
|
||||
try {
|
||||
int remaining = counter.getAndDecrement();
|
||||
// Construct our "special" ZooKeeper instance
|
||||
AuthFailingZooKeeper zk = new AuthFailingZooKeeper(quorumServers, sessionTimeout, watcher);
|
||||
if (remaining > 0) {
|
||||
zk.triggerAuthFailed();
|
||||
}
|
||||
return zk;
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Unable to create ZooKeeper Connection", ex);
|
||||
throw new KeeperException.OperationTimeoutException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,188 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZooKeeperFactory;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestZKAuthFailedRecovery {
|
||||
final Logger LOG = LoggerFactory.getLogger(getClass());
|
||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
public static class AuthFailingZooKeeperFactory implements ZooKeeperFactory {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
return new AuthFailingRecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int FAILURES_BEFORE_SUCCESS = 3;
|
||||
|
||||
public static class SelfHealingZooKeeperFactory implements ZooKeeperFactory {
|
||||
@Override
|
||||
public RecoverableZooKeeper create(String quorumServers, int sessionTimeout, Watcher watcher,
|
||||
int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
|
||||
int authFailedRetries, int authFailedPause) throws IOException {
|
||||
return new SelfHealingRecoverableZooKeeper(quorumServers, sessionTimeout, watcher, maxRetries,
|
||||
retryIntervalMillis, maxSleepTime, identifier, authFailedRetries, authFailedPause,
|
||||
FAILURES_BEFORE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setBoolean("hbase.table.sanity.checks", true); // enable for below tests
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFaultyClientZK() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setClass("zookeeper.factory.class", AuthFailingZooKeeperFactory.class,
|
||||
ZooKeeperFactory.class);
|
||||
LOG.debug("Reading meta first time");
|
||||
final Connection conn = ConnectionFactory.createConnection(conf);
|
||||
try (Table t = conn.getTable(TableName.valueOf("hbase:meta"))) {
|
||||
LOG.info(TEST_UTIL.countRows(t) + " rows in meta");
|
||||
}
|
||||
// Make sure we got our custom ZK wrapper class from the HConn
|
||||
ZooKeeper zk = HConnectionTestingUtility.unwrapZK(conn).checkZk();
|
||||
assertEquals(AuthFailingZooKeeper.class, zk.getClass());
|
||||
|
||||
((AuthFailingZooKeeper) zk).triggerAuthFailed();
|
||||
// Clear out the region cache to force a read to meta (and thus, a read to ZK)
|
||||
HConnectionTestingUtility.clearRegionCache(conn);
|
||||
|
||||
// Use the HConnection in a way that will talk to ZK
|
||||
ExecutorService svc = Executors.newSingleThreadExecutor();
|
||||
Future<Boolean> res = svc.submit(new Callable<Boolean>() {
|
||||
public Boolean call() {
|
||||
LOG.debug("Reading meta after clearing the Region caches");
|
||||
try (Table t = conn.getTable(TableName.valueOf("hbase:meta"))) {
|
||||
LOG.info(TEST_UTIL.countRows(t) + " rows in meta");
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to read hbase:meta", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
});
|
||||
// Without proper handling of AUTH_FAILED, this would spin indefinitely. With
|
||||
// the change introduced with this test, we should get a fresh ZK instance that
|
||||
// won't fail repeatedly.
|
||||
try {
|
||||
res.get(30, TimeUnit.SECONDS);
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Failed to execute task", e);
|
||||
Assert.fail("Failed to recover from AUTH_FAILED state in zookeeper client");
|
||||
} catch (TimeoutException e) {
|
||||
LOG.error("Task timed out instead of recovering", e);
|
||||
Assert.fail("Failed to recover from AUTH_FAILED state in zookeeper client");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void eventuallyRecoveringZKClient() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setClass("zookeeper.factory.class", SelfHealingZooKeeperFactory.class,
|
||||
ZooKeeperFactory.class);
|
||||
// Retry one more time than we fail, and validate that we succeed
|
||||
conf.setInt(ZKUtil.AUTH_FAILED_RETRIES_KEY, FAILURES_BEFORE_SUCCESS + 1);
|
||||
// Don't bother waiting
|
||||
conf.setInt(ZKUtil.AUTH_FAILED_PAUSE_KEY, 0);
|
||||
|
||||
final Connection conn = ConnectionFactory.createConnection(conf);
|
||||
|
||||
// Make sure we got our custom ZK wrapper class from the HConn
|
||||
RecoverableZooKeeper recoverableZk = HConnectionTestingUtility.unwrapZK(conn);
|
||||
assertEquals(SelfHealingRecoverableZooKeeper.class, recoverableZk.getClass());
|
||||
ZooKeeper zk = recoverableZk.checkZk();
|
||||
assertEquals(AuthFailingZooKeeper.class, zk.getClass());
|
||||
|
||||
try (Table t = conn.getTable(TableName.valueOf("hbase:meta"))) {
|
||||
LOG.info(TEST_UTIL.countRows(t) + " rows in meta");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void retriesExceededOnAuthFailed() throws Exception {
|
||||
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||
conf.setClass("zookeeper.factory.class", SelfHealingZooKeeperFactory.class,
|
||||
ZooKeeperFactory.class);
|
||||
// Retry one more time than we fail, and validate that we succeed
|
||||
conf.setInt(ZKUtil.AUTH_FAILED_RETRIES_KEY, FAILURES_BEFORE_SUCCESS - 1);
|
||||
// Don't bother waiting
|
||||
conf.setInt(ZKUtil.AUTH_FAILED_PAUSE_KEY, 0);
|
||||
|
||||
Connection conn = null;
|
||||
try {
|
||||
conn = ConnectionFactory.createConnection(conf);
|
||||
} catch (Exception e) {
|
||||
// Our first comms with ZK is to read the clusterId when creating the connection
|
||||
LOG.info("Caught exception, validating it", e);
|
||||
Throwable rootCause = Throwables.getRootCause(e);
|
||||
assertEquals(RuntimeException.class, rootCause.getClass());
|
||||
assertTrue("Expected the exception to contain the text 'AUTH_FAILED'",
|
||||
rootCause.getMessage().contains("AUTH_FAILED"));
|
||||
} finally {
|
||||
if (conn != null) {
|
||||
conn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue