diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java index f02dcc0e77a..764524a8ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java @@ -70,4 +70,8 @@ public boolean equals(Object o) { } return false; } + + public String toString() { + return "[" + getMin() + ", " + getMax() + ")"; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index eb1482011d5..3df80a14253 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -492,11 +492,11 @@ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { int count = request.getCount(); long smallerCount = offset + count - cachedOffset; if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Got overwrite with appended data (%d-%d)," - + " current offset %d," + " drop the overlapped section (%d-%d)" - + " and append new data (%d-%d).", offset, (offset + count - 1), - cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset - + count - 1))); + LOG.debug(String.format("Got overwrite with appended data [%d-%d)," + + " current offset %d," + " drop the overlapped section [%d-%d)" + + " and append new data [%d-%d).", offset, (offset + count), + cachedOffset, offset, cachedOffset, cachedOffset, (offset + + count))); } ByteBuffer data = request.getData(); @@ -510,6 +510,22 @@ public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { request.setCount((int) smallerCount); } + @VisibleForTesting + private static void trimWriteRequest(WriteCtx writeCtx, + long currentOffset) { + long offset = writeCtx.getOffset(); + if (LOG.isDebugEnabled()) { + int count = writeCtx.getCount(); + LOG.debug(String.format("Trim request [%d-%d)," + + " current offset %d," + " drop the overlapped section [%d-%d)" + + " and write new data [%d-%d)", + offset, (offset + count), + currentOffset, offset, (currentOffset), + currentOffset, (offset + count))); + } + writeCtx.trimWrite((int)(currentOffset - offset)); + } + /** * Creates and adds a WriteCtx into the pendingWrites map. This is a * synchronized method to handle concurrent writes. @@ -529,23 +545,27 @@ private synchronized WriteCtx addWritesToCache(WRITE3Request request, + cachedOffset); } - // Handle a special case first + // Ignore write request with range below the current offset + if (offset + count <= cachedOffset) { + LOG.warn(String.format("Got overwrite [%d-%d) smaller than" + + " current offset %d," + " drop the request.", + offset, (offset + count), cachedOffset)); + return null; + } + + // Handle a special case: trim request whose offset is smaller than + // the current offset if ((offset < cachedOffset) && (offset + count > cachedOffset)) { // One Linux client behavior: after a file is closed and reopened to // write, the client sometimes combines previous written data(could still // be in kernel buffer) with newly appended data in one write. This is // usually the first write after file reopened. In this // case, we log the event and drop the overlapped section. - LOG.warn(String.format("Got overwrite with appended data (%d-%d)," - + " current offset %d," + " drop the overlapped section (%d-%d)" - + " and append new data (%d-%d).", offset, (offset + count - 1), - cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset - + count - 1))); - - if (!pendingWrites.isEmpty()) { - LOG.warn("There are other pending writes, fail this jumbo write"); - return null; - } + LOG.warn(String.format("Got overwrite with appended data [%d-%d)," + + " current offset %d," + " drop the overlapped section [%d-%d)" + + " and append new data [%d-%d).", offset, (offset + count), + cachedOffset, offset, cachedOffset, cachedOffset, (offset + + count))); LOG.warn("Modify this write to write only the appended data"); alterWriteRequest(request, cachedOffset); @@ -1011,45 +1031,56 @@ private synchronized WriteCtx offerNextToWrite() { this.asyncStatus = false; return null; } - - Entry lastEntry = pendingWrites.lastEntry(); - OffsetRange range = lastEntry.getKey(); - WriteCtx toWrite = lastEntry.getValue(); - - if (LOG.isTraceEnabled()) { - LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" - + nextOffset); + + Entry lastEntry = pendingWrites.lastEntry(); + OffsetRange range = lastEntry.getKey(); + WriteCtx toWrite = lastEntry.getValue(); + + if (LOG.isTraceEnabled()) { + LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" + + nextOffset); + } + + long offset = nextOffset.get(); + if (range.getMin() > offset) { + if (LOG.isDebugEnabled()) { + LOG.debug("The next sequential write has not arrived yet"); } - - long offset = nextOffset.get(); - if (range.getMin() > offset) { - if (LOG.isDebugEnabled()) { - LOG.debug("The next sequential write has not arrived yet"); - } - processCommits(nextOffset.get()); // handle race - this.asyncStatus = false; - } else if (range.getMin() < offset && range.getMax() > offset) { - // shouldn't happen since we do sync for overlapped concurrent writers - LOG.warn("Got an overlapping write (" + range.getMin() + ", " - + range.getMax() + "), nextOffset=" + offset - + ". Silently drop it now"); - pendingWrites.remove(range); - processCommits(nextOffset.get()); // handle race - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() - + ") from the list"); - } - // after writing, remove the WriteCtx from cache - pendingWrites.remove(range); - // update nextOffset - nextOffset.addAndGet(toWrite.getCount()); - if (LOG.isDebugEnabled()) { - LOG.debug("Change nextOffset to " + nextOffset.get()); - } - return toWrite; + processCommits(nextOffset.get()); // handle race + this.asyncStatus = false; + } else if (range.getMax() <= offset) { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove write " + range.toString() + + " which is already written from the list"); } - + // remove the WriteCtx from cache + pendingWrites.remove(range); + } else if (range.getMin() < offset && range.getMax() > offset) { + LOG.warn("Got an overlapping write " + range.toString() + + ", nextOffset=" + offset + + ". Remove and trim it"); + pendingWrites.remove(range); + trimWriteRequest(toWrite, offset); + // update nextOffset + nextOffset.addAndGet(toWrite.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("Change nextOffset (after trim) to " + nextOffset.get()); + } + return toWrite; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove write " + range.toString() + + " from the list"); + } + // after writing, remove the WriteCtx from cache + pendingWrites.remove(range); + // update nextOffset + nextOffset.addAndGet(toWrite.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("Change nextOffset to " + nextOffset.get()); + } + return toWrite; + } return null; } @@ -1283,8 +1314,8 @@ synchronized void cleanup() { WccAttr preOpAttr = latestAttr.getWccAttr(); while (!pendingWrites.isEmpty()) { OffsetRange key = pendingWrites.firstKey(); - LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax() - + "), nextOffset=" + nextOffset.get()); + LOG.info("Fail pending write: " + key.toString() + + ", nextOffset=" + nextOffset.get()); WriteCtx writeCtx = pendingWrites.remove(key); if (!writeCtx.getReplied()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index 82c826fda1e..8c2c7ee7b67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -51,8 +51,8 @@ public static enum DataState { } private final FileHandle handle; - private final long offset; - private final int count; + private long offset; + private int count; /** * Some clients can send a write that includes previously written data along @@ -61,13 +61,61 @@ public static enum DataState { * request before it was modified to write only the new data. * @see OpenFileCtx#addWritesToCache for more details */ - private final int originalCount; + private int originalCount; public static final int INVALID_ORIGINAL_COUNT = -1; + /** + * Overlapping Write Request Handling + * A write request can be in three states: + * s0. just created, with data != null + * s1. dumped as length "count", and data set to null + * s2. read back from dumped area as length "count" + * + * Write requests may have overlapping range, we detect this by comparing + * the data offset range of the request against the current offset of data + * already written to HDFS. There are two categories: + * + * 1. If the beginning part of a new write request data is already written + * due to an earlier request, we alter the new request by trimming this + * portion before the new request enters state s0, and the originalCount is + * remembered. + * + * 2. If the lower end of the write request range is beyond the current + * offset of data already written, we put the request into cache, and detect + * the overlapping when taking the request out from cache. + * + * For category 2, if we find out that a write request overlap with another, + * this write request is already in state s0, s1, or s3. We trim the + * beginning part of this request, by remembering the size of this portion + * as trimDelta. So the resulted offset of the write request is + * "offset + trimDelta" and the resulted size of the write request is + * "count - trimDelta". + * + * What important to notice is, if the request is in s1 when we do the + * trimming, the data dumped is of size "count", so when we load + * the data back from dumped area, we should set the position of the data + * buffer to trimDelta. + */ + private int trimDelta; + public int getOriginalCount() { return originalCount; } + public void trimWrite(int delta) { + Preconditions.checkState(delta < count); + if (LOG.isDebugEnabled()) { + LOG.debug("Trim write request by delta:" + delta + " " + toString()); + } + synchronized(this) { + trimDelta = delta; + if (originalCount == INVALID_ORIGINAL_COUNT) { + originalCount = count; + } + trimData(); + } + } + private final WriteStableHow stableHow; private volatile ByteBuffer data; @@ -139,11 +187,17 @@ FileHandle getHandle() { } long getOffset() { - return offset; + synchronized(this) { + // See comment "Overlapping Write Request Handling" above + return offset + trimDelta; + } } int getCount() { - return count; + synchronized(this) { + // See comment "Overlapping Write Request Handling" above + return count - trimDelta; + } } WriteStableHow getStableHow() { @@ -174,7 +228,22 @@ private void loadData() throws IOException { throw new IOException("Data count is " + count + ", but read back " + size + "bytes"); } - data = ByteBuffer.wrap(rawData); + synchronized(this) { + data = ByteBuffer.wrap(rawData); + trimData(); + } + } + + private void trimData() { + if (data != null && trimDelta > 0) { + // make it not dump-able since the data will be used + // shortly + dataState = DataState.NO_DUMP; + data.position(data.position() + trimDelta); + offset += trimDelta; + count -= trimDelta; + trimDelta = 0; + } } public void writeData(HdfsDataOutputStream fos) throws IOException { @@ -229,6 +298,7 @@ void setReplied(boolean replied) { this.offset = offset; this.count = count; this.originalCount = originalCount; + this.trimDelta = 0; this.stableHow = stableHow; this.data = data; this.channel = channel; diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java index 3c193aefffe..9c327c425e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -640,7 +640,97 @@ securityHandler, new InetSocketAddress("localhost", config.getInt( } } } - + + @Test + public void testOverlappingWrites() throws IOException, InterruptedException { + NfsConfiguration config = new NfsConfiguration(); + MiniDFSCluster cluster = null; + RpcProgramNfs3 nfsd; + final int bufSize = 32; + SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class); + Mockito.when(securityHandler.getUser()).thenReturn( + System.getProperty("user.name")); + String currentUser = System.getProperty("user.name"); + config.set( + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(currentUser), + "*"); + config.set( + DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(currentUser), + "*"); + ProxyUsers.refreshSuperUserGroupsConfiguration(config); + // Use emphral port in case tests are running in parallel + config.setInt("nfs3.mountd.port", 0); + config.setInt("nfs3.server.port", 0); + + try { + cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); + cluster.waitActive(); + + Nfs3 nfs3 = new Nfs3(config); + nfs3.startServiceInternal(false); + nfsd = (RpcProgramNfs3) nfs3.getRpcProgram(); + + DFSClient dfsClient = new DFSClient(DFSUtilClient.getNNAddress(config), + config); + HdfsFileStatus status = dfsClient.getFileInfo("/"); + FileHandle rootHandle = new FileHandle(status.getFileId()); + + CREATE3Request createReq = new CREATE3Request(rootHandle, + "overlapping-writes" + System.currentTimeMillis(), + Nfs3Constant.CREATE_UNCHECKED, new SetAttr3(), 0); + XDR createXdr = new XDR(); + createReq.serialize(createXdr); + CREATE3Response createRsp = nfsd.create(createXdr.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + FileHandle handle = createRsp.getObjHandle(); + byte[] buffer = new byte[bufSize]; + for (int i = 0; i < bufSize; i++) { + buffer[i] = (byte) i; + } + int[][] ranges = new int[][] { + {0, 10}, + {5, 7}, + {5, 5}, + {10, 6}, + {18, 6}, + {20, 6}, + {28, 4}, + {16, 2}, + {25, 4} + }; + for (int i = 0; i < ranges.length; i++) { + int x[] = ranges[i]; + byte[] tbuffer = new byte[x[1]]; + for (int j = 0; j < x[1]; j++) { + tbuffer[j] = buffer[x[0] + j]; + } + WRITE3Request writeReq = new WRITE3Request(handle, (long)x[0], x[1], + WriteStableHow.UNSTABLE, ByteBuffer.wrap(tbuffer)); + XDR writeXdr = new XDR(); + writeReq.serialize(writeXdr); + nfsd.write(writeXdr.asReadOnlyWrap(), null, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + } + + waitWrite(nfsd, handle, 60000); + READ3Request readReq = new READ3Request(handle, 0, bufSize); + XDR readXdr = new XDR(); + readReq.serialize(readXdr); + READ3Response readRsp = nfsd.read(readXdr.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", config.getInt( + NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY, + NfsConfigKeys.DFS_NFS_SERVER_PORT_DEFAULT))); + + assertTrue(Arrays.equals(buffer, readRsp.getData().array())); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + @Test public void testCheckSequential() throws IOException { DFSClient dfsClient = Mockito.mock(DFSClient.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e523b417dba..20428691bb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1100,6 +1100,9 @@ Release 2.8.0 - UNRELEASED HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin P. McCabe via Lei (Eddy) Xu) + HDFS-9092. Nfs silently drops overlapping write requests and causes data + copying to fail. (Yongjun Zhang) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES