HDFS-11551. Handle SlowDiskReport from DataNode at the NameNode. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
60a3a63990
commit
ac76dc10dd
|
@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||||
|
@ -101,6 +102,8 @@ import org.apache.htrace.Span;
|
||||||
import org.apache.htrace.Trace;
|
import org.apache.htrace.Trace;
|
||||||
import org.apache.htrace.TraceInfo;
|
import org.apache.htrace.TraceInfo;
|
||||||
import org.apache.htrace.TraceScope;
|
import org.apache.htrace.TraceScope;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -137,6 +140,8 @@ import com.google.common.cache.RemovalNotification;
|
||||||
public class DFSOutputStream extends FSOutputSummer
|
public class DFSOutputStream extends FSOutputSummer
|
||||||
implements Syncable, CanSetDropBehind {
|
implements Syncable, CanSetDropBehind {
|
||||||
private final long dfsclientSlowLogThresholdMs;
|
private final long dfsclientSlowLogThresholdMs;
|
||||||
|
static final Logger LOG = LoggerFactory.getLogger(DFSOutputStream.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Number of times to retry creating a file when there are transient
|
* Number of times to retry creating a file when there are transient
|
||||||
* errors (typically related to encryption zones and KeyProvider operations).
|
* errors (typically related to encryption zones and KeyProvider operations).
|
||||||
|
@ -186,6 +191,7 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
private FileEncryptionInfo fileEncryptionInfo;
|
private FileEncryptionInfo fileEncryptionInfo;
|
||||||
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
||||||
BlockStoragePolicySuite.createDefaultSuite();
|
BlockStoragePolicySuite.createDefaultSuite();
|
||||||
|
private int writePacketSize;
|
||||||
|
|
||||||
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
|
||||||
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
|
||||||
|
@ -1670,6 +1676,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
"Set non-null progress callback on DFSOutputStream " + src);
|
"Set non-null progress callback on DFSOutputStream " + src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
initWritePacketSize();
|
||||||
|
|
||||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||||
if (bytesPerChecksum <= 0) {
|
if (bytesPerChecksum <= 0) {
|
||||||
throw new HadoopIllegalArgumentException(
|
throw new HadoopIllegalArgumentException(
|
||||||
|
@ -1687,6 +1695,21 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
|
this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensures the configured writePacketSize never exceeds
|
||||||
|
* PacketReceiver.MAX_PACKET_SIZE.
|
||||||
|
*/
|
||||||
|
private void initWritePacketSize() {
|
||||||
|
writePacketSize = dfsClient.getConf().writePacketSize;
|
||||||
|
if (writePacketSize > PacketReceiver.MAX_PACKET_SIZE) {
|
||||||
|
LOG.warn(
|
||||||
|
"Configured write packet exceeds {} bytes as max,"
|
||||||
|
+ " using {} bytes.",
|
||||||
|
PacketReceiver.MAX_PACKET_SIZE, PacketReceiver.MAX_PACKET_SIZE);
|
||||||
|
writePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Construct a new output stream for creating a file. */
|
/** Construct a new output stream for creating a file. */
|
||||||
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||||
EnumSet<CreateFlag> flag, Progressable progress,
|
EnumSet<CreateFlag> flag, Progressable progress,
|
||||||
|
@ -1936,18 +1959,8 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
waitAndQueueCurrentPacket();
|
waitAndQueueCurrentPacket();
|
||||||
|
|
||||||
// If the reopened file did not end at chunk boundary and the above
|
adjustChunkBoundary();
|
||||||
// write filled up its partial chunk. Tell the summer to generate full
|
|
||||||
// crc chunks from now on.
|
|
||||||
if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
|
|
||||||
appendChunk = false;
|
|
||||||
resetChecksumBufSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!appendChunk) {
|
|
||||||
int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
|
|
||||||
computePacketChunkSize(psize, bytesPerChecksum);
|
|
||||||
}
|
|
||||||
//
|
//
|
||||||
// if encountering a block boundary, send an empty packet to
|
// if encountering a block boundary, send an empty packet to
|
||||||
// indicate the end of block and reset bytesCurBlock.
|
// indicate the end of block and reset bytesCurBlock.
|
||||||
|
@ -1962,6 +1975,40 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the reopened file did not end at chunk boundary and the above
|
||||||
|
* write filled up its partial chunk. Tell the summer to generate full
|
||||||
|
* crc chunks from now on.
|
||||||
|
*/
|
||||||
|
protected void adjustChunkBoundary() {
|
||||||
|
if (appendChunk && bytesCurBlock % bytesPerChecksum == 0) {
|
||||||
|
appendChunk = false;
|
||||||
|
resetChecksumBufSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!appendChunk) {
|
||||||
|
final int psize = (int) Math.min(blockSize - bytesCurBlock,
|
||||||
|
writePacketSize);
|
||||||
|
computePacketChunkSize(psize, bytesPerChecksum);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used in test only.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void setAppendChunk(final boolean appendChunk) {
|
||||||
|
this.appendChunk = appendChunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used in test only.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
void setBytesCurBlock(final long bytesCurBlock) {
|
||||||
|
this.bytesCurBlock = bytesCurBlock;
|
||||||
|
}
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void sync() throws IOException {
|
public void sync() throws IOException {
|
||||||
hflush();
|
hflush();
|
||||||
|
|
|
@ -45,7 +45,7 @@ public class PacketReceiver implements Closeable {
|
||||||
* The max size of any single packet. This prevents OOMEs when
|
* The max size of any single packet. This prevents OOMEs when
|
||||||
* invalid data is sent.
|
* invalid data is sent.
|
||||||
*/
|
*/
|
||||||
private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
public static final int MAX_PACKET_SIZE = 16 * 1024 * 1024;
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(PacketReceiver.class);
|
static final Log LOG = LogFactory.getLog(PacketReceiver.class);
|
||||||
|
|
||||||
|
|
|
@ -17,20 +17,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.internal.util.reflection.Whitebox;
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||||
|
|
||||||
public class TestDFSOutputStream {
|
public class TestDFSOutputStream {
|
||||||
static MiniDFSCluster cluster;
|
static MiniDFSCluster cluster;
|
||||||
|
|
||||||
|
@ -97,6 +105,124 @@ public class TestDFSOutputStream {
|
||||||
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests preventing overflows of package size and bodySize.
|
||||||
|
* <p>
|
||||||
|
* See also https://issues.apache.org/jira/browse/HDFS-11608.
|
||||||
|
* </p>
|
||||||
|
* @throws IOException
|
||||||
|
* @throws SecurityException
|
||||||
|
* @throws NoSuchFieldException
|
||||||
|
* @throws InvocationTargetException
|
||||||
|
* @throws IllegalArgumentException
|
||||||
|
* @throws IllegalAccessException
|
||||||
|
* @throws NoSuchMethodException
|
||||||
|
*/
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testPreventOverflow() throws IOException, NoSuchFieldException,
|
||||||
|
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||||
|
InvocationTargetException, NoSuchMethodException {
|
||||||
|
|
||||||
|
final int defaultWritePacketSize = DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||||
|
int configuredWritePacketSize = defaultWritePacketSize;
|
||||||
|
int finalWritePacketSize = defaultWritePacketSize;
|
||||||
|
|
||||||
|
/* test default WritePacketSize, e.g. 64*1024 */
|
||||||
|
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||||
|
|
||||||
|
/* test large WritePacketSize, e.g. 1G */
|
||||||
|
configuredWritePacketSize = 1000 * 1024 * 1024;
|
||||||
|
finalWritePacketSize = PacketReceiver.MAX_PACKET_SIZE;
|
||||||
|
runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @configuredWritePacketSize the configured WritePacketSize.
|
||||||
|
* @finalWritePacketSize the final WritePacketSize picked by
|
||||||
|
* {@link DFSOutputStream#adjustChunkBoundary}
|
||||||
|
*/
|
||||||
|
private void runAdjustChunkBoundary(
|
||||||
|
final int configuredWritePacketSize,
|
||||||
|
final int finalWritePacketSize) throws IOException, NoSuchFieldException,
|
||||||
|
SecurityException, IllegalAccessException, IllegalArgumentException,
|
||||||
|
InvocationTargetException, NoSuchMethodException {
|
||||||
|
|
||||||
|
final boolean appendChunk = false;
|
||||||
|
final long blockSize = 3221225500L;
|
||||||
|
final long bytesCurBlock = 1073741824L;
|
||||||
|
final int bytesPerChecksum = 512;
|
||||||
|
final int checksumSize = 4;
|
||||||
|
final int chunkSize = bytesPerChecksum + checksumSize;
|
||||||
|
final int packateMaxHeaderLength = 33;
|
||||||
|
|
||||||
|
MiniDFSCluster dfsCluster = null;
|
||||||
|
final File baseDir = new File(PathUtils.getTestDir(getClass()),
|
||||||
|
GenericTestUtils.getMethodName());
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Configuration dfsConf = new Configuration();
|
||||||
|
dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
|
||||||
|
baseDir.getAbsolutePath());
|
||||||
|
dfsConf.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
|
||||||
|
configuredWritePacketSize);
|
||||||
|
dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(1).build();
|
||||||
|
dfsCluster.waitActive();
|
||||||
|
|
||||||
|
final FSDataOutputStream os = dfsCluster.getFileSystem()
|
||||||
|
.create(new Path(baseDir.getAbsolutePath(), "testPreventOverflow"));
|
||||||
|
final DFSOutputStream dos = (DFSOutputStream) Whitebox
|
||||||
|
.getInternalState(os, "wrappedStream");
|
||||||
|
|
||||||
|
/* set appendChunk */
|
||||||
|
final Method setAppendChunkMethod = dos.getClass()
|
||||||
|
.getDeclaredMethod("setAppendChunk", boolean.class);
|
||||||
|
setAppendChunkMethod.setAccessible(true);
|
||||||
|
setAppendChunkMethod.invoke(dos, appendChunk);
|
||||||
|
|
||||||
|
/* set bytesCurBlock */
|
||||||
|
final Method setBytesCurBlockMethod = dos.getClass()
|
||||||
|
.getDeclaredMethod("setBytesCurBlock", long.class);
|
||||||
|
setBytesCurBlockMethod.setAccessible(true);
|
||||||
|
setBytesCurBlockMethod.invoke(dos, bytesCurBlock);
|
||||||
|
|
||||||
|
/* set blockSize */
|
||||||
|
final Field blockSizeField = dos.getClass().getDeclaredField("blockSize");
|
||||||
|
blockSizeField.setAccessible(true);
|
||||||
|
blockSizeField.setLong(dos, blockSize);
|
||||||
|
|
||||||
|
/* call adjustChunkBoundary */
|
||||||
|
final Method method = dos.getClass()
|
||||||
|
.getDeclaredMethod("adjustChunkBoundary");
|
||||||
|
method.setAccessible(true);
|
||||||
|
method.invoke(dos);
|
||||||
|
|
||||||
|
/* get and verify writePacketSize */
|
||||||
|
final Field writePacketSizeField = dos.getClass()
|
||||||
|
.getDeclaredField("writePacketSize");
|
||||||
|
writePacketSizeField.setAccessible(true);
|
||||||
|
Assert.assertEquals(writePacketSizeField.getInt(dos),
|
||||||
|
finalWritePacketSize);
|
||||||
|
|
||||||
|
/* get and verify chunksPerPacket */
|
||||||
|
final Field chunksPerPacketField = dos.getClass()
|
||||||
|
.getDeclaredField("chunksPerPacket");
|
||||||
|
chunksPerPacketField.setAccessible(true);
|
||||||
|
Assert.assertEquals(chunksPerPacketField.getInt(dos),
|
||||||
|
(finalWritePacketSize - packateMaxHeaderLength) / chunkSize);
|
||||||
|
|
||||||
|
/* get and verify packetSize */
|
||||||
|
final Field packetSizeField = dos.getClass()
|
||||||
|
.getDeclaredField("packetSize");
|
||||||
|
packetSizeField.setAccessible(true);
|
||||||
|
Assert.assertEquals(packetSizeField.getInt(dos),
|
||||||
|
chunksPerPacketField.getInt(dos) * chunkSize);
|
||||||
|
} finally {
|
||||||
|
if (dfsCluster != null) {
|
||||||
|
dfsCluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() {
|
public static void tearDown() {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue