HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response. Contributed by Brandon Li

This commit is contained in:
Brandon Li 2014-10-21 10:20:29 -07:00
parent 3820bf055e
commit 33e020cb19
8 changed files with 237 additions and 27 deletions

View File

@ -372,7 +372,7 @@ public class IdUserGroup {
uid = getUid(user);
} catch (IOException e) {
uid = user.hashCode();
LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid, e);
LOG.info("Can't map user " + user + ". Use its string hashcode:" + uid);
}
return uid;
}
@ -385,7 +385,7 @@ public class IdUserGroup {
gid = getGid(group);
} catch (IOException e) {
gid = group.hashCode();
LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid, e);
LOG.info("Can't map group " + group + ". Use its string hashcode:" + gid);
}
return gid;
}

View File

@ -57,4 +57,7 @@ public class NfsConfigKeys {
public static final String AIX_COMPAT_MODE_KEY = "nfs.aix.compatibility.mode.enabled";
public static final boolean AIX_COMPAT_MODE_DEFAULT = false;
public final static String LARGE_FILE_UPLOAD = "nfs.large.file.upload";
public final static boolean LARGE_FILE_UPLOAD_DEFAULT = true;
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
import org.apache.hadoop.io.BytesWritable.Comparator;
import org.apache.hadoop.io.IOUtils;
@ -77,7 +78,26 @@ class OpenFileCtx {
COMMIT_INACTIVE_CTX,
COMMIT_INACTIVE_WITH_PENDING_WRITE,
COMMIT_ERROR,
COMMIT_DO_SYNC;
COMMIT_DO_SYNC,
/**
* Deferred COMMIT response could fail file uploading. The following two
* status are introduced as a solution. 1. if client asks to commit
* non-sequential trunk of data, NFS gateway return success with the hope
* that client will send the prerequisite writes. 2. if client asks to
* commit a sequential trunk(means it can be flushed to HDFS), NFS gateway
* return a special error NFS3ERR_JUKEBOX indicating the client needs to
* retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync
* eventually.
*
* The reason to let client wait is that, we want the client to wait for the
* last commit. Otherwise, client thinks file upload finished (e.g., cp
* command returns success) but NFS could be still flushing staged data to
* HDFS. However, we don't know which one is the last commit. We make the
* assumption that a commit after sequential writes may be the last.
* Referring HDFS-7259 for more details.
* */
COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential
COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential
}
private final DFSClient client;
@ -159,6 +179,7 @@ class OpenFileCtx {
private RandomAccessFile raf;
private final String dumpFilePath;
private Daemon dumpThread;
private final boolean uploadLargeFile;
private void updateLastAccessTime() {
lastAccessTime = Time.monotonicNow();
@ -200,12 +221,13 @@ class OpenFileCtx {
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath, DFSClient client, IdUserGroup iug) {
this(fos, latestAttr, dumpFilePath, client, iug, false);
this(fos, latestAttr, dumpFilePath, client, iug, false,
new NfsConfiguration());
}
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
String dumpFilePath, DFSClient client, IdUserGroup iug,
boolean aixCompatMode) {
boolean aixCompatMode, NfsConfiguration config) {
this.fos = fos;
this.latestAttr = latestAttr;
this.aixCompatMode = aixCompatMode;
@ -235,6 +257,8 @@ class OpenFileCtx {
dumpThread = null;
this.client = client;
this.iug = iug;
this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD,
NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT);
}
public Nfs3FileAttributes getLatestAttr() {
@ -783,6 +807,11 @@ class OpenFileCtx {
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
}
}
if (pendingWrites.isEmpty()) {
// Note that, there is no guarantee data is synced. Caller should still
// do a sync here though the output stream might be closed.
return COMMIT_STATUS.COMMIT_FINISHED;
}
long flushed = 0;
try {
@ -795,6 +824,32 @@ class OpenFileCtx {
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
}
// Handle large file upload
if (uploadLargeFile && !aixCompatMode) {
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
.getKey().getMax() - 1;
if (co <= flushed) {
return COMMIT_STATUS.COMMIT_DO_SYNC;
} else if (co < nextOffset.get()) {
if (!fromRead) {
// let client retry the same request, add pending commit to sync later
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
preOpAttr);
pendingCommits.put(commitOffset, commitCtx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_WAIT");
}
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
}
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
}
}
if (commitOffset > 0) {
if (aixCompatMode) {
// The AIX NFS client misinterprets RFC-1813 and will always send 4096
@ -825,20 +880,14 @@ class OpenFileCtx {
Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
// Commit whole file, commitOffset == 0
if (pendingWrites.isEmpty()) {
// Note that, there is no guarantee data is synced. TODO: We could still
// do a sync here though the output stream might be closed.
return COMMIT_STATUS.COMMIT_FINISHED;
} else {
if (!fromRead) {
// Insert commit
long maxOffset = key.getKey().getMax() - 1;
Preconditions.checkState(maxOffset > 0);
CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
pendingCommits.put(maxOffset, commitCtx);
}
return COMMIT_STATUS.COMMIT_WAIT;
if (!fromRead) {
// Insert commit
long maxOffset = key.getKey().getMax() - 1;
Preconditions.checkState(maxOffset > 0);
CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
pendingCommits.put(maxOffset, commitCtx);
}
return COMMIT_STATUS.COMMIT_WAIT;
}
private void addWrite(WriteCtx writeCtx) {

View File

@ -944,7 +944,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
// Add open stream
OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug,
aixCompatMode);
aixCompatMode, config);
fileHandle = new FileHandle(postOpObjAttr.getFileId());
if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
LOG.warn("Can't add more stream, close it."

View File

@ -41,7 +41,7 @@ class WriteCtx {
/**
* In memory write data has 3 states. ALLOW_DUMP: not sequential write, still
* wait for prerequisit writes. NO_DUMP: sequential write, no need to dump
* wait for prerequisite writes. NO_DUMP: sequential write, no need to dump
* since it will be written to HDFS soon. DUMPED: already dumped to a file.
*/
public static enum DataState {

View File

@ -178,7 +178,7 @@ public class WriteManager {
String writeDumpDir = config.get(NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_KEY,
NfsConfigKeys.DFS_NFS_FILE_DUMP_DIR_DEFAULT);
openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
+ fileHandle.getFileId(), dfsClient, iug, aixCompatMode);
+ fileHandle.getFileId(), dfsClient, iug, aixCompatMode, config);
if (!addOpenFileStream(fileHandle, openFileCtx)) {
LOG.info("Can't add new stream. Close it. Tell client to retry.");
@ -236,6 +236,7 @@ public class WriteManager {
status = Nfs3Status.NFS3ERR_IO;
break;
case COMMIT_WAIT:
case COMMIT_SPECIAL_WAIT:
/**
* This should happen rarely in some possible cases, such as read
* request arrives before DFSClient is able to quickly flush data to DN,
@ -244,6 +245,10 @@ public class WriteManager {
*/
status = Nfs3Status.NFS3ERR_JUKEBOX;
break;
case COMMIT_SPECIAL_SUCCESS:
// Read beyond eof could result in partial read
status = Nfs3Status.NFS3_OK;
break;
default:
LOG.error("Should not get commit return code:" + ret.name());
throw new RuntimeException("Should not get commit return code:"
@ -278,6 +283,12 @@ public class WriteManager {
case COMMIT_WAIT:
// Do nothing. Commit is async now.
return;
case COMMIT_SPECIAL_WAIT:
status = Nfs3Status.NFS3ERR_JUKEBOX;
break;
case COMMIT_SPECIAL_SUCCESS:
status = Nfs3Status.NFS3_OK;
break;
default:
LOG.error("Should not get commit return code:" + ret.name());
throw new RuntimeException("Should not get commit return code:"
@ -288,8 +299,7 @@ public class WriteManager {
// Send out the response
Nfs3FileAttributes postOpAttr = null;
try {
String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileId());
postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
postOpAttr = getFileAttr(dfsClient, new FileHandle(preOpAttr.getFileId()), iug);
} catch (IOException e1) {
LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileId(), e1);
}

View File

@ -138,8 +138,10 @@ public class TestWrites {
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
NfsConfiguration conf = new NfsConfiguration();
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new IdUserGroup(new NfsConfiguration()));
new IdUserGroup(conf), false, conf);
COMMIT_STATUS ret;
@ -157,6 +159,7 @@ public class TestWrites {
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
ctx.setNextOffsetForTest(10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
// Do_SYNC state will be updated to FINISHED after data sync
@ -192,15 +195,85 @@ public class TestWrites {
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with
// large file upload option.
public void testCheckCommitLargeFileUpload() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
NfsConfiguration conf = new NfsConfiguration();
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new IdUserGroup(conf), false, conf);
COMMIT_STATUS ret;
// Test inactive open file context
ctx.setActiveStatusForTest(false);
Channel ch = Mockito.mock(Channel.class);
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
ctx.setNextOffsetForTest(10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
// Do_SYNC state will be updated to FINISHED after data sync
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
Assert.assertTrue(commits.size() == 0);
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
Assert.assertTrue(commits.size() == 0);
// Test request with zero commit offset
commits.remove(new Long(11));
// There is one pending write [5,10]
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
Mockito.when(fos.getPos()).thenReturn((long) 6);
ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
Assert.assertTrue(commits.size() == 1);
long key = commits.firstKey();
Assert.assertTrue(key == 8);
// Empty pending writes
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
@Test
public void testCheckCommitAixCompatMode() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
// Last argument "true" here to enable AIX compatibility mode.
NfsConfiguration conf = new NfsConfiguration();
conf.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
// Enable AIX compatibility mode.
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new IdUserGroup(new NfsConfiguration()), true);
new IdUserGroup(new NfsConfiguration()), true, conf);
// Test fall-through to pendingWrites check in the event that commitOffset
// is greater than the number of bytes we've so far flushed.
@ -210,6 +283,8 @@ public class TestWrites {
// Test the case when we actually have received more bytes than we're trying
// to commit.
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
Mockito.when(fos.getPos()).thenReturn((long) 10);
status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
@ -226,8 +301,9 @@ public class TestWrites {
Mockito.when(fos.getPos()).thenReturn((long) 0);
NfsConfiguration config = new NfsConfiguration();
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new IdUserGroup(config));
new IdUserGroup(config), false, config);
FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
COMMIT_STATUS ret;
@ -285,6 +361,75 @@ public class TestWrites {
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
@Test
// Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS with large file upload option
public void testCheckCommitFromReadLargeFileUpload() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
NfsConfiguration config = new NfsConfiguration();
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new IdUserGroup(config), false, config);
FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
COMMIT_STATUS ret;
WriteManager wm = new WriteManager(new IdUserGroup(config), config, false);
assertTrue(wm.addOpenFileStream(h, ctx));
// Test inactive open file context
ctx.setActiveStatusForTest(false);
Channel ch = Mockito.mock(Channel.class);
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
// Do_SYNC state will be updated to FINISHED after data sync
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
status = ctx.checkCommitInternal(10, ch, 1, attr, true);
assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
assertTrue(commits.size() == 0);
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
// Test request with zero commit offset
// There is one pending write [5,10]
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(0, commits.size());
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
// Empty pending writes
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
}
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
throws InterruptedException {
int waitedTime = 0;

View File

@ -624,6 +624,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7260. Change DFSOutputStream.MAX_PACKETS to be configurable. (szetszwo)
HDFS-7259. Unresponseive NFS mount point due to deferred COMMIT response.
(brandonli)
BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)