HDFS-7308. Change the packet chunk size computation in DFSOutputStream in order to enforce packet size <= 64kB. Contributed by Takuya Fukudome
This commit is contained in:
parent
bc60404eaf
commit
657b027bb2
|
@ -384,6 +384,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7819. Log WARN message for the blocks which are not in Block ID based
|
HDFS-7819. Log WARN message for the blocks which are not in Block ID based
|
||||||
layout (Rakesh R via Colin P. McCabe)
|
layout (Rakesh R via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-7308. Change the packet chunk size computation in DFSOutputStream in
|
||||||
|
order to enforce packet size <= 64kB. (Takuya Fukudome via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -1851,8 +1851,9 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
private void computePacketChunkSize(int psize, int csize) {
|
private void computePacketChunkSize(int psize, int csize) {
|
||||||
|
final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
|
||||||
final int chunkSize = csize + getChecksumSize();
|
final int chunkSize = csize + getChecksumSize();
|
||||||
chunksPerPacket = Math.max(psize/chunkSize, 1);
|
chunksPerPacket = Math.max(bodySize/chunkSize, 1);
|
||||||
packetSize = chunkSize*chunksPerPacket;
|
packetSize = chunkSize*chunksPerPacket;
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
|
DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -66,6 +68,35 @@ public class TestDFSOutputStream {
|
||||||
dos.close();
|
dos.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The computePacketChunkSize() method of DFSOutputStream should set the actual
|
||||||
|
* packet size < 64kB. See HDFS-7308 for details.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testComputePacketChunkSize()
|
||||||
|
throws Exception {
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
FSDataOutputStream os = fs.create(new Path("/test"));
|
||||||
|
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
|
||||||
|
"wrappedStream");
|
||||||
|
|
||||||
|
final int packetSize = 64*1024;
|
||||||
|
final int bytesPerChecksum = 512;
|
||||||
|
|
||||||
|
Method method = dos.getClass().getDeclaredMethod("computePacketChunkSize",
|
||||||
|
int.class, int.class);
|
||||||
|
method.setAccessible(true);
|
||||||
|
method.invoke(dos, packetSize, bytesPerChecksum);
|
||||||
|
|
||||||
|
Field field = dos.getClass().getDeclaredField("packetSize");
|
||||||
|
field.setAccessible(true);
|
||||||
|
|
||||||
|
Assert.assertTrue((Integer) field.get(dos) + 33 < packetSize);
|
||||||
|
// If PKT_MAX_HEADER_LEN is 257, actual packet size come to over 64KB
|
||||||
|
// without a fix on HDFS-7308.
|
||||||
|
Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize);
|
||||||
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void tearDown() {
|
public static void tearDown() {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue