HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write. Contributed by Brandon Li

(cherry picked from commit 99d9d0c2d1)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java

(cherry picked from commit 0c3b7887e6)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
This commit is contained in:
Brandon Li 2014-11-11 13:03:31 -08:00 committed by cnauroth
parent 1535e7211e
commit d0ca88c5a1
2 changed files with 144 additions and 52 deletions

View File

@ -818,6 +818,42 @@ class OpenFileCtx {
return ret;
}
// Check if the to-commit range is sequential
@VisibleForTesting
synchronized boolean checkSequential(final long commitOffset,
final long nextOffset) {
Preconditions.checkState(commitOffset >= nextOffset, "commitOffset "
+ commitOffset + " less than nextOffset " + nextOffset);
long offset = nextOffset;
Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator();
while (it.hasNext()) {
OffsetRange range = it.next();
if (range.getMin() != offset) {
// got a hole
return false;
}
offset = range.getMax();
if (offset > commitOffset) {
return true;
}
}
// there is gap between the last pending write and commitOffset
return false;
}
private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset,
Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
if (!fromRead) {
// let client retry the same request, add pending commit to sync later
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
pendingCommits.put(commitOffset, commitCtx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_WAIT");
}
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
}
@VisibleForTesting
synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
@ -829,11 +865,6 @@ class OpenFileCtx {
return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
}
}
if (pendingWrites.isEmpty()) {
// Note that, there is no guarantee data is synced. Caller should still
// do a sync here though the output stream might be closed.
return COMMIT_STATUS.COMMIT_FINISHED;
}
long flushed = 0;
try {
@ -842,10 +873,33 @@ class OpenFileCtx {
LOG.error("Can't get flushed offset, error:" + e);
return COMMIT_STATUS.COMMIT_ERROR;
}
if (LOG.isDebugEnabled()) {
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset);
LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
+ "nextOffset=" + nextOffset.get());
}
if (pendingWrites.isEmpty()) {
if (aixCompatMode) {
// Note that, there is no guarantee data is synced. Caller should still
// do a sync here though the output stream might be closed.
return COMMIT_STATUS.COMMIT_FINISHED;
} else {
if (flushed < nextOffset.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("get commit while still writing to the requested offset,"
+ " with empty queue");
}
return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
preOpAttr);
} else {
return COMMIT_STATUS.COMMIT_FINISHED;
}
}
}
Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed
+ " is larger than nextOffset " + nextOffset.get());
// Handle large file upload
if (uploadLargeFile && !aixCompatMode) {
long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
@ -854,21 +908,20 @@ class OpenFileCtx {
if (co <= flushed) {
return COMMIT_STATUS.COMMIT_DO_SYNC;
} else if (co < nextOffset.get()) {
if (!fromRead) {
// let client retry the same request, add pending commit to sync later
CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
preOpAttr);
pendingCommits.put(commitOffset, commitCtx);
}
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_WAIT");
LOG.debug("get commit while still writing to the requested offset");
}
return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
// co >= nextOffset
if (checkSequential(co, nextOffset.get())) {
return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("return COMMIT_SPECIAL_SUCCESS");
}
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
}
return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
}
}

View File

@ -217,14 +217,14 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
Mockito.when(fos.getPos()).thenReturn((long) 8);
ctx.setNextOffsetForTest(10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
@ -232,35 +232,40 @@ public class TestWrites {
ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
// Test commit sequential writes
status = ctx.checkCommitInternal(10, ch, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
// Test commit non-sequential writes
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
Assert.assertTrue(commits.size() == 0);
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
Assert.assertTrue(commits.size() == 1);
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS);
Assert.assertTrue(commits.size() == 0);
Assert.assertTrue(commits.size() == 1);
// Test request with zero commit offset
commits.remove(new Long(11));
// There is one pending write [5,10]
commits.remove(new Long(10));
// There is one pending write [10,15]
ret = ctx.checkCommitInternal(0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC);
Mockito.when(fos.getPos()).thenReturn((long) 6);
ret = ctx.checkCommitInternal(8, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
Assert.assertTrue(commits.size() == 1);
long key = commits.firstKey();
Assert.assertTrue(key == 8);
ret = ctx.checkCommitInternal(9, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
Assert.assertTrue(commits.size() == 2);
// Empty pending writes. nextOffset=10, flushed pos=8
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
// Empty pending writes
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
ctx.setNextOffsetForTest((long) 8); // flushed pos = 8
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
}
@Test
@ -286,6 +291,7 @@ public class TestWrites {
ctx.getPendingWritesForTest().put(new OffsetRange(0, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
Mockito.when(fos.getPos()).thenReturn((long) 10);
ctx.setNextOffsetForTest((long)10);
status = ctx.checkCommitInternal(5, null, 1, attr, false);
Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
}
@ -317,7 +323,7 @@ public class TestWrites {
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
@ -326,6 +332,7 @@ public class TestWrites {
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
ctx.setNextOffsetForTest((long)10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
// Do_SYNC state will be updated to FINISHED after data sync
@ -355,7 +362,7 @@ public class TestWrites {
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
// Empty pending writes
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
@ -386,7 +393,7 @@ public class TestWrites {
assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
@ -394,7 +401,8 @@ public class TestWrites {
// Test request with non zero commit offset
ctx.setActiveStatusForTest(true);
Mockito.when(fos.getPos()).thenReturn((long) 10);
Mockito.when(fos.getPos()).thenReturn((long) 6);
ctx.setNextOffsetForTest((long)10);
COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
// Do_SYNC state will be updated to FINISHED after data sync
@ -402,32 +410,34 @@ public class TestWrites {
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
status = ctx.checkCommitInternal(10, ch, 1, attr, true);
assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
// Test request with sequential writes
status = ctx.checkCommitInternal(9, ch, 1, attr, true);
assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT);
ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9));
// Test request with non-sequential writes
ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
.getPendingCommitsForTest();
assertTrue(commits.size() == 0);
ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret);
assertEquals(0, commits.size()); // commit triggered by read doesn't wait
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11));
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16));
// Test request with zero commit offset
// There is one pending write [5,10]
// There is one pending write [10,15]
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
assertEquals(0, commits.size());
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
// Empty pending writes
ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15));
ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret);
assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0));
}
private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime)
@ -629,4 +639,33 @@ public class TestWrites {
}
}
}
@Test
public void testCheckSequential() throws IOException {
DFSClient dfsClient = Mockito.mock(DFSClient.class);
Nfs3FileAttributes attr = new Nfs3FileAttributes();
HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
Mockito.when(fos.getPos()).thenReturn((long) 0);
NfsConfiguration config = new NfsConfiguration();
config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false);
OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
new ShellBasedIdMapping(config), false, config);
ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ctx.getPendingWritesForTest().put(new OffsetRange(10, 15),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
ctx.getPendingWritesForTest().put(new OffsetRange(20, 25),
new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
assertTrue(!ctx.checkSequential(5, 4));
assertTrue(ctx.checkSequential(9, 5));
assertTrue(ctx.checkSequential(10, 5));
assertTrue(ctx.checkSequential(14, 5));
assertTrue(!ctx.checkSequential(15, 5));
assertTrue(!ctx.checkSequential(20, 5));
assertTrue(!ctx.checkSequential(25, 5));
assertTrue(!ctx.checkSequential(999, 5));
}
}