HDFS-11608. HDFS write crashed with block size greater than 2 GB. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
a49fac5302
commit
0eacd4c13b
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException;
|
|||
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
|
||||
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
|
@ -122,6 +123,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private final EnumSet<AddBlockFlag> addBlockFlags;
|
||||
protected final AtomicReference<CachingStrategy> cachingStrategy;
|
||||
private FileEncryptionInfo fileEncryptionInfo;
|
||||
private int writePacketSize;
|
||||
|
||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
|
||||
|
@ -202,6 +204,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
+"{}", src);
|
||||
}
|
||||
|
||||
initWritePacketSize();
|
||||
|
||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
if (bytesPerChecksum <= 0) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
|
@ -216,6 +220,21 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensures the configured writePacketSize never exceeds
|
||||
* PacketReceiver.MAX_PACKET_SIZE.
|
||||
*/
|
||||
private void initWritePacketSize() {
|
||||
writePacketSize = dfsClient.getConf().getWritePacketSize();
|
||||
if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
|
||||
LOG.warn(
|
||||
"Configured write packet exceeds {} bytes as max,"
|
||||
+ " using {} bytes.",
|
||||
PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
|
||||
writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
/** Construct a new output stream for creating a file. */
|
||||
protected DFSOutputStream(DFSClient dfsClient, String src,
|
||||
HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
|
||||
|
@ -489,12 +508,28 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
if (!getStreamer().getAppendChunk()) {
|
||||
int psize = Math.min((int)(blockSize- getStreamer().getBytesCurBlock()),
|
||||
dfsClient.getConf().getWritePacketSize());
|
||||
final int psize = (int) Math
|
||||
.min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
|
||||
computePacketChunkSize(psize, bytesPerChecksum);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in test only.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void setAppendChunk(final boolean appendChunk) {
|
||||
getStreamer().setAppendChunk(appendChunk);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used in test only.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void setBytesCurBlock(final long bytesCurBlock) {
|
||||
getStreamer().setBytesCurBlock(bytesCurBlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* if encountering a block boundary, send an empty packet to
|
||||
* indicate the end of block and reset bytesCurBlock.
|
||||
|
|
|
@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
|
|||
* The max size of any single packet. This prevents OOMEs when
|
||||
* invalid data is sent.
|
||||
*/
|
||||
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
||||
public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
||||
|
||||
static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class);
|
||||
|
||||
|
|
|
@ -18,8 +18,10 @@
|
|||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
|
@ -41,10 +43,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.htrace.core.SpanId;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -64,6 +69,9 @@ import static org.mockito.Mockito.doThrow;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||
|
||||
public class TestDFSOutputStream {
|
||||
static MiniDFSCluster cluster;
|
||||
|
||||
|
@ -133,6 +141,124 @@ public class TestDFSOutputStream {
|
|||
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* This tests preventing overflows of package size and bodySize.
|
||||
* <p>
|
||||
* See also https://issues.apache.org/jira/browse/HDFS-11608.
|
||||
* </p>
|
||||
* @throws IOException
|
||||
* @throws SecurityException
|
||||
* @throws NoSuchFieldException
|
||||
* @throws InvocationTargetException
|
||||
* @throws IllegalArgumentException
|
||||
* @throws IllegalAccessException
|
||||
* @throws NoSuchMethodException
|
||||
*/
|
||||
@Test(timeout=60000)
|
||||
public void testPreventOverflow() throws IOException, NoSuchFieldException,
|
||||
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException {
|
||||
|
||||
final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
int configuredWritePacketSize = defaultWritePacketSize;
|
||||
int finalWritePacketSize = defaultWritePacketSize;
|
||||
|
||||
/* test default WritePacketSize, e.g. 64*1024 */
|
||||
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||
|
||||
/* test large WritePacketSize, e.g. 1G */
|
||||
configuredWritePacketSize = 1000 * 1024 * 1024;
|
||||
finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* @configuredWritePacketSize the configured WritePacketSize.
|
||||
* @finalWritePacketSize the final WritePacketSize picked by
|
||||
* {@link DFSOutputStream#adjustChunkBoundary}
|
||||
*/
|
||||
private void runAdjustChunkBoundary(
|
||||
final int configuredWritePacketSize,
|
||||
final int finalWritePacketSize) throws IOException, NoSuchFieldException,
|
||||
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||
InvocationTargetException, NoSuchMethodException {
|
||||
|
||||
final boolean appendChunk = false;
|
||||
final long blockSize = 3221225500L;
|
||||
final long bytesCurBlock = 1073741824L;
|
||||
final int bytesPerChecksum = 512;
|
||||
final int checksumSize = 4;
|
||||
final int chunkSize = bytesPerChecksum + checksumSize;
|
||||
final int packateMaxHeaderLength = 33;
|
||||
|
||||
MiniDFSCluster dfsCluster = null;
|
||||
final File baseDir = new File(PathUtils.getTestDir(getClass()),
|
||||
GenericTestUtils.getMethodName());
|
||||
|
||||
try {
|
||||
final Configuration dfsConf = new Configuration();
|
||||
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
|
||||
baseDir.getAbsolutePath());
|
||||
dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||
configuredWritePacketSize);
|
||||
dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
|
||||
dfsCluster.waitActive();
|
||||
|
||||
final FSDataOutputStream os = dfsCluster.getFileSystem()
|
||||
.create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
|
||||
final DFSOutputStream dos = (DFSOutputStream) Whitebox
|
||||
.getInternalState(os, "wrappedStream");
|
||||
|
||||
/* set appendChunk */
|
||||
final Method setAppendChunkMethod = dos.getClass()
|
||||
.getDeclaredMethod("setAppendChunk", boolean.class);
|
||||
setAppendChunkMethod.setAccessible(true);
|
||||
setAppendChunkMethod.invoke(dos, appendChunk);
|
||||
|
||||
/* set bytesCurBlock */
|
||||
final Method setBytesCurBlockMethod = dos.getClass()
|
||||
.getDeclaredMethod("setBytesCurBlock", long.class);
|
||||
setBytesCurBlockMethod.setAccessible(true);
|
||||
setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
|
||||
|
||||
/* set blockSize */
|
||||
final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
|
||||
blockSizeField.setAccessible(true);
|
||||
blockSizeField.setLong(dos, blockSize);
|
||||
|
||||
/* call adjustChunkBoundary */
|
||||
final Method method = dos.getClass()
|
||||
.getDeclaredMethod("adjustChunkBoundary");
|
||||
method.setAccessible(true);
|
||||
method.invoke(dos);
|
||||
|
||||
/* get and verify writePacketSize */
|
||||
final Field writePacketSizeField = dos.getClass()
|
||||
.getDeclaredField("writePacketSize");
|
||||
writePacketSizeField.setAccessible(true);
|
||||
Assert.assertEquals(writePacketSizeField.getInt(dos),
|
||||
finalWritePacketSize);
|
||||
|
||||
/* get and verify chunksPerPacket */
|
||||
final Field chunksPerPacketField = dos.getClass()
|
||||
.getDeclaredField("chunksPerPacket");
|
||||
chunksPerPacketField.setAccessible(true);
|
||||
Assert.assertEquals(chunksPerPacketField.getInt(dos),
|
||||
(finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
|
||||
|
||||
/* get and verify packetSize */
|
||||
final Field packetSizeField = dos.getClass()
|
||||
.getDeclaredField("packetSize");
|
||||
packetSizeField.setAccessible(true);
|
||||
Assert.assertEquals(packetSizeField.getInt(dos),
|
||||
chunksPerPacketField.getInt(dos) * chunkSize);
|
||||
} finally {
|
||||
if (dfsCluster != null) {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCongestionBackoff() throws IOException {
|
||||
DfsClientConf dfsClientConf = mock(DfsClientConf.class);
|
||||
|
|
Loading…
Reference in New Issue