HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551701 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-12-17 20:57:00 +00:00
parent 5792d59da3
commit 124e507674
13 changed files with 989 additions and 506 deletions

View File

@ -256,6 +256,9 @@ Trunk (Unreleased)
HDFS-5431. Support cachepool-based limit management in path-based caching HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe) (awang via cmccabe)
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
(cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import java.io.IOException; import java.io.IOException;
import java.util.EnumSet;
import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -89,10 +91,10 @@ public interface BlockReader extends ByteBufferReadable {
/** /**
* Get a ClientMmap object for this BlockReader. * Get a ClientMmap object for this BlockReader.
* *
* @param curBlock The current block. * @param opts The read options to use.
* @return The ClientMmap object, or null if mmap is not * @return The ClientMmap object, or null if mmap is not
* supported. * supported.
*/ */
ClientMmap getClientMmap(LocatedBlock curBlock, ClientMmap getClientMmap(EnumSet<ReadOption> opts,
ClientMmapManager mmapManager); ClientMmapManager mmapManager);
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -98,7 +99,7 @@ public class BlockReaderFactory {
// enabled, try to set up a BlockReaderLocal. // enabled, try to set up a BlockReaderLocal.
BlockReader reader = newShortCircuitBlockReader(conf, file, BlockReader reader = newShortCircuitBlockReader(conf, file,
block, blockToken, startOffset, len, peer, datanodeID, block, blockToken, startOffset, len, peer, datanodeID,
domSockFactory, verifyChecksum, fisCache); domSockFactory, verifyChecksum, fisCache, cachingStrategy);
if (reader != null) { if (reader != null) {
// One we've constructed the short-circuit block reader, we don't // One we've constructed the short-circuit block reader, we don't
// need the socket any more. So let's return it to the cache. // need the socket any more. So let's return it to the cache.
@ -160,7 +161,8 @@ public class BlockReaderFactory {
* @param verifyChecksum True if we should verify the checksums. * @param verifyChecksum True if we should verify the checksums.
* Note: even if this is true, when * Note: even if this is true, when
* DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is * DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
* set, we will skip checksums. * set or the block is mlocked, we will skip
* checksums.
* *
* @return The BlockReaderLocal, or null if the * @return The BlockReaderLocal, or null if the
* DataNode declined to provide short-circuit * DataNode declined to provide short-circuit
@ -172,7 +174,8 @@ public class BlockReaderFactory {
Token<BlockTokenIdentifier> blockToken, long startOffset, Token<BlockTokenIdentifier> blockToken, long startOffset,
long len, Peer peer, DatanodeID datanodeID, long len, Peer peer, DatanodeID datanodeID,
DomainSocketFactory domSockFactory, boolean verifyChecksum, DomainSocketFactory domSockFactory, boolean verifyChecksum,
FileInputStreamCache fisCache) throws IOException { FileInputStreamCache fisCache,
CachingStrategy cachingStrategy) throws IOException {
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream( new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream())); peer.getOutputStream()));
@ -189,9 +192,18 @@ public class BlockReaderFactory {
FileInputStream fis[] = new FileInputStream[2]; FileInputStream fis[] = new FileInputStream[2];
sock.recvFileInputStreams(fis, buf, 0, buf.length); sock.recvFileInputStreams(fis, buf, 0, buf.length);
try { try {
reader = new BlockReaderLocal(conf, file, block, reader = new BlockReaderLocal.Builder(conf).
startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum, setFilename(file).
fisCache); setBlock(block).
setStartOffset(startOffset).
setStreams(fis).
setDatanodeID(datanodeID).
setVerifyChecksum(verifyChecksum).
setBlockMetadataHeader(
BlockMetadataHeader.preadHeader(fis[1].getChannel())).
setFileInputStreamCache(fisCache).
setCachingStrategy(cachingStrategy).
build();
} finally { } finally {
if (reader == null) { if (reader == null) {
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);

View File

@ -24,10 +24,12 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements BlockReader {
} }
@Override @Override
public ClientMmap getClientMmap(LocatedBlock curBlock, public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
ClientMmapManager mmapManager) { ClientMmapManager mmapManager) {
return null; return null;
} }
} }

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
@ -1073,9 +1074,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
"the FileInputStreamCache."); "the FileInputStreamCache.");
} }
return new BlockReaderLocal(dfsClient.getConf(), file, return new BlockReaderLocal.Builder(dfsClient.getConf()).
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, setFilename(file).
fileInputStreamCache); setBlock(block).
setStartOffset(startOffset).
setStreams(fis).
setDatanodeID(chosenNode).
setVerifyChecksum(verifyChecksum).
setBlockMetadataHeader(BlockMetadataHeader.
preadHeader(fis[1].getChannel())).
setFileInputStreamCache(fileInputStreamCache).
setCachingStrategy(cachingStrategy).
build();
} }
// If the legacy local block reader is enabled and we are reading a local // If the legacy local block reader is enabled and we are reading a local
@ -1479,23 +1489,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
"at position " + pos); "at position " + pos);
} }
} }
boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS); ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
if (canSkipChecksums) { if (buffer != null) {
ByteBuffer buffer = tryReadZeroCopy(maxLength); return buffer;
if (buffer != null) {
return buffer;
}
} }
ByteBuffer buffer = ByteBufferUtil. buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) { if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool); extendedReadBuffers.put(buffer, bufferPool);
} }
return buffer; return buffer;
} }
private synchronized ByteBuffer tryReadZeroCopy(int maxLength) private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
throws IOException { EnumSet<ReadOption> opts) throws IOException {
// Java ByteBuffers can't be longer than 2 GB, because they use // Java ByteBuffers can't be longer than 2 GB, because they use
// 4-byte signed integers to represent capacity, etc. // 4-byte signed integers to represent capacity, etc.
// So we can't mmap the parts of the block higher than the 2 GB offset. // So we can't mmap the parts of the block higher than the 2 GB offset.
@ -1518,8 +1524,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
long blockPos = curPos - blockStartInFile; long blockPos = curPos - blockStartInFile;
long limit = blockPos + length; long limit = blockPos + length;
ClientMmap clientMmap = ClientMmap clientMmap =
blockReader.getClientMmap(currentLocatedBlock, blockReader.getClientMmap(opts, dfsClient.getMmapManager());
dfsClient.getMmapManager());
if (clientMmap == null) { if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) { if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +

View File

@ -23,10 +23,12 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
@ -490,8 +492,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
} }
@Override @Override
public ClientMmap getClientMmap(LocatedBlock curBlock, public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
ClientMmapManager mmapManager) { ClientMmapManager mmapManager) {
return null; return null;
} }
} }

View File

@ -25,10 +25,12 @@ import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.util.EnumSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
@ -455,8 +457,8 @@ public class RemoteBlockReader2 implements BlockReader {
} }
@Override @Override
public ClientMmap getClientMmap(LocatedBlock curBlock, public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
ClientMmapManager manager) { ClientMmapManager mmapManager) {
return null; return null;
} }
} }

View File

@ -21,10 +21,13 @@ import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -67,7 +70,29 @@ public class BlockMetadataHeader {
return checksum; return checksum;
} }
/**
* Read the header without changing the position of the FileChannel.
*
* @param fc The FileChannel to read.
* @return the Metadata Header.
* @throws IOException on error.
*/
public static BlockMetadataHeader preadHeader(FileChannel fc)
throws IOException {
byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
ByteBuffer buf = ByteBuffer.wrap(arr);
while (buf.hasRemaining()) {
if (fc.read(buf, 0) <= 0) {
throw new EOFException("unexpected EOF while reading " +
"metadata file header");
}
}
short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
return new BlockMetadataHeader(version, dataChecksum);
}
/** /**
* This reads all the fields till the beginning of checksum. * This reads all the fields till the beginning of checksum.
* @param in * @param in

View File

@ -1394,12 +1394,15 @@
<name>dfs.client.cache.readahead</name> <name>dfs.client.cache.readahead</name>
<value></value> <value></value>
<description> <description>
Just like dfs.datanode.readahead.bytes, this setting causes the datanode to When using remote reads, this setting causes the datanode to
read ahead in the block file using posix_fadvise, potentially decreasing read ahead in the block file using posix_fadvise, potentially decreasing
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
setting rather than a setting for the entire datanode. If present, this setting rather than a setting for the entire datanode. If present, this
setting will override the DataNode default. setting will override the DataNode default.
When using local reads, this setting determines how much readahead we do in
BlockReaderLocal.
If the native libraries are not available to the DataNode, this If the native libraries are not available to the DataNode, this
configuration has no effect. configuration has no effect.
</description> </description>

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.util.VersionInfo;
import java.io.*; import java.io.*;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -1059,4 +1060,10 @@ public class DFSTestUtil {
public static void abortStream(DFSOutputStream out) throws IOException { public static void abortStream(DFSOutputStream out) throws IOException {
out.abort(); out.abort();
} }
public static byte[] asArray(ByteBuffer buf) {
byte arr[] = new byte[buf.remaining()];
buf.duplicate().get(arr);
return arr;
}
} }

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
@ -92,22 +94,35 @@ public class TestBlockReaderLocal {
} }
} }
private static interface BlockReaderLocalTest { private static class BlockReaderLocalTest {
final int TEST_LENGTH = 12345; final static int TEST_LENGTH = 12345;
final static int BYTES_PER_CHECKSUM = 512;
public void setConfiguration(HdfsConfiguration conf) {
// default: no-op
}
public void setup(File blockFile, boolean usingChecksums) public void setup(File blockFile, boolean usingChecksums)
throws IOException; throws IOException {
// default: no-op
}
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException; throws IOException {
// default: no-op
}
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test, public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum) throws IOException { boolean checksum, long readahead) throws IOException {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys. conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum); DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
FileInputStream dataIn = null, checkIn = null; conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
test.setConfiguration(conf);
FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L; final long RANDOM_SEED = 4567L;
BlockReaderLocal blockReaderLocal = null; BlockReaderLocal blockReaderLocal = null;
@ -143,45 +158,51 @@ public class TestBlockReaderLocal {
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
test.setup(dataFile, checksum); test.setup(dataFile, checksum);
dataIn = new FileInputStream(dataFile); FileInputStream streams[] = {
checkIn = new FileInputStream(metaFile); new FileInputStream(dataFile),
blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf), new FileInputStream(metaFile)
TEST_PATH.getName(), block, 0, -1, };
dataIn, checkIn, datanodeID, checksum, null); dataIn = streams[0];
metaIn = streams[1];
blockReaderLocal = new BlockReaderLocal.Builder(
new DFSClient.Conf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setStreams(streams).
setDatanodeID(datanodeID).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
metaIn.getChannel())).
build();
dataIn = null; dataIn = null;
checkIn = null; metaIn = null;
test.doTest(blockReaderLocal, original); test.doTest(blockReaderLocal, original);
// BlockReaderLocal should not alter the file position.
Assert.assertEquals(0, streams[0].getChannel().position());
Assert.assertEquals(0, streams[1].getChannel().position());
} finally { } finally {
if (fsIn != null) fsIn.close(); if (fsIn != null) fsIn.close();
if (fs != null) fs.close(); if (fs != null) fs.close();
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close(); if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close(); if (metaIn != null) metaIn.close();
if (blockReaderLocal != null) blockReaderLocal.close(); if (blockReaderLocal != null) blockReaderLocal.close();
} }
} }
private static class TestBlockReaderLocalImmediateClose private static class TestBlockReaderLocalImmediateClose
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException { }
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { }
} }
@Test @Test
public void testBlockReaderLocalImmediateClose() throws IOException { public void testBlockReaderLocalImmediateClose() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true); runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false); runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
} }
private static class TestBlockReaderSimpleReads private static class TestBlockReaderSimpleReads
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException { }
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
@ -194,24 +215,43 @@ public class TestBlockReaderLocal {
assertArrayRegionsEqual(original, 1024, buf, 1024, 513); assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
reader.readFully(buf, 1537, 514); reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537, buf, 1537, 514); assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
// Readahead is always at least the size of one chunk in this test.
Assert.assertTrue(reader.getMaxReadaheadLength() >=
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
} }
} }
@Test @Test
public void testBlockReaderSimpleReads() throws IOException { public void testBlockReaderSimpleReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true); runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
} }
@Test @Test
public void testBlockReaderSimpleReadsNoChecksum() throws IOException { public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false); runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
}
@Test
public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
} }
private static class TestBlockReaderLocalArrayReads2 private static class TestBlockReaderLocalArrayReads2
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException { }
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
@ -234,21 +274,30 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderLocalArrayReads2() throws IOException { public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
true); true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalArrayReads2NoChecksum() public void testBlockReaderLocalArrayReads2NoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
false); false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalArrayReads2NoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
}
@Test
public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
} }
private static class TestBlockReaderLocalByteBufferReads private static class TestBlockReaderLocalByteBufferReads
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException { }
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
@ -268,19 +317,105 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderLocalByteBufferReads() public void testBlockReaderLocalByteBufferReads()
throws IOException { throws IOException {
runBlockReaderLocalTest( runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
new TestBlockReaderLocalByteBufferReads(), true); true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalByteBufferReadsNoChecksum() public void testBlockReaderLocalByteBufferReadsNoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest( runBlockReaderLocalTest(
new TestBlockReaderLocalByteBufferReads(), false); new TestBlockReaderLocalByteBufferReads(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test
public void testBlockReaderLocalByteBufferReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, 0);
}
@Test
public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
false, 0);
}
/**
* Test reads that bypass the bounce buffer (because they are aligned
* and bigger than the readahead).
*/
private static class TestBlockReaderLocalByteBufferFastLaneReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
readFully(reader, buf, 0, 5120);
buf.flip();
assertArrayRegionsEqual(original, 0,
DFSTestUtil.asArray(buf), 0,
5120);
reader.skip(1537);
readFully(reader, buf, 0, 1);
buf.flip();
assertArrayRegionsEqual(original, 6657,
DFSTestUtil.asArray(buf), 0,
1);
reader.setMlocked(true);
readFully(reader, buf, 0, 5120);
buf.flip();
assertArrayRegionsEqual(original, 6658,
DFSTestUtil.asArray(buf), 0,
5120);
reader.setMlocked(false);
readFully(reader, buf, 0, 513);
buf.flip();
assertArrayRegionsEqual(original, 11778,
DFSTestUtil.asArray(buf), 0,
513);
reader.skip(3);
readFully(reader, buf, 0, 50);
buf.flip();
assertArrayRegionsEqual(original, 12294,
DFSTestUtil.asArray(buf), 0,
50);
}
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReads()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(
new TestBlockReaderLocalByteBufferFastLaneReads(),
false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
true, 0);
}
@Test
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
false, 0);
}
private static class TestBlockReaderLocalReadCorruptStart private static class TestBlockReaderLocalReadCorruptStart
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
boolean usingChecksums = false; boolean usingChecksums = false;
@Override @Override
public void setup(File blockFile, boolean usingChecksums) public void setup(File blockFile, boolean usingChecksums)
@ -314,11 +449,12 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderLocalReadCorruptStart() public void testBlockReaderLocalReadCorruptStart()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true); runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
private static class TestBlockReaderLocalReadCorrupt private static class TestBlockReaderLocalReadCorrupt
implements BlockReaderLocalTest { extends BlockReaderLocalTest {
boolean usingChecksums = false; boolean usingChecksums = false;
@Override @Override
public void setup(File blockFile, boolean usingChecksums) public void setup(File blockFile, boolean usingChecksums)
@ -364,8 +500,136 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderLocalReadCorrupt() public void testBlockReaderLocalReadCorrupt()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true); runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false); DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadCorruptNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalReadCorruptNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
}
@Test
public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
}
private static class TestBlockReaderLocalWithMlockChanges
extends BlockReaderLocalTest {
@Override
public void setup(File blockFile, boolean usingChecksums)
throws IOException {
}
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
reader.skip(1);
readFully(reader, buf, 1, 9);
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
readFully(reader, buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
reader.setMlocked(true);
readFully(reader, buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
reader.setMlocked(false);
reader.skip(1); // skip from offset 810 to offset 811
readFully(reader, buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
@Test
public void testBlockReaderLocalWithMlockChanges()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
true, 0);
}
@Test
public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
false, 0);
}
private static class TestBlockReaderLocalOnFileWithoutChecksum
extends BlockReaderLocalTest {
@Override
public void setConfiguration(HdfsConfiguration conf) {
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
}
@Override
public void doTest(BlockReaderLocal reader, byte original[])
throws IOException {
Assert.assertTrue(!reader.getVerifyChecksum());
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
reader.skip(1);
readFully(reader, buf, 1, 9);
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
readFully(reader, buf, 10, 100);
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
reader.setMlocked(true);
readFully(reader, buf, 110, 700);
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
reader.setMlocked(false);
reader.skip(1); // skip from offset 810 to offset 811
readFully(reader, buf, 811, 5);
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
}
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
true, 0);
}
@Test
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, 0);
} }
@Test(timeout=60000) @Test(timeout=60000)

View File

@ -259,7 +259,6 @@ public class TestShortCircuitLocalRead {
assertTrue("/ should be a directory", fs.getFileStatus(path) assertTrue("/ should be a directory", fs.getFileStatus(path)
.isDirectory() == true); .isDirectory() == true);
// create a new file in home directory. Do not close it.
byte[] fileData = AppendTestUtil.randomBytes(seed, size); byte[] fileData = AppendTestUtil.randomBytes(seed, size);
Path file1 = fs.makeQualified(new Path("filelocal.dat")); Path file1 = fs.makeQualified(new Path("filelocal.dat"));
FSDataOutputStream stm = createFile(fs, file1, 1); FSDataOutputStream stm = createFile(fs, file1, 1);