HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1552119 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
971418cdb5
commit
d6f80a1ce8
|
@ -230,6 +230,9 @@ Release 2.4.0 - UNRELEASED
|
||||||
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
|
HDFS-5580. Fix infinite loop in Balancer.waitForMoveCompletion.
|
||||||
(Binglin Chang via junping_du)
|
(Binglin Chang via junping_du)
|
||||||
|
|
||||||
|
HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not
|
||||||
|
(cmccabe)
|
||||||
|
|
||||||
Release 2.3.0 - UNRELEASED
|
Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -18,8 +18,10 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmap;
|
import org.apache.hadoop.hdfs.client.ClientMmap;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -89,10 +91,10 @@ public interface BlockReader extends ByteBufferReadable {
|
||||||
/**
|
/**
|
||||||
* Get a ClientMmap object for this BlockReader.
|
* Get a ClientMmap object for this BlockReader.
|
||||||
*
|
*
|
||||||
* @param curBlock The current block.
|
* @param opts The read options to use.
|
||||||
* @return The ClientMmap object, or null if mmap is not
|
* @return The ClientMmap object, or null if mmap is not
|
||||||
* supported.
|
* supported.
|
||||||
*/
|
*/
|
||||||
ClientMmap getClientMmap(LocatedBlock curBlock,
|
ClientMmap getClientMmap(EnumSet<ReadOption> opts,
|
||||||
ClientMmapManager mmapManager);
|
ClientMmapManager mmapManager);
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -98,7 +99,7 @@ public class BlockReaderFactory {
|
||||||
// enabled, try to set up a BlockReaderLocal.
|
// enabled, try to set up a BlockReaderLocal.
|
||||||
BlockReader reader = newShortCircuitBlockReader(conf, file,
|
BlockReader reader = newShortCircuitBlockReader(conf, file,
|
||||||
block, blockToken, startOffset, len, peer, datanodeID,
|
block, blockToken, startOffset, len, peer, datanodeID,
|
||||||
domSockFactory, verifyChecksum, fisCache);
|
domSockFactory, verifyChecksum, fisCache, cachingStrategy);
|
||||||
if (reader != null) {
|
if (reader != null) {
|
||||||
// One we've constructed the short-circuit block reader, we don't
|
// One we've constructed the short-circuit block reader, we don't
|
||||||
// need the socket any more. So let's return it to the cache.
|
// need the socket any more. So let's return it to the cache.
|
||||||
|
@ -160,7 +161,8 @@ public class BlockReaderFactory {
|
||||||
* @param verifyChecksum True if we should verify the checksums.
|
* @param verifyChecksum True if we should verify the checksums.
|
||||||
* Note: even if this is true, when
|
* Note: even if this is true, when
|
||||||
* DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
|
* DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is
|
||||||
* set, we will skip checksums.
|
* set or the block is mlocked, we will skip
|
||||||
|
* checksums.
|
||||||
*
|
*
|
||||||
* @return The BlockReaderLocal, or null if the
|
* @return The BlockReaderLocal, or null if the
|
||||||
* DataNode declined to provide short-circuit
|
* DataNode declined to provide short-circuit
|
||||||
|
@ -172,7 +174,8 @@ public class BlockReaderFactory {
|
||||||
Token<BlockTokenIdentifier> blockToken, long startOffset,
|
Token<BlockTokenIdentifier> blockToken, long startOffset,
|
||||||
long len, Peer peer, DatanodeID datanodeID,
|
long len, Peer peer, DatanodeID datanodeID,
|
||||||
DomainSocketFactory domSockFactory, boolean verifyChecksum,
|
DomainSocketFactory domSockFactory, boolean verifyChecksum,
|
||||||
FileInputStreamCache fisCache) throws IOException {
|
FileInputStreamCache fisCache,
|
||||||
|
CachingStrategy cachingStrategy) throws IOException {
|
||||||
final DataOutputStream out =
|
final DataOutputStream out =
|
||||||
new DataOutputStream(new BufferedOutputStream(
|
new DataOutputStream(new BufferedOutputStream(
|
||||||
peer.getOutputStream()));
|
peer.getOutputStream()));
|
||||||
|
@ -189,9 +192,18 @@ public class BlockReaderFactory {
|
||||||
FileInputStream fis[] = new FileInputStream[2];
|
FileInputStream fis[] = new FileInputStream[2];
|
||||||
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
sock.recvFileInputStreams(fis, buf, 0, buf.length);
|
||||||
try {
|
try {
|
||||||
reader = new BlockReaderLocal(conf, file, block,
|
reader = new BlockReaderLocal.Builder(conf).
|
||||||
startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum,
|
setFilename(file).
|
||||||
fisCache);
|
setBlock(block).
|
||||||
|
setStartOffset(startOffset).
|
||||||
|
setStreams(fis).
|
||||||
|
setDatanodeID(datanodeID).
|
||||||
|
setVerifyChecksum(verifyChecksum).
|
||||||
|
setBlockMetadataHeader(
|
||||||
|
BlockMetadataHeader.preadHeader(fis[1].getChannel())).
|
||||||
|
setFileInputStreamCache(fisCache).
|
||||||
|
setCachingStrategy(cachingStrategy).
|
||||||
|
build();
|
||||||
} finally {
|
} finally {
|
||||||
if (reader == null) {
|
if (reader == null) {
|
||||||
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -24,10 +24,12 @@ import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmap;
|
import org.apache.hadoop.hdfs.client.ClientMmap;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements BlockReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientMmap getClientMmap(LocatedBlock curBlock,
|
public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
|
||||||
ClientMmapManager mmapManager) {
|
ClientMmapManager mmapManager) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
|
@ -1073,9 +1074,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
|
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
|
||||||
"the FileInputStreamCache.");
|
"the FileInputStreamCache.");
|
||||||
}
|
}
|
||||||
return new BlockReaderLocal(dfsClient.getConf(), file,
|
return new BlockReaderLocal.Builder(dfsClient.getConf()).
|
||||||
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
|
setFilename(file).
|
||||||
fileInputStreamCache);
|
setBlock(block).
|
||||||
|
setStartOffset(startOffset).
|
||||||
|
setStreams(fis).
|
||||||
|
setDatanodeID(chosenNode).
|
||||||
|
setVerifyChecksum(verifyChecksum).
|
||||||
|
setBlockMetadataHeader(BlockMetadataHeader.
|
||||||
|
preadHeader(fis[1].getChannel())).
|
||||||
|
setFileInputStreamCache(fileInputStreamCache).
|
||||||
|
setCachingStrategy(cachingStrategy).
|
||||||
|
build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the legacy local block reader is enabled and we are reading a local
|
// If the legacy local block reader is enabled and we are reading a local
|
||||||
|
@ -1479,23 +1489,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
"at position " + pos);
|
"at position " + pos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
|
ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
|
||||||
if (canSkipChecksums) {
|
if (buffer != null) {
|
||||||
ByteBuffer buffer = tryReadZeroCopy(maxLength);
|
return buffer;
|
||||||
if (buffer != null) {
|
|
||||||
return buffer;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ByteBuffer buffer = ByteBufferUtil.
|
buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
|
||||||
fallbackRead(this, bufferPool, maxLength);
|
|
||||||
if (buffer != null) {
|
if (buffer != null) {
|
||||||
extendedReadBuffers.put(buffer, bufferPool);
|
extendedReadBuffers.put(buffer, bufferPool);
|
||||||
}
|
}
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
|
private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
|
||||||
throws IOException {
|
EnumSet<ReadOption> opts) throws IOException {
|
||||||
// Java ByteBuffers can't be longer than 2 GB, because they use
|
// Java ByteBuffers can't be longer than 2 GB, because they use
|
||||||
// 4-byte signed integers to represent capacity, etc.
|
// 4-byte signed integers to represent capacity, etc.
|
||||||
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
// So we can't mmap the parts of the block higher than the 2 GB offset.
|
||||||
|
@ -1518,8 +1524,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
long blockPos = curPos - blockStartInFile;
|
long blockPos = curPos - blockStartInFile;
|
||||||
long limit = blockPos + length;
|
long limit = blockPos + length;
|
||||||
ClientMmap clientMmap =
|
ClientMmap clientMmap =
|
||||||
blockReader.getClientMmap(currentLocatedBlock,
|
blockReader.getClientMmap(opts, dfsClient.getMmapManager());
|
||||||
dfsClient.getMmapManager());
|
|
||||||
if (clientMmap == null) {
|
if (clientMmap == null) {
|
||||||
if (DFSClient.LOG.isDebugEnabled()) {
|
if (DFSClient.LOG.isDebugEnabled()) {
|
||||||
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
|
||||||
|
|
|
@ -23,10 +23,12 @@ import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FSInputChecker;
|
import org.apache.hadoop.fs.FSInputChecker;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmap;
|
import org.apache.hadoop.hdfs.client.ClientMmap;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
|
@ -490,8 +492,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientMmap getClientMmap(LocatedBlock curBlock,
|
public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
|
||||||
ClientMmapManager mmapManager) {
|
ClientMmapManager mmapManager) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,12 @@ import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmap;
|
import org.apache.hadoop.hdfs.client.ClientMmap;
|
||||||
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
import org.apache.hadoop.hdfs.client.ClientMmapManager;
|
||||||
import org.apache.hadoop.hdfs.net.Peer;
|
import org.apache.hadoop.hdfs.net.Peer;
|
||||||
|
@ -455,8 +457,8 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientMmap getClientMmap(LocatedBlock curBlock,
|
public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
|
||||||
ClientMmapManager manager) {
|
ClientMmapManager mmapManager) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,10 +21,13 @@ import java.io.BufferedInputStream;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.FileChannel;
|
||||||
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
@ -67,7 +70,29 @@ public class BlockMetadataHeader {
|
||||||
return checksum;
|
return checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the header without changing the position of the FileChannel.
|
||||||
|
*
|
||||||
|
* @param fc The FileChannel to read.
|
||||||
|
* @return the Metadata Header.
|
||||||
|
* @throws IOException on error.
|
||||||
|
*/
|
||||||
|
public static BlockMetadataHeader preadHeader(FileChannel fc)
|
||||||
|
throws IOException {
|
||||||
|
byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
|
||||||
|
ByteBuffer buf = ByteBuffer.wrap(arr);
|
||||||
|
|
||||||
|
while (buf.hasRemaining()) {
|
||||||
|
if (fc.read(buf, 0) <= 0) {
|
||||||
|
throw new EOFException("unexpected EOF while reading " +
|
||||||
|
"metadata file header");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
short version = (short)((arr[0] << 8) | (arr[1] & 0xff));
|
||||||
|
DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2);
|
||||||
|
return new BlockMetadataHeader(version, dataChecksum);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This reads all the fields till the beginning of checksum.
|
* This reads all the fields till the beginning of checksum.
|
||||||
* @param in
|
* @param in
|
||||||
|
|
|
@ -1394,12 +1394,15 @@
|
||||||
<name>dfs.client.cache.readahead</name>
|
<name>dfs.client.cache.readahead</name>
|
||||||
<value></value>
|
<value></value>
|
||||||
<description>
|
<description>
|
||||||
Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
|
When using remote reads, this setting causes the datanode to
|
||||||
read ahead in the block file using posix_fadvise, potentially decreasing
|
read ahead in the block file using posix_fadvise, potentially decreasing
|
||||||
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
|
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
|
||||||
setting rather than a setting for the entire datanode. If present, this
|
setting rather than a setting for the entire datanode. If present, this
|
||||||
setting will override the DataNode default.
|
setting will override the DataNode default.
|
||||||
|
|
||||||
|
When using local reads, this setting determines how much readahead we do in
|
||||||
|
BlockReaderLocal.
|
||||||
|
|
||||||
If the native libraries are not available to the DataNode, this
|
If the native libraries are not available to the DataNode, this
|
||||||
configuration has no effect.
|
configuration has no effect.
|
||||||
</description>
|
</description>
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.util.VersionInfo;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.net.*;
|
import java.net.*;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
@ -1018,4 +1019,10 @@ public class DFSTestUtil {
|
||||||
public static void abortStream(DFSOutputStream out) throws IOException {
|
public static void abortStream(DFSOutputStream out) throws IOException {
|
||||||
out.abort();
|
out.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static byte[] asArray(ByteBuffer buf) {
|
||||||
|
byte arr[] = new byte[buf.remaining()];
|
||||||
|
buf.duplicate().get(arr);
|
||||||
|
return arr;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.net.unix.DomainSocket;
|
import org.apache.hadoop.net.unix.DomainSocket;
|
||||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||||
|
@ -92,22 +94,35 @@ public class TestBlockReaderLocal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static interface BlockReaderLocalTest {
|
private static class BlockReaderLocalTest {
|
||||||
final int TEST_LENGTH = 12345;
|
final static int TEST_LENGTH = 12345;
|
||||||
|
final static int BYTES_PER_CHECKSUM = 512;
|
||||||
|
|
||||||
|
public void setConfiguration(HdfsConfiguration conf) {
|
||||||
|
// default: no-op
|
||||||
|
}
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
throws IOException;
|
throws IOException {
|
||||||
|
// default: no-op
|
||||||
|
}
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException;
|
throws IOException {
|
||||||
|
// default: no-op
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
|
||||||
boolean checksum) throws IOException {
|
boolean checksum, long readahead) throws IOException {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
HdfsConfiguration conf = new HdfsConfiguration();
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.
|
conf.setBoolean(DFSConfigKeys.
|
||||||
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
|
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY,
|
||||||
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
|
||||||
FileInputStream dataIn = null, checkIn = null;
|
conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
|
||||||
|
test.setConfiguration(conf);
|
||||||
|
FileInputStream dataIn = null, metaIn = null;
|
||||||
final Path TEST_PATH = new Path("/a");
|
final Path TEST_PATH = new Path("/a");
|
||||||
final long RANDOM_SEED = 4567L;
|
final long RANDOM_SEED = 4567L;
|
||||||
BlockReaderLocal blockReaderLocal = null;
|
BlockReaderLocal blockReaderLocal = null;
|
||||||
|
@ -143,45 +158,51 @@ public class TestBlockReaderLocal {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
cluster = null;
|
cluster = null;
|
||||||
test.setup(dataFile, checksum);
|
test.setup(dataFile, checksum);
|
||||||
dataIn = new FileInputStream(dataFile);
|
FileInputStream streams[] = {
|
||||||
checkIn = new FileInputStream(metaFile);
|
new FileInputStream(dataFile),
|
||||||
blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf),
|
new FileInputStream(metaFile)
|
||||||
TEST_PATH.getName(), block, 0, -1,
|
};
|
||||||
dataIn, checkIn, datanodeID, checksum, null);
|
dataIn = streams[0];
|
||||||
|
metaIn = streams[1];
|
||||||
|
blockReaderLocal = new BlockReaderLocal.Builder(
|
||||||
|
new DFSClient.Conf(conf)).
|
||||||
|
setFilename(TEST_PATH.getName()).
|
||||||
|
setBlock(block).
|
||||||
|
setStreams(streams).
|
||||||
|
setDatanodeID(datanodeID).
|
||||||
|
setCachingStrategy(new CachingStrategy(false, readahead)).
|
||||||
|
setVerifyChecksum(checksum).
|
||||||
|
setBlockMetadataHeader(BlockMetadataHeader.preadHeader(
|
||||||
|
metaIn.getChannel())).
|
||||||
|
build();
|
||||||
dataIn = null;
|
dataIn = null;
|
||||||
checkIn = null;
|
metaIn = null;
|
||||||
test.doTest(blockReaderLocal, original);
|
test.doTest(blockReaderLocal, original);
|
||||||
|
// BlockReaderLocal should not alter the file position.
|
||||||
|
Assert.assertEquals(0, streams[0].getChannel().position());
|
||||||
|
Assert.assertEquals(0, streams[1].getChannel().position());
|
||||||
} finally {
|
} finally {
|
||||||
if (fsIn != null) fsIn.close();
|
if (fsIn != null) fsIn.close();
|
||||||
if (fs != null) fs.close();
|
if (fs != null) fs.close();
|
||||||
if (cluster != null) cluster.shutdown();
|
if (cluster != null) cluster.shutdown();
|
||||||
if (dataIn != null) dataIn.close();
|
if (dataIn != null) dataIn.close();
|
||||||
if (checkIn != null) checkIn.close();
|
if (metaIn != null) metaIn.close();
|
||||||
if (blockReaderLocal != null) blockReaderLocal.close();
|
if (blockReaderLocal != null) blockReaderLocal.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalImmediateClose
|
private static class TestBlockReaderLocalImmediateClose
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
|
||||||
throws IOException { }
|
|
||||||
@Override
|
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
|
||||||
throws IOException { }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalImmediateClose() throws IOException {
|
public void testBlockReaderLocalImmediateClose() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true);
|
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0);
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false);
|
runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderSimpleReads
|
private static class TestBlockReaderSimpleReads
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
|
||||||
throws IOException { }
|
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -194,24 +215,43 @@ public class TestBlockReaderLocal {
|
||||||
assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
|
assertArrayRegionsEqual(original, 1024, buf, 1024, 513);
|
||||||
reader.readFully(buf, 1537, 514);
|
reader.readFully(buf, 1537, 514);
|
||||||
assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
|
assertArrayRegionsEqual(original, 1537, buf, 1537, 514);
|
||||||
|
// Readahead is always at least the size of one chunk in this test.
|
||||||
|
Assert.assertTrue(reader.getMaxReadaheadLength() >=
|
||||||
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderSimpleReads() throws IOException {
|
public void testBlockReaderSimpleReads() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true);
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderSimpleReadsShortReadahead() throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
|
||||||
|
BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
|
public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false);
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderSimpleReadsNoReadahead() throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderSimpleReadsNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalArrayReads2
|
private static class TestBlockReaderLocalArrayReads2
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
|
||||||
throws IOException { }
|
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -234,21 +274,30 @@ public class TestBlockReaderLocal {
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalArrayReads2() throws IOException {
|
public void testBlockReaderLocalArrayReads2() throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
||||||
true);
|
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalArrayReads2NoChecksum()
|
public void testBlockReaderLocalArrayReads2NoChecksum()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
|
||||||
false);
|
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalArrayReads2NoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalByteBufferReads
|
private static class TestBlockReaderLocalByteBufferReads
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
@Override
|
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
|
||||||
throws IOException { }
|
|
||||||
@Override
|
@Override
|
||||||
public void doTest(BlockReaderLocal reader, byte original[])
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -268,19 +317,105 @@ public class TestBlockReaderLocal {
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalByteBufferReads()
|
public void testBlockReaderLocalByteBufferReads()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
|
||||||
new TestBlockReaderLocalByteBufferReads(), true);
|
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalByteBufferReadsNoChecksum()
|
public void testBlockReaderLocalByteBufferReadsNoChecksum()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(
|
runBlockReaderLocalTest(
|
||||||
new TestBlockReaderLocalByteBufferReads(), false);
|
new TestBlockReaderLocalByteBufferReads(),
|
||||||
|
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferReadsNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
|
||||||
|
true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
|
||||||
|
false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test reads that bypass the bounce buffer (because they are aligned
|
||||||
|
* and bigger than the readahead).
|
||||||
|
*/
|
||||||
|
private static class TestBlockReaderLocalByteBufferFastLaneReads
|
||||||
|
extends BlockReaderLocalTest {
|
||||||
|
@Override
|
||||||
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
|
throws IOException {
|
||||||
|
ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH);
|
||||||
|
readFully(reader, buf, 0, 5120);
|
||||||
|
buf.flip();
|
||||||
|
assertArrayRegionsEqual(original, 0,
|
||||||
|
DFSTestUtil.asArray(buf), 0,
|
||||||
|
5120);
|
||||||
|
reader.skip(1537);
|
||||||
|
readFully(reader, buf, 0, 1);
|
||||||
|
buf.flip();
|
||||||
|
assertArrayRegionsEqual(original, 6657,
|
||||||
|
DFSTestUtil.asArray(buf), 0,
|
||||||
|
1);
|
||||||
|
reader.setMlocked(true);
|
||||||
|
readFully(reader, buf, 0, 5120);
|
||||||
|
buf.flip();
|
||||||
|
assertArrayRegionsEqual(original, 6658,
|
||||||
|
DFSTestUtil.asArray(buf), 0,
|
||||||
|
5120);
|
||||||
|
reader.setMlocked(false);
|
||||||
|
readFully(reader, buf, 0, 513);
|
||||||
|
buf.flip();
|
||||||
|
assertArrayRegionsEqual(original, 11778,
|
||||||
|
DFSTestUtil.asArray(buf), 0,
|
||||||
|
513);
|
||||||
|
reader.skip(3);
|
||||||
|
readFully(reader, buf, 0, 50);
|
||||||
|
buf.flip();
|
||||||
|
assertArrayRegionsEqual(original, 12294,
|
||||||
|
DFSTestUtil.asArray(buf), 0,
|
||||||
|
50);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferFastLaneReads()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
|
||||||
|
true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(
|
||||||
|
new TestBlockReaderLocalByteBufferFastLaneReads(),
|
||||||
|
false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
|
||||||
|
true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(),
|
||||||
|
false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalReadCorruptStart
|
private static class TestBlockReaderLocalReadCorruptStart
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
boolean usingChecksums = false;
|
boolean usingChecksums = false;
|
||||||
@Override
|
@Override
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
|
@ -314,11 +449,12 @@ public class TestBlockReaderLocal {
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalReadCorruptStart()
|
public void testBlockReaderLocalReadCorruptStart()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true);
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class TestBlockReaderLocalReadCorrupt
|
private static class TestBlockReaderLocalReadCorrupt
|
||||||
implements BlockReaderLocalTest {
|
extends BlockReaderLocalTest {
|
||||||
boolean usingChecksums = false;
|
boolean usingChecksums = false;
|
||||||
@Override
|
@Override
|
||||||
public void setup(File blockFile, boolean usingChecksums)
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
|
@ -364,8 +500,136 @@ public class TestBlockReaderLocal {
|
||||||
@Test
|
@Test
|
||||||
public void testBlockReaderLocalReadCorrupt()
|
public void testBlockReaderLocalReadCorrupt()
|
||||||
throws IOException {
|
throws IOException {
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true);
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
|
||||||
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false);
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadCorruptNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadCorruptNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestBlockReaderLocalWithMlockChanges
|
||||||
|
extends BlockReaderLocalTest {
|
||||||
|
@Override
|
||||||
|
public void setup(File blockFile, boolean usingChecksums)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
|
throws IOException {
|
||||||
|
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
|
||||||
|
reader.skip(1);
|
||||||
|
readFully(reader, buf, 1, 9);
|
||||||
|
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
|
||||||
|
readFully(reader, buf, 10, 100);
|
||||||
|
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
|
||||||
|
reader.setMlocked(true);
|
||||||
|
readFully(reader, buf, 110, 700);
|
||||||
|
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
|
||||||
|
reader.setMlocked(false);
|
||||||
|
reader.skip(1); // skip from offset 810 to offset 811
|
||||||
|
readFully(reader, buf, 811, 5);
|
||||||
|
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalWithMlockChanges()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
|
||||||
|
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalWithMlockChangesNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
|
||||||
|
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalWithMlockChangesNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
|
||||||
|
true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
|
||||||
|
false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class TestBlockReaderLocalOnFileWithoutChecksum
|
||||||
|
extends BlockReaderLocalTest {
|
||||||
|
@Override
|
||||||
|
public void setConfiguration(HdfsConfiguration conf) {
|
||||||
|
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void doTest(BlockReaderLocal reader, byte original[])
|
||||||
|
throws IOException {
|
||||||
|
Assert.assertTrue(!reader.getVerifyChecksum());
|
||||||
|
ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]);
|
||||||
|
reader.skip(1);
|
||||||
|
readFully(reader, buf, 1, 9);
|
||||||
|
assertArrayRegionsEqual(original, 1, buf.array(), 1, 9);
|
||||||
|
readFully(reader, buf, 10, 100);
|
||||||
|
assertArrayRegionsEqual(original, 10, buf.array(), 10, 100);
|
||||||
|
reader.setMlocked(true);
|
||||||
|
readFully(reader, buf, 110, 700);
|
||||||
|
assertArrayRegionsEqual(original, 110, buf.array(), 110, 700);
|
||||||
|
reader.setMlocked(false);
|
||||||
|
reader.skip(1); // skip from offset 810 to offset 811
|
||||||
|
readFully(reader, buf, 811, 5);
|
||||||
|
assertArrayRegionsEqual(original, 811, buf.array(), 811, 5);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalOnFileWithoutChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
||||||
|
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
||||||
|
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
||||||
|
true, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead()
|
||||||
|
throws IOException {
|
||||||
|
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
|
||||||
|
false, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=60000)
|
@Test(timeout=60000)
|
||||||
|
|
|
@ -260,7 +260,6 @@ public class TestShortCircuitLocalRead {
|
||||||
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
||||||
.isDirectory() == true);
|
.isDirectory() == true);
|
||||||
|
|
||||||
// create a new file in home directory. Do not close it.
|
|
||||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
byte[] fileData = AppendTestUtil.randomBytes(seed, size);
|
||||||
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
|
Path file1 = fs.makeQualified(new Path("filelocal.dat"));
|
||||||
FSDataOutputStream stm = createFile(fs, file1, 1);
|
FSDataOutputStream stm = createFile(fs, file1, 1);
|
||||||
|
|
Loading…
Reference in New Issue