diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 285a703f687..8ebb5acd433 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -710,15 +710,28 @@ class OpenFileCtx { } return response; } - + + /** + * Check the commit status with the given offset + * @param commitOffset the offset to commit + * @param channel the channel to return response + * @param xid the xid of the commit request + * @param preOpAttr the preOp attribute + * @param fromRead whether the commit is triggered from read request + * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT, + * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR + */ public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, - Channel channel, int xid, Nfs3FileAttributes preOpAttr) { - // Keep stream active - updateLastAccessTime(); + Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { + if (!fromRead) { + Preconditions.checkState(channel != null && preOpAttr != null); + // Keep stream active + updateLastAccessTime(); + } Preconditions.checkState(commitOffset >= 0); COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid, - preOpAttr); + preOpAttr, fromRead); if (LOG.isDebugEnabled()) { LOG.debug("Got commit status: " + ret.name()); } @@ -745,14 +758,10 @@ class OpenFileCtx { } return ret; } - - /** - * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, - * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR - */ + @VisibleForTesting synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, - Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { if (!activeState) { if (pendingWrites.isEmpty()) { return COMMIT_STATUS.COMMIT_INACTIVE_CTX; @@ -775,9 +784,11 @@ class OpenFileCtx { if (commitOffset > 0) { if (commitOffset > flushed) { - CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, - preOpAttr); - pendingCommits.put(commitOffset, commitCtx); + if (!fromRead) { + CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, + preOpAttr); + pendingCommits.put(commitOffset, commitCtx); + } return COMMIT_STATUS.COMMIT_WAIT; } else { return COMMIT_STATUS.COMMIT_DO_SYNC; @@ -792,11 +803,13 @@ class OpenFileCtx { // do a sync here though the output stream might be closed. return COMMIT_STATUS.COMMIT_FINISHED; } else { - // Insert commit - long maxOffset = key.getKey().getMax() - 1; - Preconditions.checkState(maxOffset > 0); - CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); - pendingCommits.put(maxOffset, commitCtx); + if (!fromRead) { + // Insert commit + long maxOffset = key.getKey().getMax() - 1; + Preconditions.checkState(maxOffset > 0); + CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); + pendingCommits.put(maxOffset, commitCtx); + } return COMMIT_STATUS.COMMIT_WAIT; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 91ce8ef24dc..17670a9afe8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -628,6 +628,14 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } } + // In case there is buffered data for the same file, flush it. This can be + // optimized later by reading from the cache. + int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count); + if (ret != Nfs3Status.NFS3_OK) { + LOG.warn("commitBeforeRead didn't succeed with ret=" + ret + + ". Read may not get most recent data."); + } + try { int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count); byte[] readbuffer = new byte[buffSize]; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index aa6a8a3650b..01b3dac8648 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; -import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,11 +40,9 @@ import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; -import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; /** * Manage the writes and responds asynchronously. @@ -207,6 +204,51 @@ public class WriteManager { return; } + // Do a possible commit before read request in case there is buffered data + // inside DFSClient which has been flushed but not synced. + int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle, + long commitOffset) { + int status; + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); + + if (openFileCtx == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("No opened stream for fileId:" + fileHandle.getFileId() + + " commitOffset=" + commitOffset + + ". Return success in this case."); + } + status = Nfs3Status.NFS3_OK; + + } else { + COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset, + null, 0, null, true); + switch (ret) { + case COMMIT_FINISHED: + case COMMIT_INACTIVE_CTX: + status = Nfs3Status.NFS3_OK; + break; + case COMMIT_INACTIVE_WITH_PENDING_WRITE: + case COMMIT_ERROR: + status = Nfs3Status.NFS3ERR_IO; + break; + case COMMIT_WAIT: + /** + * This should happen rarely in some possible cases, such as read + * request arrives before DFSClient is able to quickly flush data to DN, + * or Prerequisite writes is not available. Won't wait since we don't + * want to block read. + */ + status = Nfs3Status.NFS3ERR_JUKEBOX; + break; + default: + LOG.error("Should not get commit return code:" + ret.name()); + throw new RuntimeException("Should not get commit return code:" + + ret.name()); + } + } + return status; + } + void handleCommit(DFSClient dfsClient, FileHandle fileHandle, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { int status; @@ -219,9 +261,8 @@ public class WriteManager { } else { COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset, - channel, xid, preOpAttr); + channel, xid, preOpAttr, false); switch (ret) { - case COMMIT_DO_SYNC: case COMMIT_FINISHED: case COMMIT_INACTIVE_CTX: status = Nfs3Status.NFS3_OK; @@ -234,6 +275,7 @@ public class WriteManager { // Do nothing. Commit is async now. return; default: + LOG.error("Should not get commit return code:" + ret.name()); throw new RuntimeException("Should not get commit return code:" + ret.name()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index d16b268382c..2ef614a1edf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,6 +27,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.ConcurrentNavigableMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -39,6 +41,7 @@ import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.Nfs3Constant; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; +import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.CREATE3Request; import org.apache.hadoop.nfs.nfs3.request.READ3Request; import org.apache.hadoop.nfs.nfs3.request.SetAttr3; @@ -47,6 +50,7 @@ import org.apache.hadoop.nfs.nfs3.response.CREATE3Response; import org.apache.hadoop.nfs.nfs3.response.READ3Response; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.SecurityHandler; +import org.jboss.netty.channel.Channel; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -139,32 +143,33 @@ public class TestWrites { // Test inactive open file context ctx.setActiveStatusForTest(false); - ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + Channel ch = Mockito.mock(Channel.class); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); - ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); // Test request with non zero commit offset ctx.setActiveStatusForTest(true); Mockito.when(fos.getPos()).thenReturn((long) 10); - COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr); + COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); // Do_SYNC state will be updated to FINISHED after data sync - ret = ctx.checkCommit(dfsClient, 5, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); - status = ctx.checkCommitInternal(10, null, 1, attr); + status = ctx.checkCommitInternal(10, ch, 1, attr, false); Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); - ret = ctx.checkCommit(dfsClient, 10, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); ConcurrentNavigableMap commits = ctx .getPendingCommitsForTest(); Assert.assertTrue(commits.size() == 0); - ret = ctx.checkCommit(dfsClient, 11, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); long key = commits.firstKey(); @@ -173,7 +178,7 @@ public class TestWrites { // Test request with zero commit offset commits.remove(new Long(11)); // There is one pending write [5,10] - ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); Assert.assertTrue(commits.size() == 1); key = commits.firstKey(); @@ -181,10 +186,79 @@ public class TestWrites { // Empty pending writes ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); - ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false); Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); } + @Test + // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which + // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, + // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. + public void testCheckCommitFromRead() throws IOException { + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, + new IdUserGroup()); + + FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath" + COMMIT_STATUS ret; + WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration()); + assertTrue(wm.addOpenFileStream(h, ctx)); + + // Test inactive open file context + ctx.setActiveStatusForTest(false); + Channel ch = Mockito.mock(Channel.class); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + + ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret); + assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0)); + + // Test request with non zero commit offset + ctx.setActiveStatusForTest(true); + Mockito.when(fos.getPos()).thenReturn((long) 10); + COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false); + assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status); + // Do_SYNC state will be updated to FINISHED after data sync + ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5)); + + status = ctx.checkCommitInternal(10, ch, 1, attr, true); + assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC); + ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10)); + + ConcurrentNavigableMap commits = ctx + .getPendingCommitsForTest(); + assertTrue(commits.size() == 0); + ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); + assertEquals(0, commits.size()); // commit triggered by read doesn't wait + assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 11)); + + // Test request with zero commit offset + // There is one pending write [5,10] + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret); + assertEquals(0, commits.size()); + assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 0)); + + // Empty pending writes + ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true); + assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret); + assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0)); + } + private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) throws InterruptedException { int waitedTime = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index caa74ef7a11..b579ef44c1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -337,6 +337,9 @@ Release 2.2.1 - UNRELEASED HDFS-5577. NFS user guide update (brandonli) + HDFS-5563. NFS gateway should commit the buffered data when read request comes + after write to the same file (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES