HDFS-6934. Move checksum computation off the hot path when writing to RAM disk. Contributed by Chris Nauroth.
(cherry picked from commit 463aec1171
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
This commit is contained in:
parent
57e82e647b
commit
3d67da502a
|
@ -51,7 +51,7 @@ abstract public class FSOutputSummer extends OutputStream {
|
|||
protected FSOutputSummer(DataChecksum sum) {
|
||||
this.sum = sum;
|
||||
this.buf = new byte[sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS];
|
||||
this.checksum = new byte[sum.getChecksumSize() * BUFFER_NUM_CHUNKS];
|
||||
this.checksum = new byte[getChecksumSize() * BUFFER_NUM_CHUNKS];
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
|
@ -189,6 +189,11 @@ abstract public class FSOutputSummer extends OutputStream {
|
|||
return count;
|
||||
}
|
||||
|
||||
/** @return the size for a checksum. */
|
||||
protected int getChecksumSize() {
|
||||
return sum.getChecksumSize();
|
||||
}
|
||||
|
||||
/** Generate checksums for the given data chunks and output chunks & checksums
|
||||
* to the underlying output stream.
|
||||
*/
|
||||
|
@ -197,9 +202,8 @@ abstract public class FSOutputSummer extends OutputStream {
|
|||
sum.calculateChunkedSums(b, off, len, checksum, 0);
|
||||
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
|
||||
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
|
||||
int ckOffset = i / sum.getBytesPerChecksum() * sum.getChecksumSize();
|
||||
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
|
||||
sum.getChecksumSize());
|
||||
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
|
||||
writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,8 +230,7 @@ abstract public class FSOutputSummer extends OutputStream {
|
|||
*/
|
||||
protected synchronized void setChecksumBufSize(int size) {
|
||||
this.buf = new byte[size];
|
||||
this.checksum = new byte[((size - 1) / sum.getBytesPerChecksum() + 1) *
|
||||
sum.getChecksumSize()];
|
||||
this.checksum = new byte[sum.getChecksumSize(size)];
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -234,15 +234,14 @@ public final class Options {
|
|||
* This is used in FileSystem and FileContext to specify checksum options.
|
||||
*/
|
||||
public static class ChecksumOpt {
|
||||
private final int crcBlockSize;
|
||||
private final DataChecksum.Type crcType;
|
||||
private final DataChecksum.Type checksumType;
|
||||
private final int bytesPerChecksum;
|
||||
|
||||
/**
|
||||
* Create a uninitialized one
|
||||
*/
|
||||
public ChecksumOpt() {
|
||||
crcBlockSize = -1;
|
||||
crcType = DataChecksum.Type.DEFAULT;
|
||||
this(DataChecksum.Type.DEFAULT, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,16 +250,21 @@ public final class Options {
|
|||
* @param size bytes per checksum
|
||||
*/
|
||||
public ChecksumOpt(DataChecksum.Type type, int size) {
|
||||
crcBlockSize = size;
|
||||
crcType = type;
|
||||
checksumType = type;
|
||||
bytesPerChecksum = size;
|
||||
}
|
||||
|
||||
public int getBytesPerChecksum() {
|
||||
return crcBlockSize;
|
||||
return bytesPerChecksum;
|
||||
}
|
||||
|
||||
public DataChecksum.Type getChecksumType() {
|
||||
return crcType;
|
||||
return checksumType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return checksumType + ":" + bytesPerChecksum;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -869,7 +869,8 @@ public class NativeIO {
|
|||
* @throws IOException
|
||||
*/
|
||||
public static void copyFileUnbuffered(File src, File dst) throws IOException {
|
||||
if ((nativeLoaded) && (Shell.WINDOWS || Shell.LINUX)) {
|
||||
if ((nativeLoaded) &&
|
||||
(Shell.WINDOWS || (Shell.isLinuxSendfileAvailable))) {
|
||||
copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||
} else {
|
||||
FileUtils.copyFile(src, dst);
|
||||
|
|
|
@ -37,9 +37,6 @@ import org.apache.hadoop.fs.ChecksumException;
|
|||
@InterfaceStability.Evolving
|
||||
public class DataChecksum implements Checksum {
|
||||
|
||||
// Misc constants
|
||||
public static final int HEADER_LEN = 5; /// 1 byte type and 4 byte len
|
||||
|
||||
// checksum types
|
||||
public static final int CHECKSUM_NULL = 0;
|
||||
public static final int CHECKSUM_CRC32 = 1;
|
||||
|
@ -103,7 +100,7 @@ public class DataChecksum implements Checksum {
|
|||
* @return DataChecksum of the type in the array or null in case of an error.
|
||||
*/
|
||||
public static DataChecksum newDataChecksum( byte bytes[], int offset ) {
|
||||
if ( offset < 0 || bytes.length < offset + HEADER_LEN ) {
|
||||
if (offset < 0 || bytes.length < offset + getChecksumHeaderSize()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
@ -116,8 +113,8 @@ public class DataChecksum implements Checksum {
|
|||
}
|
||||
|
||||
/**
|
||||
* This constructucts a DataChecksum by reading HEADER_LEN bytes from
|
||||
* input stream <i>in</i>
|
||||
* This constructs a DataChecksum by reading HEADER_LEN bytes from input
|
||||
* stream <i>in</i>
|
||||
*/
|
||||
public static DataChecksum newDataChecksum( DataInputStream in )
|
||||
throws IOException {
|
||||
|
@ -141,7 +138,7 @@ public class DataChecksum implements Checksum {
|
|||
}
|
||||
|
||||
public byte[] getHeader() {
|
||||
byte[] header = new byte[DataChecksum.HEADER_LEN];
|
||||
byte[] header = new byte[getChecksumHeaderSize()];
|
||||
header[0] = (byte) (type.id & 0xff);
|
||||
// Writing in buffer just like DataOutput.WriteInt()
|
||||
header[1+0] = (byte) ((bytesPerChecksum >>> 24) & 0xff);
|
||||
|
@ -229,13 +226,18 @@ public class DataChecksum implements Checksum {
|
|||
bytesPerChecksum = chunkSize;
|
||||
}
|
||||
|
||||
// Accessors
|
||||
/** @return the checksum algorithm type. */
|
||||
public Type getChecksumType() {
|
||||
return type;
|
||||
}
|
||||
/** @return the size for a checksum. */
|
||||
public int getChecksumSize() {
|
||||
return type.size;
|
||||
}
|
||||
/** @return the required checksum size given the data length. */
|
||||
public int getChecksumSize(int dataSize) {
|
||||
return ((dataSize - 1)/getBytesPerChecksum() + 1) * getChecksumSize();
|
||||
}
|
||||
public int getBytesPerChecksum() {
|
||||
return bytesPerChecksum;
|
||||
}
|
||||
|
|
|
@ -377,6 +377,117 @@ abstract public class Shell {
|
|||
return winUtilsPath;
|
||||
}
|
||||
|
||||
public static class LinuxKernelVersion implements Comparable<LinuxKernelVersion>{
|
||||
private final short major;
|
||||
private final short minor;
|
||||
private final short revision;
|
||||
|
||||
public LinuxKernelVersion(short major, short minor, short revision) {
|
||||
this.major = major;
|
||||
this.minor = minor;
|
||||
this.revision = revision;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse Linux kernel version string from output of POSIX command 'uname -r'
|
||||
* @param version version string from POSIX command 'uname -r'
|
||||
* @return LinuxKernelVersion
|
||||
* @throws IllegalArgumentException
|
||||
*
|
||||
* Note:
|
||||
* On CentOS 5.8: '2.6.18-308.24.1.el5'
|
||||
* On Ubuntu 14: '3.13.0-32-generic'
|
||||
*/
|
||||
public static LinuxKernelVersion parseLinuxKernelVersion(String version)
|
||||
throws IllegalArgumentException {
|
||||
if (version == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
String parts[] = version.split("-")[0].split("\\.");
|
||||
if (parts.length != 3) {
|
||||
throw new IllegalArgumentException(version);
|
||||
}
|
||||
short major = Short.parseShort(parts[0]);
|
||||
short minor = Short.parseShort(parts[1]);
|
||||
short revision = Short.parseShort(parts[2]);
|
||||
return new LinuxKernelVersion(major, minor, revision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(LinuxKernelVersion o) {
|
||||
if (this.major == o.major) {
|
||||
if (this.minor == o.minor) {
|
||||
return this.revision - o.revision;
|
||||
} else {
|
||||
return this.minor - o.minor;
|
||||
}
|
||||
} else {
|
||||
return this.major - o.major;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
if (!(other instanceof LinuxKernelVersion)) {
|
||||
return false;
|
||||
}
|
||||
return compareTo((LinuxKernelVersion) other) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("%d.%d.%d", major, minor, revision);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode(){
|
||||
int hash = 41;
|
||||
hash = (19 * hash) + major;
|
||||
hash = (53 * hash) + minor;
|
||||
hash = (29 * hash) + revision;
|
||||
return hash;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* sendfile() API between two file descriptors
|
||||
* is only supported on Linux Kernel version 2.6.33+
|
||||
* according to http://man7.org/linux/man-pages/man2/sendfile.2.html
|
||||
*/
|
||||
public static final boolean isLinuxSendfileAvailable = isLinuxSendfileSupported();
|
||||
private static LinuxKernelVersion minLkvSupportSendfile =
|
||||
new LinuxKernelVersion((short)2, (short)6, (short)33);
|
||||
|
||||
private static boolean isLinuxSendfileSupported() {
|
||||
if (!Shell.LINUX) {
|
||||
return false;
|
||||
}
|
||||
ShellCommandExecutor shexec = null;
|
||||
boolean sendfileSupported = false;
|
||||
try {
|
||||
String[] args = {"uname", "bash", "-r"};
|
||||
shexec = new ShellCommandExecutor(args);
|
||||
shexec.execute();
|
||||
String version = shexec.getOutput();
|
||||
LinuxKernelVersion lkv =
|
||||
LinuxKernelVersion.parseLinuxKernelVersion(version);
|
||||
if (lkv.compareTo(minLkvSupportSendfile) > 0) {
|
||||
sendfileSupported = true;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("isLinuxSendfileSupported() failed unexpected: " + e);
|
||||
} finally {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("uname exited with exit code "
|
||||
+ (shexec != null ? shexec.getExitCode() : "(null executor)"));
|
||||
}
|
||||
}
|
||||
return sendfileSupported;
|
||||
}
|
||||
|
||||
public static final boolean isSetsidAvailable = isSetsidSupported();
|
||||
private static boolean isSetsidSupported() {
|
||||
if (Shell.WINDOWS) {
|
||||
|
|
|
@ -165,4 +165,24 @@ public class TestShell extends TestCase {
|
|||
assertEquals(2, command.getRunCount());
|
||||
}
|
||||
}
|
||||
|
||||
public void testLinuxKernelVersion() throws IOException {
|
||||
Shell.LinuxKernelVersion v2_6_18 =
|
||||
new Shell.LinuxKernelVersion((short)2, (short)6, (short)18);
|
||||
Shell.LinuxKernelVersion v2_6_32 =
|
||||
new Shell.LinuxKernelVersion((short)2, (short)6, (short)32);
|
||||
assertTrue(v2_6_18.compareTo(v2_6_32) < 0);
|
||||
}
|
||||
|
||||
public void testParseLinuxKernelVersion() throws Exception {
|
||||
String centOs58Ver = new String("2.6.18-308.24.1.el5");
|
||||
String ubuntu14Ver = new String("3.13.0-32-generic");
|
||||
Shell.LinuxKernelVersion lkvCentOs58 =
|
||||
Shell.LinuxKernelVersion.parseLinuxKernelVersion(centOs58Ver);
|
||||
Shell.LinuxKernelVersion lkvUnbuntu14 =
|
||||
Shell.LinuxKernelVersion.parseLinuxKernelVersion(ubuntu14Ver);
|
||||
assertTrue(lkvUnbuntu14.compareTo(lkvCentOs58) > 0);
|
||||
assertFalse(lkvUnbuntu14.equals(lkvCentOs58));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -778,6 +778,9 @@ Release 2.6.0 - UNRELEASED
|
|||
HDFS-7090. Use unbuffered writes when persisting in-memory replicas.
|
||||
(Xiaoyu Yao via cnauroth)
|
||||
|
||||
HDFS-6934. Move checksum computation off the hot path when writing to RAM
|
||||
disk. (cnauroth)
|
||||
|
||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||
|
|
|
@ -109,6 +109,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
*/
|
||||
private DatanodeInfo datanode;
|
||||
|
||||
/**
|
||||
* StorageType of replica on DataNode.
|
||||
*/
|
||||
private StorageType storageType;
|
||||
|
||||
/**
|
||||
* If false, we won't try short-circuit local reads.
|
||||
*/
|
||||
|
@ -201,6 +206,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
return this;
|
||||
}
|
||||
|
||||
public BlockReaderFactory setStorageType(StorageType storageType) {
|
||||
this.storageType = storageType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BlockReaderFactory setAllowShortCircuitLocalReads(
|
||||
boolean allowShortCircuitLocalReads) {
|
||||
this.allowShortCircuitLocalReads = allowShortCircuitLocalReads;
|
||||
|
@ -353,7 +363,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
try {
|
||||
return BlockReaderLocalLegacy.newBlockReader(conf,
|
||||
userGroupInformation, configuration, fileName, block, token,
|
||||
datanode, startOffset, length);
|
||||
datanode, startOffset, length, storageType);
|
||||
} catch (RemoteException remoteException) {
|
||||
ioe = remoteException.unwrapRemoteException(
|
||||
InvalidToken.class, AccessControlException.class);
|
||||
|
@ -415,6 +425,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
setShortCircuitReplica(info.getReplica()).
|
||||
setVerifyChecksum(verifyChecksum).
|
||||
setCachingStrategy(cachingStrategy).
|
||||
setStorageType(storageType).
|
||||
build();
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ class BlockReaderLocal implements BlockReader {
|
|||
private ShortCircuitReplica replica;
|
||||
private long dataPos;
|
||||
private ExtendedBlock block;
|
||||
private StorageType storageType;
|
||||
|
||||
public Builder(Conf conf) {
|
||||
this.maxReadahead = Integer.MAX_VALUE;
|
||||
|
@ -109,6 +110,11 @@ class BlockReaderLocal implements BlockReader {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setStorageType(StorageType storageType) {
|
||||
this.storageType = storageType;
|
||||
return this;
|
||||
}
|
||||
|
||||
public BlockReaderLocal build() {
|
||||
Preconditions.checkNotNull(replica);
|
||||
return new BlockReaderLocal(this);
|
||||
|
@ -212,6 +218,11 @@ class BlockReaderLocal implements BlockReader {
|
|||
*/
|
||||
private ByteBuffer checksumBuf;
|
||||
|
||||
/**
|
||||
* StorageType of replica on DataNode.
|
||||
*/
|
||||
private StorageType storageType;
|
||||
|
||||
private BlockReaderLocal(Builder builder) {
|
||||
this.replica = builder.replica;
|
||||
this.dataIn = replica.getDataStream().getChannel();
|
||||
|
@ -240,6 +251,7 @@ class BlockReaderLocal implements BlockReader {
|
|||
this.zeroReadaheadRequested = false;
|
||||
}
|
||||
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
|
||||
this.storageType = builder.storageType;
|
||||
}
|
||||
|
||||
private synchronized void createDataBufIfNeeded() {
|
||||
|
@ -333,8 +345,8 @@ class BlockReaderLocal implements BlockReader {
|
|||
int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
|
||||
checksumBuf.clear();
|
||||
checksumBuf.limit(checksumsNeeded * checksumSize);
|
||||
long checksumPos =
|
||||
7 + ((startDataPos / bytesPerChecksum) * checksumSize);
|
||||
long checksumPos = BlockMetadataHeader.getHeaderSize()
|
||||
+ ((startDataPos / bytesPerChecksum) * checksumSize);
|
||||
while (checksumBuf.hasRemaining()) {
|
||||
int nRead = checksumIn.read(checksumBuf, checksumPos);
|
||||
if (nRead < 0) {
|
||||
|
@ -359,7 +371,14 @@ class BlockReaderLocal implements BlockReader {
|
|||
|
||||
private boolean createNoChecksumContext() {
|
||||
if (verifyChecksum) {
|
||||
if (storageType != null && storageType.isTransient()) {
|
||||
// Checksums are not stored for replicas on transient storage. We do not
|
||||
// anchor, because we do not intend for client activity to block eviction
|
||||
// from transient storage on the DataNode side.
|
||||
return true;
|
||||
} else {
|
||||
return replica.addNoChecksumAnchor();
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
@ -367,9 +386,11 @@ class BlockReaderLocal implements BlockReader {
|
|||
|
||||
private void releaseNoChecksumContext() {
|
||||
if (verifyChecksum) {
|
||||
if (storageType == null || !storageType.isTransient()) {
|
||||
replica.removeNoChecksumAnchor();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||
|
|
|
@ -181,7 +181,8 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
UserGroupInformation userGroupInformation,
|
||||
Configuration configuration, String file, ExtendedBlock blk,
|
||||
Token<BlockTokenIdentifier> token, DatanodeInfo node,
|
||||
long startOffset, long length) throws IOException {
|
||||
long startOffset, long length, StorageType storageType)
|
||||
throws IOException {
|
||||
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
|
||||
.getIpcPort());
|
||||
// check the cache first
|
||||
|
@ -192,7 +193,7 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
}
|
||||
pathinfo = getBlockPathInfo(userGroupInformation, blk, node,
|
||||
configuration, conf.socketTimeout, token,
|
||||
conf.connectToDnViaHostname);
|
||||
conf.connectToDnViaHostname, storageType);
|
||||
}
|
||||
|
||||
// check to see if the file exists. It may so happen that the
|
||||
|
@ -204,7 +205,8 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
FileInputStream dataIn = null;
|
||||
FileInputStream checksumIn = null;
|
||||
BlockReaderLocalLegacy localBlockReader = null;
|
||||
boolean skipChecksumCheck = conf.skipShortCircuitChecksums;
|
||||
boolean skipChecksumCheck = conf.skipShortCircuitChecksums ||
|
||||
storageType.isTransient();
|
||||
try {
|
||||
// get a local file system
|
||||
File blkfile = new File(pathinfo.getBlockPath());
|
||||
|
@ -221,15 +223,8 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
File metafile = new File(pathinfo.getMetaPath());
|
||||
checksumIn = new FileInputStream(metafile);
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader
|
||||
.readHeader(new DataInputStream(checksumIn));
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
||||
+ blk + " ignoring ...");
|
||||
}
|
||||
DataChecksum checksum = header.getChecksum();
|
||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
|
||||
new DataInputStream(checksumIn), blk);
|
||||
long firstChunkOffset = startOffset
|
||||
- (startOffset % checksum.getBytesPerChecksum());
|
||||
localBlockReader = new BlockReaderLocalLegacy(conf, file, blk, token,
|
||||
|
@ -270,8 +265,8 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
|
||||
private static BlockLocalPathInfo getBlockPathInfo(UserGroupInformation ugi,
|
||||
ExtendedBlock blk, DatanodeInfo node, Configuration conf, int timeout,
|
||||
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
|
||||
throws IOException {
|
||||
Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname,
|
||||
StorageType storageType) throws IOException {
|
||||
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
|
||||
BlockLocalPathInfo pathinfo = null;
|
||||
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(ugi, node,
|
||||
|
@ -279,7 +274,15 @@ class BlockReaderLocalLegacy implements BlockReader {
|
|||
try {
|
||||
// make RPC to local datanode to find local pathnames of blocks
|
||||
pathinfo = proxy.getBlockLocalPathInfo(blk, token);
|
||||
if (pathinfo != null) {
|
||||
// We cannot cache the path information for a replica on transient storage.
|
||||
// If the replica gets evicted, then it moves to a different path. Then,
|
||||
// our next attempt to read from the cached path would fail to find the
|
||||
// file. Additionally, the failure would cause us to disable legacy
|
||||
// short-circuit read for all subsequent use in the ClientContext. Unlike
|
||||
// the newer short-circuit read implementation, we have no communication
|
||||
// channel for the DataNode to notify the client that the path has been
|
||||
// invalidated. Therefore, our only option is to skip caching.
|
||||
if (pathinfo != null && !storageType.isTransient()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Cached location of block " + blk + " as " + pathinfo);
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ import javax.net.SocketFactory;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.CipherSuite;
|
||||
|
@ -526,8 +527,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
return createChecksum(null);
|
||||
}
|
||||
|
||||
private DataChecksum createChecksum(ChecksumOpt userOpt)
|
||||
throws IOException {
|
||||
private DataChecksum createChecksum(ChecksumOpt userOpt) {
|
||||
// Fill in any missing field with the default.
|
||||
ChecksumOpt myOpt = ChecksumOpt.processChecksumOpt(
|
||||
defaultChecksumOpt, userOpt);
|
||||
|
@ -535,8 +535,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
myOpt.getChecksumType(),
|
||||
myOpt.getBytesPerChecksum());
|
||||
if (dataChecksum == null) {
|
||||
throw new IOException("Invalid checksum type specified: "
|
||||
+ myOpt.getChecksumType().name());
|
||||
throw new HadoopIllegalArgumentException("Invalid checksum type: userOpt="
|
||||
+ userOpt + ", default=" + defaultChecksumOpt
|
||||
+ ", effective=null");
|
||||
}
|
||||
return dataChecksum;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -570,6 +571,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
DNAddrPair retval = chooseDataNode(targetBlock, null);
|
||||
chosenNode = retval.info;
|
||||
InetSocketAddress targetAddr = retval.addr;
|
||||
StorageType storageType = retval.storageType;
|
||||
|
||||
try {
|
||||
ExtendedBlock blk = targetBlock.getBlock();
|
||||
|
@ -578,6 +580,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
setInetSocketAddress(targetAddr).
|
||||
setRemotePeerFactory(dfsClient).
|
||||
setDatanodeInfo(chosenNode).
|
||||
setStorageType(storageType).
|
||||
setFileName(src).
|
||||
setBlock(blk).
|
||||
setBlockToken(accessToken).
|
||||
|
@ -885,12 +888,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
||||
while (true) {
|
||||
DatanodeInfo[] nodes = block.getLocations();
|
||||
try {
|
||||
return getBestNodeDNAddrPair(nodes, ignoredNodes);
|
||||
return getBestNodeDNAddrPair(block, ignoredNodes);
|
||||
} catch (IOException ie) {
|
||||
String errMsg =
|
||||
getBestNodeDNAddrPairErrorString(nodes, deadNodes, ignoredNodes);
|
||||
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
|
||||
deadNodes, ignoredNodes);
|
||||
String blockInfo = block.getBlock() + " file=" + src;
|
||||
if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
|
||||
String description = "Could not obtain block: " + blockInfo;
|
||||
|
@ -900,6 +902,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
block.getStartOffset());
|
||||
}
|
||||
|
||||
DatanodeInfo[] nodes = block.getLocations();
|
||||
if (nodes == null || nodes.length == 0) {
|
||||
DFSClient.LOG.info("No node available for " + blockInfo);
|
||||
}
|
||||
|
@ -933,22 +936,44 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the best node.
|
||||
* @param nodes Nodes to choose from.
|
||||
* @param ignoredNodes Do not chose nodes in this array (may be null)
|
||||
* Get the best node from which to stream the data.
|
||||
* @param block LocatedBlock, containing nodes in priority order.
|
||||
* @param ignoredNodes Do not choose nodes in this array (may be null)
|
||||
* @return The DNAddrPair of the best node.
|
||||
* @throws IOException
|
||||
*/
|
||||
private DNAddrPair getBestNodeDNAddrPair(final DatanodeInfo[] nodes,
|
||||
private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
|
||||
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
||||
DatanodeInfo chosenNode = bestNode(nodes, deadNodes, ignoredNodes);
|
||||
DatanodeInfo[] nodes = block.getLocations();
|
||||
StorageType[] storageTypes = block.getStorageTypes();
|
||||
DatanodeInfo chosenNode = null;
|
||||
StorageType storageType = null;
|
||||
if (nodes != null) {
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
if (!deadNodes.containsKey(nodes[i])
|
||||
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
|
||||
chosenNode = nodes[i];
|
||||
// Storage types are ordered to correspond with nodes, so use the same
|
||||
// index to get storage type.
|
||||
if (storageTypes != null && i < storageTypes.length) {
|
||||
storageType = storageTypes[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (chosenNode == null) {
|
||||
throw new IOException("No live nodes contain block " + block.getBlock() +
|
||||
" after checking nodes = " + Arrays.toString(nodes) +
|
||||
", ignoredNodes = " + ignoredNodes);
|
||||
}
|
||||
final String dnAddr =
|
||||
chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
|
||||
}
|
||||
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
|
||||
return new DNAddrPair(chosenNode, targetAddr);
|
||||
return new DNAddrPair(chosenNode, targetAddr, storageType);
|
||||
}
|
||||
|
||||
private static String getBestNodeDNAddrPairErrorString(
|
||||
|
@ -1039,6 +1064,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
}
|
||||
DatanodeInfo chosenNode = datanode.info;
|
||||
InetSocketAddress targetAddr = datanode.addr;
|
||||
StorageType storageType = datanode.storageType;
|
||||
BlockReader reader = null;
|
||||
|
||||
try {
|
||||
|
@ -1049,6 +1075,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
setInetSocketAddress(targetAddr).
|
||||
setRemotePeerFactory(dfsClient).
|
||||
setDatanodeInfo(chosenNode).
|
||||
setStorageType(storageType).
|
||||
setFileName(src).
|
||||
setBlock(block.getBlock()).
|
||||
setBlockToken(blockToken).
|
||||
|
@ -1174,7 +1201,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
// If no nodes to do hedged reads against, pass.
|
||||
try {
|
||||
try {
|
||||
chosenNode = getBestNodeDNAddrPair(block.getLocations(), ignored);
|
||||
chosenNode = getBestNodeDNAddrPair(block, ignored);
|
||||
} catch (IOException ioe) {
|
||||
chosenNode = chooseDataNode(block, ignored);
|
||||
}
|
||||
|
@ -1529,31 +1556,17 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
|||
throw new IOException("Mark/reset not supported");
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick the best node from which to stream the data.
|
||||
* Entries in <i>nodes</i> are already in the priority order
|
||||
*/
|
||||
static DatanodeInfo bestNode(DatanodeInfo nodes[],
|
||||
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes,
|
||||
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
||||
if (nodes != null) {
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
if (!deadNodes.containsKey(nodes[i])
|
||||
&& (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
|
||||
return nodes[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
throw new IOException("No live nodes contain current block");
|
||||
}
|
||||
|
||||
/** Utility class to encapsulate data node info and its address. */
|
||||
static class DNAddrPair {
|
||||
private static final class DNAddrPair {
|
||||
final DatanodeInfo info;
|
||||
final InetSocketAddress addr;
|
||||
DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
|
||||
final StorageType storageType;
|
||||
|
||||
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
|
||||
StorageType storageType) {
|
||||
this.info = info;
|
||||
this.addr = addr;
|
||||
this.storageType = storageType;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -89,9 +91,9 @@ import org.apache.hadoop.security.AccessControlException;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DataChecksum.Type;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import org.htrace.Span;
|
||||
import org.htrace.Trace;
|
||||
import org.htrace.TraceScope;
|
||||
|
@ -148,7 +150,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private String src;
|
||||
private final long fileId;
|
||||
private final long blockSize;
|
||||
private final DataChecksum checksum;
|
||||
/** Only for DataTransferProtocol.writeBlock(..) */
|
||||
private final DataChecksum checksum4WriteBlock;
|
||||
private final int bytesPerChecksum;
|
||||
|
||||
// both dataQueue and ackQueue are protected by dataQueue lock
|
||||
private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
|
||||
private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
|
||||
|
@ -245,6 +250,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
|
||||
void writeChecksum(byte[] inarray, int off, int len) {
|
||||
if (len == 0) {
|
||||
return;
|
||||
}
|
||||
if (checksumPos + len > dataStart) {
|
||||
throw new BufferOverflowException();
|
||||
}
|
||||
|
@ -377,19 +385,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
private final Span traceSpan;
|
||||
|
||||
/**
|
||||
* Default construction for file create
|
||||
*/
|
||||
private DataStreamer() {
|
||||
this(null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* construction with tracing info
|
||||
*/
|
||||
private DataStreamer(HdfsFileStatus stat, Span span) {
|
||||
isAppend = false;
|
||||
isLazyPersistFile = initLazyPersist(stat);
|
||||
isLazyPersistFile = isLazyPersist(stat);
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
traceSpan = span;
|
||||
}
|
||||
|
@ -409,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
accessToken = lastBlock.getBlockToken();
|
||||
isLazyPersistFile = initLazyPersist(stat);
|
||||
isLazyPersistFile = isLazyPersist(stat);
|
||||
long usedInLastBlock = stat.getLen() % blockSize;
|
||||
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
|
||||
|
||||
|
@ -453,13 +454,6 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
|
||||
private boolean initLazyPersist(HdfsFileStatus stat) {
|
||||
final BlockStoragePolicy lpPolicy = blockStoragePolicySuite
|
||||
.getPolicy(HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
|
||||
return lpPolicy != null &&
|
||||
stat.getStoragePolicy() == lpPolicy.getId();
|
||||
}
|
||||
|
||||
private void setPipeline(LocatedBlock lb) {
|
||||
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
||||
}
|
||||
|
@ -553,7 +547,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
// get packet to be sent.
|
||||
if (dataQueue.isEmpty()) {
|
||||
one = new Packet(checksum.getChecksumSize()); // heartbeat packet
|
||||
one = new Packet(getChecksumSize()); // heartbeat packet
|
||||
} else {
|
||||
one = dataQueue.getFirst(); // regular data packet
|
||||
}
|
||||
|
@ -1408,8 +1402,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// send the request
|
||||
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
|
||||
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
|
||||
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
|
||||
cachingStrategy.get(), isLazyPersistFile);
|
||||
nodes.length, block.getNumBytes(), bytesSent, newGS,
|
||||
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile);
|
||||
|
||||
// receive ack for connect
|
||||
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
|
||||
|
@ -1618,9 +1612,23 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the object for computing checksum.
|
||||
* The type is NULL if checksum is not computed.
|
||||
*/
|
||||
private static DataChecksum getChecksum4Compute(DataChecksum checksum,
|
||||
HdfsFileStatus stat) {
|
||||
if (isLazyPersist(stat) && stat.getReplication() == 1) {
|
||||
// do not compute checksum for writing to single replica to memory
|
||||
return DataChecksum.newDataChecksum(Type.NULL,
|
||||
checksum.getBytesPerChecksum());
|
||||
}
|
||||
return checksum;
|
||||
}
|
||||
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
|
||||
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||
super(checksum);
|
||||
super(getChecksum4Compute(checksum, stat));
|
||||
this.dfsClient = dfsClient;
|
||||
this.src = src;
|
||||
this.fileId = stat.getFileId();
|
||||
|
@ -1635,15 +1643,18 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
"Set non-null progress callback on DFSOutputStream " + src);
|
||||
}
|
||||
|
||||
final int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
|
||||
throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
|
||||
") and blockSize(" + blockSize +
|
||||
") do not match. " + "blockSize should be a " +
|
||||
"multiple of io.bytes.per.checksum");
|
||||
|
||||
this.bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
if (bytesPerChecksum <= 0) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
|
||||
}
|
||||
this.checksum = checksum;
|
||||
if (blockSize % bytesPerChecksum != 0) {
|
||||
throw new HadoopIllegalArgumentException("Invalid values: "
|
||||
+ DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
|
||||
+ ") must divide block size (=" + blockSize + ").");
|
||||
}
|
||||
this.checksum4WriteBlock = checksum;
|
||||
|
||||
this.dfsclientSlowLogThresholdMs =
|
||||
dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
|
||||
}
|
||||
|
@ -1655,8 +1666,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
this(dfsClient, src, progress, stat, checksum);
|
||||
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
|
||||
|
||||
Span traceSpan = null;
|
||||
if (Trace.isTracing()) {
|
||||
|
@ -1734,11 +1744,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
if (lastBlock != null) {
|
||||
// indicate that we are appending to an existing block
|
||||
bytesCurBlock = lastBlock.getBlockSize();
|
||||
streamer = new DataStreamer(lastBlock, stat,
|
||||
checksum.getBytesPerChecksum(), traceSpan);
|
||||
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
|
||||
} else {
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize,
|
||||
checksum.getBytesPerChecksum());
|
||||
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
|
||||
streamer = new DataStreamer(stat, traceSpan);
|
||||
}
|
||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||
|
@ -1753,8 +1761,14 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
return out;
|
||||
}
|
||||
|
||||
private static boolean isLazyPersist(HdfsFileStatus stat) {
|
||||
final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
|
||||
HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
|
||||
return p != null && stat.getStoragePolicy() == p.getId();
|
||||
}
|
||||
|
||||
private void computePacketChunkSize(int psize, int csize) {
|
||||
int chunkSize = csize + checksum.getChecksumSize();
|
||||
final int chunkSize = csize + getChecksumSize();
|
||||
chunksPerPacket = Math.max(psize/chunkSize, 1);
|
||||
packetSize = chunkSize*chunksPerPacket;
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
|
@ -1811,21 +1825,19 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
dfsClient.checkOpen();
|
||||
checkClosed();
|
||||
|
||||
int bytesPerChecksum = this.checksum.getBytesPerChecksum();
|
||||
if (len > bytesPerChecksum) {
|
||||
throw new IOException("writeChunk() buffer size is " + len +
|
||||
" is larger than supported bytesPerChecksum " +
|
||||
bytesPerChecksum);
|
||||
}
|
||||
if (cklen != this.checksum.getChecksumSize()) {
|
||||
if (cklen != 0 && cklen != getChecksumSize()) {
|
||||
throw new IOException("writeChunk() checksum size is supposed to be " +
|
||||
this.checksum.getChecksumSize() +
|
||||
" but found to be " + cklen);
|
||||
getChecksumSize() + " but found to be " + cklen);
|
||||
}
|
||||
|
||||
if (currentPacket == null) {
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
||||
if (DFSClient.LOG.isDebugEnabled()) {
|
||||
DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
|
||||
currentPacket.seqno +
|
||||
|
@ -1873,7 +1885,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
//
|
||||
if (bytesCurBlock == blockSize) {
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock,
|
||||
currentSeqno++, this.checksum.getChecksumSize());
|
||||
currentSeqno++, getChecksumSize());
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
waitAndQueueCurrentPacket();
|
||||
|
@ -1967,7 +1979,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// but sync was requested.
|
||||
// Send an empty packet
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
||||
}
|
||||
} else {
|
||||
if (isSync && bytesCurBlock > 0) {
|
||||
|
@ -1976,7 +1988,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// and sync was requested.
|
||||
// So send an empty sync packet.
|
||||
currentPacket = new Packet(packetSize, chunksPerPacket,
|
||||
bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize());
|
||||
bytesCurBlock, currentSeqno++, getChecksumSize());
|
||||
} else {
|
||||
// just discard the current packet since it is already been sent.
|
||||
currentPacket = null;
|
||||
|
@ -2180,8 +2192,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
if (bytesCurBlock != 0) {
|
||||
// send an empty packet to mark the end of the block
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock,
|
||||
currentSeqno++, this.checksum.getChecksumSize());
|
||||
currentPacket = new Packet(0, 0, bytesCurBlock, currentSeqno++, getChecksumSize());
|
||||
currentPacket.lastPacketInBlock = true;
|
||||
currentPacket.syncBlock = shouldSyncBlock;
|
||||
}
|
||||
|
@ -2245,8 +2256,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
@VisibleForTesting
|
||||
public synchronized void setChunksPerPacket(int value) {
|
||||
chunksPerPacket = Math.min(chunksPerPacket, value);
|
||||
packetSize = (checksum.getBytesPerChecksum() +
|
||||
checksum.getChecksumSize()) * chunksPerPacket;
|
||||
packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
|
||||
}
|
||||
|
||||
synchronized void setTestFilename(String newname) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.protocol;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -185,7 +186,11 @@ public class LocatedBlock {
|
|||
+ "; getBlockSize()=" + getBlockSize()
|
||||
+ "; corrupt=" + corrupt
|
||||
+ "; offset=" + offset
|
||||
+ "; locs=" + java.util.Arrays.asList(locs)
|
||||
+ "; locs=" + Arrays.asList(locs)
|
||||
+ "; storageIDs=" +
|
||||
(storageIDs != null ? Arrays.asList(storageIDs) : null)
|
||||
+ "; storageTypes=" +
|
||||
(storageTypes != null ? Arrays.asList(storageTypes) : null)
|
||||
+ "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,10 +29,13 @@ import java.io.RandomAccessFile;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
|
@ -46,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class BlockMetadataHeader {
|
||||
private static final Log LOG = LogFactory.getLog(BlockMetadataHeader.class);
|
||||
|
||||
public static final short VERSION = 1;
|
||||
|
||||
|
@ -73,6 +77,37 @@ public class BlockMetadataHeader {
|
|||
return checksum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the checksum header from the meta file.
|
||||
* @return the data checksum obtained from the header.
|
||||
*/
|
||||
public static DataChecksum readDataChecksum(File metaFile) throws IOException {
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
in = new DataInputStream(new BufferedInputStream(
|
||||
new FileInputStream(metaFile), HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
return readDataChecksum(in, metaFile);
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the checksum header from the meta input stream.
|
||||
* @return the data checksum obtained from the header.
|
||||
*/
|
||||
public static DataChecksum readDataChecksum(final DataInputStream metaIn,
|
||||
final Object name) throws IOException {
|
||||
// read and handle the common header here. For now just a version
|
||||
final BlockMetadataHeader header = readHeader(metaIn);
|
||||
if (header.getVersion() != VERSION) {
|
||||
LOG.warn("Unexpected meta-file version for " + name
|
||||
+ ": version in file is " + header.getVersion()
|
||||
+ " but expected version is " + VERSION);
|
||||
}
|
||||
return header.getChecksum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the header without changing the position of the FileChannel.
|
||||
*
|
||||
|
@ -82,7 +117,7 @@ public class BlockMetadataHeader {
|
|||
*/
|
||||
public static BlockMetadataHeader preadHeader(FileChannel fc)
|
||||
throws IOException {
|
||||
byte arr[] = new byte[2 + DataChecksum.HEADER_LEN];
|
||||
final byte arr[] = new byte[getHeaderSize()];
|
||||
ByteBuffer buf = ByteBuffer.wrap(arr);
|
||||
|
||||
while (buf.hasRemaining()) {
|
||||
|
@ -158,7 +193,7 @@ public class BlockMetadataHeader {
|
|||
* Writes all the fields till the beginning of checksum.
|
||||
* @throws IOException on error
|
||||
*/
|
||||
static void writeHeader(DataOutputStream out, DataChecksum checksum)
|
||||
public static void writeHeader(DataOutputStream out, DataChecksum checksum)
|
||||
throws IOException {
|
||||
writeHeader(out, new BlockMetadataHeader(VERSION, checksum));
|
||||
}
|
||||
|
|
|
@ -82,12 +82,12 @@ class BlockReceiver implements Closeable {
|
|||
* checksum polynomial than the block is stored with on disk,
|
||||
* the DataNode needs to recalculate checksums before writing.
|
||||
*/
|
||||
private boolean needsChecksumTranslation;
|
||||
private final boolean needsChecksumTranslation;
|
||||
private OutputStream out = null; // to block file at local disk
|
||||
private FileDescriptor outFd;
|
||||
private DataOutputStream checksumOut = null; // to crc file at local disk
|
||||
private int bytesPerChecksum;
|
||||
private int checksumSize;
|
||||
private final int bytesPerChecksum;
|
||||
private final int checksumSize;
|
||||
|
||||
private final PacketReceiver packetReceiver = new PacketReceiver(false);
|
||||
|
||||
|
@ -99,7 +99,6 @@ class BlockReceiver implements Closeable {
|
|||
private DataTransferThrottler throttler;
|
||||
private ReplicaOutputStreams streams;
|
||||
private DatanodeInfo srcDataNode = null;
|
||||
private Checksum partialCrc = null;
|
||||
private final DataNode datanode;
|
||||
volatile private boolean mirrorError;
|
||||
|
||||
|
@ -490,7 +489,7 @@ class BlockReceiver implements Closeable {
|
|||
long offsetInBlock = header.getOffsetInBlock();
|
||||
long seqno = header.getSeqno();
|
||||
boolean lastPacketInBlock = header.isLastPacketInBlock();
|
||||
int len = header.getDataLen();
|
||||
final int len = header.getDataLen();
|
||||
boolean syncBlock = header.getSyncBlock();
|
||||
|
||||
// avoid double sync'ing on close
|
||||
|
@ -499,7 +498,7 @@ class BlockReceiver implements Closeable {
|
|||
}
|
||||
|
||||
// update received bytes
|
||||
long firstByteInBlock = offsetInBlock;
|
||||
final long firstByteInBlock = offsetInBlock;
|
||||
offsetInBlock += len;
|
||||
if (replicaInfo.getNumBytes() < offsetInBlock) {
|
||||
replicaInfo.setNumBytes(offsetInBlock);
|
||||
|
@ -539,16 +538,15 @@ class BlockReceiver implements Closeable {
|
|||
flushOrSync(true);
|
||||
}
|
||||
} else {
|
||||
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
|
||||
checksumSize;
|
||||
final int checksumLen = diskChecksum.getChecksumSize(len);
|
||||
final int checksumReceivedLen = checksumBuf.capacity();
|
||||
|
||||
if ( checksumBuf.capacity() != checksumLen) {
|
||||
throw new IOException("Length of checksums in packet " +
|
||||
checksumBuf.capacity() + " does not match calculated checksum " +
|
||||
"length " + checksumLen);
|
||||
if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
|
||||
throw new IOException("Invalid checksum length: received length is "
|
||||
+ checksumReceivedLen + " but expected length is " + checksumLen);
|
||||
}
|
||||
|
||||
if (shouldVerifyChecksum()) {
|
||||
if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
|
||||
try {
|
||||
verifyChunks(dataBuf, checksumBuf);
|
||||
} catch (IOException ioe) {
|
||||
|
@ -573,10 +571,16 @@ class BlockReceiver implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
|
||||
// checksum is missing, need to calculate it
|
||||
checksumBuf = ByteBuffer.allocate(checksumLen);
|
||||
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
|
||||
}
|
||||
|
||||
// by this point, the data in the buffer uses the disk checksum
|
||||
|
||||
byte[] lastChunkChecksum;
|
||||
|
||||
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
|
||||
&& streams.isTransientStorage();
|
||||
try {
|
||||
long onDiskLen = replicaInfo.getBytesOnDisk();
|
||||
if (onDiskLen<offsetInBlock) {
|
||||
|
@ -588,14 +592,16 @@ class BlockReceiver implements Closeable {
|
|||
}
|
||||
|
||||
// If this is a partial chunk, then read in pre-existing checksum
|
||||
if (firstByteInBlock % bytesPerChecksum != 0) {
|
||||
LOG.info("Packet starts at " + firstByteInBlock +
|
||||
" for " + block +
|
||||
" which is not a multiple of bytesPerChecksum " +
|
||||
bytesPerChecksum);
|
||||
Checksum partialCrc = null;
|
||||
if (!shouldNotWriteChecksum && firstByteInBlock % bytesPerChecksum != 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("receivePacket for " + block
|
||||
+ ": bytesPerChecksum=" + bytesPerChecksum
|
||||
+ " does not divide firstByteInBlock=" + firstByteInBlock);
|
||||
}
|
||||
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
||||
onDiskLen / bytesPerChecksum * checksumSize;
|
||||
computePartialChunkCrc(onDiskLen, offsetInChecksum, bytesPerChecksum);
|
||||
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
|
||||
}
|
||||
|
||||
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
|
||||
|
@ -612,41 +618,40 @@ class BlockReceiver implements Closeable {
|
|||
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
|
||||
}
|
||||
|
||||
final byte[] lastCrc;
|
||||
if (shouldNotWriteChecksum) {
|
||||
lastCrc = null;
|
||||
} else if (partialCrc != null) {
|
||||
// If this is a partial chunk, then verify that this is the only
|
||||
// chunk in the packet. Calculate new crc for this chunk.
|
||||
if (partialCrc != null) {
|
||||
if (len > bytesPerChecksum) {
|
||||
throw new IOException("Got wrong length during writeBlock(" +
|
||||
block + ") from " + inAddr + " " +
|
||||
"A packet can have only one partial chunk."+
|
||||
" len = " + len +
|
||||
" bytesPerChecksum " + bytesPerChecksum);
|
||||
throw new IOException("Unexpected packet data length for "
|
||||
+ block + " from " + inAddr + ": a partial chunk must be "
|
||||
+ " sent in an individual packet (data length = " + len
|
||||
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
|
||||
}
|
||||
partialCrc.update(dataBuf.array(), startByteToDisk, numBytesToDisk);
|
||||
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
|
||||
lastChunkChecksum = Arrays.copyOfRange(
|
||||
buf, buf.length - checksumSize, buf.length
|
||||
);
|
||||
lastCrc = copyLastChunkChecksum(buf, checksumSize, buf.length);
|
||||
checksumOut.write(buf);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Writing out partial crc for data len " + len);
|
||||
}
|
||||
partialCrc = null;
|
||||
} else {
|
||||
lastChunkChecksum = Arrays.copyOfRange(
|
||||
checksumBuf.array(),
|
||||
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen - checksumSize,
|
||||
checksumBuf.arrayOffset() + checksumBuf.position() + checksumLen);
|
||||
checksumOut.write(checksumBuf.array(),
|
||||
checksumBuf.arrayOffset() + checksumBuf.position(),
|
||||
checksumLen);
|
||||
// write checksum
|
||||
final int offset = checksumBuf.arrayOffset() +
|
||||
checksumBuf.position();
|
||||
final int end = offset + checksumLen;
|
||||
lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize,
|
||||
end);
|
||||
checksumOut.write(checksumBuf.array(), offset, checksumLen);
|
||||
}
|
||||
|
||||
/// flush entire packet, sync if requested
|
||||
flushOrSync(syncBlock);
|
||||
|
||||
replicaInfo.setLastChecksumAndDataLen(
|
||||
offsetInBlock, lastChunkChecksum
|
||||
);
|
||||
replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
|
||||
|
||||
datanode.metrics.incrBytesWritten(len);
|
||||
|
||||
|
@ -686,6 +691,10 @@ class BlockReceiver implements Closeable {
|
|||
return lastPacketInBlock?-1:len;
|
||||
}
|
||||
|
||||
private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
|
||||
return Arrays.copyOfRange(array, end - size, end);
|
||||
}
|
||||
|
||||
private void manageWriterOsCache(long offsetInBlock) {
|
||||
try {
|
||||
if (outFd != null &&
|
||||
|
@ -921,18 +930,19 @@ class BlockReceiver implements Closeable {
|
|||
* reads in the partial crc chunk and computes checksum
|
||||
* of pre-existing data in partial chunk.
|
||||
*/
|
||||
private void computePartialChunkCrc(long blkoff, long ckoff,
|
||||
int bytesPerChecksum) throws IOException {
|
||||
private Checksum computePartialChunkCrc(long blkoff, long ckoff)
|
||||
throws IOException {
|
||||
|
||||
// find offset of the beginning of partial chunk.
|
||||
//
|
||||
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
|
||||
int checksumSize = diskChecksum.getChecksumSize();
|
||||
blkoff = blkoff - sizePartialChunk;
|
||||
LOG.info("computePartialChunkCrc sizePartialChunk " +
|
||||
sizePartialChunk + " " + block +
|
||||
" block offset " + blkoff +
|
||||
" metafile offset " + ckoff);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("computePartialChunkCrc for " + block
|
||||
+ ": sizePartialChunk=" + sizePartialChunk
|
||||
+ ", block offset=" + blkoff
|
||||
+ ", metafile offset=" + ckoff);
|
||||
}
|
||||
|
||||
// create an input stream from the block file
|
||||
// and read in partial crc chunk into temporary buffer
|
||||
|
@ -951,10 +961,12 @@ class BlockReceiver implements Closeable {
|
|||
}
|
||||
|
||||
// compute crc of partial chunk from data read in the block file.
|
||||
partialCrc = DataChecksum.newDataChecksum(
|
||||
final Checksum partialCrc = DataChecksum.newDataChecksum(
|
||||
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
|
||||
partialCrc.update(buf, 0, sizePartialChunk);
|
||||
LOG.info("Read in partial CRC chunk from disk for " + block);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Read in partial CRC chunk from disk for " + block);
|
||||
}
|
||||
|
||||
// paranoia! verify that the pre-computed crc matches what we
|
||||
// recalculated just now
|
||||
|
@ -965,6 +977,7 @@ class BlockReceiver implements Closeable {
|
|||
checksum2long(crcbuf);
|
||||
throw new IOException(msg);
|
||||
}
|
||||
return partialCrc;
|
||||
}
|
||||
|
||||
private static enum PacketResponderType {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.fs.ChecksumException;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
|
@ -265,27 +266,38 @@ class BlockSender implements java.io.Closeable {
|
|||
*/
|
||||
DataChecksum csum = null;
|
||||
if (verifyChecksum || sendChecksum) {
|
||||
final InputStream metaIn = datanode.data.getMetaDataInputStream(block);
|
||||
LengthInputStream metaIn = null;
|
||||
boolean keepMetaInOpen = false;
|
||||
try {
|
||||
metaIn = datanode.data.getMetaDataInputStream(block);
|
||||
if (!corruptChecksumOk || metaIn != null) {
|
||||
if (metaIn == null) {
|
||||
//need checksum but meta-data not found
|
||||
throw new FileNotFoundException("Meta-data not found for " + block);
|
||||
throw new FileNotFoundException("Meta-data not found for " +
|
||||
block);
|
||||
}
|
||||
|
||||
checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
// The meta file will contain only the header if the NULL checksum
|
||||
// type was used, or if the replica was written to transient storage.
|
||||
// Checksum verification is not performed for replicas on transient
|
||||
// storage. The header is important for determining the checksum
|
||||
// type later when lazy persistence copies the block to non-transient
|
||||
// storage and computes the checksum.
|
||||
if (metaIn.getLength() > BlockMetadataHeader.getHeaderSize()) {
|
||||
checksumIn = new DataInputStream(new BufferedInputStream(
|
||||
metaIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
LOG.warn("Wrong version (" + version + ") for metadata file for "
|
||||
+ block + " ignoring ...");
|
||||
csum = BlockMetadataHeader.readDataChecksum(checksumIn, block);
|
||||
keepMetaInOpen = true;
|
||||
}
|
||||
csum = header.getChecksum();
|
||||
} else {
|
||||
LOG.warn("Could not find metadata file for " + block);
|
||||
}
|
||||
} finally {
|
||||
if (!keepMetaInOpen) {
|
||||
IOUtils.closeStream(metaIn);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (csum == null) {
|
||||
// The number of bytes per checksum here determines the alignment
|
||||
|
@ -343,7 +355,7 @@ class BlockSender implements java.io.Closeable {
|
|||
endOffset = end;
|
||||
|
||||
// seek to the right offsets
|
||||
if (offset > 0) {
|
||||
if (offset > 0 && checksumIn != null) {
|
||||
long checksumSkip = (offset / chunkSize) * checksumSize;
|
||||
// note blockInStream is seeked when created below
|
||||
if (checksumSkip > 0) {
|
||||
|
|
|
@ -213,7 +213,7 @@ public class ReplicaInPipeline extends ReplicaInfo
|
|||
|
||||
// the checksum that should actually be used -- this
|
||||
// may differ from requestedChecksum for appends.
|
||||
DataChecksum checksum;
|
||||
final DataChecksum checksum;
|
||||
|
||||
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
|
||||
|
||||
|
@ -264,7 +264,8 @@ public class ReplicaInPipeline extends ReplicaInfo
|
|||
blockOut.getChannel().position(blockDiskSize);
|
||||
crcOut.getChannel().position(crcDiskSize);
|
||||
}
|
||||
return new ReplicaOutputStreams(blockOut, crcOut, checksum);
|
||||
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
|
||||
getVolume().isTransientStorage());
|
||||
} catch (IOException e) {
|
||||
IOUtils.closeStream(blockOut);
|
||||
IOUtils.closeStream(metaRAF);
|
||||
|
|
|
@ -32,16 +32,18 @@ public class ReplicaOutputStreams implements Closeable {
|
|||
private final OutputStream dataOut;
|
||||
private final OutputStream checksumOut;
|
||||
private final DataChecksum checksum;
|
||||
private final boolean isTransientStorage;
|
||||
|
||||
/**
|
||||
* Create an object with a data output stream, a checksum output stream
|
||||
* and a checksum.
|
||||
*/
|
||||
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
|
||||
DataChecksum checksum) {
|
||||
DataChecksum checksum, boolean isTransientStorage) {
|
||||
this.dataOut = dataOut;
|
||||
this.checksumOut = checksumOut;
|
||||
this.checksum = checksum;
|
||||
this.isTransientStorage = isTransientStorage;
|
||||
}
|
||||
|
||||
/** @return the data output stream. */
|
||||
|
@ -59,6 +61,11 @@ public class ReplicaOutputStreams implements Closeable {
|
|||
return checksum;
|
||||
}
|
||||
|
||||
/** @return is writing to a transient storage? */
|
||||
public boolean isTransientStorage() {
|
||||
return isTransientStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
IOUtils.closeStream(dataOut);
|
||||
|
|
|
@ -599,13 +599,8 @@ class BlockPoolSlice {
|
|||
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
|
||||
+ metaFile + " ignoring ...");
|
||||
}
|
||||
DataChecksum checksum = header.getChecksum();
|
||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
|
||||
checksumIn, metaFile);
|
||||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
int checksumSize = checksum.getChecksumSize();
|
||||
long numChunks = Math.min(
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileDescriptor;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
|
@ -92,6 +95,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.MultipleIOException;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
|
@ -634,7 +638,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Get the meta info of a block stored in volumeMap. To find a block,
|
||||
* block pool Id, block Id and generation stamp must match.
|
||||
* @param b extended block
|
||||
* @return the meta replica information; null if block was not found
|
||||
* @return the meta replica information
|
||||
* @throws ReplicaNotFoundException if no entry is in the map or
|
||||
* there is a generation stamp mismatch
|
||||
*/
|
||||
|
@ -722,23 +726,80 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
final File destDir = DatanodeUtil.idToBlockDir(destRoot, blockId);
|
||||
final File dstFile = new File(destDir, srcFile.getName());
|
||||
final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, genStamp);
|
||||
try {
|
||||
Storage.nativeCopyFileUnbuffered(srcMeta, dstMeta, true);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
|
||||
}
|
||||
computeChecksum(srcMeta, dstMeta, srcFile);
|
||||
|
||||
try {
|
||||
Storage.nativeCopyFileUnbuffered(srcFile, dstFile, true);
|
||||
} catch (IOException e) {
|
||||
throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Copied " + srcMeta + " to " + dstMeta);
|
||||
LOG.debug("Copied " + srcMeta + " to " + dstMeta +
|
||||
" and calculated checksum");
|
||||
LOG.debug("Copied " + srcFile + " to " + dstFile);
|
||||
}
|
||||
return new File[] {dstMeta, dstFile};
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute and store the checksum for a block file that does not already have
|
||||
* its checksum computed.
|
||||
*
|
||||
* @param srcMeta source meta file, containing only the checksum header, not a
|
||||
* calculated checksum
|
||||
* @param dstMeta destination meta file, into which this method will write a
|
||||
* full computed checksum
|
||||
* @param blockFile block file for which the checksum will be computed
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void computeChecksum(File srcMeta, File dstMeta, File blockFile)
|
||||
throws IOException {
|
||||
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(srcMeta);
|
||||
final byte[] data = new byte[1 << 16];
|
||||
final byte[] crcs = new byte[checksum.getChecksumSize(data.length)];
|
||||
|
||||
DataOutputStream metaOut = null;
|
||||
InputStream dataIn = null;
|
||||
try {
|
||||
File parentFile = dstMeta.getParentFile();
|
||||
if (parentFile != null) {
|
||||
if (!parentFile.mkdirs() && !parentFile.isDirectory()) {
|
||||
throw new IOException("Destination '" + parentFile
|
||||
+ "' directory cannot be created");
|
||||
}
|
||||
}
|
||||
metaOut = new DataOutputStream(new BufferedOutputStream(
|
||||
new FileOutputStream(dstMeta), HdfsConstants.SMALL_BUFFER_SIZE));
|
||||
BlockMetadataHeader.writeHeader(metaOut, checksum);
|
||||
|
||||
dataIn = isNativeIOAvailable ?
|
||||
NativeIO.getShareDeleteFileInputStream(blockFile) :
|
||||
new FileInputStream(blockFile);
|
||||
|
||||
int offset = 0;
|
||||
for(int n; (n = dataIn.read(data, offset, data.length - offset)) != -1; ) {
|
||||
if (n > 0) {
|
||||
n += offset;
|
||||
offset = n % checksum.getBytesPerChecksum();
|
||||
final int length = n - offset;
|
||||
|
||||
if (length > 0) {
|
||||
checksum.calculateChunkedSums(data, 0, length, crcs, 0);
|
||||
metaOut.write(crcs, 0, checksum.getChecksumSize(length));
|
||||
|
||||
System.arraycopy(data, length, data, 0, offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculate and write the last crc
|
||||
checksum.calculateChunkedSums(data, 0, offset, crcs, 0);
|
||||
metaOut.write(crcs, 0, 4);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, dataIn, metaOut);
|
||||
}
|
||||
}
|
||||
|
||||
static private void truncateBlock(File blockFile, File metaFile,
|
||||
long oldlen, long newlen) throws IOException {
|
||||
LOG.info("truncateBlock: blockFile=" + blockFile
|
||||
|
@ -1641,6 +1702,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCached(String bpid, long blockId) {
|
||||
return cacheManager.isCached(bpid, blockId);
|
||||
}
|
||||
|
@ -2556,8 +2618,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// Before deleting the files from transient storage we must notify the
|
||||
// NN that the files are on the new storage. Else a blockReport from
|
||||
// the transient storage might cause the NN to think the blocks are lost.
|
||||
// Replicas must be evicted from client short-circuit caches, because the
|
||||
// storage will no longer be transient, and thus will require validating
|
||||
// checksum. This also stops a client from holding file descriptors,
|
||||
// which would prevent the OS from reclaiming the memory.
|
||||
ExtendedBlock extendedBlock =
|
||||
new ExtendedBlock(bpid, newReplicaInfo);
|
||||
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
||||
ExtendedBlockId.fromExtendedBlock(extendedBlock));
|
||||
datanode.notifyNamenodeReceivedBlock(
|
||||
extendedBlock, null, newReplicaInfo.getStorageUuid());
|
||||
|
||||
|
|
|
@ -241,7 +241,7 @@ class RamDiskAsyncLazyPersistService {
|
|||
} catch (Exception e){
|
||||
FsDatasetImpl.LOG.warn(
|
||||
"LazyWriter failed to async persist RamDisk block pool id: "
|
||||
+ bpId + "block Id: " + blockId);
|
||||
+ bpId + "block Id: " + blockId, e);
|
||||
} finally {
|
||||
if (!succeeded) {
|
||||
datanode.getFSDataset().onFailLazyPersist(bpId, blockId);
|
||||
|
|
|
@ -168,9 +168,9 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
|
|||
|
||||
@Override
|
||||
synchronized RamDiskReplicaLru getNextCandidateForEviction() {
|
||||
Iterator it = replicasPersisted.values().iterator();
|
||||
final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
|
||||
while (it.hasNext()) {
|
||||
RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
|
||||
final RamDiskReplicaLru ramDiskReplicaLru = it.next();
|
||||
it.remove();
|
||||
|
||||
Map<Long, RamDiskReplicaLru> replicaMap =
|
||||
|
|
|
@ -248,7 +248,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
+ theBlock);
|
||||
} else {
|
||||
SimulatedOutputStream crcStream = new SimulatedOutputStream();
|
||||
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum);
|
||||
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
|
||||
volume.isTransientStorage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,389 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.tools.JMXGet;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public abstract class LazyPersistTestCase {
|
||||
|
||||
static {
|
||||
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
||||
protected static final int BUFFER_LENGTH = 4096;
|
||||
protected static final int EVICTION_LOW_WATERMARK = 1;
|
||||
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
||||
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
||||
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
||||
private static final String JMX_SERVICE_NAME = "DataNode";
|
||||
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
||||
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
||||
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
|
||||
protected static final short REPL_FACTOR = 1;
|
||||
|
||||
protected MiniDFSCluster cluster;
|
||||
protected DistributedFileSystem fs;
|
||||
protected DFSClient client;
|
||||
protected JMXGet jmx;
|
||||
protected TemporarySocketDirectory sockDir;
|
||||
|
||||
@After
|
||||
public void shutDownCluster() throws Exception {
|
||||
|
||||
// Dump all RamDisk JMX metrics before shutdown the cluster
|
||||
printRamDiskJMXMetrics();
|
||||
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
fs = null;
|
||||
client = null;
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdownDataNodes();
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
|
||||
if (jmx != null) {
|
||||
jmx = null;
|
||||
}
|
||||
|
||||
IOUtils.closeQuietly(sockDir);
|
||||
sockDir = null;
|
||||
}
|
||||
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(300000);
|
||||
|
||||
protected final LocatedBlocks ensureFileReplicasOnStorageType(
|
||||
Path path, StorageType storageType) throws IOException {
|
||||
// Ensure that returned block locations returned are correct!
|
||||
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
|
||||
assertThat(fs.exists(path), is(true));
|
||||
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||
LocatedBlocks locatedBlocks =
|
||||
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
|
||||
}
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
protected final void makeRandomTestFile(Path path, long length,
|
||||
boolean isLazyPersist, long seed) throws IOException {
|
||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||
BLOCK_SIZE, REPL_FACTOR, seed, true);
|
||||
}
|
||||
|
||||
protected final void makeTestFile(Path path, long length,
|
||||
boolean isLazyPersist) throws IOException {
|
||||
|
||||
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
||||
|
||||
if (isLazyPersist) {
|
||||
createFlags.add(LAZY_PERSIST);
|
||||
}
|
||||
|
||||
FSDataOutputStream fos = null;
|
||||
try {
|
||||
fos =
|
||||
fs.create(path,
|
||||
FsPermission.getFileDefault(),
|
||||
createFlags,
|
||||
BUFFER_LENGTH,
|
||||
REPL_FACTOR,
|
||||
BLOCK_SIZE,
|
||||
null);
|
||||
|
||||
// Allocate a block.
|
||||
byte[] buffer = new byte[BUFFER_LENGTH];
|
||||
for (int bytesWritten = 0; bytesWritten < length; ) {
|
||||
fos.write(buffer, 0, buffer.length);
|
||||
bytesWritten += buffer.length;
|
||||
}
|
||||
if (length > 0) {
|
||||
fos.hsync();
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
||||
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
||||
*/
|
||||
protected final void startUpCluster(boolean hasTransientStorage,
|
||||
final int ramDiskReplicaCapacity,
|
||||
final boolean useSCR,
|
||||
final boolean useLegacyBlockReaderLocal)
|
||||
throws IOException {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
||||
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||
LAZY_WRITER_INTERVAL_SEC);
|
||||
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
|
||||
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
|
||||
|
||||
if (useSCR) {
|
||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
||||
// Do not share a client context across tests.
|
||||
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
||||
if (useLegacyBlockReaderLocal) {
|
||||
conf.setBoolean(DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL, true);
|
||||
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
} else {
|
||||
sockDir = new TemporarySocketDirectory();
|
||||
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
long[] capacities = null;
|
||||
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
|
||||
// Convert replica count to byte count, add some delta for .meta and
|
||||
// VERSION files.
|
||||
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) +
|
||||
(BLOCK_SIZE - 1);
|
||||
capacities = new long[] { ramDiskStorageLimit, -1 };
|
||||
}
|
||||
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(REPL_FACTOR)
|
||||
.storageCapacities(capacities)
|
||||
.storageTypes(hasTransientStorage ?
|
||||
new StorageType[]{ RAM_DISK, DEFAULT } : null)
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
client = fs.getClient();
|
||||
try {
|
||||
jmx = initJMX();
|
||||
} catch (Exception e) {
|
||||
fail("Failed initialize JMX for testing: " + e);
|
||||
}
|
||||
LOG.info("Cluster startup complete");
|
||||
}
|
||||
|
||||
/**
|
||||
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
||||
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
||||
*/
|
||||
protected final void startUpCluster(final int numDataNodes,
|
||||
final StorageType[] storageTypes,
|
||||
final long ramDiskStorageLimit,
|
||||
final boolean useSCR)
|
||||
throws IOException {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
||||
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||
LAZY_WRITER_INTERVAL_SEC);
|
||||
|
||||
if (useSCR)
|
||||
{
|
||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
|
||||
conf.set(DFS_CLIENT_CONTEXT, UUID.randomUUID().toString());
|
||||
sockDir = new TemporarySocketDirectory();
|
||||
conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(),
|
||||
this.getClass().getSimpleName() + "._PORT.sock").getAbsolutePath());
|
||||
conf.set(DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
}
|
||||
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(numDataNodes)
|
||||
.storageTypes(storageTypes != null ?
|
||||
storageTypes : new StorageType[] { DEFAULT, DEFAULT })
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
client = fs.getClient();
|
||||
|
||||
// Artificially cap the storage capacity of the RAM_DISK volume.
|
||||
if (ramDiskStorageLimit >= 0) {
|
||||
List<? extends FsVolumeSpi> volumes =
|
||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||
|
||||
for (FsVolumeSpi volume : volumes) {
|
||||
if (volume.getStorageType() == RAM_DISK) {
|
||||
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Cluster startup complete");
|
||||
}
|
||||
|
||||
protected final void startUpCluster(boolean hasTransientStorage,
|
||||
final int ramDiskReplicaCapacity)
|
||||
throws IOException {
|
||||
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false, false);
|
||||
}
|
||||
|
||||
protected final void triggerBlockReport()
|
||||
throws IOException, InterruptedException {
|
||||
// Trigger block report to NN
|
||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
||||
Thread.sleep(10 * 1000);
|
||||
}
|
||||
|
||||
protected final boolean verifyBlockDeletedFromDir(File dir,
|
||||
LocatedBlocks locatedBlocks) {
|
||||
|
||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||
File targetDir =
|
||||
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
|
||||
|
||||
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||
if (blockFile.exists()) {
|
||||
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
|
||||
" exists after deletion.");
|
||||
return false;
|
||||
}
|
||||
File metaFile = new File(targetDir,
|
||||
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
||||
lb.getBlock().getGenerationStamp()));
|
||||
if (metaFile.exists()) {
|
||||
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
|
||||
" exists after deletion.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
LOG.info("Verifying replica has no saved copy after deletion.");
|
||||
triggerBlockReport();
|
||||
|
||||
while(
|
||||
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
|
||||
> 0L){
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
List<? extends FsVolumeSpi> volumes =
|
||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||
|
||||
// Make sure deleted replica does not have a copy on either finalized dir of
|
||||
// transient volume or finalized dir of non-transient volume
|
||||
for (FsVolumeSpi v : volumes) {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||
File targetDir = (v.isTransientStorage()) ?
|
||||
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
||||
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
protected final void verifyRamDiskJMXMetric(String metricName,
|
||||
long expectedValue) throws Exception {
|
||||
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
|
||||
}
|
||||
|
||||
protected final boolean verifyReadRandomFile(
|
||||
Path path, int fileLength, int seed) throws IOException {
|
||||
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
|
||||
byte expected[] = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(seed, fileLength);
|
||||
return Arrays.equals(contents, expected);
|
||||
}
|
||||
|
||||
private JMXGet initJMX() throws Exception {
|
||||
JMXGet jmx = new JMXGet();
|
||||
jmx.setService(JMX_SERVICE_NAME);
|
||||
jmx.init();
|
||||
return jmx;
|
||||
}
|
||||
|
||||
private void printRamDiskJMXMetrics() {
|
||||
try {
|
||||
if (jmx != null) {
|
||||
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -17,103 +17,45 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.tools.JMXGet;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestLazyPersistFiles {
|
||||
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
|
||||
|
||||
static {
|
||||
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
public class TestLazyPersistFiles extends LazyPersistTestCase {
|
||||
private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
||||
|
||||
private static final int THREADPOOL_SIZE = 10;
|
||||
|
||||
private static final short REPL_FACTOR = 1;
|
||||
private static final int BLOCK_SIZE = 5 * 1024 * 1024;
|
||||
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
||||
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
||||
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
||||
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
||||
private static final int BUFFER_LENGTH = 4096;
|
||||
private static final int EVICTION_LOW_WATERMARK = 1;
|
||||
private static final String JMX_SERVICE_NAME = "DataNode";
|
||||
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private DFSClient client;
|
||||
private Configuration conf;
|
||||
private JMXGet jmx;
|
||||
|
||||
@After
|
||||
public void shutDownCluster() throws Exception {
|
||||
|
||||
// Dump all RamDisk JMX metrics before shutdown the cluster
|
||||
printRamDiskJMXMetrics();
|
||||
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
fs = null;
|
||||
client = null;
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdownDataNodes();
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
|
||||
if (jmx != null) {
|
||||
jmx = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPolicyNotSetByDefault() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -126,7 +68,7 @@ public class TestLazyPersistFiles {
|
|||
assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPolicyPropagation() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -138,7 +80,7 @@ public class TestLazyPersistFiles {
|
|||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPolicyPersistenceInEditLog() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -152,7 +94,7 @@ public class TestLazyPersistFiles {
|
|||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPolicyPersistenceInFsImage() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -170,7 +112,7 @@ public class TestLazyPersistFiles {
|
|||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPlacementOnRamDisk() throws IOException {
|
||||
startUpCluster(true, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -180,7 +122,7 @@ public class TestLazyPersistFiles {
|
|||
ensureFileReplicasOnStorageType(path, RAM_DISK);
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
|
||||
startUpCluster(true, 3);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -199,7 +141,7 @@ public class TestLazyPersistFiles {
|
|||
* Write should default to disk. No error.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testFallbackToDisk() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -213,7 +155,7 @@ public class TestLazyPersistFiles {
|
|||
* File can not fit in RamDisk even with eviction
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testFallbackToDiskFull() throws Exception {
|
||||
startUpCluster(false, 0);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -231,7 +173,7 @@ public class TestLazyPersistFiles {
|
|||
* Expect 2 or less blocks are on RamDisk and 3 or more on disk.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testFallbackToDiskPartial()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 2);
|
||||
|
@ -271,7 +213,7 @@ public class TestLazyPersistFiles {
|
|||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testRamDiskNotChosenByDefault() throws IOException {
|
||||
startUpCluster(true, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -289,7 +231,7 @@ public class TestLazyPersistFiles {
|
|||
* Append to lazy persist file is denied.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testAppendIsDenied() throws IOException {
|
||||
startUpCluster(true, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -310,7 +252,7 @@ public class TestLazyPersistFiles {
|
|||
* must be discarded by the NN, instead of being kept around as a
|
||||
* 'corrupt' file.
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testLazyPersistFilesAreDiscarded()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 2);
|
||||
|
@ -344,7 +286,7 @@ public class TestLazyPersistFiles {
|
|||
is(0L));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testLazyPersistBlocksAreSaved()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, -1);
|
||||
|
@ -399,7 +341,7 @@ public class TestLazyPersistFiles {
|
|||
* RamDisk eviction after lazy persist to disk.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testRamDiskEviction() throws Exception {
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
|
@ -434,7 +376,7 @@ public class TestLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testRamDiskEvictionBeforePersist()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 1);
|
||||
|
@ -459,7 +401,7 @@ public class TestLazyPersistFiles {
|
|||
|
||||
assert(fs.exists(path1));
|
||||
assert(fs.exists(path2));
|
||||
verifyReadRandomFile(path1, BLOCK_SIZE, SEED);
|
||||
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -467,7 +409,7 @@ public class TestLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testRamDiskEvictionIsLru()
|
||||
throws Exception {
|
||||
final int NUM_PATHS = 5;
|
||||
|
@ -529,7 +471,7 @@ public class TestLazyPersistFiles {
|
|||
* Memory is freed up and file is gone.
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test // (timeout=300000)
|
||||
@Test
|
||||
public void testDeleteBeforePersist()
|
||||
throws Exception {
|
||||
startUpCluster(true, -1);
|
||||
|
@ -556,7 +498,7 @@ public class TestLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testDeleteAfterPersist()
|
||||
throws Exception {
|
||||
startUpCluster(true, -1);
|
||||
|
@ -584,7 +526,7 @@ public class TestLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testDfsUsageCreateDelete()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 4);
|
||||
|
@ -615,7 +557,7 @@ public class TestLazyPersistFiles {
|
|||
/**
|
||||
* Concurrent read from the same node and verify the contents.
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testConcurrentRead()
|
||||
throws Exception {
|
||||
startUpCluster(true, 2);
|
||||
|
@ -666,7 +608,7 @@ public class TestLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testConcurrentWrites()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 9);
|
||||
|
@ -702,7 +644,7 @@ public class TestLazyPersistFiles {
|
|||
assertThat(testFailed.get(), is(false));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testDnRestartWithSavedReplicas()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
|
@ -726,7 +668,7 @@ public class TestLazyPersistFiles {
|
|||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testDnRestartWithUnsavedReplicas()
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
|
@ -746,183 +688,6 @@ public class TestLazyPersistFiles {
|
|||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
}
|
||||
|
||||
// ---- Utility functions for all test cases -------------------------------
|
||||
|
||||
/**
|
||||
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
||||
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
||||
*/
|
||||
private void startUpCluster(boolean hasTransientStorage,
|
||||
final int ramDiskReplicaCapacity,
|
||||
final boolean useSCR)
|
||||
throws IOException {
|
||||
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
||||
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||
LAZY_WRITER_INTERVAL_SEC);
|
||||
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
|
||||
EVICTION_LOW_WATERMARK * BLOCK_SIZE);
|
||||
|
||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
|
||||
|
||||
long[] capacities = null;
|
||||
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
|
||||
// Convert replica count to byte count, add some delta for .meta and VERSION files.
|
||||
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
|
||||
capacities = new long[] { ramDiskStorageLimit, -1 };
|
||||
}
|
||||
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(REPL_FACTOR)
|
||||
.storageCapacities(capacities)
|
||||
.storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
client = fs.getClient();
|
||||
try {
|
||||
jmx = initJMX();
|
||||
} catch (Exception e) {
|
||||
fail("Failed initialize JMX for testing: " + e);
|
||||
}
|
||||
LOG.info("Cluster startup complete");
|
||||
}
|
||||
|
||||
private void startUpCluster(boolean hasTransientStorage,
|
||||
final int ramDiskReplicaCapacity)
|
||||
throws IOException {
|
||||
startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
|
||||
}
|
||||
|
||||
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
||||
|
||||
if (isLazyPersist) {
|
||||
createFlags.add(LAZY_PERSIST);
|
||||
}
|
||||
|
||||
FSDataOutputStream fos = null;
|
||||
try {
|
||||
fos =
|
||||
fs.create(path,
|
||||
FsPermission.getFileDefault(),
|
||||
createFlags,
|
||||
BUFFER_LENGTH,
|
||||
REPL_FACTOR,
|
||||
BLOCK_SIZE,
|
||||
null);
|
||||
|
||||
// Allocate a block.
|
||||
byte[] buffer = new byte[BUFFER_LENGTH];
|
||||
for (int bytesWritten = 0; bytesWritten < length; ) {
|
||||
fos.write(buffer, 0, buffer.length);
|
||||
bytesWritten += buffer.length;
|
||||
}
|
||||
if (length > 0) {
|
||||
fos.hsync();
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
private LocatedBlocks ensureFileReplicasOnStorageType(
|
||||
Path path, StorageType storageType) throws IOException {
|
||||
// Ensure that returned block locations returned are correct!
|
||||
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
|
||||
assertThat(fs.exists(path), is(true));
|
||||
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||
LocatedBlocks locatedBlocks =
|
||||
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
|
||||
}
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
|
||||
long seed) throws IOException {
|
||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||
BLOCK_SIZE, REPL_FACTOR, seed, true);
|
||||
}
|
||||
|
||||
private boolean verifyReadRandomFile(
|
||||
Path path, int fileLength, int seed) throws IOException {
|
||||
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
|
||||
byte expected[] = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(seed, fileLength);
|
||||
return Arrays.equals(contents, expected);
|
||||
}
|
||||
|
||||
private boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
|
||||
throws IOException, InterruptedException {
|
||||
|
||||
LOG.info("Verifying replica has no saved copy after deletion.");
|
||||
triggerBlockReport();
|
||||
|
||||
while(
|
||||
DataNodeTestUtils.getPendingAsyncDeletions(cluster.getDataNodes().get(0))
|
||||
> 0L){
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
List<? extends FsVolumeSpi> volumes =
|
||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||
|
||||
// Make sure deleted replica does not have a copy on either finalized dir of
|
||||
// transient volume or finalized dir of non-transient volume
|
||||
for (FsVolumeSpi v : volumes) {
|
||||
FsVolumeImpl volume = (FsVolumeImpl) v;
|
||||
File targetDir = (v.isTransientStorage()) ?
|
||||
volume.getBlockPoolSlice(bpid).getFinalizedDir() :
|
||||
volume.getBlockPoolSlice(bpid).getLazypersistDir();
|
||||
if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean verifyBlockDeletedFromDir(File dir, LocatedBlocks locatedBlocks) {
|
||||
|
||||
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
||||
File targetDir =
|
||||
DatanodeUtil.idToBlockDir(dir, lb.getBlock().getBlockId());
|
||||
|
||||
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
|
||||
if (blockFile.exists()) {
|
||||
LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
|
||||
" exists after deletion.");
|
||||
return false;
|
||||
}
|
||||
File metaFile = new File(targetDir,
|
||||
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
|
||||
lb.getBlock().getGenerationStamp()));
|
||||
if (metaFile.exists()) {
|
||||
LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
|
||||
" exists after deletion.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void triggerBlockReport()
|
||||
throws IOException, InterruptedException {
|
||||
// Trigger block report to NN
|
||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
||||
Thread.sleep(10 * 1000);
|
||||
}
|
||||
|
||||
class WriterRunnable implements Runnable {
|
||||
private final int id;
|
||||
private final Path paths[];
|
||||
|
@ -960,27 +725,4 @@ public class TestLazyPersistFiles {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
JMXGet initJMX() throws Exception
|
||||
{
|
||||
JMXGet jmx = new JMXGet();
|
||||
jmx.setService(JMX_SERVICE_NAME);
|
||||
jmx.init();
|
||||
return jmx;
|
||||
}
|
||||
|
||||
void printRamDiskJMXMetrics() {
|
||||
try {
|
||||
if (jmx != null) {
|
||||
jmx.printAllMatchedAttributes(JMX_RAM_DISK_METRICS_PATTERN);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
void verifyRamDiskJMXMetric(String metricName, long expectedValue)
|
||||
throws Exception {
|
||||
assertEquals(expectedValue, Integer.parseInt(jmx.getValue(metricName)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,84 +15,44 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.*;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.*;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.fs.ChecksumException;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.ClientContext;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.NativeCodeLoader;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class TestScrLazyPersistFiles {
|
||||
public static final Log LOG = LogFactory.getLog(TestLazyPersistFiles.class);
|
||||
|
||||
static {
|
||||
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
private static short REPL_FACTOR = 1;
|
||||
private static final int BLOCK_SIZE = 10485760; // 10 MB
|
||||
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
|
||||
private static final long HEARTBEAT_INTERVAL_SEC = 1;
|
||||
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
|
||||
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
|
||||
private static final int BUFFER_LENGTH = 4096;
|
||||
private static TemporarySocketDirectory sockDir;
|
||||
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
private DFSClient client;
|
||||
private Configuration conf;
|
||||
public class TestScrLazyPersistFiles extends LazyPersistTestCase {
|
||||
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
sockDir = new TemporarySocketDirectory();
|
||||
DomainSocket.disableBindPathValidation();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
sockDir.close();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
|
||||
|
@ -100,26 +60,14 @@ public class TestScrLazyPersistFiles {
|
|||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutDownCluster() throws IOException {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
fs = null;
|
||||
client = null;
|
||||
}
|
||||
|
||||
if (cluster != null) {
|
||||
cluster.shutdownDataNodes();
|
||||
cluster.shutdown();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
@Rule
|
||||
public ExpectedException exception = ExpectedException.none();
|
||||
|
||||
/**
|
||||
* Read in-memory block with Short Circuit Read
|
||||
* Note: the test uses faked RAM_DISK from physical disk.
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
@Test
|
||||
public void testRamDiskShortCircuitRead()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(REPL_FACTOR,
|
||||
|
@ -160,7 +108,7 @@ public class TestScrLazyPersistFiles {
|
|||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test (timeout=300000000)
|
||||
@Test
|
||||
public void testRamDiskEvictionWithShortCircuitReadHandle()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
|
||||
|
@ -204,123 +152,149 @@ public class TestScrLazyPersistFiles {
|
|||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||
}
|
||||
|
||||
// ---- Utility functions for all test cases -------------------------------
|
||||
|
||||
/**
|
||||
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
|
||||
* capped. If ramDiskStorageLimit < 0 then it is ignored.
|
||||
*/
|
||||
private void startUpCluster(final int numDataNodes,
|
||||
final StorageType[] storageTypes,
|
||||
final long ramDiskStorageLimit,
|
||||
final boolean useSCR)
|
||||
throws IOException {
|
||||
|
||||
conf = new Configuration();
|
||||
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC,
|
||||
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
|
||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
|
||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
HEARTBEAT_RECHECK_INTERVAL_MSEC);
|
||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
|
||||
LAZY_WRITER_INTERVAL_SEC);
|
||||
|
||||
if (useSCR)
|
||||
{
|
||||
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
|
||||
conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT,
|
||||
UUID.randomUUID().toString());
|
||||
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
|
||||
new File(sockDir.getDir(),
|
||||
"TestShortCircuitLocalReadHandle._PORT.sock").getAbsolutePath());
|
||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
}
|
||||
|
||||
REPL_FACTOR = 1; //Reset in case a test has modified the value
|
||||
|
||||
cluster = new MiniDFSCluster
|
||||
.Builder(conf)
|
||||
.numDataNodes(numDataNodes)
|
||||
.storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
|
||||
.build();
|
||||
fs = cluster.getFileSystem();
|
||||
client = fs.getClient();
|
||||
|
||||
// Artificially cap the storage capacity of the RAM_DISK volume.
|
||||
if (ramDiskStorageLimit >= 0) {
|
||||
List<? extends FsVolumeSpi> volumes =
|
||||
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
|
||||
|
||||
for (FsVolumeSpi volume : volumes) {
|
||||
if (volume.getStorageType() == RAM_DISK) {
|
||||
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info("Cluster startup complete");
|
||||
}
|
||||
|
||||
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE);
|
||||
|
||||
if (isLazyPersist) {
|
||||
createFlags.add(LAZY_PERSIST);
|
||||
}
|
||||
|
||||
FSDataOutputStream fos = null;
|
||||
try {
|
||||
fos =
|
||||
fs.create(path,
|
||||
FsPermission.getFileDefault(),
|
||||
createFlags,
|
||||
BUFFER_LENGTH,
|
||||
REPL_FACTOR,
|
||||
BLOCK_SIZE,
|
||||
null);
|
||||
|
||||
// Allocate a block.
|
||||
byte[] buffer = new byte[BUFFER_LENGTH];
|
||||
for (int bytesWritten = 0; bytesWritten < length; ) {
|
||||
fos.write(buffer, 0, buffer.length);
|
||||
bytesWritten += buffer.length;
|
||||
}
|
||||
if (length > 0) {
|
||||
fos.hsync();
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeQuietly(fos);
|
||||
}
|
||||
}
|
||||
|
||||
private LocatedBlocks ensureFileReplicasOnStorageType(
|
||||
Path path, StorageType storageType) throws IOException {
|
||||
// Ensure that returned block locations returned are correct!
|
||||
LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
|
||||
assertThat(fs.exists(path), is(true));
|
||||
long fileLength = client.getFileInfo(path.toString()).getLen();
|
||||
LocatedBlocks locatedBlocks =
|
||||
client.getLocatedBlocks(path.toString(), 0, fileLength);
|
||||
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
|
||||
assertThat(locatedBlock.getStorageTypes()[0], is(storageType));
|
||||
}
|
||||
return locatedBlocks;
|
||||
}
|
||||
|
||||
private void makeRandomTestFile(Path path, long length, final boolean isLazyPersist,
|
||||
long seed) throws IOException {
|
||||
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
|
||||
BLOCK_SIZE, REPL_FACTOR, seed, true);
|
||||
}
|
||||
|
||||
private void triggerBlockReport()
|
||||
@Test
|
||||
public void testShortCircuitReadAfterEviction()
|
||||
throws IOException, InterruptedException {
|
||||
// Trigger block report to NN
|
||||
DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
|
||||
Thread.sleep(10 * 1000);
|
||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
|
||||
doShortCircuitReadAfterEvictionTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLegacyShortCircuitReadAfterEviction()
|
||||
throws IOException, InterruptedException {
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
|
||||
doShortCircuitReadAfterEvictionTest();
|
||||
}
|
||||
|
||||
private void doShortCircuitReadAfterEvictionTest() throws IOException,
|
||||
InterruptedException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||
|
||||
final int SEED = 0xFADED;
|
||||
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||
|
||||
// Verify short-circuit read from RAM_DISK.
|
||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
|
||||
DFSTestUtil.getFirstBlock(fs, path1));
|
||||
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
|
||||
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
|
||||
|
||||
// Sleep for a short time to allow the lazy writer thread to do its job.
|
||||
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||
|
||||
// Verify short-circuit read from RAM_DISK once again.
|
||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
|
||||
DFSTestUtil.getFirstBlock(fs, path1));
|
||||
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
|
||||
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
|
||||
|
||||
// Create another file with a replica on RAM_DISK, which evicts the first.
|
||||
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
|
||||
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||
triggerBlockReport();
|
||||
|
||||
// Verify short-circuit read still works from DEFAULT storage. This time,
|
||||
// we'll have a checksum written during lazy persistence.
|
||||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
|
||||
DFSTestUtil.getFirstBlock(fs, path1));
|
||||
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
|
||||
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
|
||||
|
||||
// In the implementation of legacy short-circuit reads, any failure is
|
||||
// trapped silently, reverts back to a remote read, and also disables all
|
||||
// subsequent legacy short-circuit reads in the ClientContext. If the test
|
||||
// uses legacy, then assert that it didn't get disabled.
|
||||
ClientContext clientContext = client.getClientContext();
|
||||
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
||||
Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShortCircuitReadBlockFileCorruption() throws IOException,
|
||||
InterruptedException {
|
||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
|
||||
doShortCircuitReadBlockFileCorruptionTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
|
||||
InterruptedException {
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
|
||||
doShortCircuitReadBlockFileCorruptionTest();
|
||||
}
|
||||
|
||||
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
|
||||
InterruptedException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||
|
||||
final int SEED = 0xFADED;
|
||||
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
|
||||
// Create another file with a replica on RAM_DISK, which evicts the first.
|
||||
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
|
||||
|
||||
// Sleep for a short time to allow the lazy writer thread to do its job.
|
||||
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||
triggerBlockReport();
|
||||
|
||||
// Corrupt the lazy-persisted block file, and verify that checksum
|
||||
// verification catches it.
|
||||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||
MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
|
||||
exception.expect(ChecksumException.class);
|
||||
DFSTestUtil.readFileBuffer(fs, path1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShortCircuitReadMetaFileCorruption() throws IOException,
|
||||
InterruptedException {
|
||||
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, false);
|
||||
doShortCircuitReadMetaFileCorruptionTest();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
|
||||
InterruptedException {
|
||||
startUpCluster(true, 1 + EVICTION_LOW_WATERMARK, true, true);
|
||||
doShortCircuitReadMetaFileCorruptionTest();
|
||||
}
|
||||
|
||||
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
|
||||
InterruptedException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
|
||||
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
|
||||
|
||||
final int SEED = 0xFADED;
|
||||
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
|
||||
ensureFileReplicasOnStorageType(path1, RAM_DISK);
|
||||
|
||||
// Create another file with a replica on RAM_DISK, which evicts the first.
|
||||
makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
|
||||
|
||||
// Sleep for a short time to allow the lazy writer thread to do its job.
|
||||
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
|
||||
triggerBlockReport();
|
||||
|
||||
// Corrupt the lazy-persisted checksum file, and verify that checksum
|
||||
// verification catches it.
|
||||
ensureFileReplicasOnStorageType(path1, DEFAULT);
|
||||
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
|
||||
DFSTestUtil.getFirstBlock(fs, path1));
|
||||
MiniDFSCluster.corruptBlock(metaFile);
|
||||
exception.expect(ChecksumException.class);
|
||||
DFSTestUtil.readFileBuffer(fs, path1);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue