HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou.

(cherry picked from commit 0eacd4c13b)
(cherry picked from commit 0391c92022)
This commit is contained in:
Xiaoyu Yao 2017-04-06 16:11:55 -07:00
parent b1dfdea5b4
commit 6367bc1f09
3 changed files with 165 additions and 4 deletions

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
@ -120,6 +121,7 @@ public class DFSOutputStream extends FSOutputSummer
private final EnumSet<AddBlockFlag> addBlockFlags; private final EnumSet<AddBlockFlag> addBlockFlags;
protected final AtomicReference<CachingStrategy> cachingStrategy; protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo; private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt, protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@ -200,6 +202,8 @@ public class DFSOutputStream extends FSOutputSummer
+"{}", src); +"{}", src);
} }
initWritePacketSize();
this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.bytesPerChecksum = checksum.getBytesPerChecksum();
if (bytesPerChecksum <= 0) { if (bytesPerChecksum <= 0) {
throw new HadoopIllegalArgumentException( throw new HadoopIllegalArgumentException(
@ -214,6 +218,21 @@ public class DFSOutputStream extends FSOutputSummer
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
} }
/**
* Ensures the configured writePacketSize never exceeds
* PacketReceiver.MAX_PACKET_SIZE.
*/
private void initWritePacketSize() {
writePacketSize = dfsClient.getConf().getWritePacketSize();
if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
LOG.warn(
"Configured write packet exceeds {} bytes as max,"
+ " using {} bytes.",
PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
}
}
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, protected DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress, EnumSet<CreateFlag> flag, Progressable progress,
@ -450,12 +469,28 @@ public class DFSOutputStream extends FSOutputSummer
} }
if (!getStreamer().getAppendChunk()) { if (!getStreamer().getAppendChunk()) {
int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()), final int psize = (int) Math
dfsClient.getConf().getWritePacketSize()); .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
computePacketChunkSize(psize, bytesPerChecksum); computePacketChunkSize(psize, bytesPerChecksum);
} }
} }
/**
* Used in test only.
*/
@VisibleForTesting
void setAppendChunk(final boolean appendChunk) {
getStreamer().setAppendChunk(appendChunk);
}
/**
* Used in test only.
*/
@VisibleForTesting
void setBytesCurBlock(final long bytesCurBlock) {
getStreamer().setBytesCurBlock(bytesCurBlock);
}
/** /**
* if encountering a block boundary, send an empty packet to * if encountering a block boundary, send an empty packet to
* indicate the end of block and reset bytesCurBlock. * indicate the end of block and reset bytesCurBlock.

View File

@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
* The max size of any single packet. This prevents OOMEs when * The max size of any single packet. This prevents OOMEs when
* invalid data is sent. * invalid data is sent.
*/ */
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class); static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
@ -41,10 +43,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.htrace.core.SpanId; import org.apache.htrace.core.SpanId;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -64,6 +69,9 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
public class TestDFSOutputStream { public class TestDFSOutputStream {
static MiniDFSCluster cluster; static MiniDFSCluster cluster;
@ -133,6 +141,124 @@ public class TestDFSOutputStream {
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize); Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
} }
/**
* This tests preventing overflows of package size and bodySize.
* <p>
* See also https://issues.apache.org/jira/browse/HDFS-11608.
* </p>
* @throws IOException
* @throws SecurityException
* @throws NoSuchFieldException
* @throws InvocationTargetException
* @throws IllegalArgumentException
* @throws IllegalAccessException
* @throws NoSuchMethodException
*/
@Test(timeout=60000)
public void testPreventOverflow() throws IOException, NoSuchFieldException,
SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException {
final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
int configuredWritePacketSize = defaultWritePacketSize;
int finalWritePacketSize = defaultWritePacketSize;
/* test default WritePacketSize, e.g. 64*1024 */
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
/* test large WritePacketSize, e.g. 1G */
configuredWritePacketSize = 1000 * 1024 * 1024;
finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
}
/**
* @configuredWritePacketSize the configured WritePacketSize.
* @finalWritePacketSize the final WritePacketSize picked by
* {@link DFSOutputStream#adjustChunkBoundary}
*/
private void runAdjustChunkBoundary(
final int configuredWritePacketSize,
final int finalWritePacketSize) throws IOException, NoSuchFieldException,
SecurityException, IllegalAccessException, IllegalArgumentException,
InvocationTargetException, NoSuchMethodException {
final boolean appendChunk = false;
final long blockSize = 3221225500L;
final long bytesCurBlock = 1073741824L;
final int bytesPerChecksum = 512;
final int checksumSize = 4;
final int chunkSize = bytesPerChecksum + checksumSize;
final int packateMaxHeaderLength = 33;
MiniDFSCluster dfsCluster = null;
final File baseDir = new File(PathUtils.getTestDir(getClass()),
GenericTestUtils.getMethodName());
try {
final Configuration dfsConf = new Configuration();
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
baseDir.getAbsolutePath());
dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
configuredWritePacketSize);
dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
dfsCluster.waitActive();
final FSDataOutputStream os = dfsCluster.getFileSystem()
.create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
final DFSOutputStream dos = (DFSOutputStream) Whitebox
.getInternalState(os, "wrappedStream");
/* set appendChunk */
final Method setAppendChunkMethod = dos.getClass()
.getDeclaredMethod("setAppendChunk", boolean.class);
setAppendChunkMethod.setAccessible(true);
setAppendChunkMethod.invoke(dos, appendChunk);
/* set bytesCurBlock */
final Method setBytesCurBlockMethod = dos.getClass()
.getDeclaredMethod("setBytesCurBlock", long.class);
setBytesCurBlockMethod.setAccessible(true);
setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
/* set blockSize */
final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
blockSizeField.setAccessible(true);
blockSizeField.setLong(dos, blockSize);
/* call adjustChunkBoundary */
final Method method = dos.getClass()
.getDeclaredMethod("adjustChunkBoundary");
method.setAccessible(true);
method.invoke(dos);
/* get and verify writePacketSize */
final Field writePacketSizeField = dos.getClass()
.getDeclaredField("writePacketSize");
writePacketSizeField.setAccessible(true);
Assert.assertEquals(writePacketSizeField.getInt(dos),
finalWritePacketSize);
/* get and verify chunksPerPacket */
final Field chunksPerPacketField = dos.getClass()
.getDeclaredField("chunksPerPacket");
chunksPerPacketField.setAccessible(true);
Assert.assertEquals(chunksPerPacketField.getInt(dos),
(finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
/* get and verify packetSize */
final Field packetSizeField = dos.getClass()
.getDeclaredField("packetSize");
packetSizeField.setAccessible(true);
Assert.assertEquals(packetSizeField.getInt(dos),
chunksPerPacketField.getInt(dos) * chunkSize);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
}
@Test @Test
public void testCongestionBackoff() throws IOException { public void testCongestionBackoff() throws IOException {
DfsClientConf dfsClientConf = mock(DfsClientConf.class); DfsClientConf dfsClientConf = mock(DfsClientConf.class);