diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 19cbb6f9354..934421a1884 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
protected FSOutputSummer(DataChecksum sum) {
this.sum = sum;
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
- this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
+ this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
this.count = 0;
}
@@ -188,7 +188,12 @@ abstract public class FSOutputSummer extends OutputStream {
protected synchronized int getBufferedDataSize() {
return count;
}
-
+
+ /** @return the size for a checksum. */
+ protected int getChecksumSize() {
+ return sum.getChecksumSize();
+ }
+
/** Generate checksums for the given data chunks and output chunks & checksums
* to the underlying output stream.
*/
@@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream {
sum.calculateChunkedSums(b, off, len, checksum, 0);
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
- int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
- writeChunk(b, off + i, chunkLen, checksum, ckOffset,
- sum.getChecksumSize());
+ int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+ writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
}
}
@@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream {
*/
protected synchronized void setChecksumBufSize(int size) {
this.buf = new byte[size];
- this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
- sum.getChecksumSize()];
+ this.checksum = new byte[sum.getChecksumSize(size)];
this.count = 0;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index e070943bb2f..da75d1c058c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -234,15 +234,14 @@ public final class Options {
* This is used in FileSystem and FileContext to specify checksum options.
*/
public static class ChecksumOpt {
- private final int crcBlockSize;
- private final DataChecksum.Type crcType;
+ private final DataChecksum.Type checksumType;
+ private final int bytesPerChecksum;
/**
* Create a uninitialized one
*/
public ChecksumOpt() {
- crcBlockSize = -1;
- crcType = DataChecksum.Type.DEFAULT;
+ this(DataChecksum.Type.DEFAULT, -1);
}
/**
@@ -251,16 +250,21 @@ public final class Options {
* @param size bytes per checksum
*/
public ChecksumOpt(DataChecksum.Type type, int size) {
- crcBlockSize = size;
- crcType = type;
+ checksumType = type;
+ bytesPerChecksum = size;
}
public int getBytesPerChecksum() {
- return crcBlockSize;
+ return bytesPerChecksum;
}
public DataChecksum.Type getChecksumType() {
- return crcType;
+ return checksumType;
+ }
+
+ @Override
+ public String toString() {
+ return checksumType + ":" + bytesPerChecksum;
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 24009586a31..f0aca3aa53f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -869,7 +869,8 @@ public class NativeIO {
* @throws IOException
*/
public static void copyFileUnbuffered(File src, File dst) throws IOException {
- if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
+ if ((nativeLoaded) &&
+ (Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
} else {
FileUtils.copyFile(src, dst);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
index 9f0ee35711c..a38ec325fec 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/DataChecksum.java
@@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException;
@InterfaceStability.Evolving
public class DataChecksum implements Checksum {
- // Misc constants
- public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
-
// checksum types
public static final int CHECKSUM_NULL = 0;
public static final int CHECKSUM_CRC32 = 1;
@@ -103,7 +100,7 @@ public class DataChecksum implements Checksum {
* @return DataChecksum of the type in the array or null in case of an error.
*/
public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
- if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
+ if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
return null;
}
@@ -116,8 +113,8 @@ public class DataChecksum implements Checksum {
}
/**
- * This constructucts a DataChecksum by reading HEADER_LEN bytes from
- * input stream in
+ * This constructs a DataChecksum by reading HEADER_LEN bytes from input
+ * stream in
*/
public static DataChecksum newDataChecksum( DataInputStream in )
throws IOException {
@@ -141,7 +138,7 @@ public class DataChecksum implements Checksum {
}
public byte[] getHeader() {
- byte[] header = new byte[DataChecksum.HEADER_LEN];
+ byte[] header = new byte[getChecksumHeaderSize()];
header[0] = (byte) (type.id & 0xff);
// Writing in buffer just like DataOutput.WriteInt()
header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
@@ -229,13 +226,18 @@ public class DataChecksum implements Checksum {
bytesPerChecksum = chunkSize;
}
- // Accessors
+ /** @return the checksum algorithm type. */
public Type getChecksumType() {
return type;
}
+ /** @return the size for a checksum. */
public int getChecksumSize() {
return type.size;
}
+ /** @return the required checksum size given the data length. */
+ public int getChecksumSize(int dataSize) {
+ return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
+ }
public int getBytesPerChecksum() {
return bytesPerChecksum;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
index 3aac27b06a6..9b2a8243c7c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Shell.java
@@ -377,6 +377,117 @@ abstract public class Shell {
return winUtilsPath;
}
+ public static class LinuxKernelVersion implements Comparable{
+ private final short major;
+ private final short minor;
+ private final short revision;
+
+ public LinuxKernelVersion(short major, short minor, short revision) {
+ this.major = major;
+ this.minor = minor;
+ this.revision = revision;
+ }
+
+ /**
+ * Parse Linux kernel version string from output of POSIX command 'uname -r'
+ * @param version version string from POSIX command 'uname -r'
+ * @return LinuxKernelVersion
+ * @throws IllegalArgumentException
+ *
+ * Note:
+ * On CentOS 5.8: '2.6.18-308.24.1.el5'
+ * On Ubuntu 14: '3.13.0-32-generic'
+ */
+ public static LinuxKernelVersion parseLinuxKernelVersion(String version)
+ throws IllegalArgumentException {
+ if (version == null) {
+ throw new IllegalArgumentException();
+ }
+ String parts[] = version.split("-")[0].split("\\.");
+ if (parts.length != 3) {
+ throw new IllegalArgumentException(version);
+ }
+ short major = Short.parseShort(parts[0]);
+ short minor = Short.parseShort(parts[1]);
+ short revision = Short.parseShort(parts[2]);
+ return new LinuxKernelVersion(major, minor, revision);
+ }
+
+ @Override
+ public int compareTo(LinuxKernelVersion o) {
+ if (this.major == o.major) {
+ if (this.minor == o.minor) {
+ return this.revision - o.revision;
+ } else {
+ return this.minor - o.minor;
+ }
+ } else {
+ return this.major - o.major;
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof LinuxKernelVersion)) {
+ return false;
+ }
+ return compareTo((LinuxKernelVersion) other) == 0;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%d.%d.%d", major, minor, revision);
+ }
+
+ @Override
+ public int hashCode(){
+ int hash = 41;
+ hash = (19 * hash) + major;
+ hash = (53 * hash) + minor;
+ hash = (29 * hash) + revision;
+ return hash;
+ }
+ }
+
+ /*
+ * sendfile() API between two file descriptors
+ * is only supported on Linux Kernel version 2.6.33+
+ * according to http://man7.org/linux/man-pages/man2/sendfile.2.html
+ */
+ public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
+ private static LinuxKernelVersion minLkvSupportSendfile =
+ new LinuxKernelVersion((short)2, (short)6, (short)33);
+
+ private static boolean isLinuxSendfileSupported() {
+ if (!Shell.LINUX) {
+ return false;
+ }
+ ShellCommandExecutor shexec = null;
+ boolean sendfileSupported = false;
+ try {
+ String[] args = {"uname", "bash", "-r"};
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ String version = shexec.getOutput();
+ LinuxKernelVersion lkv =
+ LinuxKernelVersion.parseLinuxKernelVersion(version);
+ if (lkv.compareTo(minLkvSupportSendfile) > 0) {
+ sendfileSupported = true;
+ }
+ } catch (Exception e) {
+ LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
+ } finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("uname exited with exit code "
+ + (shexec != null ? shexec.getExitCode() : "(null executor)"));
+ }
+ }
+ return sendfileSupported;
+ }
+
public static final boolean isSetsidAvailable = isSetsidSupported();
private static boolean isSetsidSupported() {
if (Shell.WINDOWS) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
index d9dc9ef5fe6..19589f8195c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestShell.java
@@ -165,4 +165,24 @@ public class TestShell extends TestCase {
assertEquals(2, command.getRunCount());
}
}
+
+ public void testLinuxKernelVersion() throws IOException {
+ Shell.LinuxKernelVersion v2_6_18 =
+ new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
+ Shell.LinuxKernelVersion v2_6_32 =
+ new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
+ assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
+ }
+
+ public void testParseLinuxKernelVersion() throws Exception {
+ String centOs58Ver = new String("2.6.18-308.24.1.el5");
+ String ubuntu14Ver = new String("3.13.0-32-generic");
+ Shell.LinuxKernelVersion lkvCentOs58 =
+ Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
+ Shell.LinuxKernelVersion lkvUnbuntu14 =
+ Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
+ assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
+ assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 38695e01286..0eb2fe55eb1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -686,6 +686,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
(Xiaoyu Yao via cnauroth)
+ HDFS-6934. Move checksum computation off the hot path when writing to RAM
+ disk. (cnauroth)
+
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 3fb442b94a5..13e0a522685 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -109,6 +109,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
*/
private DatanodeInfo datanode;
+ /**
+ * StorageType of replica on DataNode.
+ */
+ private StorageType storageType;
+
/**
* If false, we won't try short-circuit local reads.
*/
@@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
return this;
}
+ public BlockReaderFactory setStorageType(StorageType storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
public BlockReaderFactory setAllowShortCircuitLocalReads(
boolean allowShortCircuitLocalReads) {
this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
@@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
try {
return BlockReaderLocalLegacy.newBlockReader(conf,
userGroupInformation, configuration, fileName, block, token,
- datanode, startOffset, length);
+ datanode, startOffset, length, storageType);
} catch (RemoteException remoteException) {
ioe = remoteException.unwrapRemoteException(
InvalidToken.class, AccessControlException.class);
@@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
setShortCircuitReplica(info.getReplica()).
setVerifyChecksum(verifyChecksum).
setCachingStrategy(cachingStrategy).
+ setStorageType(storageType).
build();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index cd75e53b273..a3bfde78bba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -66,6 +66,7 @@ class BlockReaderLocal implements BlockReader {
private ShortCircuitReplica replica;
private long dataPos;
private ExtendedBlock block;
+ private StorageType storageType;
public Builder(Conf conf) {
this.maxReadahead = Integer.MAX_VALUE;
@@ -106,6 +107,11 @@ class BlockReaderLocal implements BlockReader {
return this;
}
+ public Builder setStorageType(StorageType storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
public BlockReaderLocal build() {
Preconditions.checkNotNull(replica);
return new BlockReaderLocal(this);
@@ -209,6 +215,11 @@ class BlockReaderLocal implements BlockReader {
*/
private ByteBuffer checksumBuf;
+ /**
+ * StorageType of replica on DataNode.
+ */
+ private StorageType storageType;
+
private BlockReaderLocal(Builder builder) {
this.replica = builder.replica;
this.dataIn = replica.getDataStream().getChannel();
@@ -237,6 +248,7 @@ class BlockReaderLocal implements BlockReader {
this.zeroReadaheadRequested = false;
}
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+ this.storageType = builder.storageType;
}
private synchronized void createDataBufIfNeeded() {
@@ -327,8 +339,8 @@ class BlockReaderLocal implements BlockReader {
int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
checksumBuf.clear();
checksumBuf.limit(checksumsNeeded * checksumSize);
- long checksumPos =
- 7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+ long checksumPos = BlockMetadataHeader.getHeaderSize()
+ + ((startDataPos / bytesPerChecksum) * checksumSize);
while (checksumBuf.hasRemaining()) {
int nRead = checksumIn.read(checksumBuf, checksumPos);
if (nRead < 0) {
@@ -350,7 +362,14 @@ class BlockReaderLocal implements BlockReader {
private boolean createNoChecksumContext() {
if (verifyChecksum) {
- return replica.addNoChecksumAnchor();
+ if (storageType != null && storageType.isTransient()) {
+ // Checksums are not stored for replicas on transient storage. We do not
+ // anchor, because we do not intend for client activity to block eviction
+ // from transient storage on the DataNode side.
+ return true;
+ } else {
+ return replica.addNoChecksumAnchor();
+ }
} else {
return true;
}
@@ -358,7 +377,9 @@ class BlockReaderLocal implements BlockReader {
private void releaseNoChecksumContext() {
if (verifyChecksum) {
- replica.removeNoChecksumAnchor();
+ if (storageType == null || !storageType.isTransient()) {
+ replica.removeNoChecksumAnchor();
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 47455754d72..95c7178cc3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -177,7 +177,8 @@ class BlockReaderLocalLegacy implements BlockReader {
UserGroupInformation userGroupInformation,
Configuration configuration, String file, ExtendedBlock blk,
Token token, DatanodeInfo node,
- long startOffset, long length) throws IOException {
+ long startOffset, long length, StorageType storageType)
+ throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort());
// check the cache first
@@ -188,7 +189,7 @@ class BlockReaderLocalLegacy implements BlockReader {
}
pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
configuration, conf.socketTimeout, token,
- conf.connectToDnViaHostname);
+ conf.connectToDnViaHostname, storageType);
}
// check to see if the file exists. It may so happen that the
@@ -200,7 +201,8 @@ class BlockReaderLocalLegacy implements BlockReader {
FileInputStream dataIn = null;
FileInputStream checksumIn = null;
BlockReaderLocalLegacy localBlockReader = null;
- boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
+ boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
+ storageType.isTransient();
try {
// get a local file system
File blkfile = new File(pathinfo.getBlockPath());
@@ -217,15 +219,8 @@ class BlockReaderLocalLegacy implements BlockReader {
File metafile = new File(pathinfo.getMetaPath());
checksumIn = new FileInputStream(metafile);
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader
- .readHeader(new DataInputStream(checksumIn));
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + blk + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
+ final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+ new DataInputStream(checksumIn), blk);
long firstChunkOffset = startOffset
- (startOffset % checksum.getBytesPerChecksum());
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
@@ -266,8 +261,8 @@ class BlockReaderLocalLegacy implements BlockReader {
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
- Token token, boolean connectToDnViaHostname)
- throws IOException {
+ Token token, boolean connectToDnViaHostname,
+ StorageType storageType) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
@@ -275,7 +270,15 @@ class BlockReaderLocalLegacy implements BlockReader {
try {
// make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
- if (pathinfo != null) {
+ // We cannot cache the path information for a replica on transient storage.
+ // If the replica gets evicted, then it moves to a different path. Then,
+ // our next attempt to read from the cached path would fail to find the
+ // file. Additionally, the failure would cause us to disable legacy
+ // short-circuit read for all subsequent use in the ClientContext. Unlike
+ // the newer short-circuit read implementation, we have no communication
+ // channel for the DataNode to notify the client that the path has been
+ // invalidated. Therefore, our only option is to skip caching.
+ if (pathinfo != null && !storageType.isTransient()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 2c75b5edb53..08cc58f38c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -96,6 +96,7 @@ import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CipherSuite;
@@ -513,8 +514,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return createChecksum(null);
}
- private DataChecksum createChecksum(ChecksumOpt userOpt)
- throws IOException {
+ private DataChecksum createChecksum(ChecksumOpt userOpt) {
// Fill in any missing field with the default.
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
defaultChecksumOpt, userOpt);
@@ -522,8 +522,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
myOpt.getChecksumType(),
myOpt.getBytesPerChecksum());
if (dataChecksum == null) {
- throw new IOException("Invalid checksum type specified: "
- + myOpt.getChecksumType().name());
+ throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
+ + userOpt + ", default=" + defaultChecksumOpt
+ + ", effective=null");
}
return dataChecksum;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index af1ba14714a..ff65ebc5847 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -567,6 +568,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
DNAddrPair retval = chooseDataNode(targetBlock, null);
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
+ StorageType storageType = retval.storageType;
try {
ExtendedBlock blk = targetBlock.getBlock();
@@ -575,6 +577,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
+ setStorageType(storageType).
setFileName(src).
setBlock(blk).
setBlockToken(accessToken).
@@ -872,12 +875,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection ignoredNodes) throws IOException {
while (true) {
- DatanodeInfo[] nodes = block.getLocations();
try {
- return getBestNodeDNAddrPair(nodes, ignoredNodes);
+ return getBestNodeDNAddrPair(block, ignoredNodes);
} catch (IOException ie) {
- String errMsg =
- getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
+ String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
+ deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
@@ -886,7 +888,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throw new BlockMissingException(src, description,
block.getStartOffset());
}
-
+
+ DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
@@ -920,22 +923,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
/**
- * Get the best node.
- * @param nodes Nodes to choose from.
- * @param ignoredNodes Do not chose nodes in this array (may be null)
+ * Get the best node from which to stream the data.
+ * @param block LocatedBlock, containing nodes in priority order.
+ * @param ignoredNodes Do not choose nodes in this array (may be null)
* @return The DNAddrPair of the best node.
* @throws IOException
*/
- private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
+ private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
Collection ignoredNodes) throws IOException {
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
+ DatanodeInfo[] nodes = block.getLocations();
+ StorageType[] storageTypes = block.getStorageTypes();
+ DatanodeInfo chosenNode = null;
+ StorageType storageType = null;
+ if (nodes != null) {
+ for (int i = 0; i < nodes.length; i++) {
+ if (!deadNodes.containsKey(nodes[i])
+ && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
+ chosenNode = nodes[i];
+ // Storage types are ordered to correspond with nodes, so use the same
+ // index to get storage type.
+ if (storageTypes != null && i < storageTypes.length) {
+ storageType = storageTypes[i];
+ }
+ break;
+ }
+ }
+ }
+ if (chosenNode == null) {
+ throw new IOException("No live nodes contain block " + block.getBlock() +
+ " after checking nodes = " + Arrays.toString(nodes) +
+ ", ignoredNodes = " + ignoredNodes);
+ }
final String dnAddr =
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
- return new DNAddrPair(chosenNode, targetAddr);
+ return new DNAddrPair(chosenNode, targetAddr, storageType);
}
private static String getBestNodeDNAddrPairErrorString(
@@ -1018,6 +1043,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
DatanodeInfo chosenNode = datanode.info;
InetSocketAddress targetAddr = datanode.addr;
+ StorageType storageType = datanode.storageType;
BlockReader reader = null;
try {
@@ -1028,6 +1054,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
+ setStorageType(storageType).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
@@ -1151,7 +1178,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
// If no nodes to do hedged reads against, pass.
try {
try {
- chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
+ chosenNode = getBestNodeDNAddrPair(block, ignored);
} catch (IOException ioe) {
chosenNode = chooseDataNode(block, ignored);
}
@@ -1494,31 +1521,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
throw new IOException("Mark/reset not supported");
}
- /**
- * Pick the best node from which to stream the data.
- * Entries in nodes are already in the priority order
- */
- static DatanodeInfo bestNode(DatanodeInfo nodes[],
- AbstractMap deadNodes,
- Collection ignoredNodes) throws IOException {
- if (nodes != null) {
- for (int i = 0; i < nodes.length; i++) {
- if (!deadNodes.containsKey(nodes[i])
- && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
- return nodes[i];
- }
- }
- }
- throw new IOException("No live nodes contain current block");
- }
-
/** Utility class to encapsulate data node info and its address. */
- static class DNAddrPair {
+ private static final class DNAddrPair {
final DatanodeInfo info;
final InetSocketAddress addr;
- DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+ final StorageType storageType;
+
+ DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
+ StorageType storageType) {
this.info = info;
this.addr = addr;
+ this.storageType = storageType;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 60178c73dee..a83c854d035 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag;
@@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
-
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
@@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
private String src;
private final long fileId;
private final long blockSize;
- private final DataChecksum checksum;
+ /** Only for DataTransferProtocol.writeBlock(..) */
+ private final DataChecksum checksum4WriteBlock;
+ private final int bytesPerChecksum;
+
// both dataQueue and ackQueue are protected by dataQueue lock
private final LinkedList dataQueue = new LinkedList();
private final LinkedList ackQueue = new LinkedList();
@@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer
}
void writeChecksum(byte[] inarray, int off, int len) {
+ if (len == 0) {
+ return;
+ }
if (checksumPos + len > dataStart) {
throw new BufferOverflowException();
}
@@ -377,19 +385,12 @@ public class DFSOutputStream extends FSOutputSummer
private final Span traceSpan;
- /**
- * Default construction for file create
- */
- private DataStreamer() {
- this(null, null);
- }
-
/**
* construction with tracing info
*/
private DataStreamer(HdfsFileStatus stat, Span span) {
isAppend = false;
- isLazyPersistFile = initLazyPersist(stat);
+ isLazyPersistFile = isLazyPersist(stat);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
@@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
block = lastBlock.getBlock();
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
- isLazyPersistFile = initLazyPersist(stat);
+ isLazyPersistFile = isLazyPersist(stat);
long usedInLastBlock = stat.getLen() % blockSize;
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
@@ -452,13 +453,6 @@ public class DFSOutputStream extends FSOutputSummer
}
}
-
- private boolean initLazyPersist(HdfsFileStatus stat) {
- final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
- .getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
- return lpPolicy != null &&
- stat.getStoragePolicy() == lpPolicy.getId();
- }
private void setPipeline(LocatedBlock lb) {
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
@@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
- one = new Packet(checksum.getChecksumSize()); // heartbeat packet
+ one = new Packet(getChecksumSize()); // heartbeat packet
} else {
one = dataQueue.getFirst(); // regular data packet
}
@@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer
// send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
- nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
- cachingStrategy.get(), isLazyPersistFile);
+ nodes.length, block.getNumBytes(), bytesSent, newGS,
+ checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer
return value;
}
+ /**
+ * @return the object for computing checksum.
+ * The type is NULL if checksum is not computed.
+ */
+ private static DataChecksum getChecksum4Compute(DataChecksum checksum,
+ HdfsFileStatus stat) {
+ if (isLazyPersist(stat) && stat.getReplication() == 1) {
+ // do not compute checksum for writing to single replica to memory
+ return DataChecksum.newDataChecksum(Type.NULL,
+ checksum.getBytesPerChecksum());
+ }
+ return checksum;
+ }
+
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
- super(checksum);
+ super(getChecksum4Compute(checksum, stat));
this.dfsClient = dfsClient;
this.src = src;
this.fileId = stat.getFileId();
@@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer
"Set non-null progress callback on DFSOutputStream " + src);
}
- final int bytesPerChecksum = checksum.getBytesPerChecksum();
- if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
- throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
- ") and blockSize(" + blockSize +
- ") do not match. " + "blockSize should be a " +
- "multiple of io.bytes.per.checksum");
-
+ this.bytesPerChecksum = checksum.getBytesPerChecksum();
+ if (bytesPerChecksum <= 0) {
+ throw new HadoopIllegalArgumentException(
+ "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
}
- this.checksum = checksum;
+ if (blockSize % bytesPerChecksum != 0) {
+ throw new HadoopIllegalArgumentException("Invalid values: "
+ + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
+ + ") must divide block size (=" + blockSize + ").");
+ }
+ this.checksum4WriteBlock = checksum;
+
this.dfsclientSlowLogThresholdMs =
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
}
@@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
- computePacketChunkSize(dfsClient.getConf().writePacketSize,
- checksum.getBytesPerChecksum());
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
Span traceSpan = null;
if (Trace.isTracing()) {
@@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
- streamer = new DataStreamer(lastBlock, stat,
- checksum.getBytesPerChecksum(), traceSpan);
+ streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
} else {
- computePacketChunkSize(dfsClient.getConf().writePacketSize,
- checksum.getBytesPerChecksum());
+ computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
streamer = new DataStreamer(stat, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
@@ -1752,9 +1760,15 @@ public class DFSOutputStream extends FSOutputSummer
out.start();
return out;
}
+
+ private static boolean isLazyPersist(HdfsFileStatus stat) {
+ final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
+ HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
+ return p != null && stat.getStoragePolicy() == p.getId();
+ }
private void computePacketChunkSize(int psize, int csize) {
- int chunkSize = csize + checksum.getChecksumSize();
+ final int chunkSize = csize + getChecksumSize();
chunksPerPacket = Math.max(psize/chunkSize, 1);
packetSize = chunkSize*chunksPerPacket;
if (DFSClient.LOG.isDebugEnabled()) {
@@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer
dfsClient.checkOpen();
checkClosed();
- int bytesPerChecksum = this.checksum.getBytesPerChecksum();
if (len > bytesPerChecksum) {
throw new IOException("writeChunk() buffer size is " + len +
" is larger than supported bytesPerChecksum " +
bytesPerChecksum);
}
- if (cklen != this.checksum.getChecksumSize()) {
+ if (cklen != 0 && cklen != getChecksumSize()) {
throw new IOException("writeChunk() checksum size is supposed to be " +
- this.checksum.getChecksumSize() +
- " but found to be " + cklen);
+ getChecksumSize() + " but found to be " + cklen);
}
if (currentPacket == null) {
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+ bytesCurBlock, currentSeqno++, getChecksumSize());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
currentPacket.seqno +
@@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
//
if (bytesCurBlock == blockSize) {
currentPacket = new Packet(0, 0, bytesCurBlock,
- currentSeqno++, this.checksum.getChecksumSize());
+ currentSeqno++, getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
@@ -1967,7 +1979,7 @@ public class DFSOutputStream extends FSOutputSummer
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+ bytesCurBlock, currentSeqno++, getChecksumSize());
}
} else {
if (isSync && bytesCurBlock > 0) {
@@ -1976,7 +1988,7 @@ public class DFSOutputStream extends FSOutputSummer
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
- bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
+ bytesCurBlock, currentSeqno++, getChecksumSize());
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
@@ -2180,8 +2192,7 @@ public class DFSOutputStream extends FSOutputSummer
if (bytesCurBlock != 0) {
// send an empty packet to mark the end of the block
- currentPacket = new Packet(0, 0, bytesCurBlock,
- currentSeqno++, this.checksum.getChecksumSize());
+ currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
@@ -2245,8 +2256,7 @@ public class DFSOutputStream extends FSOutputSummer
@VisibleForTesting
public synchronized void setChunksPerPacket(int value) {
chunksPerPacket = Math.min(chunksPerPacket, value);
- packetSize = (checksum.getBytesPerChecksum() +
- checksum.getChecksumSize()) * chunksPerPacket;
+ packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
}
synchronized void setTestFilename(String newname) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index 16bcc0b93d1..30368f6eddc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -185,7 +186,11 @@ public class LocatedBlock {
+ "; getBlockSize()=" + getBlockSize()
+ "; corrupt=" + corrupt
+ "; offset=" + offset
- + "; locs=" + java.util.Arrays.asList(locs)
+ + "; locs=" + Arrays.asList(locs)
+ + "; storageIDs=" +
+ (storageIDs != null ? Arrays.asList(storageIDs) : null)
+ + "; storageTypes=" +
+ (storageTypes != null ? Arrays.asList(storageTypes) : null)
+ "}";
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
index b86cad45be7..51a61343ab6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
@@ -29,10 +29,13 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.DataChecksum;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
@@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockMetadataHeader {
+ private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
public static final short VERSION = 1;
@@ -73,6 +77,37 @@ public class BlockMetadataHeader {
return checksum;
}
+ /**
+ * Read the checksum header from the meta file.
+ * @return the data checksum obtained from the header.
+ */
+ public static DataChecksum readDataChecksum(File metaFile) throws IOException {
+ DataInputStream in = null;
+ try {
+ in = new DataInputStream(new BufferedInputStream(
+ new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
+ return readDataChecksum(in, metaFile);
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+
+ /**
+ * Read the checksum header from the meta input stream.
+ * @return the data checksum obtained from the header.
+ */
+ public static DataChecksum readDataChecksum(final DataInputStream metaIn,
+ final Object name) throws IOException {
+ // read and handle the common header here. For now just a version
+ final BlockMetadataHeader header = readHeader(metaIn);
+ if (header.getVersion() != VERSION) {
+ LOG.warn("Unexpected meta-file version for " + name
+ + ": version in file is " + header.getVersion()
+ + " but expected version is " + VERSION);
+ }
+ return header.getChecksum();
+ }
+
/**
* Read the header without changing the position of the FileChannel.
*
@@ -82,7 +117,7 @@ public class BlockMetadataHeader {
*/
public static BlockMetadataHeader preadHeader(FileChannel fc)
throws IOException {
- byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
+ final byte arr[] = new byte[getHeaderSize()];
ByteBuffer buf = ByteBuffer.wrap(arr);
while (buf.hasRemaining()) {
@@ -158,7 +193,7 @@ public class BlockMetadataHeader {
* Writes all the fields till the beginning of checksum.
* @throws IOException on error
*/
- static void writeHeader(DataOutputStream out, DataChecksum checksum)
+ public static void writeHeader(DataOutputStream out, DataChecksum checksum)
throws IOException {
writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 4d1cc6c256c..75f1c3656e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -81,12 +81,12 @@ class BlockReceiver implements Closeable {
* checksum polynomial than the block is stored with on disk,
* the DataNode needs to recalculate checksums before writing.
*/
- private boolean needsChecksumTranslation;
+ private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk
- private int bytesPerChecksum;
- private int checksumSize;
+ private final int bytesPerChecksum;
+ private final int checksumSize;
private final PacketReceiver packetReceiver = new PacketReceiver(false);
@@ -98,7 +98,6 @@ class BlockReceiver implements Closeable {
private DataTransferThrottler throttler;
private ReplicaOutputStreams streams;
private DatanodeInfo srcDataNode = null;
- private Checksum partialCrc = null;
private final DataNode datanode;
volatile private boolean mirrorError;
@@ -489,7 +488,7 @@ class BlockReceiver implements Closeable {
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
- int len = header.getDataLen();
+ final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
// avoid double sync'ing on close
@@ -498,7 +497,7 @@ class BlockReceiver implements Closeable {
}
// update received bytes
- long firstByteInBlock = offsetInBlock;
+ final long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
@@ -538,16 +537,15 @@ class BlockReceiver implements Closeable {
flushOrSync(true);
}
} else {
- int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
- checksumSize;
+ final int checksumLen = diskChecksum.getChecksumSize(len);
+ final int checksumReceivedLen = checksumBuf.capacity();
- if ( checksumBuf.capacity() != checksumLen) {
- throw new IOException("Length of checksums in packet " +
- checksumBuf.capacity() + " does not match calculated checksum " +
- "length " + checksumLen);
+ if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
+ throw new IOException("Invalid checksum length: received length is "
+ + checksumReceivedLen + " but expected length is " + checksumLen);
}
- if (shouldVerifyChecksum()) {
+ if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
try {
verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) {
@@ -571,11 +569,17 @@ class BlockReceiver implements Closeable {
translateChunks(dataBuf, checksumBuf);
}
}
+
+ if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
+ // checksum is missing, need to calculate it
+ checksumBuf = ByteBuffer.allocate(checksumLen);
+ diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
+ }
// by this point, the data in the buffer uses the disk checksum
- byte[] lastChunkChecksum;
-
+ final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
+ && streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen bytesPerChecksum) {
- throw new IOException("Got wrong length during writeBlock(" +
- block + ") from " + inAddr + " " +
- "A packet can have only one partial chunk."+
- " len = " + len +
- " bytesPerChecksum " + bytesPerChecksum);
+ throw new IOException("Unexpected packet data length for "
+ + block + " from " + inAddr + ": a partial chunk must be "
+ + " sent in an individual packet (data length = " + len
+ + " > bytesPerChecksum = " + bytesPerChecksum + ")");
}
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
- lastChunkChecksum = Arrays.copyOfRange(
- buf, buf.length - checksumSize, buf.length
- );
+ lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len);
}
partialCrc = null;
} else {
- lastChunkChecksum = Arrays.copyOfRange(
- checksumBuf.array(),
- checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
- checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
- checksumOut.write(checksumBuf.array(),
- checksumBuf.arrayOffset() + checksumBuf.position(),
- checksumLen);
+ // write checksum
+ final int offset = checksumBuf.arrayOffset() +
+ checksumBuf.position();
+ final int end = offset + checksumLen;
+ lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
+ end);
+ checksumOut.write(checksumBuf.array(), offset, checksumLen);
}
+
/// flush entire packet, sync if requested
flushOrSync(syncBlock);
- replicaInfo.setLastChecksumAndDataLen(
- offsetInBlock, lastChunkChecksum
- );
+ replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
datanode.metrics.incrBytesWritten(len);
@@ -685,6 +690,10 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
+ private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
+ return Arrays.copyOfRange(array, end - size, end);
+ }
+
private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
@@ -920,18 +929,19 @@ class BlockReceiver implements Closeable {
* reads in the partial crc chunk and computes checksum
* of pre-existing data in partial chunk.
*/
- private void computePartialChunkCrc(long blkoff, long ckoff,
- int bytesPerChecksum) throws IOException {
+ private Checksum computePartialChunkCrc(long blkoff, long ckoff)
+ throws IOException {
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
- int checksumSize = diskChecksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
- LOG.info("computePartialChunkCrc sizePartialChunk " +
- sizePartialChunk + " " + block +
- " block offset " + blkoff +
- " metafile offset " + ckoff);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("computePartialChunkCrc for " + block
+ + ": sizePartialChunk=" + sizePartialChunk
+ + ", block offset=" + blkoff
+ + ", metafile offset=" + ckoff);
+ }
// create an input stream from the block file
// and read in partial crc chunk into temporary buffer
@@ -950,10 +960,12 @@ class BlockReceiver implements Closeable {
}
// compute crc of partial chunk from data read in the block file.
- partialCrc = DataChecksum.newDataChecksum(
+ final Checksum partialCrc = DataChecksum.newDataChecksum(
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
partialCrc.update(buf, 0, sizePartialChunk);
- LOG.info("Read in partial CRC chunk from disk for " + block);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Read in partial CRC chunk from disk for " + block);
+ }
// paranoia! verify that the pre-computed crc matches what we
// recalculated just now
@@ -964,6 +976,7 @@ class BlockReceiver implements Closeable {
checksum2long(crcbuf);
throw new IOException(msg);
}
+ return partialCrc;
}
private static enum PacketResponderType {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index febf2de0c3a..c8855d7b463 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
@@ -262,26 +263,37 @@ class BlockSender implements java.io.Closeable {
*/
DataChecksum csum = null;
if (verifyChecksum || sendChecksum) {
- final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
- if (!corruptChecksumOk || metaIn != null) {
- if (metaIn == null) {
- //need checksum but meta-data not found
- throw new FileNotFoundException("Meta-data not found for " + block);
- }
+ LengthInputStream metaIn = null;
+ boolean keepMetaInOpen = false;
+ try {
+ metaIn = datanode.data.getMetaDataInputStream(block);
+ if (!corruptChecksumOk || metaIn != null) {
+ if (metaIn == null) {
+ //need checksum but meta-data not found
+ throw new FileNotFoundException("Meta-data not found for " +
+ block);
+ }
- checksumIn = new DataInputStream(
- new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
+ // The meta file will contain only the header if the NULL checksum
+ // type was used, or if the replica was written to transient storage.
+ // Checksum verification is not performed for replicas on transient
+ // storage. The header is important for determining the checksum
+ // type later when lazy persistence copies the block to non-transient
+ // storage and computes the checksum.
+ if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
+ checksumIn = new DataInputStream(new BufferedInputStream(
+ metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + block + " ignoring ...");
+ csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
+ keepMetaInOpen = true;
+ }
+ } else {
+ LOG.warn("Could not find metadata file for " + block);
+ }
+ } finally {
+ if (!keepMetaInOpen) {
+ IOUtils.closeStream(metaIn);
}
- csum = header.getChecksum();
- } else {
- LOG.warn("Could not find metadata file for " + block);
}
}
if (csum == null) {
@@ -340,7 +352,7 @@ class BlockSender implements java.io.Closeable {
endOffset = end;
// seek to the right offsets
- if (offset > 0) {
+ if (offset > 0 && checksumIn != null) {
long checksumSkip = (offset / chunkSize) * checksumSize;
// note blockInStream is seeked when created below
if (checksumSkip > 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index 45862ca7713..6a2664011d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo
// the checksum that should actually be used -- this
// may differ from requestedChecksum for appends.
- DataChecksum checksum;
+ final DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
@@ -250,7 +250,7 @@ public class ReplicaInPipeline extends ReplicaInfo
}
}
} else {
- // for create, we can use the requested checksum
+ // for create, we can use the requested checksum
checksum = requestedChecksum;
}
@@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
- return new ReplicaOutputStreams(blockOut, crcOut, checksum);
+ return new ReplicaOutputStreams(blockOut, crcOut, checksum,
+ getVolume().isTransientStorage());
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
index 95044c825df..bd1461a25fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java
@@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
private final OutputStream dataOut;
private final OutputStream checksumOut;
private final DataChecksum checksum;
+ private final boolean isTransientStorage;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
- DataChecksum checksum) {
+ DataChecksum checksum, boolean isTransientStorage) {
this.dataOut = dataOut;
this.checksumOut = checksumOut;
this.checksum = checksum;
+ this.isTransientStorage = isTransientStorage;
}
/** @return the data output stream. */
@@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable {
return checksum;
}
+ /** @return is writing to a transient storage? */
+ public boolean isTransientStorage() {
+ return isTransientStorage;
+ }
+
@Override
public void close() {
IOUtils.closeStream(dataOut);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index dce2ff85482..e3d1607a92a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -599,13 +599,8 @@ class BlockPoolSlice {
HdfsConstants.IO_FILE_BUFFER_SIZE));
// read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- short version = header.getVersion();
- if (version != BlockMetadataHeader.VERSION) {
- FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
- + metaFile + " ignoring ...");
- }
- DataChecksum checksum = header.getChecksum();
+ final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
+ checksumIn, metaFile);
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index e77ea34c553..f130d058c5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -92,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -634,7 +638,7 @@ class FsDatasetImpl implements FsDatasetSpi {
* Get the meta info of a block stored in volumeMap. To find a block,
* block pool Id, block Id and generation stamp must match.
* @param b extended block
- * @return the meta replica information; null if block was not found
+ * @return the meta replica information
* @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch
*/
@@ -722,23 +726,80 @@ class FsDatasetImpl implements FsDatasetSpi {
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
final File dstFile = new File(destDir, srcFile.getName());
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
- try {
- Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
- } catch (IOException e) {
- throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
- }
+ computeChecksum(srcMeta, dstMeta, srcFile);
+
try {
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
} catch (IOException e) {
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Copied " + srcMeta + " to " + dstMeta);
+ LOG.debug("Copied " + srcMeta + " to " + dstMeta +
+ " and calculated checksum");
LOG.debug("Copied " + srcFile + " to " + dstFile);
}
return new File[] {dstMeta, dstFile};
}
+ /**
+ * Compute and store the checksum for a block file that does not already have
+ * its checksum computed.
+ *
+ * @param srcMeta source meta file, containing only the checksum header, not a
+ * calculated checksum
+ * @param dstMeta destination meta file, into which this method will write a
+ * full computed checksum
+ * @param blockFile block file for which the checksum will be computed
+ * @throws IOException
+ */
+ private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
+ throws IOException {
+ final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
+ final byte[] data = new byte[1 << 16];
+ final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
+
+ DataOutputStream metaOut = null;
+ InputStream dataIn = null;
+ try {
+ File parentFile = dstMeta.getParentFile();
+ if (parentFile != null) {
+ if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
+ throw new IOException("Destination '" + parentFile
+ + "' directory cannot be created");
+ }
+ }
+ metaOut = new DataOutputStream(new BufferedOutputStream(
+ new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
+ BlockMetadataHeader.writeHeader(metaOut, checksum);
+
+ dataIn = isNativeIOAvailable ?
+ NativeIO.getShareDeleteFileInputStream(blockFile) :
+ new FileInputStream(blockFile);
+
+ int offset = 0;
+ for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
+ if (n > 0) {
+ n += offset;
+ offset = n % checksum.getBytesPerChecksum();
+ final int length = n - offset;
+
+ if (length > 0) {
+ checksum.calculateChunkedSums(data, 0, length, crcs, 0);
+ metaOut.write(crcs, 0, checksum.getChecksumSize(length));
+
+ System.arraycopy(data, length, data, 0, offset);
+ }
+ }
+ }
+
+ // calculate and write the last crc
+ checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
+ metaOut.write(crcs, 0, 4);
+ } finally {
+ IOUtils.cleanup(LOG, dataIn, metaOut);
+ }
+ }
+
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile
@@ -1641,6 +1702,7 @@ class FsDatasetImpl implements FsDatasetSpi {
}
}
+ @Override
public boolean isCached(String bpid, long blockId) {
return cacheManager.isCached(bpid, blockId);
}
@@ -2556,8 +2618,14 @@ class FsDatasetImpl implements FsDatasetSpi {
// Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
+ // Replicas must be evicted from client short-circuit caches, because the
+ // storage will no longer be transient, and thus will require validating
+ // checksum. This also stops a client from holding file descriptors,
+ // which would prevent the OS from reclaiming the memory.
ExtendedBlock extendedBlock =
new ExtendedBlock(bpid, newReplicaInfo);
+ datanode.getShortCircuitRegistry().processBlockInvalidation(
+ ExtendedBlockId.fromExtendedBlock(extendedBlock));
datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index 76acbea42b6..5fdcc2f9d87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService {
} catch (Exception e){
FsDatasetImpl.LOG.warn(
"LazyWriter failed to async persist RamDisk block pool id: "
- + bpId + "block Id: " + blockId);
+ + bpId + "block Id: " + blockId, e);
} finally {
if (!succeeded) {
datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index a843d9abd86..c01a6cf3772 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
@Override
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
- Iterator it = replicasPersisted.values().iterator();
+ final Iterator it = replicasPersisted.values().iterator();
while (it.hasNext()) {
- RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+ final RamDiskReplicaLru ramDiskReplicaLru = it.next();
it.remove();
Map replicaMap =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 0786bc69e58..83b476f36c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi {
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
- return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
+ return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
+ volume.isTransientStorage());
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
new file mode 100644
index 00000000000..c7628495a4a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.net.unix.TemporarySocketDirectory;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public abstract class LazyPersistTestCase {
+
+ static {
+ ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
+ protected static final int BUFFER_LENGTH = 4096;
+ protected static final int EVICTION_LOW_WATERMARK = 1;
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+ private static final String JMX_SERVICE_NAME = "DataNode";
+ protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
+ protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
+ protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
+ protected static final short REPL_FACTOR = 1;
+
+ protected MiniDFSCluster cluster;
+ protected DistributedFileSystem fs;
+ protected DFSClient client;
+ protected JMXGet jmx;
+ protected TemporarySocketDirectory sockDir;
+
+ @After
+ public void shutDownCluster() throws Exception {
+
+ // Dump all RamDisk JMX metrics before shutdown the cluster
+ printRamDiskJMXMetrics();
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ client = null;
+ }
+
+ if (cluster != null) {
+ cluster.shutdownDataNodes();
+ cluster.shutdown();
+ cluster = null;
+ }
+
+ if (jmx != null) {
+ jmx = null;
+ }
+
+ IOUtils.closeQuietly(sockDir);
+ sockDir = null;
+ }
+
+ @Rule
+ public Timeout timeout = new Timeout(300000);
+
+ protected final LocatedBlocks ensureFileReplicasOnStorageType(
+ Path path, StorageType storageType) throws IOException {
+ // Ensure that returned block locations returned are correct!
+ LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ assertThat(fs.exists(path), is(true));
+ long fileLength = client.getFileInfo(path.toString()).getLen();
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
+ }
+ return locatedBlocks;
+ }
+
+ protected final void makeRandomTestFile(Path path, long length,
+ boolean isLazyPersist, long seed) throws IOException {
+ DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
+ }
+
+ protected final void makeTestFile(Path path, long length,
+ boolean isLazyPersist) throws IOException {
+
+ EnumSet createFlags = EnumSet.of(CREATE);
+
+ if (isLazyPersist) {
+ createFlags.add(LAZY_PERSIST);
+ }
+
+ FSDataOutputStream fos = null;
+ try {
+ fos =
+ fs.create(path,
+ FsPermission.getFileDefault(),
+ createFlags,
+ BUFFER_LENGTH,
+ REPL_FACTOR,
+ BLOCK_SIZE,
+ null);
+
+ // Allocate a block.
+ byte[] buffer = new byte[BUFFER_LENGTH];
+ for (int bytesWritten = 0; bytesWritten < length; ) {
+ fos.write(buffer, 0, buffer.length);
+ bytesWritten += buffer.length;
+ }
+ if (length > 0) {
+ fos.hsync();
+ }
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
+ }
+
+ /**
+ * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
+ */
+ protected final void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity,
+ final boolean useSCR,
+ final boolean useLegacyBlockReaderLocal)
+ throws IOException {
+
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ LAZY_WRITER_INTERVAL_SEC);
+ conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
+ EVICTION_LOW_WATERMARK * BLOCK_SIZE);
+
+ if (useSCR) {
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
+ // Do not share a client context across tests.
+ conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+ if (useLegacyBlockReaderLocal) {
+ conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
+ conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ } else {
+ sockDir = new TemporarySocketDirectory();
+ conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+ this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+ }
+ }
+
+ long[] capacities = null;
+ if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+ // Convert replica count to byte count, add some delta for .meta and
+ // VERSION files.
+ long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
+ (BLOCK_SIZE - 1);
+ capacities = new long[] { ramDiskStorageLimit, -1 };
+ }
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(REPL_FACTOR)
+ .storageCapacities(capacities)
+ .storageTypes(hasTransientStorage ?
+ new StorageType[]{ RAM_DISK, DEFAULT } : null)
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+ try {
+ jmx = initJMX();
+ } catch (Exception e) {
+ fail("Failed initialize JMX for testing: " + e);
+ }
+ LOG.info("Cluster startup complete");
+ }
+
+ /**
+ * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
+ * capped. If ramDiskStorageLimit < 0 then it is ignored.
+ */
+ protected final void startUpCluster(final int numDataNodes,
+ final StorageType[] storageTypes,
+ final long ramDiskStorageLimit,
+ final boolean useSCR)
+ throws IOException {
+
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
+ LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
+ conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ LAZY_WRITER_INTERVAL_SEC);
+
+ if (useSCR)
+ {
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+ conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
+ sockDir = new TemporarySocketDirectory();
+ conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
+ this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
+ conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ }
+
+ cluster = new MiniDFSCluster
+ .Builder(conf)
+ .numDataNodes(numDataNodes)
+ .storageTypes(storageTypes != null ?
+ storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+ .build();
+ fs = cluster.getFileSystem();
+ client = fs.getClient();
+
+ // Artificially cap the storage capacity of the RAM_DISK volume.
+ if (ramDiskStorageLimit >= 0) {
+ List extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ for (FsVolumeSpi volume : volumes) {
+ if (volume.getStorageType() == RAM_DISK) {
+ ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
+ }
+ }
+ }
+
+ LOG.info("Cluster startup complete");
+ }
+
+ protected final void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity)
+ throws IOException {
+ startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
+ }
+
+ protected final void triggerBlockReport()
+ throws IOException, InterruptedException {
+ // Trigger block report to NN
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ Thread.sleep(10 * 1000);
+ }
+
+ protected final boolean verifyBlockDeletedFromDir(File dir,
+ LocatedBlocks locatedBlocks) {
+
+ for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+ File targetDir =
+ DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
+
+ File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+ if (blockFile.exists()) {
+ LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ File metaFile = new File(targetDir,
+ DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
+ lb.getBlock().getGenerationStamp()));
+ if (metaFile.exists()) {
+ LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
+ " exists after deletion.");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
+ throws IOException, InterruptedException {
+
+ LOG.info("Verifying replica has no saved copy after deletion.");
+ triggerBlockReport();
+
+ while(
+ DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
+ > 0L){
+ Thread.sleep(1000);
+ }
+
+ final String bpid = cluster.getNamesystem().getBlockPoolId();
+ List extends FsVolumeSpi> volumes =
+ cluster.getDataNodes().get(0).getFSDataset().getVolumes();
+
+ // Make sure deleted replica does not have a copy on either finalized dir of
+ // transient volume or finalized dir of non-transient volume
+ for (FsVolumeSpi v : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) v;
+ File targetDir = (v.isTransientStorage()) ?
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected final void verifyRamDiskJMXMetric(String metricName,
+ long expectedValue) throws Exception {
+ assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
+ }
+
+ protected final boolean verifyReadRandomFile(
+ Path path, int fileLength, int seed) throws IOException {
+ byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
+ byte expected[] = DFSTestUtil.
+ calculateFileContentsFromSeed(seed, fileLength);
+ return Arrays.equals(contents, expected);
+ }
+
+ private JMXGet initJMX() throws Exception {
+ JMXGet jmx = new JMXGet();
+ jmx.setService(JMX_SERVICE_NAME);
+ jmx.init();
+ return jmx;
+ }
+
+ private void printRamDiskJMXMetrics() {
+ try {
+ if (jmx != null) {
+ jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 444afed2a5d..771609cf24d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,103 +17,45 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.*;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.tools.JMXGet;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
-import static org.apache.hadoop.fs.CreateFlag.CREATE;
-import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TestLazyPersistFiles {
- public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
- static {
- ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
- }
-
+public class TestLazyPersistFiles extends LazyPersistTestCase {
private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
private static final int THREADPOOL_SIZE = 10;
- private static final short REPL_FACTOR = 1;
- private static final int BLOCK_SIZE = 5 * 1024 * 1024;
- private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
- private static final long HEARTBEAT_INTERVAL_SEC = 1;
- private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
- private static final int LAZY_WRITER_INTERVAL_SEC = 1;
- private static final int BUFFER_LENGTH = 4096;
- private static final int EVICTION_LOW_WATERMARK = 1;
- private static final String JMX_SERVICE_NAME = "DataNode";
- private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
-
- private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
- private DFSClient client;
- private Configuration conf;
- private JMXGet jmx;
-
- @After
- public void shutDownCluster() throws Exception {
-
- // Dump all RamDisk JMX metrics before shutdown the cluster
- printRamDiskJMXMetrics();
-
- if (fs != null) {
- fs.close();
- fs = null;
- client = null;
- }
-
- if (cluster != null) {
- cluster.shutdownDataNodes();
- cluster.shutdown();
- cluster = null;
- }
-
- if (jmx != null) {
- jmx = null;
- }
- }
-
- @Test (timeout=300000)
+ @Test
public void testPolicyNotSetByDefault() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -126,7 +68,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
}
- @Test (timeout=300000)
+ @Test
public void testPolicyPropagation() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -138,7 +80,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
- @Test (timeout=300000)
+ @Test
public void testPolicyPersistenceInEditLog() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -152,7 +94,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
- @Test (timeout=300000)
+ @Test
public void testPolicyPersistenceInFsImage() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -170,7 +112,7 @@ public class TestLazyPersistFiles {
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
}
- @Test (timeout=300000)
+ @Test
public void testPlacementOnRamDisk() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -180,7 +122,7 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path, RAM_DISK);
}
- @Test (timeout=300000)
+ @Test
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(true, 3);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -199,7 +141,7 @@ public class TestLazyPersistFiles {
* Write should default to disk. No error.
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test
public void testFallbackToDisk() throws IOException {
startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -213,7 +155,7 @@ public class TestLazyPersistFiles {
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test
public void testFallbackToDiskFull() throws Exception {
startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -231,7 +173,7 @@ public class TestLazyPersistFiles {
* Expect 2 or less blocks are on RamDisk and 3 or more on disk.
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
startUpCluster(true, 2);
@@ -271,7 +213,7 @@ public class TestLazyPersistFiles {
*
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test
public void testRamDiskNotChosenByDefault() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -289,7 +231,7 @@ public class TestLazyPersistFiles {
* Append to lazy persist file is denied.
* @throws IOException
*/
- @Test (timeout=300000)
+ @Test
public void testAppendIsDenied() throws IOException {
startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -310,7 +252,7 @@ public class TestLazyPersistFiles {
* must be discarded by the NN, instead of being kept around as a
* 'corrupt' file.
*/
- @Test (timeout=300000)
+ @Test
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
startUpCluster(true, 2);
@@ -344,7 +286,7 @@ public class TestLazyPersistFiles {
is(0L));
}
- @Test (timeout=300000)
+ @Test
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {
startUpCluster(true, -1);
@@ -399,7 +341,7 @@ public class TestLazyPersistFiles {
* RamDisk eviction after lazy persist to disk.
* @throws Exception
*/
- @Test (timeout=300000)
+ @Test
public void testRamDiskEviction() throws Exception {
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -434,7 +376,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
+ @Test
public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException {
startUpCluster(true, 1);
@@ -459,7 +401,7 @@ public class TestLazyPersistFiles {
assert(fs.exists(path1));
assert(fs.exists(path2));
- verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
+ assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
}
/**
@@ -467,7 +409,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
+ @Test
public void testRamDiskEvictionIsLru()
throws Exception {
final int NUM_PATHS = 5;
@@ -529,7 +471,7 @@ public class TestLazyPersistFiles {
* Memory is freed up and file is gone.
* @throws IOException
*/
- @Test // (timeout=300000)
+ @Test
public void testDeleteBeforePersist()
throws Exception {
startUpCluster(true, -1);
@@ -556,7 +498,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
+ @Test
public void testDeleteAfterPersist()
throws Exception {
startUpCluster(true, -1);
@@ -584,7 +526,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
+ @Test
public void testDfsUsageCreateDelete()
throws IOException, InterruptedException {
startUpCluster(true, 4);
@@ -615,7 +557,7 @@ public class TestLazyPersistFiles {
/**
* Concurrent read from the same node and verify the contents.
*/
- @Test (timeout=300000)
+ @Test
public void testConcurrentRead()
throws Exception {
startUpCluster(true, 2);
@@ -666,7 +608,7 @@ public class TestLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
+ @Test
public void testConcurrentWrites()
throws IOException, InterruptedException {
startUpCluster(true, 9);
@@ -702,7 +644,7 @@ public class TestLazyPersistFiles {
assertThat(testFailed.get(), is(false));
}
- @Test (timeout=300000)
+ @Test
public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
@@ -726,7 +668,7 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
- @Test (timeout=300000)
+ @Test
public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException {
@@ -746,183 +688,6 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, RAM_DISK);
}
- // ---- Utility functions for all test cases -------------------------------
-
- /**
- * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
- * capped. If ramDiskStorageLimit < 0 then it is ignored.
- */
- private void startUpCluster(boolean hasTransientStorage,
- final int ramDiskReplicaCapacity,
- final boolean useSCR)
- throws IOException {
-
- conf = new Configuration();
- conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
- LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
- conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
- conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
- LAZY_WRITER_INTERVAL_SEC);
- conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
- EVICTION_LOW_WATERMARK * BLOCK_SIZE);
-
- conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
-
- long[] capacities = null;
- if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
- // Convert replica count to byte count, add some delta for .meta and VERSION files.
- long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
- capacities = new long[] { ramDiskStorageLimit, -1 };
- }
-
- cluster = new MiniDFSCluster
- .Builder(conf)
- .numDataNodes(REPL_FACTOR)
- .storageCapacities(capacities)
- .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
- .build();
- fs = cluster.getFileSystem();
- client = fs.getClient();
- try {
- jmx = initJMX();
- } catch (Exception e) {
- fail("Failed initialize JMX for testing: " + e);
- }
- LOG.info("Cluster startup complete");
- }
-
- private void startUpCluster(boolean hasTransientStorage,
- final int ramDiskReplicaCapacity)
- throws IOException {
- startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
- }
-
- private void makeTestFile(Path path, long length, final boolean isLazyPersist)
- throws IOException {
-
- EnumSet createFlags = EnumSet.of(CREATE);
-
- if (isLazyPersist) {
- createFlags.add(LAZY_PERSIST);
- }
-
- FSDataOutputStream fos = null;
- try {
- fos =
- fs.create(path,
- FsPermission.getFileDefault(),
- createFlags,
- BUFFER_LENGTH,
- REPL_FACTOR,
- BLOCK_SIZE,
- null);
-
- // Allocate a block.
- byte[] buffer = new byte[BUFFER_LENGTH];
- for (int bytesWritten = 0; bytesWritten < length; ) {
- fos.write(buffer, 0, buffer.length);
- bytesWritten += buffer.length;
- }
- if (length > 0) {
- fos.hsync();
- }
- } finally {
- IOUtils.closeQuietly(fos);
- }
- }
-
- private LocatedBlocks ensureFileReplicasOnStorageType(
- Path path, StorageType storageType) throws IOException {
- // Ensure that returned block locations returned are correct!
- LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
- assertThat(fs.exists(path), is(true));
- long fileLength = client.getFileInfo(path.toString()).getLen();
- LocatedBlocks locatedBlocks =
- client.getLocatedBlocks(path.toString(), 0, fileLength);
- for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
- assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
- }
- return locatedBlocks;
- }
-
- private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
- long seed) throws IOException {
- DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
- BLOCK_SIZE, REPL_FACTOR, seed, true);
- }
-
- private boolean verifyReadRandomFile(
- Path path, int fileLength, int seed) throws IOException {
- byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
- byte expected[] = DFSTestUtil.
- calculateFileContentsFromSeed(seed, fileLength);
- return Arrays.equals(contents, expected);
- }
-
- private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
- throws IOException, InterruptedException {
-
- LOG.info("Verifying replica has no saved copy after deletion.");
- triggerBlockReport();
-
- while(
- DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
- > 0L){
- Thread.sleep(1000);
- }
-
- final String bpid = cluster.getNamesystem().getBlockPoolId();
- List extends FsVolumeSpi> volumes =
- cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
- // Make sure deleted replica does not have a copy on either finalized dir of
- // transient volume or finalized dir of non-transient volume
- for (FsVolumeSpi v : volumes) {
- FsVolumeImpl volume = (FsVolumeImpl) v;
- File targetDir = (v.isTransientStorage()) ?
- volume.getBlockPoolSlice(bpid).getFinalizedDir() :
- volume.getBlockPoolSlice(bpid).getLazypersistDir();
- if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
- return false;
- }
- }
- return true;
- }
-
- private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
-
- for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- File targetDir =
- DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
-
- File blockFile = new File(targetDir, lb.getBlock().getBlockName());
- if (blockFile.exists()) {
- LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
- " exists after deletion.");
- return false;
- }
- File metaFile = new File(targetDir,
- DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
- lb.getBlock().getGenerationStamp()));
- if (metaFile.exists()) {
- LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
- " exists after deletion.");
- return false;
- }
- }
- return true;
- }
-
- private void triggerBlockReport()
- throws IOException, InterruptedException {
- // Trigger block report to NN
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
- Thread.sleep(10 * 1000);
- }
-
class WriterRunnable implements Runnable {
private final int id;
private final Path paths[];
@@ -960,27 +725,4 @@ public class TestLazyPersistFiles {
}
}
}
-
- JMXGet initJMX() throws Exception
- {
- JMXGet jmx = new JMXGet();
- jmx.setService(JMX_SERVICE_NAME);
- jmx.init();
- return jmx;
- }
-
- void printRamDiskJMXMetrics() {
- try {
- if (jmx != null) {
- jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- void verifyRamDiskJMXMetric(String metricName, long expectedValue)
- throws Exception {
- assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
- }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index b6ac2870f58..efc6dcb374c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -15,84 +15,44 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.commons.logging.impl.Log4JLogger;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.CreateFlag;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.hdfs.*;
- import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
- import org.apache.hadoop.hdfs.protocol.LocatedBlock;
- import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
- import org.apache.hadoop.hdfs.server.datanode.DataNode;
- import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
- import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
- import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
- import org.apache.hadoop.hdfs.server.namenode.NameNode;
- import org.apache.hadoop.net.unix.DomainSocket;
- import org.apache.hadoop.net.unix.TemporarySocketDirectory;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.hadoop.test.GenericTestUtils;
- import org.apache.hadoop.util.NativeCodeLoader;
- import org.apache.log4j.Level;
- import org.junit.*;
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.net.unix.DomainSocket;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
- import java.io.File;
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.EnumSet;
- import java.util.List;
- import java.util.UUID;
+import java.io.File;
+import java.io.IOException;
- import static org.apache.hadoop.fs.CreateFlag.CREATE;
- import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
- import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
- import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
- import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
- import static org.hamcrest.CoreMatchers.equalTo;
- import static org.hamcrest.core.Is.is;
- import static org.junit.Assert.assertThat;
+import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
+import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
-public class TestScrLazyPersistFiles {
- public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
-
- static {
- ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
- ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
- }
-
- private static short REPL_FACTOR = 1;
- private static final int BLOCK_SIZE = 10485760; // 10 MB
- private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
- private static final long HEARTBEAT_INTERVAL_SEC = 1;
- private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
- private static final int LAZY_WRITER_INTERVAL_SEC = 1;
- private static final int BUFFER_LENGTH = 4096;
- private static TemporarySocketDirectory sockDir;
-
- private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
- private DFSClient client;
- private Configuration conf;
+public class TestScrLazyPersistFiles extends LazyPersistTestCase {
@BeforeClass
public static void init() {
- sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
- @AfterClass
- public static void shutdown() throws IOException {
- sockDir.close();
- }
-
@Before
public void before() {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
@@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
}
- @After
- public void shutDownCluster() throws IOException {
- if (fs != null) {
- fs.close();
- fs = null;
- client = null;
- }
-
- if (cluster != null) {
- cluster.shutdownDataNodes();
- cluster.shutdown();
- cluster = null;
- }
- }
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
/**
* Read in-memory block with Short Circuit Read
* Note: the test uses faked RAM_DISK from physical disk.
*/
- @Test (timeout=300000)
+ @Test
public void testRamDiskShortCircuitRead()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR,
@@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles {
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000000)
+ @Test
public void testRamDiskEvictionWithShortCircuitReadHandle()
throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
@@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
- // ---- Utility functions for all test cases -------------------------------
-
- /**
- * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
- * capped. If ramDiskStorageLimit < 0 then it is ignored.
- */
- private void startUpCluster(final int numDataNodes,
- final StorageType[] storageTypes,
- final long ramDiskStorageLimit,
- final boolean useSCR)
- throws IOException {
-
- conf = new Configuration();
- conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
- conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
- LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
- conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
- conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
- conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
- LAZY_WRITER_INTERVAL_SEC);
-
- if (useSCR)
- {
- conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
- conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
- UUID.randomUUID().toString());
- conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
- new File(sockDir.getDir(),
- "TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
- conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
- UserGroupInformation.getCurrentUser().getShortUserName());
- }
-
- REPL_FACTOR = 1; //Reset in case a test has modified the value
-
- cluster = new MiniDFSCluster
- .Builder(conf)
- .numDataNodes(numDataNodes)
- .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
- .build();
- fs = cluster.getFileSystem();
- client = fs.getClient();
-
- // Artificially cap the storage capacity of the RAM_DISK volume.
- if (ramDiskStorageLimit >= 0) {
- List extends FsVolumeSpi> volumes =
- cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
- for (FsVolumeSpi volume : volumes) {
- if (volume.getStorageType() == RAM_DISK) {
- ((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
- }
- }
- }
-
- LOG.info("Cluster startup complete");
+ @Test
+ public void testShortCircuitReadAfterEviction()
+ throws IOException, InterruptedException {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+ doShortCircuitReadAfterEvictionTest();
}
- private void makeTestFile(Path path, long length, final boolean isLazyPersist)
- throws IOException {
+ @Test
+ public void testLegacyShortCircuitReadAfterEviction()
+ throws IOException, InterruptedException {
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+ doShortCircuitReadAfterEvictionTest();
+ }
- EnumSet createFlags = EnumSet.of(CREATE);
+ private void doShortCircuitReadAfterEvictionTest() throws IOException,
+ InterruptedException {
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- if (isLazyPersist) {
- createFlags.add(LAZY_PERSIST);
- }
+ final int SEED = 0xFADED;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
- FSDataOutputStream fos = null;
- try {
- fos =
- fs.create(path,
- FsPermission.getFileDefault(),
- createFlags,
- BUFFER_LENGTH,
- REPL_FACTOR,
- BLOCK_SIZE,
- null);
+ // Verify short-circuit read from RAM_DISK.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+ DFSTestUtil.getFirstBlock(fs, path1));
+ assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+ assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
- // Allocate a block.
- byte[] buffer = new byte[BUFFER_LENGTH];
- for (int bytesWritten = 0; bytesWritten < length; ) {
- fos.write(buffer, 0, buffer.length);
- bytesWritten += buffer.length;
- }
- if (length > 0) {
- fos.hsync();
- }
- } finally {
- IOUtils.closeQuietly(fos);
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ // Verify short-circuit read from RAM_DISK once again.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+ DFSTestUtil.getFirstBlock(fs, path1));
+ assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
+ assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+ // Create another file with a replica on RAM_DISK, which evicts the first.
+ makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Verify short-circuit read still works from DEFAULT storage. This time,
+ // we'll have a checksum written during lazy persistence.
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+ DFSTestUtil.getFirstBlock(fs, path1));
+ assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
+ assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
+
+ // In the implementation of legacy short-circuit reads, any failure is
+ // trapped silently, reverts back to a remote read, and also disables all
+ // subsequent legacy short-circuit reads in the ClientContext. If the test
+ // uses legacy, then assert that it didn't get disabled.
+ ClientContext clientContext = client.getClientContext();
+ if (clientContext.getUseLegacyBlockReaderLocal()) {
+ Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
}
}
- private LocatedBlocks ensureFileReplicasOnStorageType(
- Path path, StorageType storageType) throws IOException {
- // Ensure that returned block locations returned are correct!
- LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
- assertThat(fs.exists(path), is(true));
- long fileLength = client.getFileInfo(path.toString()).getLen();
- LocatedBlocks locatedBlocks =
- client.getLocatedBlocks(path.toString(), 0, fileLength);
- for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
- assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
- }
- return locatedBlocks;
+ @Test
+ public void testShortCircuitReadBlockFileCorruption() throws IOException,
+ InterruptedException {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+ doShortCircuitReadBlockFileCorruptionTest();
}
- private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
- long seed) throws IOException {
- DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
- BLOCK_SIZE, REPL_FACTOR, seed, true);
+ @Test
+ public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
+ InterruptedException {
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+ doShortCircuitReadBlockFileCorruptionTest();
}
- private void triggerBlockReport()
- throws IOException, InterruptedException {
- // Trigger block report to NN
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
- Thread.sleep(10 * 1000);
+ public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
+ InterruptedException {
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ final int SEED = 0xFADED;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Create another file with a replica on RAM_DISK, which evicts the first.
+ makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Corrupt the lazy-persisted block file, and verify that checksum
+ // verification catches it.
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
+ exception.expect(ChecksumException.class);
+ DFSTestUtil.readFileBuffer(fs, path1);
+ }
+
+ @Test
+ public void testShortCircuitReadMetaFileCorruption() throws IOException,
+ InterruptedException {
+ Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
+ doShortCircuitReadMetaFileCorruptionTest();
+ }
+
+ @Test
+ public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
+ InterruptedException {
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
+ doShortCircuitReadMetaFileCorruptionTest();
+ }
+
+ public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
+ InterruptedException {
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ final int SEED = 0xFADED;
+ makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Create another file with a replica on RAM_DISK, which evicts the first.
+ makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ triggerBlockReport();
+
+ // Corrupt the lazy-persisted checksum file, and verify that checksum
+ // verification catches it.
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
+ DFSTestUtil.getFirstBlock(fs, path1));
+ MiniDFSCluster.corruptBlock(metaFile);
+ exception.expect(ChecksumException.class);
+ DFSTestUtil.readFileBuffer(fs, path1);
}
}