diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dac9e71ae6b..673d3293b09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1086,6 +1086,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(10); + // HDFS Router RPC client + public static final String DFS_ROUTER_CLIENT_THREADS_SIZE = + FEDERATION_ROUTER_PREFIX + "client.thread-size"; + public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32; + public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS = + FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts"; + public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3; + // HDFS Router State Store connection public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS = FEDERATION_ROUTER_PREFIX + "file.resolver.client.class"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 00209e96784..3e031fefecb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -42,6 +42,8 @@ public interface FederationRPCMBean { long getProxyOpNotImplemented(); + long getProxyOpRetries(); + long getRouterFailureStateStoreOps(); long getRouterFailureReadOnlyOps(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 8995689b02f..94d3383d105 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -56,6 +56,8 @@ public class FederationRPCMetrics implements FederationRPCMBean { private MutableCounterLong proxyOpFailureCommunicate; @Metric("Number of operations not implemented") private MutableCounterLong proxyOpNotImplemented; + @Metric("Number of operation retries") + private MutableCounterLong proxyOpRetries; @Metric("Failed requests due to State Store unavailable") private MutableCounterLong routerFailureStateStore; @@ -126,6 +128,15 @@ public class FederationRPCMetrics implements FederationRPCMBean { return proxyOpNotImplemented.value(); } + public void incrProxyOpRetries() { + proxyOpRetries.incr(); + } + + @Override + public long getProxyOpRetries() { + return proxyOpRetries.value(); + } + public void incrRouterFailureStateStore() { routerFailureStateStore.incr(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index e3a16b55c44..547ebb567d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -158,6 +158,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { metrics.incrProxyOpNotImplemented(); } + @Override + public void proxyOpRetries() { + metrics.incrProxyOpRetries(); + } + @Override public void routerFailureStateStore() { metrics.incrRouterFailureStateStore(); @@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor { } return -1; } + + @Override + public FederationRPCMetrics getRPCMetrics() { + return this.metrics; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java index 555e2eebe22..9d2efe27e91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java @@ -380,6 +380,14 @@ public class NamenodeStatusReport { return this.numOfBlocksPendingDeletion; } + /** + * Set the validity of registration. + * @param isValid The desired value to be set. + */ + public void setRegistrationValid(boolean isValid) { + this.registrationValid = isValid; + } + @Override public String toString() { return String.format("%s-%s:%s", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index cac37132ad2..06add71cbec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -46,12 +46,14 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; @@ -123,10 +125,14 @@ public class RouterRpcClient { this.connectionManager = new ConnectionManager(conf); this.connectionManager.start(); + int numThreads = conf.getInt( + DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, + DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("RPC Router Client-%d") .build(); - this.executorService = Executors.newCachedThreadPool(threadFactory); + this.executorService = Executors.newFixedThreadPool( + numThreads, threadFactory); this.rpcMonitor = monitor; @@ -134,8 +140,8 @@ public class RouterRpcClient { HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY, HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT); int maxRetryAttempts = conf.getInt( - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY, - HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT); + DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS, + DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT); int failoverSleepBaseMillis = conf.getInt( HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY, HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT); @@ -266,11 +272,24 @@ public class RouterRpcClient { * * @param ioe IOException reported. * @param retryCount Number of retries. + * @param nsId Nameservice ID. * @return Retry decision. - * @throws IOException Original exception if the retry policy generates one. + * @throws IOException Original exception if the retry policy generates one + * or IOException for no available namenodes. */ - private RetryDecision shouldRetry(final IOException ioe, final int retryCount) - throws IOException { + private RetryDecision shouldRetry(final IOException ioe, final int retryCount, + final String nsId) throws IOException { + // check for the case of cluster unavailable state + if (isClusterUnAvailable(nsId)) { + // we allow to retry once if cluster is unavailable + if (retryCount == 0) { + return RetryDecision.RETRY; + } else { + throw new IOException("No namenode available under nameservice " + nsId, + ioe); + } + } + try { final RetryPolicy.RetryAction a = this.retryPolicy.shouldRetry(ioe, retryCount, 0, true); @@ -321,7 +340,7 @@ public class RouterRpcClient { connection = this.getConnection(ugi, nsId, rpcAddress); ProxyAndInfo client = connection.getClient(); ClientProtocol proxy = client.getProxy(); - ret = invoke(0, method, proxy, params); + ret = invoke(nsId, 0, method, proxy, params); if (failover) { // Success on alternate server, update InetSocketAddress address = client.getAddress(); @@ -392,6 +411,8 @@ public class RouterRpcClient { * Re-throws exceptions generated by the remote RPC call as either * RemoteException or IOException. * + * @param nsId Identifier for the namespace + * @param retryCount Current retry times * @param method Method to invoke * @param obj Target object for the method * @param params Variable parameters @@ -399,8 +420,8 @@ public class RouterRpcClient { * @throws IOException * @throws InterruptedException */ - private Object invoke(int retryCount, final Method method, final Object obj, - final Object... params) throws IOException { + private Object invoke(String nsId, int retryCount, final Method method, + final Object obj, final Object... params) throws IOException { try { return method.invoke(obj, params); } catch (IllegalAccessException e) { @@ -413,11 +434,16 @@ public class RouterRpcClient { Throwable cause = e.getCause(); if (cause instanceof IOException) { IOException ioe = (IOException) cause; + // Check if we should retry. - RetryDecision decision = shouldRetry(ioe, retryCount); + RetryDecision decision = shouldRetry(ioe, retryCount, nsId); if (decision == RetryDecision.RETRY) { + if (this.rpcMonitor != null) { + this.rpcMonitor.proxyOpRetries(); + } + // retry - return invoke(++retryCount, method, obj, params); + return invoke(nsId, ++retryCount, method, obj, params); } else if (decision == RetryDecision.FAILOVER_AND_RETRY) { // failover, invoker looks for standby exceptions for failover. if (ioe instanceof StandbyException) { @@ -439,6 +465,29 @@ public class RouterRpcClient { } } + /** + * Check if the cluster of given nameservice id is available. + * @param nsId nameservice ID. + * @return + * @throws IOException + */ + private boolean isClusterUnAvailable(String nsId) throws IOException { + List nnState = this.namenodeResolver + .getNamenodesForNameserviceId(nsId); + + if (nnState != null) { + for (FederationNamenodeContext nnContext : nnState) { + // Once we find one NN is in active state, we assume this + // cluster is available. + if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) { + return false; + } + } + } + + return true; + } + /** * Get a clean copy of the exception. Sometimes the exceptions returned by the * server contain the full stack trace in the message. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index d889a56433d..df9aa111596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.federation.router; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; /** @@ -35,6 +36,12 @@ public interface RouterRpcMonitor { void init( Configuration conf, RouterRpcServer server, StateStoreService store); + /** + * Get Router RPC metrics info. + * @return The instance of FederationRPCMetrics. + */ + FederationRPCMetrics getRPCMetrics(); + /** * Close the monitor. */ @@ -73,6 +80,12 @@ public interface RouterRpcMonitor { */ void proxyOpNotImplemented(); + /** + * Retry to proxy an operation to a Namenode because of an unexpected + * exception. + */ + void proxyOpRetries(); + /** * If the Router cannot contact the State Store in an operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 846d676e517..ba0a82baa74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo; import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; @@ -2005,4 +2006,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol { UserGroupInformation ugi = Server.getRemoteUser(); return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser(); } + + /** + * Get RPC metrics info. + * @return The instance of FederationRPCMetrics. + */ + public FederationRPCMetrics getRPCMetrics() { + return this.rpcMonitor.getRPCMetrics(); + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 5bbd8536144..eeff8060d2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4743,4 +4743,20 @@ + + dfs.federation.router.client.thread-size + 32 + + Max threads size for the RouterClient to execute concurrent + requests. + + + + + dfs.federation.router.client.retry.max.attempts + 3 + + Max retry attempts for the RouterClient talking to the Router. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java new file mode 100644 index 00000000000..dddcb5a9a5e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test retry behavior of the Router RPC Client. + */ +public class TestRouterRPCClientRetries { + + private static StateStoreDFSCluster cluster; + private static NamenodeContext nnContext1; + private static RouterContext routerContext; + private static MembershipNamenodeResolver resolver; + private static ClientProtocol routerProtocol; + + @Before + public void setUp() throws Exception { + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + + cluster.addRouterOverrides(routerConf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null); + routerContext = cluster.getRandomRouter(); + resolver = (MembershipNamenodeResolver) routerContext.getRouter() + .getNamenodeResolver(); + routerProtocol = routerContext.getClient().getNamenode(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testRetryWhenAllNameServiceDown() throws Exception { + // shutdown the dfs cluster + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdown(); + + // register an invalid namenode report + registerInvalidNameReport(); + + // Create a directory via the router + String dirPath = "/testRetryWhenClusterisDown"; + FsPermission permission = new FsPermission("705"); + try { + routerProtocol.mkdirs(dirPath, permission, false); + fail("Should have thrown RemoteException error."); + } catch (RemoteException e) { + String ns0 = cluster.getNameservices().get(0); + GenericTestUtils.assertExceptionContains( + "No namenode available under nameservice " + ns0, e); + } + + // Verify the retry times, it should only retry one time. + FederationRPCMetrics rpcMetrics = routerContext.getRouter() + .getRpcServer().getRPCMetrics(); + assertEquals(1, rpcMetrics.getProxyOpRetries()); + } + + @Test + public void testRetryWhenOneNameServiceDown() throws Exception { + // shutdown the dfs cluster + MiniDFSCluster dfsCluster = cluster.getCluster(); + dfsCluster.shutdownNameNode(0); + + // register an invalid namenode report + registerInvalidNameReport(); + + DFSClient client = nnContext1.getClient(); + // Renew lease for the DFS client, it will succeed. + routerProtocol.renewLease(client.getClientName()); + + // Verify the retry times, it will retry one time for ns0. + FederationRPCMetrics rpcMetrics = routerContext.getRouter() + .getRpcServer().getRPCMetrics(); + assertEquals(1, rpcMetrics.getProxyOpRetries()); + } + + /** + * Register an invalid namenode report. + * @throws IOException + */ + private void registerInvalidNameReport() throws IOException { + String ns0 = cluster.getNameservices().get(0); + List origin = resolver + .getNamenodesForNameserviceId(ns0); + FederationNamenodeContext nnInfo = origin.get(0); + NamenodeStatusReport report = new NamenodeStatusReport(ns0, + nnInfo.getNamenodeId(), nnInfo.getRpcAddress(), + nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(), + nnInfo.getWebAddress()); + report.setRegistrationValid(false); + assertTrue(resolver.registerNamenode(report)); + resolver.loadCache(true); + } +}