diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java index 67dbc2e6fef..23672d6ce81 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java @@ -865,6 +865,19 @@ public abstract class AbstractFileSystem implements PathCapabilities { throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException; + /** + * Synchronize client metadata state. + *
In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + /** * The specification of this method matches that of * {@link FileContext#access(Path, FsAction)} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java index e376efcde94..cddac5222da 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java @@ -1249,6 +1249,16 @@ public class FileContext implements PathCapabilities { }.resolve(this, absF); } + /** + * Synchronize client metadata state. + * + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + defaultFS.msync(); + } + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 74619097c37..0b84257cbbc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2674,6 +2674,19 @@ public abstract class FileSystem extends Configured */ public abstract FileStatus getFileStatus(Path f) throws IOException; + /** + * Synchronize client metadata state. + * In some FileSystem implementations such as HDFS metadata + * synchronization is essential to guarantee consistency of read requests + * particularly in HA setting. + * @throws IOException + * @throws UnsupportedOperationException + */ + public void msync() throws IOException, UnsupportedOperationException { + throw new UnsupportedOperationException(getClass().getCanonicalName() + + " does not support method msync"); + } + /** * Checks if the user can access a path. The mode specifies which access * checks to perform. If the requested permissions are granted, then the diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 42410974db1..607aa263622 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -462,6 +462,11 @@ public class FilterFileSystem extends FileSystem { return fs.getFileStatus(f); } + @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java index 27e75d8a25d..7d979b37b4a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFs.java @@ -124,6 +124,11 @@ public abstract class FilterFs extends AbstractFileSystem { return myFs.getFileStatus(f); } + @Override + public void msync() throws IOException, UnsupportedOperationException { + myFs.msync(); + } + @Override public void access(Path path, FsAction mode) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 5f4c4a236e9..02c0916e544 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -676,6 +676,11 @@ public class HarFileSystem extends FileSystem { return hstatus; } + @Override + public void msync() throws IOException, UnsupportedOperationException { + fs.msync(); + } + /** * @return null since no checksum algorithm is implemented. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java index 4162b198fb1..60ce8cd3de3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/fs/Hdfs.java @@ -153,7 +153,18 @@ public class Hdfs extends AbstractFileSystem { throw new FileNotFoundException("File does not exist: " + f.toString()); } } - + + /** + * Synchronize client metadata state with Active NameNode. + * In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @Override public FileStatus getFileLinkStatus(Path f) throws IOException, UnresolvedLinkException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 89232380ad4..97e9c58544d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1764,6 +1764,17 @@ public class DistributedFileSystem extends FileSystem }.resolve(this, absF); } + /** + * Synchronize client metadata state with Active NameNode. + * In HA the client synchronizes its state with the Active NameNode + * in order to guarantee subsequent read consistency from Observer Nodes. + * @throws IOException + */ + @Override + public void msync() throws IOException { + dfs.msync(); + } + @SuppressWarnings("deprecation") @Override public void createSymlink(final Path target, final Path link, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 7c9c8996477..a2a90669bbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2679,7 +2679,8 @@ public class MiniDFSCluster implements AutoCloseable { public void rollEditLogAndTail(int nnIndex) throws Exception { getNameNode(nnIndex).getRpcServer().rollEditLog(); for (int i = 2; i < getNumNameNodes(); i++) { - getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits(); + LOG.info("editsLoaded {}", el); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java index 96067858c9d..854027a1164 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; import java.util.concurrent.TimeUnit; @@ -30,9 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -108,7 +112,8 @@ public class TestConsistentReadsObserver { final int observerIdx = 2; NameNode nn = dfsCluster.getNameNode(observerIdx); int port = nn.getNameNodeAddress().getPort(); - Configuration configuration = dfsCluster.getConfiguration(observerIdx); + Configuration originalConf = dfsCluster.getConfiguration(observerIdx); + Configuration configuration = new Configuration(originalConf); String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + "."; configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY, TestRpcScheduler.class.getName()); @@ -125,6 +130,8 @@ public class TestConsistentReadsObserver { // be triggered and client should retry active NN. dfs.getFileStatus(testPath); assertSentTo(0); + // reset the original call queue + NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf); } @Test @@ -194,7 +201,7 @@ public class TestConsistentReadsObserver { // Therefore, the subsequent getFileStatus call should succeed. if (!autoMsync) { // If not testing auto-msync, perform an explicit one here - dfs2.getClient().msync(); + dfs2.msync(); } else if (autoMsyncPeriodMs > 0) { Thread.sleep(autoMsyncPeriodMs); } @@ -383,6 +390,35 @@ public class TestConsistentReadsObserver { } } + @Test(timeout=10000) + public void testMsyncFileContext() throws Exception { + NameNode nn0 = dfsCluster.getNameNode(0); + NameNode nn2 = dfsCluster.getNameNode(2); + HAServiceStatus st = nn0.getRpcServer().getServiceStatus(); + assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState()); + st = nn2.getRpcServer().getServiceStatus(); + assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState()); + + FileContext fc = FileContext.getFileContext(conf); + // initialize observer proxy for FileContext + fc.getFsStatus(testPath); + + Path p = new Path(testPath, "testMsyncFileContext"); + fc.mkdir(p, FsPermission.getDefault(), true); + fc.msync(); + dfsCluster.rollEditLogAndTail(0); + LOG.info("State id active = {}, Stat id observer = {}", + nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(), + nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId()); + try { + // if getFileStatus is taking too long due to server requeueing + // the test will time out + fc.getFileStatus(p); + } catch (FileNotFoundException e) { + fail("File should exist on Observer after msync"); + } + } + private void assertSentTo(int nnIdx) throws IOException { assertTrue("Request was not sent to the expected namenode " + nnIdx, HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));