diff --git a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java index b6b8fd0cbd4..b04e7fca74d 100644 --- a/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java +++ b/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/nfs/nfs3/request/WRITE3Request.java @@ -28,8 +28,8 @@ import org.apache.hadoop.oncrpc.XDR; * WRITE3 Request */ public class WRITE3Request extends RequestWithHandle { - private final long offset; - private final int count; + private long offset; + private int count; private final WriteStableHow stableHow; private final ByteBuffer data; @@ -54,10 +54,18 @@ public class WRITE3Request extends RequestWithHandle { return this.offset; } + public void setOffset(long offset) { + this.offset = offset; + } + public int getCount() { return this.count; } + public void setCount(int count) { + this.count = count; + } + public WriteStableHow getStableHow() { return this.stableHow; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index ff2b33b1bf4..1aef083cc30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.security.InvalidParameterException; import java.util.EnumSet; @@ -55,6 +56,7 @@ import org.apache.hadoop.oncrpc.security.VerifierNone; import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -360,6 +362,30 @@ class OpenFileCtx { } } + @VisibleForTesting + public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { + long offset = request.getOffset(); + int count = request.getCount(); + long smallerCount = offset + count - cachedOffset; + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + } + + ByteBuffer data = request.getData(); + Preconditions.checkState(data.position() == 0, + "The write request data has non-zero position"); + data.position((int) (cachedOffset - offset)); + Preconditions.checkState(data.limit() - data.position() == smallerCount, + "The write request buffer has wrong limit/position regarding count"); + + request.setOffset(cachedOffset); + request.setCount((int) smallerCount); + } + /** * Creates and adds a WriteCtx into the pendingWrites map. This is a * synchronized method to handle concurrent writes. @@ -372,12 +398,40 @@ class OpenFileCtx { long offset = request.getOffset(); int count = request.getCount(); long cachedOffset = nextOffset.get(); - + int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; + if (LOG.isDebugEnabled()) { LOG.debug("requesed offset=" + offset + " and current offset=" + cachedOffset); } + // Handle a special case first + if ((offset < cachedOffset) && (offset + count > cachedOffset)) { + // One Linux client behavior: after a file is closed and reopened to + // write, the client sometimes combines previous written data(could still + // be in kernel buffer) with newly appended data in one write. This is + // usually the first write after file reopened. In this + // case, we log the event and drop the overlapped section. + LOG.warn(String.format("Got overwrite with appended data (%d-%d)," + + " current offset %d," + " drop the overlapped section (%d-%d)" + + " and append new data (%d-%d).", offset, (offset + count - 1), + cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset + + count - 1))); + + if (!pendingWrites.isEmpty()) { + LOG.warn("There are other pending writes, fail this jumbo write"); + return null; + } + + LOG.warn("Modify this write to write only the appended data"); + alterWriteRequest(request, cachedOffset); + + // Update local variable + originalCount = count; + offset = request.getOffset(); + count = request.getCount(); + } + // Fail non-append call if (offset < cachedOffset) { LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," @@ -387,8 +441,9 @@ class OpenFileCtx { DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP; WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, dataState); + request.getOffset(), request.getCount(), originalCount, + request.getStableHow(), request.getData(), channel, xid, false, + dataState); if (LOG.isDebugEnabled()) { LOG.debug("Add new write to the list with nextOffset " + cachedOffset + " and requesed offset=" + offset); @@ -419,8 +474,7 @@ class OpenFileCtx { WRITE3Response response; long cachedOffset = nextOffset.get(); if (offset + count > cachedOffset) { - LOG.warn("Haven't noticed any partial overwrite for a sequential file" - + " write requests. Treat it as a real random write, no support."); + LOG.warn("Treat this jumbo write as a real random write, no support."); response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF); } else { @@ -633,6 +687,7 @@ class OpenFileCtx { private void addWrite(WriteCtx writeCtx) { long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); + // For the offset range (min, max), min is inclusive, and max is exclusive pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); } @@ -745,19 +800,7 @@ class OpenFileCtx { long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); WriteStableHow stableHow = writeCtx.getStableHow(); - byte[] data = null; - try { - data = writeCtx.getData(); - } catch (Exception e1) { - LOG.error("Failed to get request data offset:" + offset + " count:" - + count + " error:" + e1); - // Cleanup everything - cleanup(); - return; - } - Preconditions.checkState(data.length == count); - FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " @@ -766,8 +809,8 @@ class OpenFileCtx { try { // The write is not protected by lock. asyncState is used to make sure - // there is one thread doing write back at any time - fos.write(data, 0, count); + // there is one thread doing write back at any time + writeCtx.writeData(fos); long flushedOffset = getFlushedOffset(); if (flushedOffset != (offset + count)) { @@ -776,10 +819,6 @@ class OpenFileCtx { + (offset + count)); } - if (LOG.isDebugEnabled()) { - LOG.debug("After writing " + handle.getFileId() + " at offset " - + offset + ", update the memory count."); - } // Reduce memory occupation size if request was allowed dumped if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { @@ -787,6 +826,11 @@ class OpenFileCtx { if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); updateNonSequentialWriteInMemory(-count); + if (LOG.isDebugEnabled()) { + LOG.debug("After writing " + handle.getFileId() + " at offset " + + offset + ", updated the memory count, new value:" + + nonSequentialWriteInMemory.get()); + } } } } @@ -794,6 +838,11 @@ class OpenFileCtx { if (!writeCtx.getReplied()) { WccAttr preOpAttr = latestAttr.getWccAttr(); WccData fileWcc = new WccData(preOpAttr, latestAttr); + if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) { + LOG.warn("Return original count:" + writeCtx.getOriginalCount() + + " instead of real data count:" + count); + count = writeCtx.getOriginalCount(); + } WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( @@ -801,7 +850,7 @@ class OpenFileCtx { } } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " - + offset + " and length " + data.length, e); + + offset + " and length " + count, e); if (!writeCtx.getReplied()) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index f1af6520940..05e0fb7c2c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -20,13 +20,16 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.jboss.netty.channel.Channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -50,8 +53,17 @@ class WriteCtx { private final FileHandle handle; private final long offset; private final int count; + + //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache() + private final int originalCount; + public static final int INVALID_ORIGINAL_COUNT = -1; + + public int getOriginalCount() { + return originalCount; + } + private final WriteStableHow stableHow; - private volatile byte[] data; + private volatile ByteBuffer data; private final Channel channel; private final int xid; @@ -89,9 +101,13 @@ class WriteCtx { } return 0; } + + // Resized write should not allow dump + Preconditions.checkState(originalCount == INVALID_ORIGINAL_COUNT); + this.raf = raf; dumpFileOffset = dumpOut.getChannel().position(); - dumpOut.write(data, 0, count); + dumpOut.write(data.array(), 0, count); if (LOG.isDebugEnabled()) { LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); } @@ -127,7 +143,8 @@ class WriteCtx { return stableHow; } - byte[] getData() throws IOException { + @VisibleForTesting + ByteBuffer getData() throws IOException { if (dataState != DataState.DUMPED) { synchronized (this) { if (dataState != DataState.DUMPED) { @@ -143,15 +160,45 @@ class WriteCtx { private void loadData() throws IOException { Preconditions.checkState(data == null); - data = new byte[count]; + byte[] rawData = new byte[count]; raf.seek(dumpFileOffset); - int size = raf.read(data, 0, count); + int size = raf.read(rawData, 0, count); if (size != count) { throw new IOException("Data count is " + count + ", but read back " + size + "bytes"); } + data = ByteBuffer.wrap(rawData); } + public void writeData(HdfsDataOutputStream fos) throws IOException { + Preconditions.checkState(fos != null); + + ByteBuffer dataBuffer = null; + try { + dataBuffer = getData(); + } catch (Exception e1) { + LOG.error("Failed to get request data offset:" + offset + " count:" + + count + " error:" + e1); + throw new IOException("Can't get WriteCtx.data"); + } + + byte[] data = dataBuffer.array(); + int position = dataBuffer.position(); + int limit = dataBuffer.limit(); + Preconditions.checkState(limit - position == count); + // Modified write has a valid original count + if (position != 0) { + if (limit != getOriginalCount()) { + throw new IOException("Modified write has differnt original size." + + "buff position:" + position + " buff limit:" + limit + ". " + + toString()); + } + } + + // Now write data + fos.write(data, position, count); + } + Channel getChannel() { return channel; } @@ -168,11 +215,13 @@ class WriteCtx { this.replied = replied; } - WriteCtx(FileHandle handle, long offset, int count, WriteStableHow stableHow, - byte[] data, Channel channel, int xid, boolean replied, DataState dataState) { + WriteCtx(FileHandle handle, long offset, int count, int originalCount, + WriteStableHow stableHow, ByteBuffer data, Channel channel, int xid, + boolean replied, DataState dataState) { this.handle = handle; this.offset = offset; this.count = count; + this.originalCount = originalCount; this.stableHow = stableHow; this.data = data; this.channel = channel; @@ -185,7 +234,7 @@ class WriteCtx { @Override public String toString() { return "Id:" + handle.getFileId() + " offset:" + offset + " count:" + count - + " stableHow:" + stableHow + " replied:" + replied + " dataState:" - + dataState + " xid:" + xid; + + " originalCount:" + originalCount + " stableHow:" + stableHow + + " replied:" + replied + " dataState:" + dataState + " xid:" + xid; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java new file mode 100644 index 00000000000..d24e5d1fa8a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import junit.framework.Assert; + +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; +import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; +import org.junit.Test; + +public class TestWrites { + @Test + public void testAlterWriteRequest() throws IOException { + int len = 20; + byte[] data = new byte[len]; + ByteBuffer buffer = ByteBuffer.wrap(data); + + for (int i = 0; i < len; i++) { + buffer.put((byte) i); + } + buffer.flip(); + int originalCount = buffer.array().length; + WRITE3Request request = new WRITE3Request(new FileHandle(), 0, data.length, + WriteStableHow.UNSTABLE, buffer); + + WriteCtx writeCtx1 = new WriteCtx(request.getHandle(), request.getOffset(), + request.getCount(), WriteCtx.INVALID_ORIGINAL_COUNT, + request.getStableHow(), request.getData(), null, 1, false, + WriteCtx.DataState.NO_DUMP); + + Assert.assertTrue(writeCtx1.getData().array().length == originalCount); + + // Now change the write request + OpenFileCtx.alterWriteRequest(request, 12); + + WriteCtx writeCtx2 = new WriteCtx(request.getHandle(), request.getOffset(), + request.getCount(), originalCount, request.getStableHow(), + request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP); + ByteBuffer appendedData = writeCtx2.getData(); + + int position = appendedData.position(); + int limit = appendedData.limit(); + Assert.assertTrue(position == 12); + Assert.assertTrue(limit - position == 8); + Assert.assertTrue(appendedData.get(position) == (byte) 12); + Assert.assertTrue(appendedData.get(position + 1) == (byte) 13); + Assert.assertTrue(appendedData.get(position + 2) == (byte) 14); + Assert.assertTrue(appendedData.get(position + 7) == (byte) 19); + + // Test current file write offset is at boundaries + buffer.position(0); + request = new WRITE3Request(new FileHandle(), 0, data.length, + WriteStableHow.UNSTABLE, buffer); + OpenFileCtx.alterWriteRequest(request, 1); + WriteCtx writeCtx3 = new WriteCtx(request.getHandle(), request.getOffset(), + request.getCount(), originalCount, request.getStableHow(), + request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP); + appendedData = writeCtx3.getData(); + position = appendedData.position(); + limit = appendedData.limit(); + Assert.assertTrue(position == 1); + Assert.assertTrue(limit - position == 19); + Assert.assertTrue(appendedData.get(position) == (byte) 1); + Assert.assertTrue(appendedData.get(position + 18) == (byte) 19); + + // Reset buffer position before test another boundary + buffer.position(0); + request = new WRITE3Request(new FileHandle(), 0, data.length, + WriteStableHow.UNSTABLE, buffer); + OpenFileCtx.alterWriteRequest(request, 19); + WriteCtx writeCtx4 = new WriteCtx(request.getHandle(), request.getOffset(), + request.getCount(), originalCount, request.getStableHow(), + request.getData(), null, 2, false, WriteCtx.DataState.NO_DUMP); + appendedData = writeCtx4.getData(); + position = appendedData.position(); + limit = appendedData.limit(); + Assert.assertTrue(position == 19); + Assert.assertTrue(limit - position == 1); + Assert.assertTrue(appendedData.get(position) == (byte) 19); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e92325720dd..a30f32a6b7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -403,6 +403,9 @@ Release 2.1.2 - UNRELEASED HDFS-5299. DFS client hangs in updatePipeline RPC when failover happened. (Vinay via jing9) + HDFS-5259. Support client which combines appended data with old data + before sends it to NFS server. (brandonli) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES