HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
f2d1d3d159
commit
9f4585c95f
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
|||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
|
@ -101,6 +102,8 @@ import org.apache.htrace.Span;
|
|||
import org.apache.htrace.Trace;
|
||||
import org.apache.htrace.TraceInfo;
|
||||
import org.apache.htrace.TraceScope;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -137,6 +140,8 @@ import com.google.common.cache.RemovalNotification;
|
|||
public class DFSOutputStream extends FSOutputSummer
|
||||
implements Syncable, CanSetDropBehind {
|
||||
private final long dfsclientSlowLogThresholdMs;
|
||||
static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
|
||||
|
||||
/**
|
||||
* Number of times to retry creating a file when there are transient
|
||||
* errors (typically related to encryption zones and KeyProvider operations).
|
||||
|
@ -186,6 +191,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private FileEncryptionInfo fileEncryptionInfo;
|
||||
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
||||
BlockStoragePolicySuite.createDefaultSuite();
|
||||
private int writePacketSize;
|
||||
|
||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||
|
@ -1669,7 +1675,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
DFSClient.LOG.debug(
|
||||
"Set non-null progress callback on DFSOutputStream " + src);
|
||||
}
|
||||
|
||||
|
||||
initWritePacketSize();
|
||||
|
||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
if (bytesPerChecksum <= 0) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
|
@ -1687,6 +1695,21 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the configured writePacketSize never exceeds
|
||||
* PacketReceiver.MAX_PACKET_SIZE.
|
||||
*/
|
||||
private void initWritePacketSize() {
|
||||
writePacketSize = dfsClient.getConf().writePacketSize;
|
||||
if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
|
||||
LOG.warn(
|
||||
"Configured write packet exceeds {} bytes as max,"
|
||||
+ " using {} bytes.",
|
||||
PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
|
||||
writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
/** Construct a new output stream for creating a file. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
EnumSet<CreateFlag> flag, Progressable progress,
|
||||
|
@ -1936,18 +1959,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
waitAndQueueCurrentPacket();
|
||||
|
||||
// If the reopened file did not end at chunk boundary and the above
|
||||
// write filled up its partial chunk. Tell the summer to generate full
|
||||
// crc chunks from now on.
|
||||
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
|
||||
appendChunk = false;
|
||||
resetChecksumBufSize();
|
||||
}
|
||||
adjustChunkBoundary();
|
||||
|
||||
if (!appendChunk) {
|
||||
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
|
||||
computePacketChunkSize(psize, bytesPerChecksum);
|
||||
}
|
||||
//
|
||||
// if encountering a block boundary, send an empty packet to
|
||||
// indicate the end of block and reset bytesCurBlock.
|
||||
|
@ -1962,6 +1975,40 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the reopened file did not end at chunk boundary and the above
|
||||
* write filled up its partial chunk. Tell the summer to generate full
|
||||
* crc chunks from now on.
|
||||
*/
|
||||
protected void adjustChunkBoundary() {
|
||||
if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {
|
||||
appendChunk = false;
|
||||
resetChecksumBufSize();
|
||||
}
|
||||
|
||||
if (!appendChunk) {
|
||||
final int psize = (int) Math.min(blockSize - bytesCurBlock,
|
||||
writePacketSize);
|
||||
computePacketChunkSize(psize, bytesPerChecksum);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in test only.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void setAppendChunk(final boolean appendChunk) {
|
||||
this.appendChunk = appendChunk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in test only.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void setBytesCurBlock(final long bytesCurBlock) {
|
||||
this.bytesCurBlock = bytesCurBlock;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void sync() throws IOException {
|
||||
hflush();
|
||||
|
|
|
@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
|
|||
* The max size of any single packet. This prevents OOMEs when
|
||||
* invalid data is sent.
|
||||
*/
|
||||
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
||||
public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
||||
|
||||
static final Log LOG = LogFactory.getLog(PacketReceiver.class);
|
||||
|
||||
|
|
|
@ -17,20 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||
|
||||
public class TestDFSOutputStream {
|
||||
static MiniDFSCluster cluster;
|
||||
|
||||
|
@ -97,6 +105,124 @@ public class TestDFSOutputStream {
|
|||
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests preventing overflows of package size and bodySize.
|
||||
* <p>
|
||||
* See also https://issues.apache.org/jira/browse/HDFS-11608.
|
||||
* </p>
|
||||
* @throws IOException
|
||||
* @throws SecurityException
|
||||
* @throws NoSuchFieldException
|
||||
* @throws InvocationTargetException
|
||||
* @throws IllegalArgumentException
|
||||
* @throws IllegalAccessException
|
||||
* @throws NoSuchMethodException
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testPreventOverflow() throws IOException, NoSuchFieldException,
|
||||
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException {
|
||||
|
||||
final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
int configuredWritePacketSize = defaultWritePacketSize;
|
||||
int finalWritePacketSize = defaultWritePacketSize;
|
||||
|
||||
/* test default WritePacketSize, e.g. 64*1024 */
|
||||
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||
|
||||
/* test large WritePacketSize, e.g. 1G */
|
||||
configuredWritePacketSize = 1000 * 1024 * 1024;
|
||||
finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @configuredWritePacketSize the configured WritePacketSize.
|
||||
* @finalWritePacketSize the final WritePacketSize picked by
|
||||
* {@link DFSOutputStream#adjustChunkBoundary}
|
||||
*/
|
||||
private void runAdjustChunkBoundary(
|
||||
final int configuredWritePacketSize,
|
||||
final int finalWritePacketSize) throws IOException, NoSuchFieldException,
|
||||
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException {
|
||||
|
||||
final boolean appendChunk = false;
|
||||
final long blockSize = 3221225500L;
|
||||
final long bytesCurBlock = 1073741824L;
|
||||
final int bytesPerChecksum = 512;
|
||||
final int checksumSize = 4;
|
||||
final int chunkSize = bytesPerChecksum + checksumSize;
|
||||
final int packateMaxHeaderLength = 33;
|
||||
|
||||
MiniDFSCluster dfsCluster = null;
|
||||
final File baseDir = new File(PathUtils.getTestDir(getClass()),
|
||||
GenericTestUtils.getMethodName());
|
||||
|
||||
try {
|
||||
final Configuration dfsConf = new Configuration();
|
||||
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
|
||||
baseDir.getAbsolutePath());
|
||||
dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||
configuredWritePacketSize);
|
||||
dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
|
||||
dfsCluster.waitActive();
|
||||
|
||||
final FSDataOutputStream os = dfsCluster.getFileSystem()
|
||||
.create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
|
||||
final DFSOutputStream dos = (DFSOutputStream) Whitebox
|
||||
.getInternalState(os, "wrappedStream");
|
||||
|
||||
/* set appendChunk */
|
||||
final Method setAppendChunkMethod = dos.getClass()
|
||||
.getDeclaredMethod("setAppendChunk", boolean.class);
|
||||
setAppendChunkMethod.setAccessible(true);
|
||||
setAppendChunkMethod.invoke(dos, appendChunk);
|
||||
|
||||
/* set bytesCurBlock */
|
||||
final Method setBytesCurBlockMethod = dos.getClass()
|
||||
.getDeclaredMethod("setBytesCurBlock", long.class);
|
||||
setBytesCurBlockMethod.setAccessible(true);
|
||||
setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
|
||||
|
||||
/* set blockSize */
|
||||
final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
|
||||
blockSizeField.setAccessible(true);
|
||||
blockSizeField.setLong(dos, blockSize);
|
||||
|
||||
/* call adjustChunkBoundary */
|
||||
final Method method = dos.getClass()
|
||||
.getDeclaredMethod("adjustChunkBoundary");
|
||||
method.setAccessible(true);
|
||||
method.invoke(dos);
|
||||
|
||||
/* get and verify writePacketSize */
|
||||
final Field writePacketSizeField = dos.getClass()
|
||||
.getDeclaredField("writePacketSize");
|
||||
writePacketSizeField.setAccessible(true);
|
||||
Assert.assertEquals(writePacketSizeField.getInt(dos),
|
||||
finalWritePacketSize);
|
||||
|
||||
/* get and verify chunksPerPacket */
|
||||
final Field chunksPerPacketField = dos.getClass()
|
||||
.getDeclaredField("chunksPerPacket");
|
||||
chunksPerPacketField.setAccessible(true);
|
||||
Assert.assertEquals(chunksPerPacketField.getInt(dos),
|
||||
(finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
|
||||
|
||||
/* get and verify packetSize */
|
||||
final Field packetSizeField = dos.getClass()
|
||||
.getDeclaredField("packetSize");
|
||||
packetSizeField.setAccessible(true);
|
||||
Assert.assertEquals(packetSizeField.getInt(dos),
|
||||
chunksPerPacketField.getInt(dos) * chunkSize);
|
||||
} finally {
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
cluster.shutdown();
|
||||
|
|
Loading…
Reference in New Issue