HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko.

(cherry picked from commit b3786d6c3c)
This commit is contained in:
Konstantin V Shvachko 2020-10-12 17:26:24 -07:00
parent 55f01bda8e
commit 23b6ab67f7
10 changed files with 114 additions and 4 deletions

View File

@ -846,6 +846,19 @@ public abstract class AbstractFileSystem implements PathCapabilities {
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException;
/**
* Synchronize client metadata state.
* <p/>In some FileSystem implementations such as HDFS metadata
* synchronization is essential to guarantee consistency of read requests
* particularly in HA setting.
* @throws IOException
* @throws UnsupportedOperationException
*/
public void msync() throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method msync");
}
/**
* The specification of this method matches that of
* {@link FileContext#access(Path, FsAction)}

View File

@ -1247,6 +1247,16 @@ public class FileContext implements PathCapabilities {
}.resolve(this, absF);
}
/**
* Synchronize client metadata state.
*
* @throws IOException
* @throws UnsupportedOperationException
*/
public void msync() throws IOException, UnsupportedOperationException {
defaultFS.msync();
}
/**
* Checks if the user can access a path. The mode specifies which access
* checks to perform. If the requested permissions are granted, then the

View File

@ -2590,6 +2590,19 @@ public abstract class FileSystem extends Configured
*/
public abstract FileStatus getFileStatus(Path f) throws IOException;
/**
* Synchronize client metadata state.
* <p/>In some FileSystem implementations such as HDFS metadata
* synchronization is essential to guarantee consistency of read requests
* particularly in HA setting.
* @throws IOException
* @throws UnsupportedOperationException
*/
public void msync() throws IOException, UnsupportedOperationException {
throw new UnsupportedOperationException(getClass().getCanonicalName() +
" does not support method msync");
}
/**
* Checks if the user can access a path. The mode specifies which access
* checks to perform. If the requested permissions are granted, then the

View File

@ -458,6 +458,11 @@ public class FilterFileSystem extends FileSystem {
return fs.getFileStatus(f);
}
@Override
public void msync() throws IOException, UnsupportedOperationException {
fs.msync();
}
@Override
public void access(Path path, FsAction mode) throws AccessControlException,
FileNotFoundException, IOException {

View File

@ -122,6 +122,11 @@ public abstract class FilterFs extends AbstractFileSystem {
return myFs.getFileStatus(f);
}
@Override
public void msync() throws IOException, UnsupportedOperationException {
myFs.msync();
}
@Override
public void access(Path path, FsAction mode) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException {

View File

@ -676,6 +676,11 @@ public class HarFileSystem extends FileSystem {
return hstatus;
}
@Override
public void msync() throws IOException, UnsupportedOperationException {
fs.msync();
}
/**
* @return null since no checksum algorithm is implemented.
*/

View File

@ -153,7 +153,18 @@ public class Hdfs extends AbstractFileSystem {
throw new FileNotFoundException("File does not exist: " + f.toString());
}
}
/**
* Synchronize client metadata state with Active NameNode.
* <p/>In HA the client synchronizes its state with the Active NameNode
* in order to guarantee subsequent read consistency from Observer Nodes.
* @throws IOException
*/
@Override
public void msync() throws IOException {
dfs.msync();
}
@Override
public FileStatus getFileLinkStatus(Path f)
throws IOException, UnresolvedLinkException {

View File

@ -1614,6 +1614,17 @@ public class DistributedFileSystem extends FileSystem
}.resolve(this, absF);
}
/**
* Synchronize client metadata state with Active NameNode.
* <p/>In HA the client synchronizes its state with the Active NameNode
* in order to guarantee subsequent read consistency from Observer Nodes.
* @throws IOException
*/
@Override
public void msync() throws IOException {
dfs.msync();
}
@SuppressWarnings("deprecation")
@Override
public void createSymlink(final Path target, final Path link,

View File

@ -2664,7 +2664,8 @@ public class MiniDFSCluster implements AutoCloseable {
public void rollEditLogAndTail(int nnIndex) throws Exception {
getNameNode(nnIndex).getRpcServer().rollEditLog();
for (int i = 2; i < getNumNameNodes(); i++) {
getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
LOG.info("editsLoaded {}", el);
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@ -30,9 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -108,7 +112,8 @@ public class TestConsistentReadsObserver {
final int observerIdx = 2;
NameNode nn = dfsCluster.getNameNode(observerIdx);
int port = nn.getNameNodeAddress().getPort();
Configuration configuration = dfsCluster.getConfiguration(observerIdx);
Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
Configuration configuration = new Configuration(originalConf);
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
TestRpcScheduler.class.getName());
@ -125,6 +130,8 @@ public class TestConsistentReadsObserver {
// be triggered and client should retry active NN.
dfs.getFileStatus(testPath);
assertSentTo(0);
// reset the original call queue
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
}
@Test
@ -194,7 +201,7 @@ public class TestConsistentReadsObserver {
// Therefore, the subsequent getFileStatus call should succeed.
if (!autoMsync) {
// If not testing auto-msync, perform an explicit one here
dfs2.getClient().msync();
dfs2.msync();
} else if (autoMsyncPeriodMs > 0) {
Thread.sleep(autoMsyncPeriodMs);
}
@ -383,6 +390,35 @@ public class TestConsistentReadsObserver {
}
}
@Test(timeout=10000)
public void testMsyncFileContext() throws Exception {
NameNode nn0 = dfsCluster.getNameNode(0);
NameNode nn2 = dfsCluster.getNameNode(2);
HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
st = nn2.getRpcServer().getServiceStatus();
assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());
FileContext fc = FileContext.getFileContext(conf);
// initialize observer proxy for FileContext
fc.getFsStatus(testPath);
Path p = new Path(testPath, "testMsyncFileContext");
fc.mkdir(p, FsPermission.getDefault(), true);
fc.msync();
dfsCluster.rollEditLogAndTail(0);
LOG.info("State id active = {}, Stat id observer = {}",
nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
try {
// if getFileStatus is taking too long due to server requeueing
// the test will time out
fc.getFileStatus(p);
} catch (FileNotFoundException e) {
fail("File should exist on Observer after msync");
}
}
private void assertSentTo(int nnIdx) throws IOException {
assertTrue("Request was not sent to the expected namenode " + nnIdx,
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));