HDFS-13119. RBF: Manage unavailable clusters. Contributed by Yiqun Lin.

This commit is contained in:
Yiqun Lin 2018-02-20 09:37:08 +08:00
parent 1d37cf675c
commit 8896d20b91
10 changed files with 289 additions and 11 deletions

View File

@ -1246,6 +1246,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10); TimeUnit.SECONDS.toMillis(10);
// HDFS Router RPC client
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
FEDERATION_ROUTER_PREFIX + "client.thread-size";
public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
// HDFS Router State Store connection // HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "file.resolver.client.class"; FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";

View File

@ -42,6 +42,8 @@ public interface FederationRPCMBean {
long getProxyOpNotImplemented(); long getProxyOpNotImplemented();
long getProxyOpRetries();
long getRouterFailureStateStoreOps(); long getRouterFailureStateStoreOps();
long getRouterFailureReadOnlyOps(); long getRouterFailureReadOnlyOps();

View File

@ -56,6 +56,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableCounterLong proxyOpFailureCommunicate; private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations not implemented") @Metric("Number of operations not implemented")
private MutableCounterLong proxyOpNotImplemented; private MutableCounterLong proxyOpNotImplemented;
@Metric("Number of operation retries")
private MutableCounterLong proxyOpRetries;
@Metric("Failed requests due to State Store unavailable") @Metric("Failed requests due to State Store unavailable")
private MutableCounterLong routerFailureStateStore; private MutableCounterLong routerFailureStateStore;
@ -126,6 +128,15 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return proxyOpNotImplemented.value(); return proxyOpNotImplemented.value();
} }
public void incrProxyOpRetries() {
proxyOpRetries.incr();
}
@Override
public long getProxyOpRetries() {
return proxyOpRetries.value();
}
public void incrRouterFailureStateStore() { public void incrRouterFailureStateStore() {
routerFailureStateStore.incr(); routerFailureStateStore.incr();
} }

View File

@ -158,6 +158,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
metrics.incrProxyOpNotImplemented(); metrics.incrProxyOpNotImplemented();
} }
@Override
public void proxyOpRetries() {
metrics.incrProxyOpRetries();
}
@Override @Override
public void routerFailureStateStore() { public void routerFailureStateStore() {
metrics.incrRouterFailureStateStore(); metrics.incrRouterFailureStateStore();
@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
} }
return -1; return -1;
} }
@Override
public FederationRPCMetrics getRPCMetrics() {
return this.metrics;
}
} }

View File

@ -390,6 +390,14 @@ public class NamenodeStatusReport {
return this.numOfBlocksPendingDeletion; return this.numOfBlocksPendingDeletion;
} }
/**
* Set the validity of registration.
* @param isValid The desired value to be set.
*/
public void setRegistrationValid(boolean isValid) {
this.registrationValid = isValid;
}
@Override @Override
public String toString() { public String toString() {
return String.format("%s-%s:%s", return String.format("%s-%s:%s",

View File

@ -46,12 +46,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
@ -123,10 +125,14 @@ public class RouterRpcClient {
this.connectionManager = new ConnectionManager(conf); this.connectionManager = new ConnectionManager(conf);
this.connectionManager.start(); this.connectionManager.start();
int numThreads = conf.getInt(
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder() ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d") .setNameFormat("RPC Router Client-%d")
.build(); .build();
this.executorService = Executors.newCachedThreadPool(threadFactory); this.executorService = Executors.newFixedThreadPool(
numThreads, threadFactory);
this.rpcMonitor = monitor; this.rpcMonitor = monitor;
@ -134,8 +140,8 @@ public class RouterRpcClient {
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt( int maxRetryAttempts = conf.getInt(
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt( int failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
@ -274,11 +280,24 @@ public class RouterRpcClient {
* *
* @param ioe IOException reported. * @param ioe IOException reported.
* @param retryCount Number of retries. * @param retryCount Number of retries.
* @param nsId Nameservice ID.
* @return Retry decision. * @return Retry decision.
* @throws IOException Original exception if the retry policy generates one. * @throws IOException Original exception if the retry policy generates one
* or IOException for no available namenodes.
*/ */
private RetryDecision shouldRetry(final IOException ioe, final int retryCount) private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
throws IOException { final String nsId) throws IOException {
// check for the case of cluster unavailable state
if (isClusterUnAvailable(nsId)) {
// we allow to retry once if cluster is unavailable
if (retryCount == 0) {
return RetryDecision.RETRY;
} else {
throw new IOException("No namenode available under nameservice " + nsId,
ioe);
}
}
try { try {
final RetryPolicy.RetryAction a = final RetryPolicy.RetryAction a =
this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
@ -329,7 +348,7 @@ public class RouterRpcClient {
connection = this.getConnection(ugi, nsId, rpcAddress); connection = this.getConnection(ugi, nsId, rpcAddress);
ProxyAndInfo<ClientProtocol> client = connection.getClient(); ProxyAndInfo<ClientProtocol> client = connection.getClient();
ClientProtocol proxy = client.getProxy(); ClientProtocol proxy = client.getProxy();
ret = invoke(0, method, proxy, params); ret = invoke(nsId, 0, method, proxy, params);
if (failover) { if (failover) {
// Success on alternate server, update // Success on alternate server, update
InetSocketAddress address = client.getAddress(); InetSocketAddress address = client.getAddress();
@ -400,6 +419,8 @@ public class RouterRpcClient {
* Re-throws exceptions generated by the remote RPC call as either * Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException. * RemoteException or IOException.
* *
* @param nsId Identifier for the namespace
* @param retryCount Current retry times
* @param method Method to invoke * @param method Method to invoke
* @param obj Target object for the method * @param obj Target object for the method
* @param params Variable parameters * @param params Variable parameters
@ -407,8 +428,8 @@ public class RouterRpcClient {
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
private Object invoke(int retryCount, final Method method, final Object obj, private Object invoke(String nsId, int retryCount, final Method method,
final Object... params) throws IOException { final Object obj, final Object... params) throws IOException {
try { try {
return method.invoke(obj, params); return method.invoke(obj, params);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -421,11 +442,16 @@ public class RouterRpcClient {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
if (cause instanceof IOException) { if (cause instanceof IOException) {
IOException ioe = (IOException) cause; IOException ioe = (IOException) cause;
// Check if we should retry. // Check if we should retry.
RetryDecision decision = shouldRetry(ioe, retryCount); RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
if (decision == RetryDecision.RETRY) { if (decision == RetryDecision.RETRY) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpRetries();
}
// retry // retry
return invoke(++retryCount, method, obj, params); return invoke(nsId, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) { } else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover. // failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) { if (ioe instanceof StandbyException) {
@ -447,6 +473,29 @@ public class RouterRpcClient {
} }
} }
/**
* Check if the cluster of given nameservice id is available.
* @param nsId nameservice ID.
* @return
* @throws IOException
*/
private boolean isClusterUnAvailable(String nsId) throws IOException {
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
.getNamenodesForNameserviceId(nsId);
if (nnState != null) {
for (FederationNamenodeContext nnContext : nnState) {
// Once we find one NN is in active state, we assume this
// cluster is available.
if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
return false;
}
}
}
return true;
}
/** /**
* Get a clean copy of the exception. Sometimes the exceptions returned by the * Get a clean copy of the exception. Sometimes the exceptions returned by the
* server contain the full stack trace in the message. * server contain the full stack trace in the message.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router; package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
/** /**
@ -35,6 +36,12 @@ public interface RouterRpcMonitor {
void init( void init(
Configuration conf, RouterRpcServer server, StateStoreService store); Configuration conf, RouterRpcServer server, StateStoreService store);
/**
* Get Router RPC metrics info.
* @return The instance of FederationRPCMetrics.
*/
FederationRPCMetrics getRPCMetrics();
/** /**
* Close the monitor. * Close the monitor.
*/ */
@ -73,6 +80,12 @@ public interface RouterRpcMonitor {
*/ */
void proxyOpNotImplemented(); void proxyOpNotImplemented();
/**
* Retry to proxy an operation to a Namenode because of an unexpected
* exception.
*/
void proxyOpRetries();
/** /**
* If the Router cannot contact the State Store in an operation. * If the Router cannot contact the State Store in an operation.
*/ */

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@ -2177,4 +2178,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
public Quota getQuotaModule() { public Quota getQuotaModule() {
return this.quotaCall; return this.quotaCall;
} }
/**
* Get RPC metrics info.
* @return The instance of FederationRPCMetrics.
*/
public FederationRPCMetrics getRPCMetrics() {
return this.rpcMonitor.getRPCMetrics();
}
} }

View File

@ -5234,4 +5234,21 @@
is assumed. is assumed.
</description> </description>
</property> </property>
<property>
<name>dfs.federation.router.client.thread-size</name>
<value>32</value>
<description>
Max threads size for the RouterClient to execute concurrent
requests.
</description>
</property>
<property>
<name>dfs.federation.router.client.retry.max.attempts</name>
<value>3</value>
<description>
Max retry attempts for the RouterClient talking to the Router.
</description>
</property>
</configuration> </configuration>

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test retry behavior of the Router RPC Client.
*/
public class TestRouterRPCClientRetries {
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
private static MembershipNamenodeResolver resolver;
private static ClientProtocol routerProtocol;
@Before
public void setUp() throws Exception {
// Build and start a federated cluster
cluster = new StateStoreDFSCluster(false, 2);
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startCluster();
cluster.startRouters();
cluster.waitClusterUp();
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
routerContext = cluster.getRandomRouter();
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
.getNamenodeResolver();
routerProtocol = routerContext.getClient().getNamenode();
}
@After
public void tearDown() {
if (cluster != null) {
cluster.stopRouter(routerContext);
cluster.shutdown();
cluster = null;
}
}
@Test
public void testRetryWhenAllNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdown();
// register an invalid namenode report
registerInvalidNameReport();
// Create a directory via the router
String dirPath = "/testRetryWhenClusterisDown";
FsPermission permission = new FsPermission("705");
try {
routerProtocol.mkdirs(dirPath, permission, false);
fail("Should have thrown RemoteException error.");
} catch (RemoteException e) {
String ns0 = cluster.getNameservices().get(0);
GenericTestUtils.assertExceptionContains(
"No namenode available under nameservice " + ns0, e);
}
// Verify the retry times, it should only retry one time.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
@Test
public void testRetryWhenOneNameServiceDown() throws Exception {
// shutdown the dfs cluster
MiniDFSCluster dfsCluster = cluster.getCluster();
dfsCluster.shutdownNameNode(0);
// register an invalid namenode report
registerInvalidNameReport();
DFSClient client = nnContext1.getClient();
// Renew lease for the DFS client, it will succeed.
routerProtocol.renewLease(client.getClientName());
// Verify the retry times, it will retry one time for ns0.
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
.getRpcServer().getRPCMetrics();
assertEquals(1, rpcMetrics.getProxyOpRetries());
}
/**
* Register an invalid namenode report.
* @throws IOException
*/
private void registerInvalidNameReport() throws IOException {
String ns0 = cluster.getNameservices().get(0);
List<? extends FederationNamenodeContext> origin = resolver
.getNamenodesForNameserviceId(ns0);
FederationNamenodeContext nnInfo = origin.get(0);
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
nnInfo.getWebAddress());
report.setRegistrationValid(false);
assertTrue(resolver.registerNamenode(report));
resolver.loadCache(true);
}
}