HDFS-15567. [SBN Read] HDFS should expose msync() API to allow downstream applications call it explicitly. Contributed by Konstantin V Shvachko.
(cherry picked from commit b3786d6c3c
)
This commit is contained in:
parent
800b1ed1c2
commit
b6423d2780
|
@ -865,6 +865,19 @@ public abstract class AbstractFileSystem implements PathCapabilities {
|
||||||
throws AccessControlException, FileNotFoundException,
|
throws AccessControlException, FileNotFoundException,
|
||||||
UnresolvedLinkException, IOException;
|
UnresolvedLinkException, IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize client metadata state.
|
||||||
|
* <p/>In some FileSystem implementations such as HDFS metadata
|
||||||
|
* synchronization is essential to guarantee consistency of read requests
|
||||||
|
* particularly in HA setting.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws UnsupportedOperationException
|
||||||
|
*/
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
throw new UnsupportedOperationException(getClass().getCanonicalName() +
|
||||||
|
" does not support method msync");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The specification of this method matches that of
|
* The specification of this method matches that of
|
||||||
* {@link FileContext#access(Path, FsAction)}
|
* {@link FileContext#access(Path, FsAction)}
|
||||||
|
|
|
@ -1249,6 +1249,16 @@ public class FileContext implements PathCapabilities {
|
||||||
}.resolve(this, absF);
|
}.resolve(this, absF);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize client metadata state.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws UnsupportedOperationException
|
||||||
|
*/
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
defaultFS.msync();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the user can access a path. The mode specifies which access
|
* Checks if the user can access a path. The mode specifies which access
|
||||||
* checks to perform. If the requested permissions are granted, then the
|
* checks to perform. If the requested permissions are granted, then the
|
||||||
|
|
|
@ -2674,6 +2674,19 @@ public abstract class FileSystem extends Configured
|
||||||
*/
|
*/
|
||||||
public abstract FileStatus getFileStatus(Path f) throws IOException;
|
public abstract FileStatus getFileStatus(Path f) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize client metadata state.
|
||||||
|
* <p/>In some FileSystem implementations such as HDFS metadata
|
||||||
|
* synchronization is essential to guarantee consistency of read requests
|
||||||
|
* particularly in HA setting.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws UnsupportedOperationException
|
||||||
|
*/
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
throw new UnsupportedOperationException(getClass().getCanonicalName() +
|
||||||
|
" does not support method msync");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if the user can access a path. The mode specifies which access
|
* Checks if the user can access a path. The mode specifies which access
|
||||||
* checks to perform. If the requested permissions are granted, then the
|
* checks to perform. If the requested permissions are granted, then the
|
||||||
|
|
|
@ -462,6 +462,11 @@ public class FilterFileSystem extends FileSystem {
|
||||||
return fs.getFileStatus(f);
|
return fs.getFileStatus(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
fs.msync();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void access(Path path, FsAction mode) throws AccessControlException,
|
public void access(Path path, FsAction mode) throws AccessControlException,
|
||||||
FileNotFoundException, IOException {
|
FileNotFoundException, IOException {
|
||||||
|
|
|
@ -124,6 +124,11 @@ public abstract class FilterFs extends AbstractFileSystem {
|
||||||
return myFs.getFileStatus(f);
|
return myFs.getFileStatus(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
myFs.msync();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void access(Path path, FsAction mode) throws AccessControlException,
|
public void access(Path path, FsAction mode) throws AccessControlException,
|
||||||
FileNotFoundException, UnresolvedLinkException, IOException {
|
FileNotFoundException, UnresolvedLinkException, IOException {
|
||||||
|
|
|
@ -676,6 +676,11 @@ public class HarFileSystem extends FileSystem {
|
||||||
return hstatus;
|
return hstatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void msync() throws IOException, UnsupportedOperationException {
|
||||||
|
fs.msync();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return null since no checksum algorithm is implemented.
|
* @return null since no checksum algorithm is implemented.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -153,7 +153,18 @@ public class Hdfs extends AbstractFileSystem {
|
||||||
throw new FileNotFoundException("File does not exist: " + f.toString());
|
throw new FileNotFoundException("File does not exist: " + f.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize client metadata state with Active NameNode.
|
||||||
|
* <p/>In HA the client synchronizes its state with the Active NameNode
|
||||||
|
* in order to guarantee subsequent read consistency from Observer Nodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void msync() throws IOException {
|
||||||
|
dfs.msync();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileStatus getFileLinkStatus(Path f)
|
public FileStatus getFileLinkStatus(Path f)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
|
|
|
@ -1764,6 +1764,17 @@ public class DistributedFileSystem extends FileSystem
|
||||||
}.resolve(this, absF);
|
}.resolve(this, absF);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronize client metadata state with Active NameNode.
|
||||||
|
* <p/>In HA the client synchronizes its state with the Active NameNode
|
||||||
|
* in order to guarantee subsequent read consistency from Observer Nodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void msync() throws IOException {
|
||||||
|
dfs.msync();
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public void createSymlink(final Path target, final Path link,
|
public void createSymlink(final Path target, final Path link,
|
||||||
|
|
|
@ -2679,7 +2679,8 @@ public class MiniDFSCluster implements AutoCloseable {
|
||||||
public void rollEditLogAndTail(int nnIndex) throws Exception {
|
public void rollEditLogAndTail(int nnIndex) throws Exception {
|
||||||
getNameNode(nnIndex).getRpcServer().rollEditLog();
|
getNameNode(nnIndex).getRpcServer().rollEditLog();
|
||||||
for (int i = 2; i < getNumNameNodes(); i++) {
|
for (int i = 2; i < getNumNameNodes(); i++) {
|
||||||
getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
|
long el = getNameNode(i).getNamesystem().getEditLogTailer().doTailEdits();
|
||||||
|
LOG.info("editsLoaded {}", el);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -30,9 +31,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
import org.apache.hadoop.ha.HAServiceStatus;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
|
@ -108,7 +112,8 @@ public class TestConsistentReadsObserver {
|
||||||
final int observerIdx = 2;
|
final int observerIdx = 2;
|
||||||
NameNode nn = dfsCluster.getNameNode(observerIdx);
|
NameNode nn = dfsCluster.getNameNode(observerIdx);
|
||||||
int port = nn.getNameNodeAddress().getPort();
|
int port = nn.getNameNodeAddress().getPort();
|
||||||
Configuration configuration = dfsCluster.getConfiguration(observerIdx);
|
Configuration originalConf = dfsCluster.getConfiguration(observerIdx);
|
||||||
|
Configuration configuration = new Configuration(originalConf);
|
||||||
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
|
String prefix = CommonConfigurationKeys.IPC_NAMESPACE + "." + port + ".";
|
||||||
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
|
configuration.set(prefix + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
|
||||||
TestRpcScheduler.class.getName());
|
TestRpcScheduler.class.getName());
|
||||||
|
@ -125,6 +130,8 @@ public class TestConsistentReadsObserver {
|
||||||
// be triggered and client should retry active NN.
|
// be triggered and client should retry active NN.
|
||||||
dfs.getFileStatus(testPath);
|
dfs.getFileStatus(testPath);
|
||||||
assertSentTo(0);
|
assertSentTo(0);
|
||||||
|
// reset the original call queue
|
||||||
|
NameNodeAdapter.getRpcServer(nn).refreshCallQueue(originalConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -194,7 +201,7 @@ public class TestConsistentReadsObserver {
|
||||||
// Therefore, the subsequent getFileStatus call should succeed.
|
// Therefore, the subsequent getFileStatus call should succeed.
|
||||||
if (!autoMsync) {
|
if (!autoMsync) {
|
||||||
// If not testing auto-msync, perform an explicit one here
|
// If not testing auto-msync, perform an explicit one here
|
||||||
dfs2.getClient().msync();
|
dfs2.msync();
|
||||||
} else if (autoMsyncPeriodMs > 0) {
|
} else if (autoMsyncPeriodMs > 0) {
|
||||||
Thread.sleep(autoMsyncPeriodMs);
|
Thread.sleep(autoMsyncPeriodMs);
|
||||||
}
|
}
|
||||||
|
@ -383,6 +390,35 @@ public class TestConsistentReadsObserver {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testMsyncFileContext() throws Exception {
|
||||||
|
NameNode nn0 = dfsCluster.getNameNode(0);
|
||||||
|
NameNode nn2 = dfsCluster.getNameNode(2);
|
||||||
|
HAServiceStatus st = nn0.getRpcServer().getServiceStatus();
|
||||||
|
assertEquals("nn0 is not active", HAServiceState.ACTIVE, st.getState());
|
||||||
|
st = nn2.getRpcServer().getServiceStatus();
|
||||||
|
assertEquals("nn2 is not observer", HAServiceState.OBSERVER, st.getState());
|
||||||
|
|
||||||
|
FileContext fc = FileContext.getFileContext(conf);
|
||||||
|
// initialize observer proxy for FileContext
|
||||||
|
fc.getFsStatus(testPath);
|
||||||
|
|
||||||
|
Path p = new Path(testPath, "testMsyncFileContext");
|
||||||
|
fc.mkdir(p, FsPermission.getDefault(), true);
|
||||||
|
fc.msync();
|
||||||
|
dfsCluster.rollEditLogAndTail(0);
|
||||||
|
LOG.info("State id active = {}, Stat id observer = {}",
|
||||||
|
nn0.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId(),
|
||||||
|
nn2.getNamesystem().getFSImage().getLastAppliedOrWrittenTxId());
|
||||||
|
try {
|
||||||
|
// if getFileStatus is taking too long due to server requeueing
|
||||||
|
// the test will time out
|
||||||
|
fc.getFileStatus(p);
|
||||||
|
} catch (FileNotFoundException e) {
|
||||||
|
fail("File should exist on Observer after msync");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void assertSentTo(int nnIdx) throws IOException {
|
private void assertSentTo(int nnIdx) throws IOException {
|
||||||
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
assertTrue("Request was not sent to the expected namenode " + nnIdx,
|
||||||
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
|
||||||
|
|
Loading…
Reference in New Issue