From 104dd85ad84829758fbe63a1b7300e639e02ee90 Mon Sep 17 00:00:00 2001 From: Konstantin V Shvachko Date: Mon, 12 Oct 2020 17:26:24 -0700 Subject: [PATCH] HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko. (cherry picked from commit b3786d6c3cc13b0b92b9f42da1731c4ce35c9ded) --- .../apache/hadoop/fs/AbstractFileSystem.java | 13 ++++++ .../org/apache/hadoop/fs/FileContext.java | 10 +++++ .../java/org/apache/hadoop/fs/FileSystem.java | 13 ++++++ .../apache/hadoop/fs/FilterFileSystem.java | 5 +++ .../java/org/apache/hadoop/fs/FilterFs.java | 5 +++ .../org/apache/hadoop/fs/HarFileSystem.java | 5 +++ .../main/java/org/apache/hadoop/fs/Hdfs.java | 13 +++++- .../hadoop/hdfs/DistributedFileSystem.java | 11 +++++ .../apache/hadoop/hdfs/MiniDFSCluster.java | 3 +- .../ha/TestConsistentReadsObserver.java | 41 ++++++++++++++++++- 10 files changed, 115 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index df14ee8762b..bae14543eb8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -844,6 +844,19 @@ public abstract FileStatus getFileStatus(final Path f) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException; + /** + * Synchronize client metadata state. + *

In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + /** * The specification of this method matches that of * {@link FileContext#access(Path, FsAction)} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index 12b39bb8b9e..4f0f2fc81fc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -1188,6 +1188,16 @@ public FileStatus next(final AbstractFileSystem fs, final Path p) }.resolve(this, absF); } + /** + * Synchronize client metadata state. + * + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + defaultFS.msync(); + } + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index c32f4cd3d05..176362c54f7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2507,6 +2507,19 @@ public short getDefaultReplication(Path path) { */ public abstract FileStatus getFileStatus(Path f) throws IOException; + /** + * Synchronize client metadata state. + *

In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 1c38df86f87..0a346d128bc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -446,6 +446,11 @@ public FileStatus getFileStatus(Path f) throws IOException { return fs.getFileStatus(f); } + @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index b2a9aab75d7..cccf681858a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -122,6 +122,11 @@ public FileStatus getFileStatus(Path f) return myFs.getFileStatus(f); } + @Override + public void msync() throws IOException, UnsupportedOperationException { + myFs.msync(); + } + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index aa58706ed11..807fb6b4e87 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -673,6 +673,11 @@ private HarStatus getFileHarStatus(Path f) throws IOException { return hstatus; } + @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + /** * @return null since no checksum algorithm is implemented. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java index 645f1ad833a..17246cf1537 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -137,7 +137,18 @@ public FileStatus getFileStatus(Path f) throw new FileNotFoundException("File does not exist: " + f.toString()); } } - + + /** + * Synchronize client metadata state with Active NameNode. + *

In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @Override public FileStatus getFileLinkStatus(Path f) throws IOException, UnresolvedLinkException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 362768d8d8a..9babc9ce74b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1536,6 +1536,17 @@ public FileStatus next(final FileSystem fs, final Path p) }.resolve(this, absF); } + /** + * Synchronize client metadata state with Active NameNode. + *

In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @SuppressWarnings("deprecation") @Override public void createSymlink(final Path target, final Path link, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index b0586d7cfaf..d905e95587f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2656,7 +2656,8 @@ public void transitionToObserver(int nnIndex) throws IOException, public void rollEditLogAndTail(int nnIndex) throws Exception { getNameNode(nnIndex).getRpcServer().rollEditLog(); for (int i = 2; i < getNumNameNodes(); i++) { - getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + LOG.info("editsLoaded " + el); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 780f807b994..f01a51157cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -23,6 +23,8 @@ import static org.junit.Assert.fail; import com.google.common.base.Supplier; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -31,9 +33,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -110,7 +115,8 @@ public void testRequeueCall() throws Exception { final int observerIdx = 2; NameNode nn = dfsCluster.getNameNode(observerIdx); int port = nn.getNameNodeAddress().getPort(); - Configuration configuration = dfsCluster.getConfiguration(observerIdx); + Configuration originalConf = dfsCluster.getConfiguration(observerIdx); + Configuration configuration = new Configuration(originalConf); String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + "."; configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, TestRpcScheduler.class.getName()); @@ -127,6 +133,8 @@ public void testRequeueCall() throws Exception { // be triggered and client should retry active NN. dfs.getFileStatus(testPath); assertSentTo(0); + // reset the original call queue + NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf); } @Test @@ -207,7 +215,7 @@ public void run() { // Therefore, the subsequent getFileStatus call should succeed. if (!autoMsync) { // If not testing auto-msync, perform an explicit one here - dfs2.getClient().msync(); + dfs2.msync(); } else if (autoMsyncPeriodMs > 0) { Thread.sleep(autoMsyncPeriodMs); } @@ -413,6 +421,35 @@ public void testRequestFromNonObserverProxyProvider() throws Exception { } } + @Test(timeout=10000) + public void testMsyncFileContext() throws Exception { + NameNode nn0 = dfsCluster.getNameNode(0); + NameNode nn2 = dfsCluster.getNameNode(2); + HAServiceStatus st = nn0.getRpcServer().getServiceStatus(); + assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState()); + st = nn2.getRpcServer().getServiceStatus(); + assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState()); + + FileContext fc = FileContext.getFileContext(conf); + // initialize observer proxy for FileContext + fc.getFsStatus(testPath); + + Path p = new Path(testPath, "testMsyncFileContext"); + fc.mkdir(p, FsPermission.getDefault(), true); + fc.msync(); + dfsCluster.rollEditLogAndTail(0); + LOG.info("State id active = {}, Stat id observer = {}", + nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(), + nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId()); + try { + // if getFileStatus is taking too long due to server requeueing + // the test will time out + fc.getFileStatus(p); + } catch (FileNotFoundException e) { + fail("File should exist on Observer after msync"); + } + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));