From d92a5815f45b7c2c33a550f150c7616f7954b8df Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Wed, 31 May 2023 10:20:19 -0700 Subject: [PATCH] HDFS-17027. RBF: Adds auto-msync support for clients connecting to routers. (#5693) --- .../ha/RouterObserverReadProxyProvider.java | 223 +++++++++++++++ hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml | 5 + .../router/TestObserverWithRouter.java | 267 +++++++++++++++--- 3 files changed, 456 insertions(+), 39 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java new file mode 100644 index 00000000000..e494e524299 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RouterObserverReadProxyProvider.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.URI; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RpcInvocationHandler; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX; +import static org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_DEFAULT; + +/** + * A {@link org.apache.hadoop.io.retry.FailoverProxyProvider} implementation + * to support automatic msync-ing when using routers. + * + * This constructs a wrapper proxy around an internal one, and + * injects msync calls when necessary via the InvocationHandler. + */ +public class RouterObserverReadProxyProvider extends AbstractNNFailoverProxyProvider { + @VisibleForTesting + static final Logger LOG = LoggerFactory.getLogger(ObserverReadProxyProvider.class); + + /** Client-side context for syncing with the NameNode server side. */ + private final AlignmentContext alignmentContext; + + /** The inner proxy provider used for active/standby failover. */ + private final AbstractNNFailoverProxyProvider innerProxy; + + /** The proxy which redirects the internal one. */ + private final ProxyInfo wrapperProxy; + + /** + * Whether reading from observer is enabled. If this is false, this proxy + * will not call msync. + */ + private final boolean observerReadEnabled; + + /** + * This adjusts how frequently this proxy provider should auto-msync to the + * Active NameNode, automatically performing an msync() call to the active + * to fetch the current transaction ID before submitting read requests to + * observer nodes. See HDFS-14211 for more description of this feature. + * If this is below 0, never auto-msync. If this is 0, perform an msync on + * every read operation. If this is above 0, perform an msync after this many + * ms have elapsed since the last msync. + */ + private final long autoMsyncPeriodMs; + + /** + * The time, in millisecond epoch, that the last msync operation was + * performed. This includes any implicit msync (any operation which is + * serviced by the Active NameNode). + */ + private volatile long lastMsyncTimeMs = -1; + + public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class xface, + HAProxyFactory factory) { + this(conf, uri, xface, factory, new IPFailoverProxyProvider<>(conf, uri, xface, factory)); + } + + @SuppressWarnings("unchecked") + public RouterObserverReadProxyProvider(Configuration conf, URI uri, Class xface, + HAProxyFactory factory, AbstractNNFailoverProxyProvider failoverProxy) { + super(conf, uri, xface, factory); + this.alignmentContext = new ClientGSIContext(); + factory.setAlignmentContext(alignmentContext); + this.innerProxy = failoverProxy; + + String proxyInfoString = "RouterObserverReadProxyProvider for " + innerProxy.getProxy(); + + T wrappedProxy = (T) Proxy.newProxyInstance( + RouterObserverReadInvocationHandler.class.getClassLoader(), + new Class[]{xface}, new RouterObserverReadInvocationHandler()); + this.wrapperProxy = new ProxyInfo<>(wrappedProxy, proxyInfoString); + + autoMsyncPeriodMs = conf.getTimeDuration( + // The host of the URI is the name service ID + AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(), + AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS); + + if (wrappedProxy instanceof ClientProtocol) { + this.observerReadEnabled = true; + } else { + LOG.info("Disabling observer reads for {} because the requested proxy " + + "class does not implement {}", uri, ClientProtocol.class.getName()); + this.observerReadEnabled = false; + } + } + + + public AlignmentContext getAlignmentContext() { + return alignmentContext; + } + + @Override + public ProxyInfo getProxy() { + return wrapperProxy; + } + + @Override + public void performFailover(T currentProxy) { + innerProxy.performFailover(currentProxy); + } + + @Override + public boolean useLogicalURI() { + return innerProxy.useLogicalURI(); + } + + @Override + public void close() throws IOException { + innerProxy.close(); + } + + /** + * Return the input proxy, cast as a {@link ClientProtocol}. This catches any + * {@link ClassCastException} and wraps it in a more helpful message. This + * should ONLY be called if the caller is certain that the proxy is, in fact, + * a {@link ClientProtocol}. + */ + private ClientProtocol getProxyAsClientProtocol(T proxy) { + assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy of class " + + proxy.getClass() + + " as if it was a ClientProtocol."; + return (ClientProtocol) proxy; + } + + /** + * This will call {@link ClientProtocol#msync()} on the active NameNode + * (via the {@link #innerProxy}) to update the state of this client, only + * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time + * an msync was performed. + * + * @see #autoMsyncPeriodMs + */ + private void autoMsyncIfNecessary() throws IOException { + if (autoMsyncPeriodMs == 0) { + // Always msync + getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync(); + } else if (autoMsyncPeriodMs > 0) { + if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { + synchronized (this) { + // Use a synchronized block so that only one thread will msync + // if many operations are submitted around the same time. + // Re-check the entry criterion since the status may have changed + // while waiting for the lock. + if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) { + getProxyAsClientProtocol(innerProxy.getProxy().proxy).msync(); + lastMsyncTimeMs = Time.monotonicNow(); + } + } + } + } + } + + /** + * Check if a method is read-only. + * + * @return whether the 'method' is a read-only operation. + */ + private static boolean isRead(Method method) { + if (!method.isAnnotationPresent(ReadOnly.class)) { + return false; + } + return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); + } + + private class RouterObserverReadInvocationHandler implements RpcInvocationHandler { + + @Override + public Client.ConnectionId getConnectionId() { + return RPC.getConnectionIdForProxy(innerProxy.getProxy().proxy); + } + + @Override + public void close() throws IOException { + innerProxy.close(); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if (observerReadEnabled && isRead(method)) { + autoMsyncIfNecessary(); + } + + try { + return method.invoke(innerProxy.getProxy().proxy, args); + } catch (InvocationTargetException e) { + // This exception will be handled by higher layers + throw e.getCause(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml index c9e69456b80..a5bf5c1c318 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml @@ -177,6 +177,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> assertj-core test + + org.junit.jupiter + junit-jupiter-params + test + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 72e8f8f66d5..437b330f60c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeConte import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.test.GenericTestUtils; import org.junit.jupiter.api.Assertions; @@ -58,10 +59,13 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.api.TestInfo; public class TestObserverWithRouter { + private static final int NUM_NAMESERVICES = 2; private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup"; private MiniRouterDFSCluster cluster; private RouterContext routerContext; @@ -102,7 +106,7 @@ public class TestObserverWithRouter { .iterator() .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue())); } - cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode); + cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, numberOfNamenode); cluster.addNamenodeOverrides(conf); // Start NNs and DNs and wait until ready cluster.startCluster(); @@ -139,15 +143,34 @@ public class TestObserverWithRouter { routerContext = cluster.getRandomRouter(); } - private static Configuration getConfToEnableObserverReads() { + public enum ConfigSetting { + USE_NAMENODE_PROXY_FLAG, + USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER + } + + private Configuration getConfToEnableObserverReads(ConfigSetting configSetting) { Configuration conf = new Configuration(); - conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true); + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + + routerContext.getRouter() + .getRpcServerAddress() + .getHostName(), RouterObserverReadProxyProvider.class.getName()); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } return conf; } - @Test - public void testObserverRead() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testObserverRead(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); internalTestObserverRead(); } @@ -187,13 +210,15 @@ public class TestObserverWithRouter { assertEquals("One call should be sent to observer", 1, rpcCountForObserver); } - @Test + @EnumSource(ConfigSetting.class) + @ParameterizedTest @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) - public void testObserverReadWithoutFederatedStatePropagation() throws Exception { + public void testObserverReadWithoutFederatedStatePropagation(ConfigSetting configSetting) + throws Exception { Configuration confOverrides = new Configuration(false); confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); startUpCluster(2, confOverrides); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); @@ -216,14 +241,16 @@ public class TestObserverWithRouter { assertEquals("No call should be sent to observer", 0, rpcCountForObserver); } - @Test + @EnumSource(ConfigSetting.class) + @ParameterizedTest @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) - public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { + public void testDisablingObserverReadUsingNameserviceOverride(ConfigSetting configSetting) + throws Exception { // Disable observer reads using per-nameservice override Configuration confOverrides = new Configuration(false); confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); startUpCluster(2, confOverrides); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); Path path = new Path("/testFile"); fileSystem.create(path).close(); @@ -239,9 +266,10 @@ public class TestObserverWithRouter { assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver); } - @Test - public void testReadWhenObserverIsDown() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testReadWhenObserverIsDown(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -267,9 +295,10 @@ public class TestObserverWithRouter { rpcCountForObserver); } - @Test - public void testMultipleObserver() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testMultipleObserver(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); Path path = new Path("/testFile1"); // Send Create call to active fileSystem.create(path).close(); @@ -406,9 +435,10 @@ public class TestObserverWithRouter { innerCluster.shutdown(); } - @Test - public void testUnavailableObserverNN() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testUnavailableObserverNN(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); stopObserver(2); Path path = new Path("/testFile"); @@ -442,9 +472,10 @@ public class TestObserverWithRouter { assertTrue("There must be unavailable namenodes", hasUnavailable); } - @Test - public void testRouterMsync() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testRouterMsync(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); Path path = new Path("/testFile"); // Send Create call to active @@ -464,9 +495,10 @@ public class TestObserverWithRouter { rpcCountForActive); } - @Test - public void testSingleRead() throws Exception { - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testSingleRead(ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); @@ -554,10 +586,11 @@ public class TestObserverWithRouter { Assertions.assertEquals(10L, latestFederateState.get("ns0")); } - @Test - public void testStateIdProgressionInRouter() throws Exception { + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testStateIdProgressionInRouter(ConfigSetting configSetting) throws Exception { Path rootPath = new Path("/"); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); RouterStateIdContext routerStateIdContext = routerContext .getRouterRpcServer() .getRouterStateIdContext(); @@ -570,9 +603,10 @@ public class TestObserverWithRouter { assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get()); } - @Test + @EnumSource(ConfigSetting.class) + @ParameterizedTest @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) - public void testSharedStateInRouterStateIdContext() throws Exception { + public void testSharedStateInRouterStateIdContext(ConfigSetting configSetting) throws Exception { Path rootPath = new Path("/"); long cleanupPeriodMs = 1000; @@ -580,7 +614,7 @@ public class TestObserverWithRouter { conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs); conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10); startUpCluster(1, conf); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer() .getRouterStateIdContext(); @@ -616,9 +650,10 @@ public class TestObserverWithRouter { } - @Test + @EnumSource(ConfigSetting.class) + @ParameterizedTest @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) - public void testRouterStateIdContextCleanup() throws Exception { + public void testRouterStateIdContextCleanup(ConfigSetting configSetting) throws Exception { Path rootPath = new Path("/"); long recordExpiry = TimeUnit.SECONDS.toMillis(1); @@ -626,7 +661,7 @@ public class TestObserverWithRouter { confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry); startUpCluster(1, confOverride); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer() .getRouterStateIdContext(); @@ -645,9 +680,11 @@ public class TestObserverWithRouter { assertTrue(namespace2.isEmpty()); } - @Test + @EnumSource(ConfigSetting.class) + @ParameterizedTest @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) - public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception { + public void testPeriodicStateRefreshUsingActiveNamenode(ConfigSetting configSetting) + throws Exception { Path rootPath = new Path("/"); Configuration confOverride = new Configuration(false); @@ -655,7 +692,7 @@ public class TestObserverWithRouter { confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s"); startUpCluster(1, confOverride); - fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads()); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); fileSystem.listStatus(rootPath); int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length; @@ -682,4 +719,156 @@ public class TestObserverWithRouter { assertEquals("List-status should show newly created directories.", initialLengthOfRootListing + 10, rootFolderAfterMkdir.length); } + + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testAutoMsyncEqualsZero(ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + + routerContext.getRouter().getRpcServerAddress().getHostName(), 0); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read requests + int numListings = 15; + for (int i = 0; i < numListings; i++) { + fileSystem.listFiles(path, false); + } + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First read goes to active. + assertEquals("Calls sent to the active", 1, rpcCountForActive); + // The rest of the reads are sent to the observer. + assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + // An msync is sent to each active namenode for each read. + // Total msyncs will be (numListings * num_of_nameservices). + assertEquals("Msyncs sent to the active namenodes", + NUM_NAMESERVICES * numListings, rpcCountForActive); + // All reads should be sent of the observer. + assertEquals("Reads sent to observer", numListings, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testAutoMsyncNonZero(ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + + routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + fileSystem.listFiles(path, false); + fileSystem.listFiles(path, false); + Thread.sleep(5000); + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First read goes to active. + assertEquals("Calls sent to the active", 1, rpcCountForActive); + // The rest of the reads are sent to the observer. + assertEquals("Reads sent to observer", 2, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + // 4 msyncs expected. 2 for the first read, and 2 for the third read + // after the auto-msync period has elapsed during the sleep. + assertEquals("Msyncs sent to the active namenodes", + 4, rpcCountForActive); + // All three reads should be sent of the observer. + assertEquals("Reads sent to observer", 3, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + + @EnumSource(ConfigSetting.class) + @ParameterizedTest + public void testThatWriteDoesntBypassNeedForMsync(ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + clientConfiguration.setLong("dfs.client.failover.observer.auto-msync-period." + + routerContext.getRouter().getRpcServerAddress().getHostName(), 3000); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + fileSystem.listFiles(path, false); + Thread.sleep(5000); + fileSystem.mkdirs(new Path(path, "mkdirLocation")); + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First listing and mkdir go to the active. + assertEquals("Calls sent to the active namenodes", 2, rpcCountForActive); + // Second listing goes to the observer. + assertEquals("Read sent to observer", 1, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + // 5 calls to the active namenodes expected. 4 msync and a mkdir. + // Each of the 2 reads results in an msync to 2 nameservices. + // The mkdir also goes to the active. + assertEquals("Calls sent to the active namenodes", + 5, rpcCountForActive); + // Both reads should be sent of the observer. + assertEquals("Reads sent to observer", 2, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } }