HDFS-13119. RBF: Manage unavailable clusters. Contributed by Yiqun Lin.
(cherry picked from commit 8896d20b91
)
This commit is contained in:
parent
85c611ad7d
commit
7601489995
|
@ -1208,6 +1208,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
|
||||
TimeUnit.SECONDS.toMillis(10);
|
||||
|
||||
// HDFS Router RPC client
|
||||
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
|
||||
FEDERATION_ROUTER_PREFIX + "client.thread-size";
|
||||
public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
|
||||
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
|
||||
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
|
||||
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
|
||||
|
||||
// HDFS Router State Store connection
|
||||
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
|
||||
FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";
|
||||
|
|
|
@ -42,6 +42,8 @@ public interface FederationRPCMBean {
|
|||
|
||||
long getProxyOpNotImplemented();
|
||||
|
||||
long getProxyOpRetries();
|
||||
|
||||
long getRouterFailureStateStoreOps();
|
||||
|
||||
long getRouterFailureReadOnlyOps();
|
||||
|
|
|
@ -56,6 +56,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
|||
private MutableCounterLong proxyOpFailureCommunicate;
|
||||
@Metric("Number of operations not implemented")
|
||||
private MutableCounterLong proxyOpNotImplemented;
|
||||
@Metric("Number of operation retries")
|
||||
private MutableCounterLong proxyOpRetries;
|
||||
|
||||
@Metric("Failed requests due to State Store unavailable")
|
||||
private MutableCounterLong routerFailureStateStore;
|
||||
|
@ -126,6 +128,15 @@ public class FederationRPCMetrics implements FederationRPCMBean {
|
|||
return proxyOpNotImplemented.value();
|
||||
}
|
||||
|
||||
public void incrProxyOpRetries() {
|
||||
proxyOpRetries.incr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProxyOpRetries() {
|
||||
return proxyOpRetries.value();
|
||||
}
|
||||
|
||||
public void incrRouterFailureStateStore() {
|
||||
routerFailureStateStore.incr();
|
||||
}
|
||||
|
|
|
@ -158,6 +158,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
|
|||
metrics.incrProxyOpNotImplemented();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void proxyOpRetries() {
|
||||
metrics.incrProxyOpRetries();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void routerFailureStateStore() {
|
||||
metrics.incrRouterFailureStateStore();
|
||||
|
@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FederationRPCMetrics getRPCMetrics() {
|
||||
return this.metrics;
|
||||
}
|
||||
}
|
|
@ -380,6 +380,14 @@ public class NamenodeStatusReport {
|
|||
return this.numOfBlocksPendingDeletion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the validity of registration.
|
||||
* @param isValid The desired value to be set.
|
||||
*/
|
||||
public void setRegistrationValid(boolean isValid) {
|
||||
this.registrationValid = isValid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%s-%s:%s",
|
||||
|
|
|
@ -46,12 +46,14 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
|
@ -123,10 +125,14 @@ public class RouterRpcClient {
|
|||
this.connectionManager = new ConnectionManager(conf);
|
||||
this.connectionManager.start();
|
||||
|
||||
int numThreads = conf.getInt(
|
||||
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
|
||||
DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
|
||||
ThreadFactory threadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("RPC Router Client-%d")
|
||||
.build();
|
||||
this.executorService = Executors.newCachedThreadPool(threadFactory);
|
||||
this.executorService = Executors.newFixedThreadPool(
|
||||
numThreads, threadFactory);
|
||||
|
||||
this.rpcMonitor = monitor;
|
||||
|
||||
|
@ -134,8 +140,8 @@ public class RouterRpcClient {
|
|||
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
|
||||
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
|
||||
int maxRetryAttempts = conf.getInt(
|
||||
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
|
||||
HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
|
||||
DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
|
||||
DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
|
||||
int failoverSleepBaseMillis = conf.getInt(
|
||||
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
|
||||
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
|
||||
|
@ -274,11 +280,24 @@ public class RouterRpcClient {
|
|||
*
|
||||
* @param ioe IOException reported.
|
||||
* @param retryCount Number of retries.
|
||||
* @param nsId Nameservice ID.
|
||||
* @return Retry decision.
|
||||
* @throws IOException Original exception if the retry policy generates one.
|
||||
* @throws IOException Original exception if the retry policy generates one
|
||||
* or IOException for no available namenodes.
|
||||
*/
|
||||
private RetryDecision shouldRetry(final IOException ioe, final int retryCount)
|
||||
throws IOException {
|
||||
private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
|
||||
final String nsId) throws IOException {
|
||||
// check for the case of cluster unavailable state
|
||||
if (isClusterUnAvailable(nsId)) {
|
||||
// we allow to retry once if cluster is unavailable
|
||||
if (retryCount == 0) {
|
||||
return RetryDecision.RETRY;
|
||||
} else {
|
||||
throw new IOException("No namenode available under nameservice " + nsId,
|
||||
ioe);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
final RetryPolicy.RetryAction a =
|
||||
this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
|
||||
|
@ -329,7 +348,7 @@ public class RouterRpcClient {
|
|||
connection = this.getConnection(ugi, nsId, rpcAddress);
|
||||
ProxyAndInfo<ClientProtocol> client = connection.getClient();
|
||||
ClientProtocol proxy = client.getProxy();
|
||||
ret = invoke(0, method, proxy, params);
|
||||
ret = invoke(nsId, 0, method, proxy, params);
|
||||
if (failover) {
|
||||
// Success on alternate server, update
|
||||
InetSocketAddress address = client.getAddress();
|
||||
|
@ -400,6 +419,8 @@ public class RouterRpcClient {
|
|||
* Re-throws exceptions generated by the remote RPC call as either
|
||||
* RemoteException or IOException.
|
||||
*
|
||||
* @param nsId Identifier for the namespace
|
||||
* @param retryCount Current retry times
|
||||
* @param method Method to invoke
|
||||
* @param obj Target object for the method
|
||||
* @param params Variable parameters
|
||||
|
@ -407,8 +428,8 @@ public class RouterRpcClient {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private Object invoke(int retryCount, final Method method, final Object obj,
|
||||
final Object... params) throws IOException {
|
||||
private Object invoke(String nsId, int retryCount, final Method method,
|
||||
final Object obj, final Object... params) throws IOException {
|
||||
try {
|
||||
return method.invoke(obj, params);
|
||||
} catch (IllegalAccessException e) {
|
||||
|
@ -421,11 +442,16 @@ public class RouterRpcClient {
|
|||
Throwable cause = e.getCause();
|
||||
if (cause instanceof IOException) {
|
||||
IOException ioe = (IOException) cause;
|
||||
|
||||
// Check if we should retry.
|
||||
RetryDecision decision = shouldRetry(ioe, retryCount);
|
||||
RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
|
||||
if (decision == RetryDecision.RETRY) {
|
||||
if (this.rpcMonitor != null) {
|
||||
this.rpcMonitor.proxyOpRetries();
|
||||
}
|
||||
|
||||
// retry
|
||||
return invoke(++retryCount, method, obj, params);
|
||||
return invoke(nsId, ++retryCount, method, obj, params);
|
||||
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
|
||||
// failover, invoker looks for standby exceptions for failover.
|
||||
if (ioe instanceof StandbyException) {
|
||||
|
@ -447,6 +473,29 @@ public class RouterRpcClient {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the cluster of given nameservice id is available.
|
||||
* @param nsId nameservice ID.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean isClusterUnAvailable(String nsId) throws IOException {
|
||||
List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
|
||||
.getNamenodesForNameserviceId(nsId);
|
||||
|
||||
if (nnState != null) {
|
||||
for (FederationNamenodeContext nnContext : nnState) {
|
||||
// Once we find one NN is in active state, we assume this
|
||||
// cluster is available.
|
||||
if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a clean copy of the exception. Sometimes the exceptions returned by the
|
||||
* server contain the full stack trace in the message.
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
|
||||
|
||||
/**
|
||||
|
@ -35,6 +36,12 @@ public interface RouterRpcMonitor {
|
|||
void init(
|
||||
Configuration conf, RouterRpcServer server, StateStoreService store);
|
||||
|
||||
/**
|
||||
* Get Router RPC metrics info.
|
||||
* @return The instance of FederationRPCMetrics.
|
||||
*/
|
||||
FederationRPCMetrics getRPCMetrics();
|
||||
|
||||
/**
|
||||
* Close the monitor.
|
||||
*/
|
||||
|
@ -73,6 +80,12 @@ public interface RouterRpcMonitor {
|
|||
*/
|
||||
void proxyOpNotImplemented();
|
||||
|
||||
/**
|
||||
* Retry to proxy an operation to a Namenode because of an unexpected
|
||||
* exception.
|
||||
*/
|
||||
void proxyOpRetries();
|
||||
|
||||
/**
|
||||
* If the Router cannot contact the State Store in an operation.
|
||||
*/
|
||||
|
|
|
@ -103,6 +103,7 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
|
|||
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
|
||||
|
@ -2143,4 +2144,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
|
|||
combinedData = set.toArray(combinedData);
|
||||
return combinedData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get RPC metrics info.
|
||||
* @return The instance of FederationRPCMetrics.
|
||||
*/
|
||||
public FederationRPCMetrics getRPCMetrics() {
|
||||
return this.rpcMonitor.getRPCMetrics();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5084,4 +5084,20 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.client.thread-size</name>
|
||||
<value>32</value>
|
||||
<description>
|
||||
Max threads size for the RouterClient to execute concurrent
|
||||
requests.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.federation.router.client.retry.max.attempts</name>
|
||||
<value>3</value>
|
||||
<description>
|
||||
Max retry attempts for the RouterClient talking to the Router.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.federation.router;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test retry behavior of the Router RPC Client.
|
||||
*/
|
||||
public class TestRouterRPCClientRetries {
|
||||
|
||||
private static StateStoreDFSCluster cluster;
|
||||
private static NamenodeContext nnContext1;
|
||||
private static RouterContext routerContext;
|
||||
private static MembershipNamenodeResolver resolver;
|
||||
private static ClientProtocol routerProtocol;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Build and start a federated cluster
|
||||
cluster = new StateStoreDFSCluster(false, 2);
|
||||
Configuration routerConf = new RouterConfigBuilder()
|
||||
.stateStore()
|
||||
.admin()
|
||||
.rpc()
|
||||
.build();
|
||||
|
||||
cluster.addRouterOverrides(routerConf);
|
||||
cluster.startCluster();
|
||||
cluster.startRouters();
|
||||
cluster.waitClusterUp();
|
||||
|
||||
nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
|
||||
routerContext = cluster.getRandomRouter();
|
||||
resolver = (MembershipNamenodeResolver) routerContext.getRouter()
|
||||
.getNamenodeResolver();
|
||||
routerProtocol = routerContext.getClient().getNamenode();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
if (cluster != null) {
|
||||
cluster.stopRouter(routerContext);
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryWhenAllNameServiceDown() throws Exception {
|
||||
// shutdown the dfs cluster
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
dfsCluster.shutdown();
|
||||
|
||||
// register an invalid namenode report
|
||||
registerInvalidNameReport();
|
||||
|
||||
// Create a directory via the router
|
||||
String dirPath = "/testRetryWhenClusterisDown";
|
||||
FsPermission permission = new FsPermission("705");
|
||||
try {
|
||||
routerProtocol.mkdirs(dirPath, permission, false);
|
||||
fail("Should have thrown RemoteException error.");
|
||||
} catch (RemoteException e) {
|
||||
String ns0 = cluster.getNameservices().get(0);
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"No namenode available under nameservice " + ns0, e);
|
||||
}
|
||||
|
||||
// Verify the retry times, it should only retry one time.
|
||||
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
|
||||
.getRpcServer().getRPCMetrics();
|
||||
assertEquals(1, rpcMetrics.getProxyOpRetries());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryWhenOneNameServiceDown() throws Exception {
|
||||
// shutdown the dfs cluster
|
||||
MiniDFSCluster dfsCluster = cluster.getCluster();
|
||||
dfsCluster.shutdownNameNode(0);
|
||||
|
||||
// register an invalid namenode report
|
||||
registerInvalidNameReport();
|
||||
|
||||
DFSClient client = nnContext1.getClient();
|
||||
// Renew lease for the DFS client, it will succeed.
|
||||
routerProtocol.renewLease(client.getClientName());
|
||||
|
||||
// Verify the retry times, it will retry one time for ns0.
|
||||
FederationRPCMetrics rpcMetrics = routerContext.getRouter()
|
||||
.getRpcServer().getRPCMetrics();
|
||||
assertEquals(1, rpcMetrics.getProxyOpRetries());
|
||||
}
|
||||
|
||||
/**
|
||||
* Register an invalid namenode report.
|
||||
* @throws IOException
|
||||
*/
|
||||
private void registerInvalidNameReport() throws IOException {
|
||||
String ns0 = cluster.getNameservices().get(0);
|
||||
List<? extends FederationNamenodeContext> origin = resolver
|
||||
.getNamenodesForNameserviceId(ns0);
|
||||
FederationNamenodeContext nnInfo = origin.get(0);
|
||||
NamenodeStatusReport report = new NamenodeStatusReport(ns0,
|
||||
nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
|
||||
nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
|
||||
nnInfo.getWebAddress());
|
||||
report.setRegistrationValid(false);
|
||||
assertTrue(resolver.registerNamenode(report));
|
||||
resolver.loadCache(true);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue