From 61f369c43e254796f997ec034a35ca764d723e38 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Mon, 27 Feb 2023 09:56:24 -0800 Subject: [PATCH] HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298) --- .../federation/router/RBFConfigKeys.java | 4 ++ .../federation/router/RouterRpcClient.java | 58 +++++++++++++++++-- .../src/main/resources/hdfs-rbf-default.xml | 10 ++++ .../router/TestObserverWithRouter.java | 45 +++++++++++++- 4 files changed, 110 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7e07d7b6549..c0ee9504597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -201,6 +201,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; + public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY = + FEDERATION_ROUTER_PREFIX + "observer.state.id.refresh.period"; + public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT = "15s"; + public static final String FEDERATION_STORE_SERIALIZER_CLASS = FEDERATION_STORE_PREFIX + "serializer"; public static final Class diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 06e64439011..92f1fc06a81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -57,6 +57,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -86,6 +87,7 @@ import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -136,6 +138,14 @@ public class RouterRpcClient { private final boolean observerReadEnabledDefault; /** Nameservice specific overrides of the default setting for enabling observer reads. */ private HashSet observerReadEnabledOverrides = new HashSet<>(); + /** + * Period to refresh namespace stateID using active namenode. + * This ensures the namespace stateID is fresh even when an + * observer is trailing behind. + */ + private long activeNNStateIdRefreshPeriodMs; + /** Last msync times for each namespace. */ + private final ConcurrentHashMap lastActiveNNRefreshTimes; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -211,13 +221,25 @@ public class RouterRpcClient { this.observerReadEnabledDefault = conf.getBoolean( RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE); - String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES); + String[] observerReadOverrides = + conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES); if (observerReadOverrides != null) { observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides)); } if (this.observerReadEnabledDefault) { LOG.info("Observer read is enabled for router."); } + this.activeNNStateIdRefreshPeriodMs = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, + RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT, + TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + if (activeNNStateIdRefreshPeriodMs < 0) { + LOG.info("Periodic stateId freshness check is disabled" + + " since '{}' is {}ms, which is less than 0.", + RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, + activeNNStateIdRefreshPeriodMs); + } + this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>(); } /** @@ -1707,10 +1729,13 @@ public class RouterRpcClient { boolean isObserverRead) throws IOException { final List namenodes; - if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) { - namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead); - } else { - namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false); + boolean listObserverNamenodesFirst = isObserverRead + && isNamespaceStateIdFresh(nsId) + && (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE); + namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, listObserverNamenodesFirst); + if (!listObserverNamenodesFirst) { + // Refresh time of last call to active NameNode. + getTimeOfLastCallToActive(nsId).accumulate(Time.monotonicNow()); } if (namenodes == null || namenodes.isEmpty()) { @@ -1721,7 +1746,8 @@ public class RouterRpcClient { } private boolean isObserverReadEligible(String nsId, Method method) { - boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); + boolean isReadEnabledForNamespace = + observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); return isReadEnabledForNamespace && isReadCall(method); } @@ -1735,4 +1761,24 @@ public class RouterRpcClient { } return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); } + + /** + * Checks and sets last refresh time for a namespace's stateId. + * Returns true if refresh time is newer than threshold. + * Otherwise, return false and call should be handled by active namenode. + * @param nsId namespaceID + */ + @VisibleForTesting + boolean isNamespaceStateIdFresh(String nsId) { + if (activeNNStateIdRefreshPeriodMs < 0) { + return true; + } + long timeSinceRefreshMs = Time.monotonicNow() - getTimeOfLastCallToActive(nsId).get(); + return (timeSinceRefreshMs <= activeNNStateIdRefreshPeriodMs); + } + + private LongAccumulator getTimeOfLastCallToActive(String namespaceId) { + return lastActiveNNRefreshTimes + .computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index b5096cd253d..79a16cc2022 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -884,4 +884,14 @@ of namespaces in use and the latency of the msync requests. + + + dfs.federation.router.observer.state.id.refresh.period + 15s + + Period to refresh namespace stateID using active namenode. This ensures the + namespace stateID is refresh even when an observer is trailing behind. + If this is below 0, the auto-refresh is disabled. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 45001b461ba..72e8f8f66d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -34,9 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto; @@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; @@ -95,7 +98,9 @@ public class TestObserverWithRouter { conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); if (confOverrides != null) { - conf.addResource(confOverrides); + confOverrides + .iterator() + .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue())); } cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode); cluster.addNamenodeOverrides(conf); @@ -639,4 +644,42 @@ public class TestObserverWithRouter { assertEquals("ns0", namespace1.get(0)); assertTrue(namespace2.isEmpty()); } + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception { + Path rootPath = new Path("/"); + + Configuration confOverride = new Configuration(false); + confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms"); + confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s"); + startUpCluster(1, confOverride); + + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem.listStatus(rootPath); + int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length; + + DFSClient activeClient = cluster.getNamenodes("ns0") + .stream() + .filter(nnContext -> nnContext.getNamenode().isActiveState()) + .findFirst().orElseThrow(() -> new IllegalStateException("No active namenode.")) + .getClient(); + + for (int i = 0; i < 10; i++) { + activeClient.mkdirs("/dir" + i, null, false); + } + activeClient.close(); + + // Wait long enough for state in router to be considered stale. + GenericTestUtils.waitFor( + () -> !routerContext + .getRouterRpcClient() + .isNamespaceStateIdFresh("ns0"), + 100, + 10000, + "Timeout: Namespace state was never considered stale."); + FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath); + assertEquals("List-status should show newly created directories.", + initialLengthOfRootListing + 10, rootFolderAfterMkdir.length); + } }