diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java index 8486e2ae50e..24aa8f0b14e 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/Nfs3Interface.java @@ -97,6 +97,6 @@ public interface Nfs3Interface { InetAddress client); /** COMMIT: Commit cached data on a server to stable storage */ - public NFS3Response commit(XDR xdr, SecurityHandler securityHandler, - InetAddress client); + public NFS3Response commit(XDR xdr, Channel channel, int xid, + SecurityHandler securityHandler, InetAddress client); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java index 2cf6216ef13..a8def777317 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3Utils.java @@ -115,6 +115,14 @@ public class Nfs3Utils { ChannelBuffer outBuf = XDR.writeMessageTcp(out, true); channel.write(outBuf); } + + public static void writeChannelCommit(Channel channel, XDR out, int xid) { + if (RpcProgramNfs3.LOG.isDebugEnabled()) { + RpcProgramNfs3.LOG.debug("Commit done:" + xid); + } + ChannelBuffer outBuf = XDR.writeMessageTcp(out, true); + channel.write(outBuf); + } private static boolean isSet(int access, int bits) { return (access & bits) == bits; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 1aef083cc30..8306360e6b6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -48,6 +48,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; @@ -69,12 +70,18 @@ class OpenFileCtx { // Pending writes water mark for dump, 1MB private static long DUMP_WRITE_WATER_MARK = 1024 * 1024; - public final static int COMMIT_FINISHED = 0; - public final static int COMMIT_WAIT = 1; - public final static int COMMIT_INACTIVE_CTX = 2; - public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3; - public final static int COMMIT_ERROR = 4; + static enum COMMIT_STATUS { + COMMIT_FINISHED, + COMMIT_WAIT, + COMMIT_INACTIVE_CTX, + COMMIT_INACTIVE_WITH_PENDING_WRITE, + COMMIT_ERROR, + COMMIT_DO_SYNC; + } + private final DFSClient client; + private final IdUserGroup iug; + // The stream status. False means the stream is closed. private volatile boolean activeState; // The stream write-back status. True means one thread is doing write back. @@ -87,11 +94,58 @@ class OpenFileCtx { private AtomicLong nextOffset; private final HdfsDataOutputStream fos; - // TODO: make it mutable and update it after each writing back to HDFS - private final Nfs3FileAttributes latestAttr; + // It's updated after each sync to HDFS + private Nfs3FileAttributes latestAttr; private final ConcurrentNavigableMap pendingWrites; + private final ConcurrentNavigableMap pendingCommits; + + static class CommitCtx { + private final long offset; + private final Channel channel; + private final int xid; + private final Nfs3FileAttributes preOpAttr; + + // Remember time for debug purpose + private final long startTime; + + long getOffset() { + return offset; + } + + Channel getChannel() { + return channel; + } + + int getXid() { + return xid; + } + + Nfs3FileAttributes getPreOpAttr() { + return preOpAttr; + } + + long getStartTime() { + return startTime; + } + + CommitCtx(long offset, Channel channel, int xid, + Nfs3FileAttributes preOpAttr) { + this.offset = offset; + this.channel = channel; + this.xid = xid; + this.preOpAttr = preOpAttr; + this.startTime = System.currentTimeMillis(); + } + + @Override + public String toString() { + return String.format("offset: %d xid: %d startTime: %d", offset, xid, + startTime); + } + } + // The last write, commit request or write-back event. Updating time to keep // output steam alive. private long lastAccessTime; @@ -130,7 +184,7 @@ class OpenFileCtx { } OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, - String dumpFilePath) { + String dumpFilePath, DFSClient client, IdUserGroup iug) { this.fos = fos; this.latestAttr = latestAttr; // We use the ReverseComparatorOnMin as the comparator of the map. In this @@ -138,6 +192,9 @@ class OpenFileCtx { // retrieve the last element to write back to HDFS. pendingWrites = new ConcurrentSkipListMap( OffsetRange.ReverseComparatorOnMin); + + pendingCommits = new ConcurrentSkipListMap(); + updateLastAccessTime(); activeState = true; asyncStatus = false; @@ -151,6 +208,8 @@ class OpenFileCtx { nextOffset.set(latestAttr.getSize()); assert(nextOffset.get() == this.fos.getPos()); dumpThread = null; + this.client = client; + this.iug = iug; } public Nfs3FileAttributes getLatestAttr() { @@ -545,19 +604,23 @@ class OpenFileCtx { // of reordered writes and won't send more writes until it gets // responses of the previous batch. So here send response immediately // for unstable non-sequential write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - if (LOG.isDebugEnabled()) { - LOG.debug("UNSTABLE write request, send response for offset: " - + writeCtx.getOffset()); - } - WccData fileWcc = new WccData(preOpAttr, latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils - .writeChannel(channel, response.writeHeaderAndResponse(new XDR(), - xid, new VerifierNone()), xid); - writeCtx.setReplied(true); + if (stableHow != WriteStableHow.UNSTABLE) { + LOG.info("Have to change stable write to unstable write:" + + request.getStableHow()); + stableHow = WriteStableHow.UNSTABLE; } + + if (LOG.isDebugEnabled()) { + LOG.debug("UNSTABLE write request, send response for offset: " + + writeCtx.getOffset()); + } + WccData fileWcc = new WccData(preOpAttr, latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils + .writeChannel(channel, response.writeHeaderAndResponse(new XDR(), + xid, new VerifierNone()), xid); + writeCtx.setReplied(true); } } } @@ -635,53 +698,85 @@ class OpenFileCtx { return response; } + public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, + Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + // Keep stream active + updateLastAccessTime(); + Preconditions.checkState(commitOffset >= 0); + + COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid, + preOpAttr); + if (LOG.isDebugEnabled()) { + LOG.debug("Got commit status: " + ret.name()); + } + // Do the sync outside the lock + if (ret == COMMIT_STATUS.COMMIT_DO_SYNC) { + try { + // Sync file data and length + fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + // Nothing to do for metadata since attr related change is pass-through + } catch (ClosedChannelException cce) { + if (pendingWrites.isEmpty()) { + ret = COMMIT_STATUS.COMMIT_FINISHED; + } else { + ret = COMMIT_STATUS.COMMIT_ERROR; + } + } catch (IOException e) { + LOG.error("Got stream error during data sync:" + e); + // Do nothing. Stream will be closed eventually by StreamMonitor. + // status = Nfs3Status.NFS3ERR_IO; + ret = COMMIT_STATUS.COMMIT_ERROR; + } + } + return ret; + } + /** * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, - * COMMIT_INACTIVE_CTX, COMMIT_ERROR + * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR */ - public int checkCommit(long commitOffset) { - return activeState ? checkCommitInternal(commitOffset) - : COMMIT_INACTIVE_CTX; - } - - private int checkCommitInternal(long commitOffset) { - if (commitOffset == 0) { - // Commit whole file - commitOffset = nextOffset.get(); + private synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, + Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + if (!activeState) { + if (pendingWrites.isEmpty()) { + return COMMIT_STATUS.COMMIT_INACTIVE_CTX; + } else { + // TODO: return success if already committed + return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE; + } } long flushed = getFlushedOffset(); if (LOG.isDebugEnabled()) { LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); } - if (flushed < commitOffset) { - // Keep stream active - updateLastAccessTime(); - return COMMIT_WAIT; - } - int ret = COMMIT_WAIT; - try { - // Sync file data and length - fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - // Nothing to do for metadata since attr related change is pass-through - ret = COMMIT_FINISHED; - } catch (ClosedChannelException cce) { - ret = COMMIT_INACTIVE_CTX; - if (pendingWrites.isEmpty()) { - ret = COMMIT_INACTIVE_CTX; + if (commitOffset > 0) { + if (commitOffset > flushed) { + CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, + preOpAttr); + pendingCommits.put(commitOffset, commitCtx); + return COMMIT_STATUS.COMMIT_WAIT; } else { - ret = COMMIT_INACTIVE_WITH_PENDING_WRITE; + return COMMIT_STATUS.COMMIT_DO_SYNC; } - } catch (IOException e) { - LOG.error("Got stream error during data sync:" + e); - // Do nothing. Stream will be closed eventually by StreamMonitor. - ret = COMMIT_ERROR; } - // Keep stream active - updateLastAccessTime(); - return ret; + Entry key = pendingWrites.firstEntry(); + + // Commit whole file, commitOffset == 0 + if (pendingWrites.isEmpty()) { + // Note that, there is no guarantee data is synced. TODO: We could still + // do a sync here though the output stream might be closed. + return COMMIT_STATUS.COMMIT_FINISHED; + } else { + // Insert commit + long maxOffset = key.getKey().getMax() - 1; + Preconditions.checkState(maxOffset > 0); + CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); + pendingCommits.put(maxOffset, commitCtx); + return COMMIT_STATUS.COMMIT_WAIT; + } } private void addWrite(WriteCtx writeCtx) { @@ -726,8 +821,18 @@ class OpenFileCtx { LOG.debug("The asyn write task has no pending writes, fileId: " + latestAttr.getFileId()); } + // process pending commit again to handle this race: a commit is added + // to pendingCommits map just after the last doSingleWrite returns. + // There is no pending write and the commit should be handled by the + // last doSingleWrite. Due to the race, the commit is left along and + // can't be processed until cleanup. Therefore, we should do another + // processCommits to fix the race issue. + processCommits(nextOffset.get()); // nextOffset has same value as + // flushedOffset this.asyncStatus = false; - } else { + return null; + } + Entry lastEntry = pendingWrites.lastEntry(); OffsetRange range = lastEntry.getKey(); WriteCtx toWrite = lastEntry.getValue(); @@ -742,6 +847,7 @@ class OpenFileCtx { if (LOG.isDebugEnabled()) { LOG.debug("The next sequencial write has not arrived yet"); } + processCommits(nextOffset.get()); // handle race this.asyncStatus = false; } else if (range.getMin() < offset && range.getMax() > offset) { // shouldn't happen since we do sync for overlapped concurrent writers @@ -749,6 +855,7 @@ class OpenFileCtx { + range.getMax() + "), nextOffset=" + offset + ". Silently drop it now"); pendingWrites.remove(range); + processCommits(nextOffset.get()); // handle race } else { if (LOG.isDebugEnabled()) { LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() @@ -763,7 +870,7 @@ class OpenFileCtx { } return toWrite; } - } + return null; } @@ -785,7 +892,7 @@ class OpenFileCtx { if (!activeState && LOG.isDebugEnabled()) { LOG.debug("The openFileCtx is not active anymore, fileId: " - + +latestAttr.getFileId()); + + latestAttr.getFileId()); } } finally { // make sure we reset asyncStatus to false @@ -793,6 +900,69 @@ class OpenFileCtx { } } + private void processCommits(long offset) { + Preconditions.checkState(offset > 0); + long flushedOffset = getFlushedOffset(); + Entry entry = pendingCommits.firstEntry(); + + if (entry == null || entry.getValue().offset > flushedOffset) { + return; + } + + // Now do sync for the ready commits + int status = Nfs3Status.NFS3ERR_IO; + try { + // Sync file data and length + fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); + status = Nfs3Status.NFS3_OK; + } catch (ClosedChannelException cce) { + if (!pendingWrites.isEmpty()) { + LOG.error("Can't sync for fileId: " + latestAttr.getFileId() + + ". Channel closed with writes pending"); + } + status = Nfs3Status.NFS3ERR_IO; + } catch (IOException e) { + LOG.error("Got stream error during data sync:" + e); + // Do nothing. Stream will be closed eventually by StreamMonitor. + status = Nfs3Status.NFS3ERR_IO; + } + + // Update latestAttr + try { + latestAttr = Nfs3Utils.getFileAttr(client, + Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug); + } catch (IOException e) { + LOG.error("Can't get new file attr for fileId: " + latestAttr.getFileId()); + status = Nfs3Status.NFS3ERR_IO; + } + + if (latestAttr.getSize() != offset) { + LOG.error("After sync, the expect file size: " + offset + + ", however actual file size is: " + latestAttr.getSize()); + status = Nfs3Status.NFS3ERR_IO; + } + WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr); + + // Send response for the ready commits + while (entry != null && entry.getValue().offset <= flushedOffset) { + pendingCommits.remove(entry.getKey()); + CommitCtx commit = entry.getValue(); + + COMMIT3Response response = new COMMIT3Response(status, wccData, + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannelCommit(commit.getChannel(), response + .writeHeaderAndResponse(new XDR(), commit.getXid(), + new VerifierNone()), commit.getXid()); + + if (LOG.isDebugEnabled()) { + LOG.debug("FileId: " + latestAttr.getFileid() + " Service time:" + + (System.currentTimeMillis() - commit.getStartTime()) + + "ms. Sent response for commit:" + commit); + } + entry = pendingCommits.firstEntry(); + } + } + private void doSingleWrite(final WriteCtx writeCtx) { Channel channel = writeCtx.getChannel(); int xid = writeCtx.getXid(); @@ -848,6 +1018,10 @@ class OpenFileCtx { Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( new XDR(), xid, new VerifierNone()), xid); } + + // Handle the waiting commits without holding any lock + processCommits(writeCtx.getOffset() + writeCtx.getCount()); + } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + count, e); @@ -929,4 +1103,29 @@ class OpenFileCtx { } } } + + @VisibleForTesting + ConcurrentNavigableMap getPendingWritesForTest(){ + return pendingWrites; + } + + @VisibleForTesting + ConcurrentNavigableMap getPendingCommitsForTest(){ + return pendingCommits; + } + + @VisibleForTesting + long getNextOffsetForTest() { + return nextOffset.get(); + } + + @VisibleForTesting + void setNextOffsetForTest(long newValue) { + nextOffset.set(newValue); + } + + @VisibleForTesting + void setActiveStatusForTest(boolean activeState) { + this.activeState = activeState; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index a1f5c10406a..5f3a079e6cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -840,7 +840,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { // Add open stream OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir - + "/" + postOpObjAttr.getFileId()); + + "/" + postOpObjAttr.getFileId(), dfsClient, iug); fileHandle = new FileHandle(postOpObjAttr.getFileId()); writeManager.addOpenFileStream(fileHandle, openFileCtx); if (LOG.isDebugEnabled()) { @@ -1706,8 +1706,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } @Override - public COMMIT3Response commit(XDR xdr, SecurityHandler securityHandler, - InetAddress client) { + public COMMIT3Response commit(XDR xdr, Channel channel, int xid, + SecurityHandler securityHandler, InetAddress client) { COMMIT3Response response = new COMMIT3Response(Nfs3Status.NFS3_OK); DFSClient dfsClient = clientCache.get(securityHandler.getUser()); if (dfsClient == null) { @@ -1748,18 +1748,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { long commitOffset = (request.getCount() == 0) ? 0 : (request.getOffset() + request.getCount()); - int status; - if (writeManager.handleCommit(handle, commitOffset)) { - status = Nfs3Status.NFS3_OK; - } else { - status = Nfs3Status.NFS3ERR_IO; - } - Nfs3FileAttributes postOpAttr = writeManager.getFileAttr(dfsClient, - handle, iug); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr); - return new COMMIT3Response(status, fileWcc, - Nfs3Constant.WRITE_COMMIT_VERF); - + // Insert commit as an async request + writeManager.handleCommit(dfsClient, handle, commitOffset, channel, xid, + preOpAttr); + return null; } catch (IOException e) { LOG.warn("Exception ", e); Nfs3FileAttributes postOpAttr = null; @@ -1892,7 +1884,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { } else if (nfsproc3 == NFSPROC3.PATHCONF) { response = pathconf(xdr, securityHandler, client); } else if (nfsproc3 == NFSPROC3.COMMIT) { - response = commit(xdr, securityHandler, client); + response = commit(xdr, channel, xid, securityHandler, client); } else { // Invalid procedure RpcAcceptedReply.getInstance(xid, diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 1471ddfc1f1..c1969894e48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; @@ -36,6 +37,7 @@ import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response; import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; @@ -166,7 +168,7 @@ public class WriteManager { String writeDumpDir = config.get(Nfs3Constant.FILE_DUMP_DIR_KEY, Nfs3Constant.FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" - + fileHandle.getFileId()); + + fileHandle.getFileId(), dfsClient, iug); addOpenFileStream(fileHandle, openFileCtx); if (LOG.isDebugEnabled()) { LOG.debug("opened stream for file:" + fileHandle.getFileId()); @@ -176,71 +178,53 @@ public class WriteManager { // Add write into the async job queue openFileCtx.receivedNewWrite(dfsClient, request, channel, xid, asyncDataService, iug); - // Block stable write - if (request.getStableHow() != WriteStableHow.UNSTABLE) { - if (handleCommit(fileHandle, offset + count)) { - Nfs3FileAttributes postOpAttr = getFileAttr(dfsClient, handle, iug); - WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), - postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - } else { - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - } - } - return; } - boolean handleCommit(FileHandle fileHandle, long commitOffset) { + void handleCommit(DFSClient dfsClient, FileHandle fileHandle, + long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { + int status; OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return true; - } - long timeout = 30 * 1000; // 30 seconds - long startCommit = System.currentTimeMillis(); - while (true) { - int ret = openFileCtx.checkCommit(commitOffset); - if (ret == OpenFileCtx.COMMIT_FINISHED) { - // Committed - return true; - } else if (ret == OpenFileCtx.COMMIT_INACTIVE_CTX) { - LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return true; - } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) { - LOG.info("Inactive stream with pending writes, fileId=" - + fileHandle.getFileId() + " commitOffset=" + commitOffset); - return false; - } - assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); - if (ret == OpenFileCtx.COMMIT_ERROR) { - return false; - } + + " commitOffset=" + commitOffset + ". Return success in this case."); + status = Nfs3Status.NFS3_OK; - if (LOG.isDebugEnabled()) { - LOG.debug("Not committed yet, wait., fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); + } else { + COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset, + channel, xid, preOpAttr); + switch (ret) { + case COMMIT_FINISHED: + case COMMIT_INACTIVE_CTX: + status = Nfs3Status.NFS3_OK; + break; + case COMMIT_INACTIVE_WITH_PENDING_WRITE: + case COMMIT_ERROR: + status = Nfs3Status.NFS3ERR_IO; + break; + case COMMIT_WAIT: + // Do nothing. Commit is async now. + return; + default: + throw new RuntimeException("Wring error code:" + ret.name()); } - if (System.currentTimeMillis() - startCommit > timeout) { - // Commit took too long, return error - return false; - } - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.info("Commit is interrupted, fileId=" + fileHandle.getFileId() - + " commitOffset=" + commitOffset); - return false; - } - }// while + } + + // Send out the response + Nfs3FileAttributes postOpAttr = null; + try { + String fileIdPath = Nfs3Utils.getFileIdPath(preOpAttr.getFileid()); + postOpAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (IOException e1) { + LOG.info("Can't get postOpAttr for fileId: " + preOpAttr.getFileid()); + } + WccData fileWcc = new WccData(Nfs3Utils.getWccAttr(preOpAttr), postOpAttr); + COMMIT3Response response = new COMMIT3Response(status, fileWcc, + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannelCommit(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index d24e5d1fa8a..5119474c277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -19,13 +19,23 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentNavigableMap; import junit.framework.Assert; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; +import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.jboss.netty.channel.Channel; import org.junit.Test; +import org.mockito.Mockito; public class TestWrites { @Test @@ -97,4 +107,61 @@ public class TestWrites { Assert.assertTrue(limit - position == 1); Assert.assertTrue(appendedData.get(position) == (byte) 19); } + + @Test + // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, which + // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX, + // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC. + public void testCheckCommit() throws IOException { + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient, + new IdUserGroup()); + + COMMIT_STATUS ret; + + // Test inactive open file context + ctx.setActiveStatusForTest(false); + ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX); + + ctx.getPendingWritesForTest().put(new OffsetRange(5, 10), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE); + + // Test request with non zero commit offset + ctx.setActiveStatusForTest(true); + Mockito.when(fos.getPos()).thenReturn((long) 10); + ret = ctx.checkCommit(dfsClient, 5, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); + ret = ctx.checkCommit(dfsClient, 10, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_DO_SYNC); + + ConcurrentNavigableMap commits = ctx + .getPendingCommitsForTest(); + Assert.assertTrue(commits.size() == 0); + ret = ctx.checkCommit(dfsClient, 11, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); + Assert.assertTrue(commits.size() == 1); + long key = commits.firstKey(); + Assert.assertTrue(key == 11); + + // Test request with zero commit offset + commits.remove(new Long(11)); + // There is one pending write [5,10] + ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT); + Assert.assertTrue(commits.size() == 1); + key = commits.firstKey(); + Assert.assertTrue(key == 9); + + // Empty pending writes + ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10)); + ret = ctx.checkCommit(dfsClient, 0, null, 1, attr); + Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7174e3af8a3..9b5680f458b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -354,6 +354,8 @@ Release 2.2.1 - UNRELEASED HDFS-5316. Namenode ignores the default https port (Haohui Mai via brandonli) + HDFS-5281. COMMIT request should not block. (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES