HDFS-6097. Zero-copy reads are incorrectly disabled on file offsets above 2GB (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577350 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
842aa2bc94
commit
f730fa919e
|
@ -611,6 +611,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5244. TestNNStorageRetentionManager#testPurgeMultipleDirs fails.
|
HDFS-5244. TestNNStorageRetentionManager#testPurgeMultipleDirs fails.
|
||||||
(Jinghui Wang via suresh)
|
(Jinghui Wang via suresh)
|
||||||
|
|
||||||
|
HDFS-6097. zero-copy reads are incorrectly disabled on file offsets above
|
||||||
|
2GB (cmccabe)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||||
|
|
|
@ -1601,28 +1601,63 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
|
|
||||||
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
|
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
|
||||||
EnumSet<ReadOption> opts) throws IOException {
|
EnumSet<ReadOption> opts) throws IOException {
|
||||||
|
// Copy 'pos' and 'blockEnd' to local variables to make it easier for the
|
||||||
|
// JVM to optimize this function.
|
||||||
|
final long curPos = pos;
|
||||||
|
final long curEnd = blockEnd;
|
||||||
|
final long blockStartInFile = currentLocatedBlock.getStartOffset();
|
||||||
|
final long blockPos = curPos - blockStartInFile;
|
||||||
|
|
||||||
|
// Shorten this read if the end of the block is nearby.
|
||||||
|
long length63;
|
||||||
|
if ((curPos + maxLength) <= (curEnd + 1)) {
|
||||||
|
length63 = maxLength;
|
||||||
|
} else {
|
||||||
|
length63 = 1 + curEnd - curPos;
|
||||||
|
if (length63 <= 0) {
|
||||||
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
|
||||||
|
curPos + " of " + src + "; " + length63 + " bytes left in block. " +
|
||||||
|
"blockPos=" + blockPos + "; curPos=" + curPos +
|
||||||
|
"; curEnd=" + curEnd);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
|
DFSClient.LOG.debug("Reducing read length from " + maxLength +
|
||||||
|
" to " + length63 + " to avoid going more than one byte " +
|
||||||
|
"past the end of the block. blockPos=" + blockPos +
|
||||||
|
"; curPos=" + curPos + "; curEnd=" + curEnd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Make sure that don't go beyond 31-bit offsets in the MappedByteBuffer.
|
||||||
|
int length;
|
||||||
|
if (blockPos + length63 <= Integer.MAX_VALUE) {
|
||||||
|
length = (int)length63;
|
||||||
|
} else {
|
||||||
|
long length31 = Integer.MAX_VALUE - blockPos;
|
||||||
|
if (length31 <= 0) {
|
||||||
// Java ByteBuffers can't be longer than 2 GB, because they use
|
// Java ByteBuffers can't be longer than 2 GB, because they use
|
||||||
// 4-byte signed integers to represent capacity, etc.
|
// 4-byte signed integers to represent capacity, etc.
|
||||||
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
||||||
// FIXME: we could work around this with multiple memory maps.
|
// FIXME: we could work around this with multiple memory maps.
|
||||||
// See HDFS-5101.
|
// See HDFS-5101.
|
||||||
long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
|
|
||||||
long curPos = pos;
|
|
||||||
long blockLeft = blockEnd32 - curPos + 1;
|
|
||||||
if (blockLeft <= 0) {
|
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
DFSClient.LOG.debug("Unable to perform a zero-copy read from offset " +
|
||||||
curPos + " of " + src + "; blockLeft = " + blockLeft +
|
curPos + " of " + src + "; 31-bit MappedByteBuffer limit " +
|
||||||
"; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
|
"exceeded. blockPos=" + blockPos + ", curEnd=" + curEnd);
|
||||||
"; maxLength = " + maxLength);
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
int length = Math.min((int)blockLeft, maxLength);
|
length = (int)length31;
|
||||||
long blockStartInFile = currentLocatedBlock.getStartOffset();
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
long blockPos = curPos - blockStartInFile;
|
DFSClient.LOG.debug("Reducing read length from " + maxLength +
|
||||||
long limit = blockPos + length;
|
" to " + length + " to avoid 31-bit limit. " +
|
||||||
ClientMmap clientMmap = blockReader.getClientMmap(opts);
|
"blockPos=" + blockPos + "; curPos=" + curPos +
|
||||||
|
"; curEnd=" + curEnd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final ClientMmap clientMmap = blockReader.getClientMmap(opts);
|
||||||
if (clientMmap == null) {
|
if (clientMmap == null) {
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
||||||
|
@ -1634,16 +1669,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
ByteBuffer buffer;
|
ByteBuffer buffer;
|
||||||
try {
|
try {
|
||||||
seek(pos + length);
|
seek(curPos + length);
|
||||||
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
|
buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
|
||||||
buffer.position((int)blockPos);
|
buffer.position((int)blockPos);
|
||||||
buffer.limit((int)limit);
|
buffer.limit((int)(blockPos + length));
|
||||||
extendedReadBuffers.put(buffer, clientMmap);
|
extendedReadBuffers.put(buffer, clientMmap);
|
||||||
readStatistics.addZeroCopyBytes(length);
|
readStatistics.addZeroCopyBytes(length);
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
|
DFSClient.LOG.debug("readZeroCopy read " + length +
|
||||||
"offset " + curPos + " via the zero-copy read path. " +
|
" bytes from offset " + curPos + " via the zero-copy read " +
|
||||||
"blockEnd = " + blockEnd);
|
"path. blockEnd = " + blockEnd);
|
||||||
}
|
}
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -285,7 +285,8 @@ public class ShortCircuitReplica {
|
||||||
MappedByteBuffer loadMmapInternal() {
|
MappedByteBuffer loadMmapInternal() {
|
||||||
try {
|
try {
|
||||||
FileChannel channel = dataStream.getChannel();
|
FileChannel channel = dataStream.getChannel();
|
||||||
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0, channel.size());
|
MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, 0,
|
||||||
|
Math.min(Integer.MAX_VALUE, channel.size()));
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(this + ": created mmap of size " + channel.size());
|
LOG.trace(this + ": created mmap of size " + channel.size());
|
||||||
}
|
}
|
||||||
|
|
|
@ -776,4 +776,80 @@ public class TestEnhancedByteBufferAccess {
|
||||||
if (cluster != null) cluster.shutdown();
|
if (cluster != null) cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test2GBMmapLimit() throws Exception {
|
||||||
|
Assume.assumeTrue(BlockReaderTestUtil.shouldTestLargeFiles());
|
||||||
|
HdfsConfiguration conf = initZeroCopyTest();
|
||||||
|
final long TEST_FILE_LENGTH = 2469605888L;
|
||||||
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, TEST_FILE_LENGTH);
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
final Path TEST_PATH = new Path("/a");
|
||||||
|
final String CONTEXT = "test2GBMmapLimit";
|
||||||
|
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT);
|
||||||
|
|
||||||
|
FSDataInputStream fsIn = null, fsIn2 = null;
|
||||||
|
ByteBuffer buf1 = null, buf2 = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 0xB);
|
||||||
|
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
|
||||||
|
|
||||||
|
fsIn = fs.open(TEST_PATH);
|
||||||
|
buf1 = fsIn.read(null, 1, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.assertEquals(1, buf1.remaining());
|
||||||
|
fsIn.releaseBuffer(buf1);
|
||||||
|
buf1 = null;
|
||||||
|
fsIn.seek(2147483640L);
|
||||||
|
buf1 = fsIn.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.assertEquals(7, buf1.remaining());
|
||||||
|
Assert.assertEquals(Integer.MAX_VALUE, buf1.limit());
|
||||||
|
fsIn.releaseBuffer(buf1);
|
||||||
|
buf1 = null;
|
||||||
|
Assert.assertEquals(2147483647L, fsIn.getPos());
|
||||||
|
try {
|
||||||
|
buf1 = fsIn.read(null, 1024,
|
||||||
|
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.fail("expected UnsupportedOperationException");
|
||||||
|
} catch (UnsupportedOperationException e) {
|
||||||
|
// expected; can't read past 2GB boundary.
|
||||||
|
}
|
||||||
|
fsIn.close();
|
||||||
|
fsIn = null;
|
||||||
|
|
||||||
|
// Now create another file with normal-sized blocks, and verify we
|
||||||
|
// can read past 2GB
|
||||||
|
final Path TEST_PATH2 = new Path("/b");
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 268435456L);
|
||||||
|
DFSTestUtil.createFile(fs, TEST_PATH2, 1024 * 1024, TEST_FILE_LENGTH,
|
||||||
|
268435456L, (short)1, 0xA);
|
||||||
|
|
||||||
|
fsIn2 = fs.open(TEST_PATH2);
|
||||||
|
fsIn2.seek(2147483640L);
|
||||||
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.assertEquals(8, buf2.remaining());
|
||||||
|
Assert.assertEquals(2147483648L, fsIn2.getPos());
|
||||||
|
fsIn2.releaseBuffer(buf2);
|
||||||
|
buf2 = null;
|
||||||
|
buf2 = fsIn2.read(null, 1024, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
|
||||||
|
Assert.assertEquals(1024, buf2.remaining());
|
||||||
|
Assert.assertEquals(2147484672L, fsIn2.getPos());
|
||||||
|
fsIn2.releaseBuffer(buf2);
|
||||||
|
buf2 = null;
|
||||||
|
} finally {
|
||||||
|
if (buf1 != null) {
|
||||||
|
fsIn.releaseBuffer(buf1);
|
||||||
|
}
|
||||||
|
if (buf2 != null) {
|
||||||
|
fsIn2.releaseBuffer(buf2);
|
||||||
|
}
|
||||||
|
IOUtils.cleanup(null, fsIn, fsIn2);
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,15 @@ import org.apache.log4j.LogManager;
|
||||||
* A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
|
* A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
|
||||||
*/
|
*/
|
||||||
public class BlockReaderTestUtil {
|
public class BlockReaderTestUtil {
|
||||||
|
/**
|
||||||
|
* Returns true if we should run tests that generate large files (> 1GB)
|
||||||
|
*/
|
||||||
|
static public boolean shouldTestLargeFiles() {
|
||||||
|
String property = System.getProperty("hdfs.test.large.files");
|
||||||
|
if (property == null) return false;
|
||||||
|
if (property.isEmpty()) return true;
|
||||||
|
return Boolean.parseBoolean(property);
|
||||||
|
}
|
||||||
|
|
||||||
private HdfsConfiguration conf = null;
|
private HdfsConfiguration conf = null;
|
||||||
private MiniDFSCluster cluster = null;
|
private MiniDFSCluster cluster = null;
|
||||||
|
|
Loading…
Reference in New Issue