HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.

(cherry picked from commit 463aec1171)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

(cherry picked from commit 3d67da502a)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
This commit is contained in:
cnauroth 2014-10-27 09:38:30 -07:00
parent 6724c2f7e4
commit a9f31af29c
27 changed files with 1152 additions and 707 deletions

View File

@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
@ -188,7 +188,12 @@ abstract public class FSOutputSummer extends OutputStream {
protected synchronized int getBufferedDataSize() {
return count;
}
/** @return the size for a checksum. */
protected int getChecksumSize() {
return sum.getChecksumSize();
}
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream {
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
sum.getChecksumSize());
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream {
*/
protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
sum.getChecksumSize()];
this.checksum = new byte[sum.getChecksumSize(size)];
this.count = 0;
}

View File

@ -234,15 +234,14 @@ public final class Options {
* This is used in FileSystem and FileContext to specify checksum options.
*/
public static class ChecksumOpt {
private final int crcBlockSize;
private final DataChecksum.Type crcType;
private final DataChecksum.Type checksumType;
private final int bytesPerChecksum;
/**
* Create a uninitialized one
*/
public ChecksumOpt() {
crcBlockSize = -1;
crcType = DataChecksum.Type.DEFAULT;
this(DataChecksum.Type.DEFAULT, -1);
}
/**
@ -251,16 +250,21 @@ public final class Options {
* @param size bytes per checksum
*/
public ChecksumOpt(DataChecksum.Type type, int size) {
crcBlockSize = size;
crcType = type;
checksumType = type;
bytesPerChecksum = size;
}
public int getBytesPerChecksum() {
return crcBlockSize;
return bytesPerChecksum;
}
public DataChecksum.Type getChecksumType() {
return crcType;
return checksumType;
}
@Override
public String toString() {
return checksumType + ":" + bytesPerChecksum;
}
/**

View File

@ -869,7 +869,8 @@ public class NativeIO {
* @throws IOException
*/
public static void copyFileUnbuffered(File src, File dst) throws IOException {
if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
if ((nativeLoaded) &&
(Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
} else {
FileUtils.copyFile(src, dst);

View File

@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException;
@InterfaceStability.Evolving
public class DataChecksum implements Checksum {
// Misc constants
public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
// checksum types
public static final int CHECKSUM_NULL = 0;
public static final int CHECKSUM_CRC32 = 1;
@ -103,7 +100,7 @@ public class DataChecksum implements Checksum {
* @return DataChecksum of the type in the array or null in case of an error.
*/
public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
return null;
}
@ -116,8 +113,8 @@ public class DataChecksum implements Checksum {
}
/**
* This constructucts a DataChecksum by reading HEADER_LEN bytes from
* input stream <i>in</i>
* This constructs a DataChecksum by reading HEADER_LEN bytes from input
* stream <i>in</i>
*/
public static DataChecksum newDataChecksum( DataInputStream in )
throws IOException {
@ -141,7 +138,7 @@ public class DataChecksum implements Checksum {
}
public byte[] getHeader() {
byte[] header = new byte[DataChecksum.HEADER_LEN];
byte[] header = new byte[getChecksumHeaderSize()];
header[0] = (byte) (type.id & 0xff);
// Writing in buffer just like DataOutput.WriteInt()
header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
@ -229,13 +226,18 @@ public class DataChecksum implements Checksum {
bytesPerChecksum = chunkSize;
}
// Accessors
/** @return the checksum algorithm type. */
public Type getChecksumType() {
return type;
}
/** @return the size for a checksum. */
public int getChecksumSize() {
return type.size;
}
/** @return the required checksum size given the data length. */
public int getChecksumSize(int dataSize) {
return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
}
public int getBytesPerChecksum() {
return bytesPerChecksum;
}

View File

@ -377,6 +377,117 @@ abstract public class Shell {
return winUtilsPath;
}
public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
private final short major;
private final short minor;
private final short revision;
public LinuxKernelVersion(short major, short minor, short revision) {
this.major = major;
this.minor = minor;
this.revision = revision;
}
/**
* Parse Linux kernel version string from output of POSIX command 'uname -r'
* @param version version string from POSIX command 'uname -r'
* @return LinuxKernelVersion
* @throws IllegalArgumentException
*
* Note:
* On CentOS 5.8: '2.6.18-308.24.1.el5'
* On Ubuntu 14: '3.13.0-32-generic'
*/
public static LinuxKernelVersion parseLinuxKernelVersion(String version)
throws IllegalArgumentException {
if (version == null) {
throw new IllegalArgumentException();
}
String parts[] = version.split("-")[0].split("\\.");
if (parts.length != 3) {
throw new IllegalArgumentException(version);
}
short major = Short.parseShort(parts[0]);
short minor = Short.parseShort(parts[1]);
short revision = Short.parseShort(parts[2]);
return new LinuxKernelVersion(major, minor, revision);
}
@Override
public int compareTo(LinuxKernelVersion o) {
if (this.major == o.major) {
if (this.minor == o.minor) {
return this.revision - o.revision;
} else {
return this.minor - o.minor;
}
} else {
return this.major - o.major;
}
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof LinuxKernelVersion)) {
return false;
}
return compareTo((LinuxKernelVersion) other) == 0;
}
@Override
public String toString() {
return String.format("%d.%d.%d", major, minor, revision);
}
@Override
public int hashCode(){
int hash = 41;
hash = (19 * hash) + major;
hash = (53 * hash) + minor;
hash = (29 * hash) + revision;
return hash;
}
}
/*
* sendfile() API between two file descriptors
* is only supported on Linux Kernel version 2.6.33+
* according to http://man7.org/linux/man-pages/man2/sendfile.2.html
*/
public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
private static LinuxKernelVersion minLkvSupportSendfile =
new LinuxKernelVersion((short)2, (short)6, (short)33);
private static boolean isLinuxSendfileSupported() {
if (!Shell.LINUX) {
return false;
}
ShellCommandExecutor shexec = null;
boolean sendfileSupported = false;
try {
String[] args = {"uname", "bash", "-r"};
shexec = new ShellCommandExecutor(args);
shexec.execute();
String version = shexec.getOutput();
LinuxKernelVersion lkv =
LinuxKernelVersion.parseLinuxKernelVersion(version);
if (lkv.compareTo(minLkvSupportSendfile) > 0) {
sendfileSupported = true;
}
} catch (Exception e) {
LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("uname exited with exit code "
+ (shexec != null ? shexec.getExitCode() : "(null executor)"));
}
}
return sendfileSupported;
}
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
if (Shell.WINDOWS) {

View File

@ -165,4 +165,24 @@ public class TestShell extends TestCase {
assertEquals(2, command.getRunCount());
}
}
public void testLinuxKernelVersion() throws IOException {
Shell.LinuxKernelVersion v2_6_18 =
new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
Shell.LinuxKernelVersion v2_6_32 =
new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
}
public void testParseLinuxKernelVersion() throws Exception {
String centOs58Ver = new String("2.6.18-308.24.1.el5");
String ubuntu14Ver = new String("3.13.0-32-generic");
Shell.LinuxKernelVersion lkvCentOs58 =
Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
Shell.LinuxKernelVersion lkvUnbuntu14 =
Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
}
}

View File

@ -686,6 +686,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
(Xiaoyu Yao via cnauroth)
HDFS-6934. Move checksum computation off the hot path when writing to RAM
disk. (cnauroth)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -109,6 +109,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/
private DatanodeInfo datanode;
/**
* StorageType of replica on DataNode.
*/
private StorageType storageType;
/**
* If false, we won't try short-circuit local reads.
*/
@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
return this;
}
public BlockReaderFactory setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
public BlockReaderFactory setAllowShortCircuitLocalReads(
boolean allowShortCircuitLocalReads) {
this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
try {
return BlockReaderLocalLegacy.newBlockReader(conf,
userGroupInformation, configuration, fileName, block, token,
datanode, startOffset, length);
datanode, startOffset, length, storageType);
} catch (RemoteException remoteException) {
ioe = remoteException.unwrapRemoteException(
InvalidToken.class, AccessControlException.class);
@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
setShortCircuitReplica(info.getReplica()).
setVerifyChecksum(verifyChecksum).
setCachingStrategy(cachingStrategy).
setStorageType(storageType).
build();
}

View File

@ -66,6 +66,7 @@ class BlockReaderLocal implements BlockReader {
private ShortCircuitReplica replica;
private long dataPos;
private ExtendedBlock block;
private StorageType storageType;
public Builder(Conf conf) {
this.maxReadahead = Integer.MAX_VALUE;
@ -106,6 +107,11 @@ class BlockReaderLocal implements BlockReader {
return this;
}
public Builder setStorageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
public BlockReaderLocal build() {
Preconditions.checkNotNull(replica);
return new BlockReaderLocal(this);
@ -209,6 +215,11 @@ class BlockReaderLocal implements BlockReader {
*/
private ByteBuffer checksumBuf;
/**
* StorageType of replica on DataNode.
*/
private StorageType storageType;
private BlockReaderLocal(Builder builder) {
this.replica = builder.replica;
this.dataIn = replica.getDataStream().getChannel();
@ -237,6 +248,7 @@ class BlockReaderLocal implements BlockReader {
this.zeroReadaheadRequested = false;
}
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
this.storageType = builder.storageType;
}
private synchronized void createDataBufIfNeeded() {
@ -327,8 +339,8 @@ class BlockReaderLocal implements BlockReader {
int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.clear();
checksumBuf.limit(checksumsNeeded * checksumSize);
long checksumPos =
7 + ((startDataPos / bytesPerChecksum) * checksumSize);
long checksumPos = BlockMetadataHeader.getHeaderSize()
+ ((startDataPos / bytesPerChecksum) * checksumSize);
while (checksumBuf.hasRemaining()) {
int nRead = checksumIn.read(checksumBuf, checksumPos);
if (nRead < 0) {
@ -350,7 +362,14 @@ class BlockReaderLocal implements BlockReader {
private boolean createNoChecksumContext() {
if (verifyChecksum) {
return replica.addNoChecksumAnchor();
if (storageType != null && storageType.isTransient()) {
// Checksums are not stored for replicas on transient storage. We do not
// anchor, because we do not intend for client activity to block eviction
// from transient storage on the DataNode side.
return true;
} else {
return replica.addNoChecksumAnchor();
}
} else {
return true;
}
@ -358,7 +377,9 @@ class BlockReaderLocal implements BlockReader {
private void releaseNoChecksumContext() {
if (verifyChecksum) {
replica.removeNoChecksumAnchor();
if (storageType == null || !storageType.isTransient()) {
replica.removeNoChecksumAnchor();
}
}
}

View File

@ -177,7 +177,8 @@ class BlockReaderLocalLegacy implements BlockReader {
UserGroupInformation userGroupInformation,
Configuration configuration, String file, ExtendedBlock blk,
Token<BlockTokenIdentifier> token, DatanodeInfo node,
long startOffset, long length) throws IOException {
long startOffset, long length, StorageType storageType)
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
@ -188,7 +189,7 @@ class BlockReaderLocalLegacy implements BlockReader {
}
pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
configuration, conf.socketTimeout, token,
conf.connectToDnViaHostname);
conf.connectToDnViaHostname, storageType);
}
// check to see if the file exists. It may so happen that the
@ -200,7 +201,8 @@ class BlockReaderLocalLegacy implements BlockReader {
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null;
boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
storageType.isTransient();
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
@ -217,15 +219,8 @@ class BlockReaderLocalLegacy implements BlockReader {
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader
.readHeader(new DataInputStream(checksumIn));
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ blk + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
new DataInputStream(checksumIn), blk);
long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
@ -266,8 +261,8 @@ class BlockReaderLocalLegacy implements BlockReader {
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
throws IOException {
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
StorageType storageType) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
@ -275,7 +270,15 @@ class BlockReaderLocalLegacy implements BlockReader {
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
if (pathinfo != null) {
// We cannot cache the path information for a replica on transient storage.
// If the replica gets evicted, then it moves to a different path. Then,
// our next attempt to read from the cached path would fail to find the
// file. Additionally, the failure would cause us to disable legacy
// short-circuit read for all subsequent use in the ClientContext. Unlike
// the newer short-circuit read implementation, we have no communication
// channel for the DataNode to notify the client that the path has been
// invalidated. Therefore, our only option is to skip caching.
if (pathinfo != null && !storageType.isTransient()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}

View File

@ -96,6 +96,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
@ -513,8 +514,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return createChecksum(null);
}
private DataChecksum createChecksum(ChecksumOpt userOpt)
throws IOException {
private DataChecksum createChecksum(ChecksumOpt userOpt) {
// Fill in any missing field with the default.
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
defaultChecksumOpt, userOpt);
@ -522,8 +522,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
myOpt.getChecksumType(),
myOpt.getBytesPerChecksum());
if (dataChecksum == null) {
throw new IOException("Invalid checksum type specified: "
+ myOpt.getChecksumType().name());
throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ userOpt + ", default=" + defaultChecksumOpt
+ ", effective=null");
}
return dataChecksum;
}

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@ -567,6 +568,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
try {
ExtendedBlock blk = targetBlock.getBlock();
@ -575,6 +577,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
@ -872,12 +875,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
return getBestNodeDNAddrPair(nodes, ignoredNodes);
return getBestNodeDNAddrPair(block, ignoredNodes);
} catch (IOException ie) {
String errMsg =
getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
@ -886,7 +888,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throw new BlockMissingException(src, description,
block.getStartOffset());
}
DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
@ -920,22 +923,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
* Get the best node.
* @param nodes Nodes to choose from.
* @param ignoredNodes Do not chose nodes in this array (may be null)
* Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order.
* @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node.
* @throws IOException
*/
private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
DatanodeInfo[] nodes = block.getLocations();
StorageType[] storageTypes = block.getStorageTypes();
DatanodeInfo chosenNode = null;
StorageType storageType = null;
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
chosenNode = nodes[i];
// Storage types are ordered to correspond with nodes, so use the same
// index to get storage type.
if (storageTypes != null && i < storageTypes.length) {
storageType = storageTypes[i];
}
break;
}
}
}
if (chosenNode == null) {
throw new IOException("No live nodes contain block " + block.getBlock() +
" after checking nodes = " + Arrays.toString(nodes) +
", ignoredNodes = " + ignoredNodes);
}
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType);
}
private static String getBestNodeDNAddrPairErrorString(
@ -1018,6 +1043,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
StorageType storageType = datanode.storageType;
BlockReader reader = null;
try {
@ -1028,6 +1054,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setStorageType(storageType).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
@ -1151,7 +1178,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// If no nodes to do hedged reads against, pass.
try {
try {
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
chosenNode = getBestNodeDNAddrPair(block, ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
@ -1494,31 +1521,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throw new IOException("Mark/reset not supported");
}
/**
* Pick the best node from which to stream the data.
* Entries in <i>nodes</i> are already in the priority order
*/
static DatanodeInfo bestNode(DatanodeInfo nodes[],
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
if (nodes != null) {
for (int i = 0; i < nodes.length; i++) {
if (!deadNodes.containsKey(nodes[i])
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
return nodes[i];
}
}
}
throw new IOException("No live nodes contain current block");
}
/** Utility class to encapsulate data node info and its address. */
static class DNAddrPair {
private static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
final StorageType storageType;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
StorageType storageType) {
this.info = info;
this.addr = addr;
this.storageType = storageType;
}
}

View File

@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag;
@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
private String src;
private final long fileId;
private final long blockSize;
private final DataChecksum checksum;
/** Only for DataTransferProtocol.writeBlock(..) */
private final DataChecksum checksum4WriteBlock;
private final int bytesPerChecksum;
// both dataQueue and ackQueue are protected by dataQueue lock
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer
}
void writeChecksum(byte[] inarray, int off, int len) {
if (len == 0) {
return;
}
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
@ -377,19 +385,12 @@ public class DFSOutputStream extends FSOutputSummer
private final Span traceSpan;
/**
* Default construction for file create
*/
private DataStreamer() {
this(null, null);
}
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false;
isLazyPersistFile = initLazyPersist(stat);
isLazyPersistFile = isLazyPersist(stat);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
isLazyPersistFile = initLazyPersist(stat);
isLazyPersistFile = isLazyPersist(stat);
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@ -452,13 +453,6 @@ public class DFSOutputStream extends FSOutputSummer
}
}
private boolean initLazyPersist(HdfsFileStatus stat) {
final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
.getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
return lpPolicy != null &&
stat.getStoragePolicy() == lpPolicy.getId();
}
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = new Packet(checksum.getChecksumSize()); // heartbeat packet
one = new Packet(getChecksumSize()); // heartbeat packet
} else {
one = dataQueue.getFirst(); // regular data packet
}
@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer
// send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy.get(), isLazyPersistFile);
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer
return value;
}
/**
* @return the object for computing checksum.
* The type is NULL if checksum is not computed.
*/
private static DataChecksum getChecksum4Compute(DataChecksum checksum,
HdfsFileStatus stat) {
if (isLazyPersist(stat) && stat.getReplication() == 1) {
// do not compute checksum for writing to single replica to memory
return DataChecksum.newDataChecksum(Type.NULL,
checksum.getBytesPerChecksum());
}
return checksum;
}
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
super(checksum);
super(getChecksum4Compute(checksum, stat));
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer
"Set non-null progress callback on DFSOutputStream " + src);
}
final int bytesPerChecksum = checksum.getBytesPerChecksum();
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
") and blockSize(" + blockSize +
") do not match. " + "blockSize should be a " +
"multiple of io.bytes.per.checksum");
this.bytesPerChecksum = checksum.getBytesPerChecksum();
if (bytesPerChecksum <= 0) {
throw new HadoopIllegalArgumentException(
"Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
}
this.checksum = checksum;
if (blockSize % bytesPerChecksum != 0) {
throw new HadoopIllegalArgumentException("Invalid values: "
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ ") must divide block size (=" + blockSize + ").");
}
this.checksum4WriteBlock = checksum;
this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
}
@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
Span traceSpan = null;
if (Trace.isTracing()) {
@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat,
checksum.getBytesPerChecksum(), traceSpan);
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer(stat, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@ -1752,9 +1760,15 @@ public class DFSOutputStream extends FSOutputSummer
out.start();
return out;
}
private static boolean isLazyPersist(HdfsFileStatus stat) {
final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
return p != null && stat.getStoragePolicy() == p.getId();
}
private void computePacketChunkSize(int psize, int csize) {
int chunkSize = csize + checksum.getChecksumSize();
final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(psize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.checkOpen();
checkClosed();
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
if (cklen != this.checksum.getChecksumSize()) {
if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
this.checksum.getChecksumSize() +
" but found to be " + cklen);
getChecksumSize() + " but found to be " + cklen);
}
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
bytesCurBlock, currentSeqno++, getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, this.checksum.getChecksumSize());
currentSeqno++, getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@ -1967,7 +1979,7 @@ public class DFSOutputStream extends FSOutputSummer
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
bytesCurBlock, currentSeqno++, getChecksumSize());
}
} else {
if (isSync && bytesCurBlock > 0) {
@ -1976,7 +1988,7 @@ public class DFSOutputStream extends FSOutputSummer
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
bytesCurBlock, currentSeqno++, getChecksumSize());
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
@ -2180,8 +2192,7 @@ public class DFSOutputStream extends FSOutputSummer
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
currentPacket = new Packet(0, 0, bytesCurBlock,
currentSeqno++, this.checksum.getChecksumSize());
currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@ -2245,8 +2256,7 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
packetSize = (checksum.getBytesPerChecksum() +
checksum.getChecksumSize()) * chunksPerPacket;
packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
}
synchronized void setTestFilename(String newname) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@ -185,7 +186,11 @@ public class LocatedBlock {
+ "; getBlockSize()=" + getBlockSize()
+ "; corrupt=" + corrupt
+ "; offset=" + offset
+ "; locs=" + java.util.Arrays.asList(locs)
+ "; locs=" + Arrays.asList(locs)
+ "; storageIDs=" +
(storageIDs != null ? Arrays.asList(storageIDs) : null)
+ "; storageTypes=" +
(storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}";
}
}

View File

@ -29,10 +29,13 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockMetadataHeader {
private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
public static final short VERSION = 1;
@ -73,6 +77,37 @@ public class BlockMetadataHeader {
return checksum;
}
/**
* Read the checksum header from the meta file.
* @return the data checksum obtained from the header.
*/
public static DataChecksum readDataChecksum(File metaFile) throws IOException {
DataInputStream in = null;
try {
in = new DataInputStream(new BufferedInputStream(
new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
return readDataChecksum(in, metaFile);
} finally {
IOUtils.closeStream(in);
}
}
/**
* Read the checksum header from the meta input stream.
* @return the data checksum obtained from the header.
*/
public static DataChecksum readDataChecksum(final DataInputStream metaIn,
final Object name) throws IOException {
// read and handle the common header here. For now just a version
final BlockMetadataHeader header = readHeader(metaIn);
if (header.getVersion() != VERSION) {
LOG.warn("Unexpected meta-file version for " + name
+ ": version in file is " + header.getVersion()
+ " but expected version is " + VERSION);
}
return header.getChecksum();
}
/**
* Read the header without changing the position of the FileChannel.
*
@ -82,7 +117,7 @@ public class BlockMetadataHeader {
*/
public static BlockMetadataHeader preadHeader(FileChannel fc)
throws IOException {
byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
final byte arr[] = new byte[getHeaderSize()];
ByteBuffer buf = ByteBuffer.wrap(arr);
while (buf.hasRemaining()) {
@ -158,7 +193,7 @@ public class BlockMetadataHeader {
* Writes all the fields till the beginning of checksum.
* @throws IOException on error
*/
static void writeHeader(DataOutputStream out, DataChecksum checksum)
public static void writeHeader(DataOutputStream out, DataChecksum checksum)
throws IOException {
writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
}

View File

@ -81,12 +81,12 @@ class BlockReceiver implements Closeable {
* checksum polynomial than the block is stored with on disk,
* the DataNode needs to recalculate checksums before writing.
*/
private boolean needsChecksumTranslation;
private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
private final int bytesPerChecksum;
private final int checksumSize;
private final PacketReceiver packetReceiver = new PacketReceiver(false);
@ -98,7 +98,6 @@ class BlockReceiver implements Closeable {
private DataTransferThrottler throttler;
private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null;
private Checksum partialCrc = null;
private final DataNode datanode;
volatile private boolean mirrorError;
@ -489,7 +488,7 @@ class BlockReceiver implements Closeable {
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
int len = header.getDataLen();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
// avoid double sync'ing on close
@ -498,7 +497,7 @@ class BlockReceiver implements Closeable {
}
// update received bytes
long firstByteInBlock = offsetInBlock;
final long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
@ -538,16 +537,15 @@ class BlockReceiver implements Closeable {
flushOrSync(true);
}
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
if ( checksumBuf.capacity() != checksumLen) {
throw new IOException("Length of checksums in packet " +
checksumBuf.capacity() + " does not match calculated checksum " +
"length " + checksumLen);
if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
throw new IOException("Invalid checksum length: received length is "
+ checksumReceivedLen + " but expected length is " + checksumLen);
}
if (shouldVerifyChecksum()) {
if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
try {
verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) {
@ -571,11 +569,17 @@ class BlockReceiver implements Closeable {
translateChunks(dataBuf, checksumBuf);
}
}
if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
// checksum is missing, need to calculate it
checksumBuf = ByteBuffer.allocate(checksumLen);
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
// by this point, the data in the buffer uses the disk checksum
byte[] lastChunkChecksum;
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
@ -587,14 +591,16 @@ class BlockReceiver implements Closeable {
}
// If this is a partial chunk, then read in pre-existing checksum
if (firstByteInBlock % bytesPerChecksum != 0) {
LOG.info("Packet starts at " + firstByteInBlock +
" for " + block +
" which is not a multiple of bytesPerChecksum " +
bytesPerChecksum);
Checksum partialCrc = null;
if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("receivePacket for " + block
+ ": bytesPerChecksum=" + bytesPerChecksum
+ " does not divide firstByteInBlock=" + firstByteInBlock);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
}
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
@ -611,41 +617,40 @@ class BlockReceiver implements Closeable {
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (partialCrc != null) {
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else if (partialCrc != null) {
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. Calculate new crc for this chunk.
if (len > bytesPerChecksum) {
throw new IOException("Got wrong length during writeBlock(" +
block + ") from " + inAddr + " " +
"A packet can have only one partial chunk."+
" len = " + len +
" bytesPerChecksum " + bytesPerChecksum);
throw new IOException("Unexpected packet data length for "
+ block + " from " + inAddr + ": a partial chunk must be "
+ " sent in an individual packet (data length = " + len
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
}
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
lastChunkChecksum = Arrays.copyOfRange(
buf, buf.length - checksumSize, buf.length
);
lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len);
}
partialCrc = null;
} else {
lastChunkChecksum = Arrays.copyOfRange(
checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
checksumOut.write(checksumBuf.array(),
checksumBuf.arrayOffset() + checksumBuf.position(),
checksumLen);
// write checksum
final int offset = checksumBuf.arrayOffset() +
checksumBuf.position();
final int end = offset + checksumLen;
lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
end);
checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
/// flush entire packet, sync if requested
flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
);
replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
datanode.metrics.incrBytesWritten(len);
@ -685,6 +690,10 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
return Arrays.copyOfRange(array, end - size, end);
}
private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
@ -920,18 +929,19 @@ class BlockReceiver implements Closeable {
* reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk.
*/
private void computePartialChunkCrc(long blkoff, long ckoff,
int bytesPerChecksum) throws IOException {
private Checksum computePartialChunkCrc(long blkoff, long ckoff)
throws IOException {
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = diskChecksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk + " " + block +
" block offset " + blkoff +
" metafile offset " + ckoff);
if (LOG.isDebugEnabled()) {
LOG.debug("computePartialChunkCrc for " + block
+ ": sizePartialChunk=" + sizePartialChunk
+ ", block offset=" + blkoff
+ ", metafile offset=" + ckoff);
}
// create an input stream from the block file
// and read in partial crc chunk into temporary buffer
@ -950,10 +960,12 @@ class BlockReceiver implements Closeable {
}
// compute crc of partial chunk from data read in the block file.
partialCrc = DataChecksum.newDataChecksum(
final Checksum partialCrc = DataChecksum.newDataChecksum(
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for " + block);
if (LOG.isDebugEnabled()) {
LOG.debug("Read in partial CRC chunk from disk for " + block);
}
// paranoia! verify that the pre-computed crc matches what we
// recalculated just now
@ -964,6 +976,7 @@ class BlockReceiver implements Closeable {
checksum2long(crcbuf);
throw new IOException(msg);
}
return partialCrc;
}
private static enum PacketResponderType {

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
@ -262,26 +263,37 @@ class BlockSender implements java.io.Closeable {
*/
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " + block);
}
LengthInputStream metaIn = null;
boolean keepMetaInOpen = false;
try {
metaIn = datanode.data.getMetaDataInputStream(block);
if (!corruptChecksumOk || metaIn != null) {
if (metaIn == null) {
//need checksum but meta-data not found
throw new FileNotFoundException("Meta-data not found for " +
block);
}
checksumIn = new DataInputStream(
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
// The meta file will contain only the header if the NULL checksum
// type was used, or if the replica was written to transient storage.
// Checksum verification is not performed for replicas on transient
// storage. The header is important for determining the checksum
// type later when lazy persistence copies the block to non-transient
// storage and computes the checksum.
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
checksumIn = new DataInputStream(new BufferedInputStream(
metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
LOG.warn("Wrong version (" + version + ") for metadata file for "
+ block + " ignoring ...");
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
keepMetaInOpen = true;
}
} else {
LOG.warn("Could not find metadata file for " + block);
}
} finally {
if (!keepMetaInOpen) {
IOUtils.closeStream(metaIn);
}
csum = header.getChecksum();
} else {
LOG.warn("Could not find metadata file for " + block);
}
}
if (csum == null) {
@ -340,7 +352,7 @@ class BlockSender implements java.io.Closeable {
endOffset = end;
// seek to the right offsets
if (offset > 0) {
if (offset > 0 && checksumIn != null) {
long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {

View File

@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo
// the checksum that should actually be used -- this
// may differ from requestedChecksum for appends.
DataChecksum checksum;
final DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo
}
}
} else {
// for create, we can use the requested checksum
// for create, we can use the requested checksum
checksum = requestedChecksum;
}
@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum);
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
getVolume().isTransientStorage());
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF);

View File

@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
private final OutputStream dataOut;
private final OutputStream checksumOut;
private final DataChecksum checksum;
private final boolean isTransientStorage;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
DataChecksum checksum) {
DataChecksum checksum, boolean isTransientStorage) {
this.dataOut = dataOut;
this.checksumOut = checksumOut;
this.checksum = checksum;
this.isTransientStorage = isTransientStorage;
}
/** @return the data output stream. */
@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable {
return checksum;
}
/** @return is writing to a transient storage? */
public boolean isTransientStorage() {
return isTransientStorage;
}
@Override
public void close() {
IOUtils.closeStream(dataOut);

View File

@ -599,13 +599,8 @@ class BlockPoolSlice {
HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
short version = header.getVersion();
if (version != BlockMetadataHeader.VERSION) {
FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
+ metaFile + " ignoring ...");
}
DataChecksum checksum = header.getChecksum();
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
checksumIn, metaFile);
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min(

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@ -92,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
@ -634,7 +638,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get the meta info of a block stored in volumeMap. To find a block,
* block pool Id, block Id and generation stamp must match.
* @param b extended block
* @return the meta replica information; null if block was not found
* @return the meta replica information
* @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch
*/
@ -722,23 +726,80 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
try {
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
} catch (IOException e) {
throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
}
computeChecksum(srcMeta, dstMeta, srcFile);
try {
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
} catch (IOException e) {
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Copied " + srcMeta + " to " + dstMeta);
LOG.debug("Copied " + srcMeta + " to " + dstMeta +
" and calculated checksum");
LOG.debug("Copied " + srcFile + " to " + dstFile);
}
return new File[] {dstMeta, dstFile};
}
/**
* Compute and store the checksum for a block file that does not already have
* its checksum computed.
*
* @param srcMeta source meta file, containing only the checksum header, not a
* calculated checksum
* @param dstMeta destination meta file, into which this method will write a
* full computed checksum
* @param blockFile block file for which the checksum will be computed
* @throws IOException
*/
private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
throws IOException {
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
final byte[] data = new byte[1 << 16];
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
DataOutputStream metaOut = null;
InputStream dataIn = null;
try {
File parentFile = dstMeta.getParentFile();
if (parentFile != null) {
if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
throw new IOException("Destination '" + parentFile
+ "' directory cannot be created");
}
}
metaOut = new DataOutputStream(new BufferedOutputStream(
new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
BlockMetadataHeader.writeHeader(metaOut, checksum);
dataIn = isNativeIOAvailable ?
NativeIO.getShareDeleteFileInputStream(blockFile) :
new FileInputStream(blockFile);
int offset = 0;
for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
if (n > 0) {
n += offset;
offset = n % checksum.getBytesPerChecksum();
final int length = n - offset;
if (length > 0) {
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
System.arraycopy(data, length, data, 0, offset);
}
}
}
// calculate and write the last crc
checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
metaOut.write(crcs, 0, 4);
} finally {
IOUtils.cleanup(LOG, dataIn, metaOut);
}
}
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
@ -1641,6 +1702,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
@Override
public boolean isCached(String bpid, long blockId) {
return cacheManager.isCached(bpid, blockId);
}
@ -2556,8 +2618,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
// Replicas must be evicted from client short-circuit caches, because the
// storage will no longer be transient, and thus will require validating
// checksum. This also stops a client from holding file descriptors,
// which would prevent the OS from reclaiming the memory.
ExtendedBlock extendedBlock =
new ExtendedBlock(bpid, newReplicaInfo);
datanode.getShortCircuitRegistry().processBlockInvalidation(
ExtendedBlockId.fromExtendedBlock(extendedBlock));
datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid());

View File

@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService {
} catch (Exception e){
FsDatasetImpl.LOG.warn(
"LazyWriter failed to async persist RamDisk block pool id: "
+ bpId + "block Id: " + blockId);
+ bpId + "block Id: " + blockId, e);
} finally {
if (!succeeded) {
datanode.getFSDataset().onFailLazyPersist(bpId, blockId);

View File

@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
@Override
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
Iterator it = replicasPersisted.values().iterator();
final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
while (it.hasNext()) {
RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove();
Map<Long, RamDiskReplicaLru> replicaMap =

View File

@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume.isTransientStorage());
}
}

View File

@ -0,0 +1,389 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.Timeout;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public abstract class LazyPersistTestCase {
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
protected static final int EVICTION_LOW_WATERMARK = 1;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private static final String JMX_SERVICE_NAME = "DataNode";
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
protected MiniDFSCluster cluster;
protected DistributedFileSystem fs;
protected DFSClient client;
protected JMXGet jmx;
protected TemporarySocketDirectory sockDir;
@After
public void shutDownCluster() throws Exception {
// Dump all RamDisk JMX metrics before shutdown the cluster
printRamDiskJMXMetrics();
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
if (jmx != null) {
jmx = null;
}
IOUtils.closeQuietly(sockDir);
sockDir = null;
}
@Rule
public Timeout timeout = new Timeout(300000);
protected final LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
}
protected final void makeRandomTestFile(Path path, long length,
boolean isLazyPersist, long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
protected final void makeTestFile(Path path, long length,
boolean isLazyPersist) throws IOException {
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Allocate a block.
byte[] buffer = new byte[BUFFER_LENGTH];
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length);
bytesWritten += buffer.length;
}
if (length > 0) {
fos.hsync();
}
} finally {
IOUtils.closeQuietly(fos);
}
}
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
protected final void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity,
final boolean useSCR,
final boolean useLegacyBlockReaderLocal)
throws IOException {
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
if (useSCR) {
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
// Do not share a client context across tests.
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
if (useLegacyBlockReaderLocal) {
conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
} else {
sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
}
}
long[] capacities = null;
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
// Convert replica count to byte count, add some delta for .meta and
// VERSION files.
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
(BLOCK_SIZE - 1);
capacities = new long[] { ramDiskStorageLimit, -1 };
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(REPL_FACTOR)
.storageCapacities(capacities)
.storageTypes(hasTransientStorage ?
new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
try {
jmx = initJMX();
} catch (Exception e) {
fail("Failed initialize JMX for testing: " + e);
}
LOG.info("Cluster startup complete");
}
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
protected final void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException {
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
if (useSCR)
{
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
sockDir = new TemporarySocketDirectory();
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(numDataNodes)
.storageTypes(storageTypes != null ?
storageTypes : new StorageType[] { DEFAULT, DEFAULT })
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageType() == RAM_DISK) {
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
}
}
}
LOG.info("Cluster startup complete");
}
protected final void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity)
throws IOException {
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
}
protected final void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
protected final boolean verifyBlockDeletedFromDir(File dir,
LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
protected final void verifyRamDiskJMXMetric(String metricName,
long expectedValue) throws Exception {
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
}
protected final boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private JMXGet initJMX() throws Exception {
JMXGet jmx = new JMXGet();
jmx.setService(JMX_SERVICE_NAME);
jmx.init();
return jmx;
}
private void printRamDiskJMXMetrics() {
try {
if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -17,103 +17,45 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.util.*;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestLazyPersistFiles {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
public class TestLazyPersistFiles extends LazyPersistTestCase {
private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
private static final int THREADPOOL_SIZE = 10;
private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 5 * 1024 * 1024;
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static final int EVICTION_LOW_WATERMARK = 1;
private static final String JMX_SERVICE_NAME = "DataNode";
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
private JMXGet jmx;
@After
public void shutDownCluster() throws Exception {
// Dump all RamDisk JMX metrics before shutdown the cluster
printRamDiskJMXMetrics();
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
if (jmx != null) {
jmx = null;
}
}
@Test (timeout=300000)
@Test
public void testPolicyNotSetByDefault() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -126,7 +68,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
}
@Test (timeout=300000)
@Test
public void testPolicyPropagation() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -138,7 +80,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
@Test (timeout=300000)
@Test
public void testPolicyPersistenceInEditLog() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -152,7 +94,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
@Test (timeout=300000)
@Test
public void testPolicyPersistenceInFsImage() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -170,7 +112,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
@Test (timeout=300000)
@Test
public void testPlacementOnRamDisk() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -180,7 +122,7 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path, RAM_DISK);
}
@Test (timeout=300000)
@Test
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(true, 3);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -199,7 +141,7 @@ public class TestLazyPersistFiles {
* Write should default to disk. No error.
* @throws IOException
*/
@Test (timeout=300000)
@Test
public void testFallbackToDisk() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -213,7 +155,7 @@ public class TestLazyPersistFiles {
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
@Test (timeout=300000)
@Test
public void testFallbackToDiskFull() throws Exception {
startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -231,7 +173,7 @@ public class TestLazyPersistFiles {
* Expect 2 or less blocks are on RamDisk and 3 or more on disk.
* @throws IOException
*/
@Test (timeout=300000)
@Test
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
startUpCluster(true, 2);
@ -271,7 +213,7 @@ public class TestLazyPersistFiles {
*
* @throws IOException
*/
@Test (timeout=300000)
@Test
public void testRamDiskNotChosenByDefault() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -289,7 +231,7 @@ public class TestLazyPersistFiles {
* Append to lazy persist file is denied.
* @throws IOException
*/
@Test (timeout=300000)
@Test
public void testAppendIsDenied() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -310,7 +252,7 @@ public class TestLazyPersistFiles {
* must be discarded by the NN, instead of being kept around as a
* 'corrupt' file.
*/
@Test (timeout=300000)
@Test
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
startUpCluster(true, 2);
@ -344,7 +286,7 @@ public class TestLazyPersistFiles {
is(0L));
}
@Test (timeout=300000)
@Test
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {
startUpCluster(true, -1);
@ -399,7 +341,7 @@ public class TestLazyPersistFiles {
* RamDisk eviction after lazy persist to disk.
* @throws Exception
*/
@Test (timeout=300000)
@Test
public void testRamDiskEviction() throws Exception {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -434,7 +376,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
@Test
public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException {
startUpCluster(true, 1);
@ -459,7 +401,7 @@ public class TestLazyPersistFiles {
assert(fs.exists(path1));
assert(fs.exists(path2));
verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
}
/**
@ -467,7 +409,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
@Test
public void testRamDiskEvictionIsLru()
throws Exception {
final int NUM_PATHS = 5;
@ -529,7 +471,7 @@ public class TestLazyPersistFiles {
* Memory is freed up and file is gone.
* @throws IOException
*/
@Test // (timeout=300000)
@Test
public void testDeleteBeforePersist()
throws Exception {
startUpCluster(true, -1);
@ -556,7 +498,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
@Test
public void testDeleteAfterPersist()
throws Exception {
startUpCluster(true, -1);
@ -584,7 +526,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
@Test
public void testDfsUsageCreateDelete()
throws IOException, InterruptedException {
startUpCluster(true, 4);
@ -615,7 +557,7 @@ public class TestLazyPersistFiles {
/**
* Concurrent read from the same node and verify the contents.
*/
@Test (timeout=300000)
@Test
public void testConcurrentRead()
throws Exception {
startUpCluster(true, 2);
@ -666,7 +608,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000)
@Test
public void testConcurrentWrites()
throws IOException, InterruptedException {
startUpCluster(true, 9);
@ -702,7 +644,7 @@ public class TestLazyPersistFiles {
assertThat(testFailed.get(), is(false));
}
@Test (timeout=300000)
@Test
public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
@ -726,7 +668,7 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
@Test (timeout=300000)
@Test
public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException {
@ -746,183 +688,6 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, RAM_DISK);
}
// ---- Utility functions for all test cases -------------------------------
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
private void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity,
final boolean useSCR)
throws IOException {
conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
long[] capacities = null;
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
// Convert replica count to byte count, add some delta for .meta and VERSION files.
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
capacities = new long[] { ramDiskStorageLimit, -1 };
}
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(REPL_FACTOR)
.storageCapacities(capacities)
.storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
try {
jmx = initJMX();
} catch (Exception e) {
fail("Failed initialize JMX for testing: " + e);
}
LOG.info("Cluster startup complete");
}
private void startUpCluster(boolean hasTransientStorage,
final int ramDiskReplicaCapacity)
throws IOException {
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
throws IOException {
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Allocate a block.
byte[] buffer = new byte[BUFFER_LENGTH];
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length);
bytesWritten += buffer.length;
}
if (length > 0) {
fos.hsync();
}
} finally {
IOUtils.closeQuietly(fos);
}
}
private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
}
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
}
private boolean verifyReadRandomFile(
Path path, int fileLength, int seed) throws IOException {
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(seed, fileLength);
return Arrays.equals(contents, expected);
}
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
throws IOException, InterruptedException {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
while(
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
> 0L){
Thread.sleep(1000);
}
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
// Make sure deleted replica does not have a copy on either finalized dir of
// transient volume or finalized dir of non-transient volume
for (FsVolumeSpi v : volumes) {
FsVolumeImpl volume = (FsVolumeImpl) v;
File targetDir = (v.isTransientStorage()) ?
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
volume.getBlockPoolSlice(bpid).getLazypersistDir();
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
return false;
}
}
return true;
}
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir =
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
" exists after deletion.");
return false;
}
}
return true;
}
private void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
}
class WriterRunnable implements Runnable {
private final int id;
private final Path paths[];
@ -960,27 +725,4 @@ public class TestLazyPersistFiles {
}
}
}
JMXGet initJMX() throws Exception
{
JMXGet jmx = new JMXGet();
jmx.setService(JMX_SERVICE_NAME);
jmx.init();
return jmx;
}
void printRamDiskJMXMetrics() {
try {
if (jmx != null) {
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
}
} catch (Exception e) {
e.printStackTrace();
}
}
void verifyRamDiskJMXMetric(String metricName, long expectedValue)
throws Exception {
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
}
}

View File

@ -15,84 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.log4j.Level;
import org.junit.*;
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import java.io.File;
import java.io.IOException;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class TestScrLazyPersistFiles {
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private static TemporarySocketDirectory sockDir;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
private DFSClient client;
private Configuration conf;
public class TestScrLazyPersistFiles extends LazyPersistTestCase {
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
@Before
public void before() {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
}
@After
public void shutDownCluster() throws IOException {
if (fs != null) {
fs.close();
fs = null;
client = null;
}
if (cluster != null) {
cluster.shutdownDataNodes();
cluster.shutdown();
cluster = null;
}
}
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Read in-memory block with Short Circuit Read
* Note: the test uses faked RAM_DISK from physical disk.
*/
@Test (timeout=300000)
@Test
public void testRamDiskShortCircuitRead()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=300000000)
@Test
public void testRamDiskEvictionWithShortCircuitReadHandle()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
// ---- Utility functions for all test cases -------------------------------
/**
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
private void startUpCluster(final int numDataNodes,
final StorageType[] storageTypes,
final long ramDiskStorageLimit,
final boolean useSCR)
throws IOException {
conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
if (useSCR)
{
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
UUID.randomUUID().toString());
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
UserGroupInformation.getCurrentUser().getShortUserName());
}
REPL_FACTOR = 1; //Reset in case a test has modified the value
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(numDataNodes)
.storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageType() == RAM_DISK) {
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
}
}
}
LOG.info("Cluster startup complete");
@Test
public void testShortCircuitReadAfterEviction()
throws IOException, InterruptedException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
doShortCircuitReadAfterEvictionTest();
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
throws IOException {
@Test
public void testLegacyShortCircuitReadAfterEviction()
throws IOException, InterruptedException {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadAfterEvictionTest();
}
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
private void doShortCircuitReadAfterEvictionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
if (isLazyPersist) {
createFlags.add(LAZY_PERSIST);
}
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
FSDataOutputStream fos = null;
try {
fos =
fs.create(path,
FsPermission.getFileDefault(),
createFlags,
BUFFER_LENGTH,
REPL_FACTOR,
BLOCK_SIZE,
null);
// Verify short-circuit read from RAM_DISK.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// Allocate a block.
byte[] buffer = new byte[BUFFER_LENGTH];
for (int bytesWritten = 0; bytesWritten < length; ) {
fos.write(buffer, 0, buffer.length);
bytesWritten += buffer.length;
}
if (length > 0) {
fos.hsync();
}
} finally {
IOUtils.closeQuietly(fos);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
// Verify short-circuit read from RAM_DISK once again.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence.
ensureFileReplicasOnStorageType(path1, DEFAULT);
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
// In the implementation of legacy short-circuit reads, any failure is
// trapped silently, reverts back to a remote read, and also disables all
// subsequent legacy short-circuit reads in the ClientContext. If the test
// uses legacy, then assert that it didn't get disabled.
ClientContext clientContext = client.getClientContext();
if (clientContext.getUseLegacyBlockReaderLocal()) {
Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
}
}
private LocatedBlocks ensureFileReplicasOnStorageType(
Path path, StorageType storageType) throws IOException {
// Ensure that returned block locations returned are correct!
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
LocatedBlocks locatedBlocks =
client.getLocatedBlocks(path.toString(), 0, fileLength);
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
}
return locatedBlocks;
@Test
public void testShortCircuitReadBlockFileCorruption() throws IOException,
InterruptedException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
doShortCircuitReadBlockFileCorruptionTest();
}
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
BLOCK_SIZE, REPL_FACTOR, seed, true);
@Test
public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
InterruptedException {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadBlockFileCorruptionTest();
}
private void triggerBlockReport()
throws IOException, InterruptedException {
// Trigger block report to NN
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
Thread.sleep(10 * 1000);
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
@Test
public void testShortCircuitReadMetaFileCorruption() throws IOException,
InterruptedException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
doShortCircuitReadMetaFileCorruptionTest();
}
@Test
public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
InterruptedException {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
doShortCircuitReadMetaFileCorruptionTest();
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
InterruptedException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK, which evicts the first.
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
// Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
}