diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index b04c21cfc80..b31baf58f5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -816,6 +816,42 @@ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, return ret; } + // Check if the to-commit range is sequential + @VisibleForTesting + synchronized boolean checkSequential(final long commitOffset, + final long nextOffset) { + Preconditions.checkState(commitOffset >= nextOffset, "commitOffset " + + commitOffset + " less than nextOffset " + nextOffset); + long offset = nextOffset; + Iterator it = pendingWrites.descendingKeySet().iterator(); + while (it.hasNext()) { + OffsetRange range = it.next(); + if (range.getMin() != offset) { + // got a hole + return false; + } + offset = range.getMax(); + if (offset > commitOffset) { + return true; + } + } + // there is gap between the last pending write and commitOffset + return false; + } + + private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset, + Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + if (!fromRead) { + // let client retry the same request, add pending commit to sync later + CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr); + pendingCommits.put(commitOffset, commitCtx); + } + if (LOG.isDebugEnabled()) { + LOG.debug("return COMMIT_SPECIAL_WAIT"); + } + return COMMIT_STATUS.COMMIT_SPECIAL_WAIT; + } + @VisibleForTesting synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { @@ -827,17 +863,34 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE; } } - if (pendingWrites.isEmpty()) { - // Note that, there is no guarantee data is synced. Caller should still - // do a sync here though the output stream might be closed. - return COMMIT_STATUS.COMMIT_FINISHED; - } long flushed = getFlushedOffset(); if (LOG.isDebugEnabled()) { - LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); + LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset + + "nextOffset=" + nextOffset.get()); } - + + if (pendingWrites.isEmpty()) { + if (aixCompatMode) { + // Note that, there is no guarantee data is synced. Caller should still + // do a sync here though the output stream might be closed. + return COMMIT_STATUS.COMMIT_FINISHED; + } else { + if (flushed < nextOffset.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("get commit while still writing to the requested offset," + + " with empty queue"); + } + return handleSpecialWait(fromRead, nextOffset.get(), channel, xid, + preOpAttr); + } else { + return COMMIT_STATUS.COMMIT_FINISHED; + } + } + } + + Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed + + " is larger than nextOffset " + nextOffset.get()); // Handle large file upload if (uploadLargeFile && !aixCompatMode) { long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry() @@ -846,21 +899,20 @@ synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, if (co <= flushed) { return COMMIT_STATUS.COMMIT_DO_SYNC; } else if (co < nextOffset.get()) { - if (!fromRead) { - // let client retry the same request, add pending commit to sync later - CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, - preOpAttr); - pendingCommits.put(commitOffset, commitCtx); - } if (LOG.isDebugEnabled()) { - LOG.debug("return COMMIT_SPECIAL_WAIT"); + LOG.debug("get commit while still writing to the requested offset"); } - return COMMIT_STATUS.COMMIT_SPECIAL_WAIT; + return handleSpecialWait(fromRead, co, channel, xid, preOpAttr); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("return COMMIT_SPECIAL_SUCCESS"); + // co >= nextOffset + if (checkSequential(co, nextOffset.get())) { + return handleSpecialWait(fromRead, co, channel, xid, preOpAttr); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("return COMMIT_SPECIAL_SUCCESS"); + } + return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS; } - return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 96e63937e28..56603b90ecd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -217,14 +217,14 @@ public void testCheckCommitLargeFileUpload() throws IOException { ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); - ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); - Mockito.when(fos.getPos()).thenReturn((long) 10); + Mockito.when(fos.getPos()).thenReturn((long) 8); ctx.setNextOffsetForTest(10); COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); @@ -232,35 +232,40 @@ public void testCheckCommitLargeFileUpload() throws IOException { ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + // Test commit sequential writes status = ctx.checkCommitInternal(10, ch, 1, attr, false); - Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); + Assert.assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); - Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); + // Test commit non-sequential writes ConcurrentNavigableMap commits = ctx .getPendingCommitsForTest(); - Assert.assertTrue(commits.size() == 0); - ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); + Assert.assertTrue(commits.size() == 1); + ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS); - Assert.assertTrue(commits.size() == 0); + Assert.assertTrue(commits.size() == 1); // Test request with zero commit offset - commits.remove(new Long(11)); - // There is one pending write [5,10] + commits.remove(new Long(10)); + // There is one pending write [10,15] ret = ctx.checkCommitInternal(0, ch, 1, attr, false); - Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); - - Mockito.when(fos.getPos()).thenReturn((long) 6); - ret = ctx.checkCommitInternal(8, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); - Assert.assertTrue(commits.size() == 1); - long key = commits.firstKey(); - Assert.assertTrue(key == 8); + + ret = ctx.checkCommitInternal(9, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); + Assert.assertTrue(commits.size() == 2); + // Empty pending writes. nextOffset=10, flushed pos=8 + ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); + // Empty pending writes - ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ctx.setNextOffsetForTest((long) 8); // flushed pos = 8 ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + } @Test @@ -286,6 +291,7 @@ public void testCheckCommitAixCompatMode() throws IOException { ctx.getPendingWritesForTest().put(new OffsetRange(0, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); Mockito.when(fos.getPos()).thenReturn((long) 10); + ctx.setNextOffsetForTest((long)10); status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); } @@ -317,7 +323,7 @@ public void testCheckCommitFromRead() throws IOException { assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); - ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); @@ -326,6 +332,7 @@ public void testCheckCommitFromRead() throws IOException { // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); + ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync @@ -355,7 +362,7 @@ public void testCheckCommitFromRead() throws IOException { assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes - ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); @@ -386,7 +393,7 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException { assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); - ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); @@ -394,7 +401,8 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException { // Test request with non zero commit offset ctx.setActiveStatusForTest(true); - Mockito.when(fos.getPos()).thenReturn((long) 10); + Mockito.when(fos.getPos()).thenReturn((long) 6); + ctx.setNextOffsetForTest((long)10); COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); // Do_SYNC state will be updated to FINISHED after data sync @@ -402,32 +410,34 @@ public void testCheckCommitFromReadLargeFileUpload() throws IOException { assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); - status = ctx.checkCommitInternal(10, ch, 1, attr, true); - assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); - ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); - assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); - assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); + // Test request with sequential writes + status = ctx.checkCommitInternal(9, ch, 1, attr, true); + assertTrue(status == COMMIT_STATUS.COMMIT_SPECIAL_WAIT); + ret = ctx.checkCommit(dfsClient, 9, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); + assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 9)); + // Test request with non-sequential writes ConcurrentNavigableMap commits = ctx .getPendingCommitsForTest(); assertTrue(commits.size() == 0); - ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); + ret = ctx.checkCommit(dfsClient, 16, ch, 1, attr, true); assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS, ret); assertEquals(0, commits.size()); // commit triggered by read doesn't wait - assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 11)); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 16)); // Test request with zero commit offset - // There is one pending write [5,10] + // There is one pending write [10,15] ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); - assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); assertEquals(0, commits.size()); - assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); // Empty pending writes - ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ctx.getPendingWritesForTest().remove(new OffsetRange(10, 15)); ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); - assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); - assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + assertEquals(COMMIT_STATUS.COMMIT_SPECIAL_WAIT, ret); + assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); } private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) @@ -629,4 +639,33 @@ securityHandler, new InetSocketAddress("localhost", config.getInt( } } } + + @Test + public void testCheckSequential() throws IOException { + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + NfsConfiguration config = new NfsConfiguration(); + + config.setBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, false); + OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, + new ShellBasedIdMapping(config), false, config); + + ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ctx.getPendingWritesForTest().put(new OffsetRange(10, 15), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ctx.getPendingWritesForTest().put(new OffsetRange(20, 25), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + + assertTrue(!ctx.checkSequential(5, 4)); + assertTrue(ctx.checkSequential(9, 5)); + assertTrue(ctx.checkSequential(10, 5)); + assertTrue(ctx.checkSequential(14, 5)); + assertTrue(!ctx.checkSequential(15, 5)); + assertTrue(!ctx.checkSequential(20, 5)); + assertTrue(!ctx.checkSequential(25, 5)); + assertTrue(!ctx.checkSequential(999, 5)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8922a0f4b0b..b1837313467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -410,6 +410,9 @@ Release 2.7.0 - UNRELEASED HDFS-7366. BlockInfo should take replication as an short in the constructor. (Li Lu via wheat9) + HDFS-7387. NFS may only do partial commit due to a race between COMMIT and write + (brandonli) + Release 2.6.0 - 2014-11-15 INCOMPATIBLE CHANGES