HDFS-13119. RBF: Manage unavailable clusters. Contributed by Yiqun Lin.

(cherry picked from commit 8896d20b91)
This commit is contained in:
Yiqun Lin 2018-02-20 09:37:08 +08:00 committed by Inigo Goiri
parent c16b91fde0
commit 14ddac3174
10 changed files with 288 additions and 11 deletions

View File

@ -1086,6 +1086,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
// HDFS Router RPC client
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
FEDERATION_ROUTER_PREFIX + "client.thread-size";
public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";

View File

@ -42,6 +42,8 @@ public interface FederationRPCMBean {
long getProxyOpNotImplemented();
long getProxyOpRetries();
long getRouterFailureStateStoreOps();
long getRouterFailureReadOnlyOps();

View File

@ -56,6 +56,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations not implemented")
private MutableCounterLong proxyOpNotImplemented;
@Metric("Number of operation retries")
private MutableCounterLong proxyOpRetries;
@Metric("Failed requests due to State Store unavailable")
private MutableCounterLong routerFailureStateStore;
@ -126,6 +128,15 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return proxyOpNotImplemented.value();
}
public void incrProxyOpRetries() {
proxyOpRetries.incr();
}
@Override
public long getProxyOpRetries() {
return proxyOpRetries.value();
}
public void incrRouterFailureStateStore() {
routerFailureStateStore.incr();
}

View File

@ -158,6 +158,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
metrics.incrProxyOpNotImplemented();
}
@Override
public void proxyOpRetries() {
metrics.incrProxyOpRetries();
}
@Override
public void routerFailureStateStore() {
metrics.incrRouterFailureStateStore();
@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
return -1;
}
@Override
public FederationRPCMetrics getRPCMetrics() {
return this.metrics;
}
}

View File

@ -380,6 +380,14 @@ public class NamenodeStatusReport {
return this.numOfBlocksPendingDeletion;
}
/**
* Set the validity of registration.
* @param isValid The desired value to be set.
*/
public void setRegistrationValid(boolean isValid) {
this.registrationValid = isValid;
}
@Override
public String toString() {
return String.format("%s-%s:%s",

View File

@ -46,12 +46,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@ -123,10 +125,14 @@ public class RouterRpcClient {
this.connectionManager = new ConnectionManager(conf);
this.connectionManager.start();
int numThreads = conf.getInt(
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
this.executorService = Executors.newCachedThreadPool(threadFactory);
this.executorService = Executors.newFixedThreadPool(
numThreads, threadFactory);
this.rpcMonitor = monitor;
@ -134,8 +140,8 @@ public class RouterRpcClient {
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
@ -266,11 +272,24 @@ public class RouterRpcClient {
*
* @param ioe IOException reported.
* @param retryCount Number of retries.
* @param nsId Nameservice ID.
* @return Retry decision.
* @throws IOException Original exception if the retry policy generates one.
* @throws IOException Original exception if the retry policy generates one
* or IOException for no available namenodes.
*/
private RetryDecision shouldRetry(final IOException ioe, final int retryCount)
throws IOException {
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
final String nsId) throws IOException {
// check for the case of cluster unavailable state
if (isClusterUnAvailable(nsId)) {
// we allow to retry once if cluster is unavailable
if (retryCount == 0) {
return RetryDecision.RETRY;
} else {
throw new IOException("No namenode available under nameservice " + nsId,
ioe);
}
}
try {
final RetryPolicy.RetryAction a =
this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
@ -321,7 +340,7 @@ public class RouterRpcClient {
connection = this.getConnection(ugi, nsId, rpcAddress);
ProxyAndInfo<ClientProtocol> client = connection.getClient();
ClientProtocol proxy = client.getProxy();
ret = invoke(0, method, proxy, params);
ret = invoke(nsId, 0, method, proxy, params);
if (failover) {
// Success on alternate server, update
InetSocketAddress address = client.getAddress();
@ -392,6 +411,8 @@ public class RouterRpcClient {
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param nsId Identifier for the namespace
* @param retryCount Current retry times
* @param method Method to invoke
* @param obj Target object for the method
* @param params Variable parameters
@ -399,8 +420,8 @@ public class RouterRpcClient {
* @throws IOException
* @throws InterruptedException
*/
private Object invoke(int retryCount, final Method method, final Object obj,
final Object... params) throws IOException {
private Object invoke(String nsId, int retryCount, final Method method,
final Object obj, final Object... params) throws IOException {
try {
return method.invoke(obj, params);
} catch (IllegalAccessException e) {
@ -413,11 +434,16 @@ public class RouterRpcClient {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
IOException ioe = (IOException) cause;
// Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount);
RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
if (decision == RetryDecision.RETRY) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpRetries();
}
// retry
return invoke(++retryCount, method, obj, params);
return invoke(nsId, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
@ -439,6 +465,29 @@ public class RouterRpcClient {
}
}
/**
* Check if the cluster of given nameservice id is available.
* @param nsId nameservice ID.
* @return
* @throws IOException
*/
private boolean isClusterUnAvailable(String nsId) throws IOException {
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
.getNamenodesForNameserviceId(nsId);
if (nnState != null) {
for (FederationNamenodeContext nnContext : nnState) {
// Once we find one NN is in active state, we assume this
// cluster is available.
if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
return false;
}
}
}
return true;
}
/**
* Get a clean copy of the exception. Sometimes the exceptions returned by the
* server contain the full stack trace in the message.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
/**
@ -35,6 +36,12 @@ public interface RouterRpcMonitor {
void init(
Configuration conf, RouterRpcServer server, StateStoreService store);
/**
* Get Router RPC metrics info.
* @return The instance of FederationRPCMetrics.
*/
FederationRPCMetrics getRPCMetrics();
/**
* Close the monitor.
*/
@ -73,6 +80,12 @@ public interface RouterRpcMonitor {
*/
void proxyOpNotImplemented();
/**
* Retry to proxy an operation to a Namenode because of an unexpected
* exception.
*/
void proxyOpRetries();
/**
* If the Router cannot contact the State Store in an operation.
*/

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -2005,4 +2006,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
/**
* Get RPC metrics info.
* @return The instance of FederationRPCMetrics.
*/
public FederationRPCMetrics getRPCMetrics() {
return this.rpcMonitor.getRPCMetrics();
}
}

View File

@ -4743,4 +4743,20 @@
</description>
</property>
<property>
<name>dfs.federation.router.client.thread-size</name>
<value>32</value>
<description>
Max threads size for the RouterClient to execute concurrent
requests.
</description>
</property>
<property>
<name>dfs.federation.router.client.retry.max.attempts</name>
<value>3</value>
<description>
Max retry attempts for the RouterClient talking to the Router.
</description>
</property>
</configuration>

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test retry behavior of the Router RPC Client.
*/
public class TestRouterRPCClientRetries {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MembershipNamenodeResolver resolver;
private static ClientProtocol routerProtocol;
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
routerContext = cluster.getRandomRouter();
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
.getNamenodeResolver();
routerProtocol = routerContext.getClient().getNamenode();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testRetryWhenAllNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdown();
// register an invalid namenode report
registerInvalidNameReport();
// Create a directory via the router
String dirPath = "/testRetryWhenClusterisDown";
FsPermission permission = new FsPermission("705");
try {
routerProtocol.mkdirs(dirPath, permission, false);
fail("Should have thrown RemoteException error.");
} catch (RemoteException e) {
String ns0 = cluster.getNameservices().get(0);
GenericTestUtils.assertExceptionContains(
"No namenode available under nameservice " + ns0, e);
}
// Verify the retry times, it should only retry one time.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
@Test
public void testRetryWhenOneNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
// register an invalid namenode report
registerInvalidNameReport();
DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
routerProtocol.renewLease(client.getClientName());
// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
/**
* Register an invalid namenode report.
* @throws IOException
*/
private void registerInvalidNameReport() throws IOException {
String ns0 = cluster.getNameservices().get(0);
List<? extends FederationNamenodeContext> origin = resolver
.getNamenodesForNameserviceId(ns0);
FederationNamenodeContext nnInfo = origin.get(0);
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
nnInfo.getWebAddress());
report.setRegistrationValid(false);
assertTrue(resolver.registerNamenode(report));
resolver.loadCache(true);
}
}