diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index ef8aa5a02c0..5de5460f566 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; @@ -101,6 +102,8 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceInfo; import org.apache.htrace.TraceScope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -137,6 +140,8 @@ import com.google.common.cache.RemovalNotification; public class DFSOutputStream extends FSOutputSummer implements Syncable, CanSetDropBehind { private final long dfsclientSlowLogThresholdMs; + static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class); + /** * Number of times to retry creating a file when there are transient * errors (typically related to encryption zones and KeyProvider operations). @@ -186,6 +191,7 @@ public class DFSOutputStream extends FSOutputSummer private FileEncryptionInfo fileEncryptionInfo; private static final BlockStoragePolicySuite blockStoragePolicySuite = BlockStoragePolicySuite.createDefaultSuite(); + private int writePacketSize; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, @@ -1669,7 +1675,9 @@ public class DFSOutputStream extends FSOutputSummer DFSClient.LOG.debug( "Set non-null progress callback on DFSOutputStream " + src); } - + + initWritePacketSize(); + this.bytesPerChecksum = checksum.getBytesPerChecksum(); if (bytesPerChecksum <= 0) { throw new HadoopIllegalArgumentException( @@ -1687,6 +1695,21 @@ public class DFSOutputStream extends FSOutputSummer this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); } + /** + * Ensures the configured writePacketSize never exceeds + * PacketReceiver.MAX_PACKET_SIZE. + */ + private void initWritePacketSize() { + writePacketSize = dfsClient.getConf().writePacketSize; + if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) { + LOG.warn( + "Configured write packet exceeds {} bytes as max," + + " using {} bytes.", + PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE); + writePacketSize = PacketReceiver.MAX_PACKET_SIZE; + } + } + /** Construct a new output stream for creating a file. */ private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, @@ -1936,18 +1959,8 @@ public class DFSOutputStream extends FSOutputSummer } waitAndQueueCurrentPacket(); - // If the reopened file did not end at chunk boundary and the above - // write filled up its partial chunk. Tell the summer to generate full - // crc chunks from now on. - if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { - appendChunk = false; - resetChecksumBufSize(); - } + adjustChunkBoundary(); - if (!appendChunk) { - int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); - computePacketChunkSize(psize, bytesPerChecksum); - } // // if encountering a block boundary, send an empty packet to // indicate the end of block and reset bytesCurBlock. @@ -1962,6 +1975,40 @@ public class DFSOutputStream extends FSOutputSummer } } + /** + * If the reopened file did not end at chunk boundary and the above + * write filled up its partial chunk. Tell the summer to generate full + * crc chunks from now on. + */ + protected void adjustChunkBoundary() { + if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) { + appendChunk = false; + resetChecksumBufSize(); + } + + if (!appendChunk) { + final int psize = (int) Math.min(blockSize - bytesCurBlock, + writePacketSize); + computePacketChunkSize(psize, bytesPerChecksum); + } + } + + /** + * Used in test only. + */ + @VisibleForTesting + void setAppendChunk(final boolean appendChunk) { + this.appendChunk = appendChunk; + } + + /** + * Used in test only. + */ + @VisibleForTesting + void setBytesCurBlock(final long bytesCurBlock) { + this.bytesCurBlock = bytesCurBlock; + } + @Deprecated public void sync() throws IOException { hflush(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java index 3045a13b200..784c3053727 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable { * The max size of any single packet. This prevents OOMEs when * invalid data is sent. */ - private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; + public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; static final Log LOG = LogFactory.getLog(PacketReceiver.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 7269e3910d2..8c46564dd6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -17,20 +17,28 @@ */ package org.apache.hadoop.hdfs; +import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; + public class TestDFSOutputStream { static MiniDFSCluster cluster; @@ -97,6 +105,124 @@ public class TestDFSOutputStream { Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize); } + /** + * This tests preventing overflows of package size and bodySize. + *

+ * See also https://issues.apache.org/jira/browse/HDFS-11608. + *

+ * @throws IOException + * @throws SecurityException + * @throws NoSuchFieldException + * @throws InvocationTargetException + * @throws IllegalArgumentException + * @throws IllegalAccessException + * @throws NoSuchMethodException + */ + @Test(timeout=60000) + public void testPreventOverflow() throws IOException, NoSuchFieldException, + SecurityException, IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { + + final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; + int configuredWritePacketSize = defaultWritePacketSize; + int finalWritePacketSize = defaultWritePacketSize; + + /* test default WritePacketSize, e.g. 64*1024 */ + runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); + + /* test large WritePacketSize, e.g. 1G */ + configuredWritePacketSize = 1000 * 1024 * 1024; + finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE; + runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); + } + + /** + * @configuredWritePacketSize the configured WritePacketSize. + * @finalWritePacketSize the final WritePacketSize picked by + * {@link DFSOutputStream#adjustChunkBoundary} + */ + private void runAdjustChunkBoundary( + final int configuredWritePacketSize, + final int finalWritePacketSize) throws IOException, NoSuchFieldException, + SecurityException, IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { + + final boolean appendChunk = false; + final long blockSize = 3221225500L; + final long bytesCurBlock = 1073741824L; + final int bytesPerChecksum = 512; + final int checksumSize = 4; + final int chunkSize = bytesPerChecksum + checksumSize; + final int packateMaxHeaderLength = 33; + + MiniDFSCluster dfsCluster = null; + final File baseDir = new File(PathUtils.getTestDir(getClass()), + GenericTestUtils.getMethodName()); + + try { + final Configuration dfsConf = new Configuration(); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, + baseDir.getAbsolutePath()); + dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, + configuredWritePacketSize); + dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build(); + dfsCluster.waitActive(); + + final FSDataOutputStream os = dfsCluster.getFileSystem() + .create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow")); + final DFSOutputStream dos = (DFSOutputStream) Whitebox + .getInternalState(os, "wrappedStream"); + + /* set appendChunk */ + final Method setAppendChunkMethod = dos.getClass() + .getDeclaredMethod("setAppendChunk", boolean.class); + setAppendChunkMethod.setAccessible(true); + setAppendChunkMethod.invoke(dos, appendChunk); + + /* set bytesCurBlock */ + final Method setBytesCurBlockMethod = dos.getClass() + .getDeclaredMethod("setBytesCurBlock", long.class); + setBytesCurBlockMethod.setAccessible(true); + setBytesCurBlockMethod.invoke(dos, bytesCurBlock); + + /* set blockSize */ + final Field blockSizeField = dos.getClass().getDeclaredField("blockSize"); + blockSizeField.setAccessible(true); + blockSizeField.setLong(dos, blockSize); + + /* call adjustChunkBoundary */ + final Method method = dos.getClass() + .getDeclaredMethod("adjustChunkBoundary"); + method.setAccessible(true); + method.invoke(dos); + + /* get and verify writePacketSize */ + final Field writePacketSizeField = dos.getClass() + .getDeclaredField("writePacketSize"); + writePacketSizeField.setAccessible(true); + Assert.assertEquals(writePacketSizeField.getInt(dos), + finalWritePacketSize); + + /* get and verify chunksPerPacket */ + final Field chunksPerPacketField = dos.getClass() + .getDeclaredField("chunksPerPacket"); + chunksPerPacketField.setAccessible(true); + Assert.assertEquals(chunksPerPacketField.getInt(dos), + (finalWritePacketSize - packateMaxHeaderLength) / chunkSize); + + /* get and verify packetSize */ + final Field packetSizeField = dos.getClass() + .getDeclaredField("packetSize"); + packetSizeField.setAccessible(true); + Assert.assertEquals(packetSizeField.getInt(dos), + chunksPerPacketField.getInt(dos) * chunkSize); + } finally { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + } + @AfterClass public static void tearDown() { cluster.shutdown();