HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via Colin Patrick McCabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1615223 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
07860b1c9e
commit
1ba3f89714
|
@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.HardLink;
|
||||||
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
|
||||||
import org.apache.hadoop.util.NativeCodeLoader;
|
import org.apache.hadoop.util.NativeCodeLoader;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
@ -823,6 +824,14 @@ public class NativeIO {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void link(File src, File dst) throws IOException {
|
||||||
|
if (!nativeLoaded) {
|
||||||
|
HardLink.createHardLink(src, dst);
|
||||||
|
} else {
|
||||||
|
link0(src.getAbsolutePath(), dst.getAbsolutePath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A version of renameTo that throws a descriptive exception when it fails.
|
* A version of renameTo that throws a descriptive exception when it fails.
|
||||||
*
|
*
|
||||||
|
@ -833,4 +842,7 @@ public class NativeIO {
|
||||||
*/
|
*/
|
||||||
private static native void renameTo0(String src, String dst)
|
private static native void renameTo0(String src, String dst)
|
||||||
throws NativeIOException;
|
throws NativeIOException;
|
||||||
|
|
||||||
|
private static native void link0(String src, String dst)
|
||||||
|
throws NativeIOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,20 @@ public class DiskChecker {
|
||||||
(canonDir.mkdir() || canonDir.exists()));
|
(canonDir.mkdir() || canonDir.exists()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recurse down a directory tree, checking all child directories.
|
||||||
|
* @param dir
|
||||||
|
* @throws DiskErrorException
|
||||||
|
*/
|
||||||
|
public static void checkDirs(File dir) throws DiskErrorException {
|
||||||
|
checkDir(dir);
|
||||||
|
for (File child : dir.listFiles()) {
|
||||||
|
if (child.isDirectory()) {
|
||||||
|
checkDirs(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the directory if it doesn't exist and check that dir is readable,
|
* Create the directory if it doesn't exist and check that dir is readable,
|
||||||
* writable and executable
|
* writable and executable
|
||||||
|
|
|
@ -1054,6 +1054,43 @@ done:
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
JNIEXPORT void JNICALL
|
||||||
|
Java_org_apache_hadoop_io_nativeio_NativeIO_link0(JNIEnv *env,
|
||||||
|
jclass clazz, jstring jsrc, jstring jdst)
|
||||||
|
{
|
||||||
|
#ifdef UNIX
|
||||||
|
const char *src = NULL, *dst = NULL;
|
||||||
|
|
||||||
|
src = (*env)->GetStringUTFChars(env, jsrc, NULL);
|
||||||
|
if (!src) goto done; // exception was thrown
|
||||||
|
dst = (*env)->GetStringUTFChars(env, jdst, NULL);
|
||||||
|
if (!dst) goto done; // exception was thrown
|
||||||
|
if (link(src, dst)) {
|
||||||
|
throw_ioe(env, errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src);
|
||||||
|
if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef WINDOWS
|
||||||
|
LPCTSTR src = NULL, dst = NULL;
|
||||||
|
|
||||||
|
src = (LPCTSTR) (*env)->GetStringChars(env, jsrc, NULL);
|
||||||
|
if (!src) goto done; // exception was thrown
|
||||||
|
dst = (LPCTSTR) (*env)->GetStringChars(env, jdst, NULL);
|
||||||
|
if (!dst) goto done; // exception was thrown
|
||||||
|
if (!CreateHardLink(dst, src)) {
|
||||||
|
throw_ioe(env, GetLastError());
|
||||||
|
}
|
||||||
|
|
||||||
|
done:
|
||||||
|
if (src) (*env)->ReleaseStringChars(env, jsrc, src);
|
||||||
|
if (dst) (*env)->ReleaseStringChars(env, jdst, dst);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
JNIEXPORT jlong JNICALL
|
JNIEXPORT jlong JNICALL
|
||||||
Java_org_apache_hadoop_io_nativeio_NativeIO_getMemlockLimit0(
|
Java_org_apache_hadoop_io_nativeio_NativeIO_getMemlockLimit0(
|
||||||
JNIEnv *env, jclass clazz)
|
JNIEnv *env, jclass clazz)
|
||||||
|
|
|
@ -130,6 +130,9 @@ Trunk (Unreleased)
|
||||||
HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
|
HDFS-6609. Use DirectorySnapshottableFeature to represent a snapshottable
|
||||||
directory. (Jing Zhao via wheat9)
|
directory. (Jing Zhao via wheat9)
|
||||||
|
|
||||||
|
HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via
|
||||||
|
Colin Patrick McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -381,8 +381,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
|
public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
|
||||||
public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
|
public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
|
||||||
public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
|
public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
|
||||||
public static final String DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks";
|
|
||||||
public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
|
|
||||||
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
|
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
|
||||||
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
|
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
|
||||||
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
|
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
|
||||||
|
@ -666,4 +664,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
|
public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
|
||||||
"dfs.datanode.slow.io.warning.threshold.ms";
|
"dfs.datanode.slow.io.warning.threshold.ms";
|
||||||
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
|
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
|
||||||
|
|
||||||
|
public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
|
||||||
|
"dfs.datanode.block.id.layout.upgrade.threads";
|
||||||
|
public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,9 @@ public class Block implements Writable, Comparable<Block> {
|
||||||
public static final Pattern metaFilePattern = Pattern
|
public static final Pattern metaFilePattern = Pattern
|
||||||
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
|
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
|
||||||
+ "$");
|
+ "$");
|
||||||
|
public static final Pattern metaOrBlockFilePattern = Pattern
|
||||||
|
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION
|
||||||
|
+ ")?$");
|
||||||
|
|
||||||
public static boolean isBlockFilename(File f) {
|
public static boolean isBlockFilename(File f) {
|
||||||
String name = f.getName();
|
String name = f.getName();
|
||||||
|
@ -65,6 +68,11 @@ public class Block implements Writable, Comparable<Block> {
|
||||||
return metaFilePattern.matcher(name).matches();
|
return metaFilePattern.matcher(name).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static File metaToBlockFile(File metaFile) {
|
||||||
|
return new File(metaFile.getParent(), metaFile.getName().substring(
|
||||||
|
0, metaFile.getName().lastIndexOf('_')));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get generation stamp from the name of the metafile name
|
* Get generation stamp from the name of the metafile name
|
||||||
*/
|
*/
|
||||||
|
@ -75,10 +83,10 @@ public class Block implements Writable, Comparable<Block> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the blockId from the name of the metafile name
|
* Get the blockId from the name of the meta or block file
|
||||||
*/
|
*/
|
||||||
public static long getBlockId(String metaFile) {
|
public static long getBlockId(String metaOrBlockFile) {
|
||||||
Matcher m = metaFilePattern.matcher(metaFile);
|
Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile);
|
||||||
return m.matches() ? Long.parseLong(m.group(1)) : 0;
|
return m.matches() ? Long.parseLong(m.group(1)) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
// During startup some of them can upgrade or roll back
|
// During startup some of them can upgrade or roll back
|
||||||
// while others could be up-to-date for the regular startup.
|
// while others could be up-to-date for the regular startup.
|
||||||
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
for (int idx = 0; idx < getNumStorageDirs(); idx++) {
|
||||||
doTransition(getStorageDir(idx), nsInfo, startOpt);
|
doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
|
||||||
assert getCTime() == nsInfo.getCTime()
|
assert getCTime() == nsInfo.getCTime()
|
||||||
: "Data-node and name-node CTimes must be the same.";
|
: "Data-node and name-node CTimes must be the same.";
|
||||||
}
|
}
|
||||||
|
@ -242,7 +242,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param startOpt startup option
|
* @param startOpt startup option
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void doTransition(StorageDirectory sd,
|
private void doTransition(DataNode datanode, StorageDirectory sd,
|
||||||
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
|
||||||
if (startOpt == StartupOption.ROLLBACK) {
|
if (startOpt == StartupOption.ROLLBACK) {
|
||||||
doRollback(sd, nsInfo); // rollback if applicable
|
doRollback(sd, nsInfo); // rollback if applicable
|
||||||
|
@ -275,7 +275,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
}
|
}
|
||||||
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|
||||||
|| this.cTime < nsInfo.getCTime()) {
|
|| this.cTime < nsInfo.getCTime()) {
|
||||||
doUpgrade(sd, nsInfo); // upgrade
|
doUpgrade(datanode, sd, nsInfo); // upgrade
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
|
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
|
||||||
|
@ -304,7 +304,8 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param nsInfo Namespace Info from the namenode
|
* @param nsInfo Namespace Info from the namenode
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
|
void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
|
||||||
|
throws IOException {
|
||||||
// Upgrading is applicable only to release with federation or after
|
// Upgrading is applicable only to release with federation or after
|
||||||
if (!DataNodeLayoutVersion.supports(
|
if (!DataNodeLayoutVersion.supports(
|
||||||
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
|
||||||
|
@ -340,7 +341,7 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
rename(bpCurDir, bpTmpDir);
|
rename(bpCurDir, bpTmpDir);
|
||||||
|
|
||||||
// 3. Create new <SD>/current with block files hardlinks and VERSION
|
// 3. Create new <SD>/current with block files hardlinks and VERSION
|
||||||
linkAllBlocks(bpTmpDir, bpCurDir);
|
linkAllBlocks(datanode, bpTmpDir, bpCurDir);
|
||||||
this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
|
this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
|
||||||
assert this.namespaceID == nsInfo.getNamespaceID()
|
assert this.namespaceID == nsInfo.getNamespaceID()
|
||||||
: "Data-node and name-node layout versions must be the same.";
|
: "Data-node and name-node layout versions must be the same.";
|
||||||
|
@ -517,14 +518,15 @@ public class BlockPoolSliceStorage extends Storage {
|
||||||
* @param toDir the current data directory
|
* @param toDir the current data directory
|
||||||
* @throws IOException if error occurs during hardlink
|
* @throws IOException if error occurs during hardlink
|
||||||
*/
|
*/
|
||||||
private void linkAllBlocks(File fromDir, File toDir) throws IOException {
|
private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
|
||||||
|
throws IOException {
|
||||||
// do the link
|
// do the link
|
||||||
int diskLayoutVersion = this.getLayoutVersion();
|
int diskLayoutVersion = this.getLayoutVersion();
|
||||||
// hardlink finalized blocks in tmpDir
|
// hardlink finalized blocks in tmpDir
|
||||||
HardLink hardLink = new HardLink();
|
HardLink hardLink = new HardLink();
|
||||||
DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
|
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
|
||||||
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
||||||
DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW),
|
DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
|
||||||
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
||||||
LOG.info( hardLink.linkStats.report() );
|
LOG.info( hardLink.linkStats.report() );
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public static enum Feature implements LayoutFeature {
|
public static enum Feature implements LayoutFeature {
|
||||||
FIRST_LAYOUT(-55, -53, "First datanode layout", false);
|
FIRST_LAYOUT(-55, -53, "First datanode layout", false),
|
||||||
|
BLOCKID_BASED_LAYOUT(-56,
|
||||||
|
"The block ID of a finalized block uniquely determines its position " +
|
||||||
|
"in the directory structure");
|
||||||
|
|
||||||
private final FeatureInfo info;
|
private final FeatureInfo info;
|
||||||
|
|
||||||
|
|
|
@ -18,13 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.*;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.HardLink;
|
||||||
|
import org.apache.hadoop.fs.LocalFileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||||
|
@ -35,13 +41,30 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
import java.nio.channels.FileLock;
|
import java.nio.channels.FileLock;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Data storage information file.
|
* Data storage information file.
|
||||||
|
@ -261,6 +284,7 @@ public class DataStorage extends Storage {
|
||||||
STORAGE_DIR_CURRENT));
|
STORAGE_DIR_CURRENT));
|
||||||
bpDataDirs.add(bpRoot);
|
bpDataDirs.add(bpRoot);
|
||||||
}
|
}
|
||||||
|
|
||||||
// mkdir for the list of BlockPoolStorage
|
// mkdir for the list of BlockPoolStorage
|
||||||
makeBlockPoolDataDir(bpDataDirs, null);
|
makeBlockPoolDataDir(bpDataDirs, null);
|
||||||
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
|
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
|
||||||
|
@ -488,7 +512,7 @@ public class DataStorage extends Storage {
|
||||||
|
|
||||||
// do upgrade
|
// do upgrade
|
||||||
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
|
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
|
||||||
doUpgrade(sd, nsInfo); // upgrade
|
doUpgrade(datanode, sd, nsInfo); // upgrade
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,7 +547,8 @@ public class DataStorage extends Storage {
|
||||||
* @param sd storage directory
|
* @param sd storage directory
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
|
void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
|
||||||
|
throws IOException {
|
||||||
// If the existing on-disk layout version supportes federation, simply
|
// If the existing on-disk layout version supportes federation, simply
|
||||||
// update its layout version.
|
// update its layout version.
|
||||||
if (DataNodeLayoutVersion.supports(
|
if (DataNodeLayoutVersion.supports(
|
||||||
|
@ -568,7 +593,8 @@ public class DataStorage extends Storage {
|
||||||
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
|
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
|
||||||
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
|
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
|
||||||
bpStorage.format(curDir, nsInfo);
|
bpStorage.format(curDir, nsInfo);
|
||||||
linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
|
linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
|
||||||
|
STORAGE_DIR_CURRENT));
|
||||||
|
|
||||||
// 4. Write version file under <SD>/current
|
// 4. Write version file under <SD>/current
|
||||||
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
|
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
|
||||||
|
@ -746,22 +772,22 @@ public class DataStorage extends Storage {
|
||||||
*
|
*
|
||||||
* @throws IOException If error occurs during hardlink
|
* @throws IOException If error occurs during hardlink
|
||||||
*/
|
*/
|
||||||
private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
|
private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
|
||||||
throws IOException {
|
File toDir) throws IOException {
|
||||||
HardLink hardLink = new HardLink();
|
HardLink hardLink = new HardLink();
|
||||||
// do the link
|
// do the link
|
||||||
int diskLayoutVersion = this.getLayoutVersion();
|
int diskLayoutVersion = this.getLayoutVersion();
|
||||||
if (DataNodeLayoutVersion.supports(
|
if (DataNodeLayoutVersion.supports(
|
||||||
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
|
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
|
||||||
// hardlink finalized blocks in tmpDir/finalized
|
// hardlink finalized blocks in tmpDir/finalized
|
||||||
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
|
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
|
||||||
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
||||||
// hardlink rbw blocks in tmpDir/rbw
|
// hardlink rbw blocks in tmpDir/rbw
|
||||||
linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
|
linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
|
||||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
||||||
} else { // pre-RBW version
|
} else { // pre-RBW version
|
||||||
// hardlink finalized blocks in tmpDir
|
// hardlink finalized blocks in tmpDir
|
||||||
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
|
linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
|
||||||
diskLayoutVersion, hardLink);
|
diskLayoutVersion, hardLink);
|
||||||
if (fromBbwDir.exists()) {
|
if (fromBbwDir.exists()) {
|
||||||
/*
|
/*
|
||||||
|
@ -770,15 +796,67 @@ public class DataStorage extends Storage {
|
||||||
* NOT underneath the 'current' directory in those releases. See
|
* NOT underneath the 'current' directory in those releases. See
|
||||||
* HDFS-3731 for details.
|
* HDFS-3731 for details.
|
||||||
*/
|
*/
|
||||||
linkBlocks(fromBbwDir,
|
linkBlocks(datanode, fromBbwDir,
|
||||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info( hardLink.linkStats.report() );
|
LOG.info( hardLink.linkStats.report() );
|
||||||
}
|
}
|
||||||
|
|
||||||
static void linkBlocks(File from, File to, int oldLV, HardLink hl)
|
private static class LinkArgs {
|
||||||
throws IOException {
|
public File src;
|
||||||
|
public File dst;
|
||||||
|
|
||||||
|
public LinkArgs(File src, File dst) {
|
||||||
|
this.src = src;
|
||||||
|
this.dst = dst;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
|
||||||
|
HardLink hl) throws IOException {
|
||||||
|
boolean upgradeToIdBasedLayout = false;
|
||||||
|
// If we are upgrading from a version older than the one where we introduced
|
||||||
|
// block ID-based layout AND we're working with the finalized directory,
|
||||||
|
// we'll need to upgrade from the old flat layout to the block ID-based one
|
||||||
|
if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
|
||||||
|
getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
|
||||||
|
upgradeToIdBasedLayout = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
|
||||||
|
linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
|
||||||
|
idBasedLayoutSingleLinks);
|
||||||
|
int numLinkWorkers = datanode.getConf().getInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
|
||||||
|
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
|
||||||
|
final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
|
||||||
|
List<Future<Void>> futures = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
|
||||||
|
final int iCopy = i;
|
||||||
|
futures.add(linkWorkers.submit(new Callable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws IOException {
|
||||||
|
int upperBound = Math.min(iCopy + step,
|
||||||
|
idBasedLayoutSingleLinks.size());
|
||||||
|
for (int j = iCopy; j < upperBound; j++) {
|
||||||
|
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
|
||||||
|
NativeIO.link(cur.src, cur.dst);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
linkWorkers.shutdown();
|
||||||
|
for (Future<Void> f : futures) {
|
||||||
|
Futures.get(f, IOException.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
|
||||||
|
boolean upgradeToIdBasedLayout, File blockRoot,
|
||||||
|
List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
|
||||||
if (!from.exists()) {
|
if (!from.exists()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -805,9 +883,6 @@ public class DataStorage extends Storage {
|
||||||
// from is a directory
|
// from is a directory
|
||||||
hl.linkStats.countDirs++;
|
hl.linkStats.countDirs++;
|
||||||
|
|
||||||
if (!to.mkdirs())
|
|
||||||
throw new IOException("Cannot create directory " + to);
|
|
||||||
|
|
||||||
String[] blockNames = from.list(new java.io.FilenameFilter() {
|
String[] blockNames = from.list(new java.io.FilenameFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(File dir, String name) {
|
public boolean accept(File dir, String name) {
|
||||||
|
@ -815,12 +890,36 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// If we are upgrading to block ID-based layout, we don't want to recreate
|
||||||
|
// any subdirs from the source that contain blocks, since we have a new
|
||||||
|
// directory structure
|
||||||
|
if (!upgradeToIdBasedLayout || !to.getName().startsWith(
|
||||||
|
BLOCK_SUBDIR_PREFIX)) {
|
||||||
|
if (!to.mkdirs())
|
||||||
|
throw new IOException("Cannot create directory " + to);
|
||||||
|
}
|
||||||
|
|
||||||
// Block files just need hard links with the same file names
|
// Block files just need hard links with the same file names
|
||||||
// but a different directory
|
// but a different directory
|
||||||
if (blockNames.length > 0) {
|
if (blockNames.length > 0) {
|
||||||
HardLink.createHardLinkMult(from, blockNames, to);
|
if (upgradeToIdBasedLayout) {
|
||||||
hl.linkStats.countMultLinks++;
|
for (String blockName : blockNames) {
|
||||||
hl.linkStats.countFilesMultLinks += blockNames.length;
|
long blockId = Block.getBlockId(blockName);
|
||||||
|
File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
|
||||||
|
if (!blockLocation.exists()) {
|
||||||
|
if (!blockLocation.mkdirs()) {
|
||||||
|
throw new IOException("Failed to mkdirs " + blockLocation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
|
||||||
|
new File(blockLocation, blockName)));
|
||||||
|
hl.linkStats.countSingleLinks++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
HardLink.createHardLinkMult(from, blockNames, to);
|
||||||
|
hl.linkStats.countMultLinks++;
|
||||||
|
hl.linkStats.countFilesMultLinks += blockNames.length;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
hl.linkStats.countEmptyDirs++;
|
hl.linkStats.countEmptyDirs++;
|
||||||
}
|
}
|
||||||
|
@ -834,8 +933,9 @@ public class DataStorage extends Storage {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
for(int i = 0; i < otherNames.length; i++)
|
for(int i = 0; i < otherNames.length; i++)
|
||||||
linkBlocks(new File(from, otherNames[i]),
|
linkBlocksHelper(new File(from, otherNames[i]),
|
||||||
new File(to, otherNames[i]), oldLV, hl);
|
new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
|
||||||
|
blockRoot, idBasedLayoutSingleLinks);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,6 +30,8 @@ public class DatanodeUtil {
|
||||||
|
|
||||||
public static final String DISK_ERROR = "Possible disk error: ";
|
public static final String DISK_ERROR = "Possible disk error: ";
|
||||||
|
|
||||||
|
private static final String SEP = System.getProperty("file.separator");
|
||||||
|
|
||||||
/** Get the cause of an I/O exception if caused by a possible disk error
|
/** Get the cause of an I/O exception if caused by a possible disk error
|
||||||
* @param ioe an I/O exception
|
* @param ioe an I/O exception
|
||||||
* @return cause if the I/O exception is caused by a possible disk error;
|
* @return cause if the I/O exception is caused by a possible disk error;
|
||||||
|
@ -78,4 +80,38 @@ public class DatanodeUtil {
|
||||||
public static File getUnlinkTmpFile(File f) {
|
public static File getUnlinkTmpFile(File f) {
|
||||||
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
|
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether there are any files anywhere in the directory tree rooted
|
||||||
|
* at dir (directories don't count as files). dir must exist
|
||||||
|
* @return true if there are no files
|
||||||
|
* @throws IOException if unable to list subdirectories
|
||||||
|
*/
|
||||||
|
public static boolean dirNoFilesRecursive(File dir) throws IOException {
|
||||||
|
File[] contents = dir.listFiles();
|
||||||
|
if (contents == null) {
|
||||||
|
throw new IOException("Cannot list contents of " + dir);
|
||||||
|
}
|
||||||
|
for (File f : contents) {
|
||||||
|
if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the directory where a finalized block with this ID should be stored.
|
||||||
|
* Do not attempt to create the directory.
|
||||||
|
* @param root the root directory where finalized blocks are stored
|
||||||
|
* @param blockId
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static File idToBlockDir(File root, long blockId) {
|
||||||
|
int d1 = (int)((blockId >> 16) & 0xff);
|
||||||
|
int d2 = (int)((blockId >> 8) & 0xff);
|
||||||
|
String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
|
||||||
|
DataStorage.BLOCK_SUBDIR_PREFIX + d2;
|
||||||
|
return new File(root, path);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,10 +54,10 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
private File baseDir;
|
private File baseDir;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ints representing the sub directory path from base dir to the directory
|
* Whether or not this replica's parent directory includes subdirs, in which
|
||||||
* containing this replica.
|
* case we can generate them based on the replica's block ID
|
||||||
*/
|
*/
|
||||||
private int[] subDirs;
|
private boolean hasSubdirs;
|
||||||
|
|
||||||
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
|
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
|
||||||
|
|
||||||
|
@ -151,18 +151,8 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
* @return the parent directory path where this replica is located
|
* @return the parent directory path where this replica is located
|
||||||
*/
|
*/
|
||||||
File getDir() {
|
File getDir() {
|
||||||
if (subDirs == null) {
|
return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
|
||||||
return null;
|
getBlockId()) : baseDir;
|
||||||
}
|
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (int i : subDirs) {
|
|
||||||
sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
|
|
||||||
sb.append(i);
|
|
||||||
sb.append("/");
|
|
||||||
}
|
|
||||||
File ret = new File(baseDir, sb.toString());
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -175,54 +165,46 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
||||||
|
|
||||||
private void setDirInternal(File dir) {
|
private void setDirInternal(File dir) {
|
||||||
if (dir == null) {
|
if (dir == null) {
|
||||||
subDirs = null;
|
|
||||||
baseDir = null;
|
baseDir = null;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicaDirInfo replicaDirInfo = parseSubDirs(dir);
|
ReplicaDirInfo dirInfo = parseBaseDir(dir);
|
||||||
this.subDirs = replicaDirInfo.subDirs;
|
this.hasSubdirs = dirInfo.hasSubidrs;
|
||||||
|
|
||||||
synchronized (internedBaseDirs) {
|
synchronized (internedBaseDirs) {
|
||||||
if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) {
|
if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
|
||||||
// Create a new String path of this file and make a brand new File object
|
// Create a new String path of this file and make a brand new File object
|
||||||
// to guarantee we drop the reference to the underlying char[] storage.
|
// to guarantee we drop the reference to the underlying char[] storage.
|
||||||
File baseDir = new File(replicaDirInfo.baseDirPath);
|
File baseDir = new File(dirInfo.baseDirPath);
|
||||||
internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir);
|
internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
|
||||||
}
|
}
|
||||||
this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath);
|
this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static class ReplicaDirInfo {
|
public static class ReplicaDirInfo {
|
||||||
@VisibleForTesting
|
|
||||||
public String baseDirPath;
|
public String baseDirPath;
|
||||||
|
public boolean hasSubidrs;
|
||||||
|
|
||||||
@VisibleForTesting
|
public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
|
||||||
public int[] subDirs;
|
this.baseDirPath = baseDirPath;
|
||||||
|
this.hasSubidrs = hasSubidrs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public static ReplicaDirInfo parseSubDirs(File dir) {
|
public static ReplicaDirInfo parseBaseDir(File dir) {
|
||||||
ReplicaDirInfo ret = new ReplicaDirInfo();
|
|
||||||
|
|
||||||
File currentDir = dir;
|
File currentDir = dir;
|
||||||
List<Integer> subDirList = new ArrayList<Integer>();
|
boolean hasSubdirs = false;
|
||||||
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
|
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
|
||||||
// Prepend the integer into the list.
|
hasSubdirs = true;
|
||||||
subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
|
|
||||||
DataStorage.BLOCK_SUBDIR_PREFIX, "")));
|
|
||||||
currentDir = currentDir.getParentFile();
|
currentDir = currentDir.getParentFile();
|
||||||
}
|
}
|
||||||
ret.subDirs = new int[subDirList.size()];
|
|
||||||
for (int i = 0; i < subDirList.size(); i++) {
|
|
||||||
ret.subDirs[i] = subDirList.get(i);
|
|
||||||
}
|
|
||||||
|
|
||||||
ret.baseDirPath = currentDir.getAbsolutePath();
|
return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -59,7 +59,8 @@ class BlockPoolSlice {
|
||||||
private final String bpid;
|
private final String bpid;
|
||||||
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
|
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
|
||||||
private final File currentDir; // StorageDirectory/current/bpid/current
|
private final File currentDir; // StorageDirectory/current/bpid/current
|
||||||
private final LDir finalizedDir; // directory store Finalized replica
|
// directory where finalized replicas are stored
|
||||||
|
private final File finalizedDir;
|
||||||
private final File rbwDir; // directory store RBW replica
|
private final File rbwDir; // directory store RBW replica
|
||||||
private final File tmpDir; // directory store Temporary replica
|
private final File tmpDir; // directory store Temporary replica
|
||||||
private static final String DU_CACHE_FILE = "dfsUsed";
|
private static final String DU_CACHE_FILE = "dfsUsed";
|
||||||
|
@ -82,8 +83,13 @@ class BlockPoolSlice {
|
||||||
this.bpid = bpid;
|
this.bpid = bpid;
|
||||||
this.volume = volume;
|
this.volume = volume;
|
||||||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||||
final File finalizedDir = new File(
|
this.finalizedDir = new File(
|
||||||
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
|
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
|
||||||
|
if (!this.finalizedDir.exists()) {
|
||||||
|
if (!this.finalizedDir.mkdirs()) {
|
||||||
|
throw new IOException("Failed to mkdirs " + this.finalizedDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Files that were being written when the datanode was last shutdown
|
// Files that were being written when the datanode was last shutdown
|
||||||
// are now moved back to the data directory. It is possible that
|
// are now moved back to the data directory. It is possible that
|
||||||
|
@ -95,10 +101,6 @@ class BlockPoolSlice {
|
||||||
FileUtil.fullyDelete(tmpDir);
|
FileUtil.fullyDelete(tmpDir);
|
||||||
}
|
}
|
||||||
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
|
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
|
||||||
final int maxBlocksPerDir = conf.getInt(
|
|
||||||
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
|
|
||||||
this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
|
|
||||||
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
|
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
|
||||||
if (!rbwDir.isDirectory()) {
|
if (!rbwDir.isDirectory()) {
|
||||||
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
|
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
|
||||||
|
@ -131,7 +133,7 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
|
|
||||||
File getFinalizedDir() {
|
File getFinalizedDir() {
|
||||||
return finalizedDir.dir;
|
return finalizedDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
File getRbwDir() {
|
File getRbwDir() {
|
||||||
|
@ -239,25 +241,56 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
|
|
||||||
File addBlock(Block b, File f) throws IOException {
|
File addBlock(Block b, File f) throws IOException {
|
||||||
File blockFile = finalizedDir.addBlock(b, f);
|
File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
|
||||||
|
if (!blockDir.exists()) {
|
||||||
|
if (!blockDir.mkdirs()) {
|
||||||
|
throw new IOException("Failed to mkdirs " + blockDir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
|
||||||
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
||||||
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
||||||
return blockFile;
|
return blockFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
void checkDirs() throws DiskErrorException {
|
void checkDirs() throws DiskErrorException {
|
||||||
finalizedDir.checkDirTree();
|
DiskChecker.checkDirs(finalizedDir);
|
||||||
DiskChecker.checkDir(tmpDir);
|
DiskChecker.checkDir(tmpDir);
|
||||||
DiskChecker.checkDir(rbwDir);
|
DiskChecker.checkDir(rbwDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||||
// add finalized replicas
|
// add finalized replicas
|
||||||
finalizedDir.getVolumeMap(bpid, volumeMap, volume);
|
addToReplicasMap(volumeMap, finalizedDir, true);
|
||||||
// add rbw replicas
|
// add rbw replicas
|
||||||
addToReplicasMap(volumeMap, rbwDir, false);
|
addToReplicasMap(volumeMap, rbwDir, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recover an unlinked tmp file on datanode restart. If the original block
|
||||||
|
* does not exist, then the tmp file is renamed to be the
|
||||||
|
* original file name and the original name is returned; otherwise the tmp
|
||||||
|
* file is deleted and null is returned.
|
||||||
|
*/
|
||||||
|
File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
|
||||||
|
File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
|
||||||
|
if (blockFile.exists()) {
|
||||||
|
// If the original block file still exists, then no recovery is needed.
|
||||||
|
if (!unlinkedTmp.delete()) {
|
||||||
|
throw new IOException("Unable to cleanup unlinked tmp file " +
|
||||||
|
unlinkedTmp);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
} else {
|
||||||
|
if (!unlinkedTmp.renameTo(blockFile)) {
|
||||||
|
throw new IOException("Unable to rename unlinked tmp file " +
|
||||||
|
unlinkedTmp);
|
||||||
|
}
|
||||||
|
return blockFile;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add replicas under the given directory to the volume map
|
* Add replicas under the given directory to the volume map
|
||||||
* @param volumeMap the replicas map
|
* @param volumeMap the replicas map
|
||||||
|
@ -267,23 +300,34 @@ class BlockPoolSlice {
|
||||||
*/
|
*/
|
||||||
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
File blockFiles[] = FileUtil.listFiles(dir);
|
File files[] = FileUtil.listFiles(dir);
|
||||||
for (File blockFile : blockFiles) {
|
for (File file : files) {
|
||||||
if (!Block.isBlockFilename(blockFile))
|
if (file.isDirectory()) {
|
||||||
|
addToReplicasMap(volumeMap, file, isFinalized);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
|
||||||
|
file = recoverTempUnlinkedBlock(file);
|
||||||
|
if (file == null) { // the original block still exists, so we cover it
|
||||||
|
// in another iteration and can continue here
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!Block.isBlockFilename(file))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
|
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
|
||||||
blockFiles, blockFile);
|
files, file);
|
||||||
long blockId = Block.filename2id(blockFile.getName());
|
long blockId = Block.filename2id(file.getName());
|
||||||
ReplicaInfo newReplica = null;
|
ReplicaInfo newReplica = null;
|
||||||
if (isFinalized) {
|
if (isFinalized) {
|
||||||
newReplica = new FinalizedReplica(blockId,
|
newReplica = new FinalizedReplica(blockId,
|
||||||
blockFile.length(), genStamp, volume, blockFile.getParentFile());
|
file.length(), genStamp, volume, file.getParentFile());
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
boolean loadRwr = true;
|
boolean loadRwr = true;
|
||||||
File restartMeta = new File(blockFile.getParent() +
|
File restartMeta = new File(file.getParent() +
|
||||||
File.pathSeparator + "." + blockFile.getName() + ".restart");
|
File.pathSeparator + "." + file.getName() + ".restart");
|
||||||
Scanner sc = null;
|
Scanner sc = null;
|
||||||
try {
|
try {
|
||||||
sc = new Scanner(restartMeta);
|
sc = new Scanner(restartMeta);
|
||||||
|
@ -291,8 +335,8 @@ class BlockPoolSlice {
|
||||||
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
|
if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
|
||||||
// It didn't expire. Load the replica as a RBW.
|
// It didn't expire. Load the replica as a RBW.
|
||||||
newReplica = new ReplicaBeingWritten(blockId,
|
newReplica = new ReplicaBeingWritten(blockId,
|
||||||
validateIntegrityAndSetLength(blockFile, genStamp),
|
validateIntegrityAndSetLength(file, genStamp),
|
||||||
genStamp, volume, blockFile.getParentFile(), null);
|
genStamp, volume, file.getParentFile(), null);
|
||||||
loadRwr = false;
|
loadRwr = false;
|
||||||
}
|
}
|
||||||
sc.close();
|
sc.close();
|
||||||
|
@ -301,7 +345,7 @@ class BlockPoolSlice {
|
||||||
restartMeta.getPath());
|
restartMeta.getPath());
|
||||||
}
|
}
|
||||||
} catch (FileNotFoundException fnfe) {
|
} catch (FileNotFoundException fnfe) {
|
||||||
// nothing to do here
|
// nothing to do hereFile dir =
|
||||||
} finally {
|
} finally {
|
||||||
if (sc != null) {
|
if (sc != null) {
|
||||||
sc.close();
|
sc.close();
|
||||||
|
@ -310,15 +354,15 @@ class BlockPoolSlice {
|
||||||
// Restart meta doesn't exist or expired.
|
// Restart meta doesn't exist or expired.
|
||||||
if (loadRwr) {
|
if (loadRwr) {
|
||||||
newReplica = new ReplicaWaitingToBeRecovered(blockId,
|
newReplica = new ReplicaWaitingToBeRecovered(blockId,
|
||||||
validateIntegrityAndSetLength(blockFile, genStamp),
|
validateIntegrityAndSetLength(file, genStamp),
|
||||||
genStamp, volume, blockFile.getParentFile());
|
genStamp, volume, file.getParentFile());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
|
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
|
||||||
if (oldReplica != null) {
|
if (oldReplica != null) {
|
||||||
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
|
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
|
||||||
"on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
|
"on disk: " + oldReplica.getBlockFile() + " and " + file );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,10 +449,6 @@ class BlockPoolSlice {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearPath(File f) {
|
|
||||||
finalizedDir.clearPath(f);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return currentDir.getAbsolutePath();
|
return currentDir.getAbsolutePath();
|
||||||
|
|
|
@ -1224,13 +1224,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
+ ". Parent not found for file " + f);
|
+ ". Parent not found for file " + f);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ReplicaState replicaState = info.getState();
|
|
||||||
if (replicaState == ReplicaState.FINALIZED ||
|
|
||||||
(replicaState == ReplicaState.RUR &&
|
|
||||||
((ReplicaUnderRecovery)info).getOriginalReplica().getState() ==
|
|
||||||
ReplicaState.FINALIZED)) {
|
|
||||||
v.clearPath(bpid, parent);
|
|
||||||
}
|
|
||||||
volumeMap.remove(bpid, invalidBlks[i]);
|
volumeMap.remove(bpid, invalidBlks[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.StorageType;
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
|
@ -236,10 +237,6 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
bp.addToReplicasMap(volumeMap, dir, isFinalized);
|
bp.addToReplicasMap(volumeMap, dir, isFinalized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearPath(String bpid, File f) throws IOException {
|
|
||||||
getBlockPoolSlice(bpid).clearPath(f);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return currentDir.getAbsolutePath();
|
return currentDir.getAbsolutePath();
|
||||||
|
@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
File finalizedDir = new File(bpCurrentDir,
|
File finalizedDir = new File(bpCurrentDir,
|
||||||
DataStorage.STORAGE_DIR_FINALIZED);
|
DataStorage.STORAGE_DIR_FINALIZED);
|
||||||
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
|
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
|
||||||
if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
|
if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
|
||||||
|
finalizedDir)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
|
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
|
||||||
|
@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSpi {
|
||||||
if (!rbwDir.delete()) {
|
if (!rbwDir.delete()) {
|
||||||
throw new IOException("Failed to delete " + rbwDir);
|
throw new IOException("Failed to delete " + rbwDir);
|
||||||
}
|
}
|
||||||
if (!finalizedDir.delete()) {
|
if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
|
||||||
|
!FileUtil.fullyDelete(finalizedDir)) {
|
||||||
throw new IOException("Failed to delete " + finalizedDir);
|
throw new IOException("Failed to delete " + finalizedDir);
|
||||||
}
|
}
|
||||||
FileUtil.fullyDelete(tmpDir);
|
FileUtil.fullyDelete(tmpDir);
|
||||||
|
|
|
@ -1,228 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A node type that can be built into a tree reflecting the
|
|
||||||
* hierarchy of replicas on the local disk.
|
|
||||||
*/
|
|
||||||
class LDir {
|
|
||||||
final File dir;
|
|
||||||
final int maxBlocksPerDir;
|
|
||||||
|
|
||||||
private int numBlocks = 0;
|
|
||||||
private LDir[] children = null;
|
|
||||||
private int lastChildIdx = 0;
|
|
||||||
|
|
||||||
LDir(File dir, int maxBlocksPerDir) throws IOException {
|
|
||||||
this.dir = dir;
|
|
||||||
this.maxBlocksPerDir = maxBlocksPerDir;
|
|
||||||
|
|
||||||
if (!dir.exists()) {
|
|
||||||
if (!dir.mkdirs()) {
|
|
||||||
throw new IOException("Failed to mkdirs " + dir);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
File[] files = FileUtil.listFiles(dir);
|
|
||||||
List<LDir> dirList = new ArrayList<LDir>();
|
|
||||||
for (int idx = 0; idx < files.length; idx++) {
|
|
||||||
if (files[idx].isDirectory()) {
|
|
||||||
dirList.add(new LDir(files[idx], maxBlocksPerDir));
|
|
||||||
} else if (Block.isBlockFilename(files[idx])) {
|
|
||||||
numBlocks++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (dirList.size() > 0) {
|
|
||||||
children = dirList.toArray(new LDir[dirList.size()]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
File addBlock(Block b, File src) throws IOException {
|
|
||||||
//First try without creating subdirectories
|
|
||||||
File file = addBlock(b, src, false, false);
|
|
||||||
return (file != null) ? file : addBlock(b, src, true, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
private File addBlock(Block b, File src, boolean createOk, boolean resetIdx
|
|
||||||
) throws IOException {
|
|
||||||
if (numBlocks < maxBlocksPerDir) {
|
|
||||||
final File dest = FsDatasetImpl.moveBlockFiles(b, src, dir);
|
|
||||||
numBlocks += 1;
|
|
||||||
return dest;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lastChildIdx < 0 && resetIdx) {
|
|
||||||
//reset so that all children will be checked
|
|
||||||
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (lastChildIdx >= 0 && children != null) {
|
|
||||||
//Check if any child-tree has room for a block.
|
|
||||||
for (int i=0; i < children.length; i++) {
|
|
||||||
int idx = (lastChildIdx + i)%children.length;
|
|
||||||
File file = children[idx].addBlock(b, src, false, resetIdx);
|
|
||||||
if (file != null) {
|
|
||||||
lastChildIdx = idx;
|
|
||||||
return file;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lastChildIdx = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!createOk) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (children == null || children.length == 0) {
|
|
||||||
children = new LDir[maxBlocksPerDir];
|
|
||||||
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
|
|
||||||
final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
|
|
||||||
children[idx] = new LDir(sub, maxBlocksPerDir);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//now pick a child randomly for creating a new set of subdirs.
|
|
||||||
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
|
||||||
return children[ lastChildIdx ].addBlock(b, src, true, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void getVolumeMap(String bpid, ReplicaMap volumeMap, FsVolumeImpl volume
|
|
||||||
) throws IOException {
|
|
||||||
if (children != null) {
|
|
||||||
for (int i = 0; i < children.length; i++) {
|
|
||||||
children[i].getVolumeMap(bpid, volumeMap, volume);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
recoverTempUnlinkedBlock();
|
|
||||||
volume.addToReplicasMap(bpid, volumeMap, dir, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Recover unlinked tmp files on datanode restart. If the original block
|
|
||||||
* does not exist, then the tmp file is renamed to be the
|
|
||||||
* original file name; otherwise the tmp file is deleted.
|
|
||||||
*/
|
|
||||||
private void recoverTempUnlinkedBlock() throws IOException {
|
|
||||||
File files[] = FileUtil.listFiles(dir);
|
|
||||||
for (File file : files) {
|
|
||||||
if (!FsDatasetUtil.isUnlinkTmpFile(file)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
File blockFile = FsDatasetUtil.getOrigFile(file);
|
|
||||||
if (blockFile.exists()) {
|
|
||||||
// If the original block file still exists, then no recovery is needed.
|
|
||||||
if (!file.delete()) {
|
|
||||||
throw new IOException("Unable to cleanup unlinked tmp file " + file);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!file.renameTo(blockFile)) {
|
|
||||||
throw new IOException("Unable to cleanup detached file " + file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* check if a data diretory is healthy
|
|
||||||
* @throws DiskErrorException
|
|
||||||
*/
|
|
||||||
void checkDirTree() throws DiskErrorException {
|
|
||||||
DiskChecker.checkDir(dir);
|
|
||||||
|
|
||||||
if (children != null) {
|
|
||||||
for (int i = 0; i < children.length; i++) {
|
|
||||||
children[i].checkDirTree();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void clearPath(File f) {
|
|
||||||
String root = dir.getAbsolutePath();
|
|
||||||
String dir = f.getAbsolutePath();
|
|
||||||
if (dir.startsWith(root)) {
|
|
||||||
String[] dirNames = dir.substring(root.length()).
|
|
||||||
split(File.separator + DataStorage.BLOCK_SUBDIR_PREFIX);
|
|
||||||
if (clearPath(f, dirNames, 1))
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
clearPath(f, null, -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* dirNames is an array of string integers derived from
|
|
||||||
* usual directory structure data/subdirN/subdirXY/subdirM ...
|
|
||||||
* If dirName array is non-null, we only check the child at
|
|
||||||
* the children[dirNames[idx]]. This avoids iterating over
|
|
||||||
* children in common case. If directory structure changes
|
|
||||||
* in later versions, we need to revisit this.
|
|
||||||
*/
|
|
||||||
private boolean clearPath(File f, String[] dirNames, int idx) {
|
|
||||||
if ((dirNames == null || idx == dirNames.length) &&
|
|
||||||
dir.compareTo(f) == 0) {
|
|
||||||
numBlocks--;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dirNames != null) {
|
|
||||||
//guess the child index from the directory name
|
|
||||||
if (idx > (dirNames.length - 1) || children == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
int childIdx;
|
|
||||||
try {
|
|
||||||
childIdx = Integer.parseInt(dirNames[idx]);
|
|
||||||
} catch (NumberFormatException ignored) {
|
|
||||||
// layout changed? we could print a warning.
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return (childIdx >= 0 && childIdx < children.length) ?
|
|
||||||
children[childIdx].clearPath(f, dirNames, idx+1) : false;
|
|
||||||
}
|
|
||||||
|
|
||||||
//guesses failed. back to blind iteration.
|
|
||||||
if (children != null) {
|
|
||||||
for(int i=0; i < children.length; i++) {
|
|
||||||
if (children[i].clearPath(f, null, -1)){
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "FSDir{dir=" + dir + ", children="
|
|
||||||
+ (children == null ? null : Arrays.asList(children)) + "}";
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -2052,4 +2052,12 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.datanode.block.id.layout.upgrade.threads</name>
|
||||||
|
<value>12</value>
|
||||||
|
<description>The number of threads to use when creating hard links from
|
||||||
|
current to previous blocks during upgrade of a DataNode to block ID-based
|
||||||
|
block layout (see HDFS-6482 for details on the layout).</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -2353,8 +2353,8 @@ public class MiniDFSCluster {
|
||||||
* @return data file corresponding to the block
|
* @return data file corresponding to the block
|
||||||
*/
|
*/
|
||||||
public static File getBlockFile(File storageDir, ExtendedBlock blk) {
|
public static File getBlockFile(File storageDir, ExtendedBlock blk) {
|
||||||
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
|
return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(storageDir,
|
||||||
blk.getBlockName());
|
blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2364,10 +2364,32 @@ public class MiniDFSCluster {
|
||||||
* @return metadata file corresponding to the block
|
* @return metadata file corresponding to the block
|
||||||
*/
|
*/
|
||||||
public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
|
public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
|
||||||
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()),
|
return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(storageDir,
|
||||||
blk.getBlockName() + "_" + blk.getGenerationStamp() +
|
blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName() + "_" +
|
||||||
Block.METADATA_EXTENSION);
|
blk.getGenerationStamp() + Block.METADATA_EXTENSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return all block metadata files in given directory (recursive search)
|
||||||
|
*/
|
||||||
|
public static List<File> getAllBlockMetadataFiles(File storageDir) {
|
||||||
|
List<File> results = new ArrayList<File>();
|
||||||
|
File[] files = storageDir.listFiles();
|
||||||
|
if (files == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (File f : files) {
|
||||||
|
if (f.getName().startsWith("blk_") && f.getName().endsWith(
|
||||||
|
Block.METADATA_EXTENSION)) {
|
||||||
|
results.add(f);
|
||||||
|
} else if (f.isDirectory()) {
|
||||||
|
List<File> subdirResults = getAllBlockMetadataFiles(f);
|
||||||
|
if (subdirResults != null) {
|
||||||
|
results.addAll(subdirResults);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -79,8 +79,8 @@ public class TestDFSFinalize {
|
||||||
File dnCurDirs[] = new File[dataNodeDirs.length];
|
File dnCurDirs[] = new File[dataNodeDirs.length];
|
||||||
for (int i = 0; i < dataNodeDirs.length; i++) {
|
for (int i = 0; i < dataNodeDirs.length; i++) {
|
||||||
dnCurDirs[i] = new File(dataNodeDirs[i],"current");
|
dnCurDirs[i] = new File(dataNodeDirs[i],"current");
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, dnCurDirs[i]),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, dnCurDirs[i],
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
false), UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
}
|
}
|
||||||
for (int i = 0; i < nameNodeDirs.length; i++) {
|
for (int i = 0; i < nameNodeDirs.length; i++) {
|
||||||
assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
|
assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
|
||||||
|
@ -96,8 +96,9 @@ public class TestDFSFinalize {
|
||||||
assertFalse(new File(bpRoot,"previous").isDirectory());
|
assertFalse(new File(bpRoot,"previous").isDirectory());
|
||||||
|
|
||||||
File bpCurFinalizeDir = new File(bpRoot,"current/"+DataStorage.STORAGE_DIR_FINALIZED);
|
File bpCurFinalizeDir = new File(bpRoot,"current/"+DataStorage.STORAGE_DIR_FINALIZED);
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurFinalizeDir),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
|
||||||
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
bpCurFinalizeDir, true),
|
||||||
|
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class TestDFSRollback {
|
||||||
break;
|
break;
|
||||||
case DATA_NODE:
|
case DATA_NODE:
|
||||||
assertEquals(
|
assertEquals(
|
||||||
UpgradeUtilities.checksumContents(nodeType, curDir),
|
UpgradeUtilities.checksumContents(nodeType, curDir, false),
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -239,7 +239,7 @@ public class TestDFSStorageStateRecovery {
|
||||||
assertTrue(new File(baseDirs[i],"previous").isDirectory());
|
assertTrue(new File(baseDirs[i],"previous").isDirectory());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
UpgradeUtilities.checksumContents(
|
UpgradeUtilities.checksumContents(
|
||||||
NAME_NODE, new File(baseDirs[i],"previous")),
|
NAME_NODE, new File(baseDirs[i],"previous"), false),
|
||||||
UpgradeUtilities.checksumMasterNameNodeContents());
|
UpgradeUtilities.checksumMasterNameNodeContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -259,7 +259,8 @@ public class TestDFSStorageStateRecovery {
|
||||||
if (currentShouldExist) {
|
if (currentShouldExist) {
|
||||||
for (int i = 0; i < baseDirs.length; i++) {
|
for (int i = 0; i < baseDirs.length; i++) {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"current")),
|
UpgradeUtilities.checksumContents(DATA_NODE,
|
||||||
|
new File(baseDirs[i],"current"), false),
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -267,7 +268,8 @@ public class TestDFSStorageStateRecovery {
|
||||||
for (int i = 0; i < baseDirs.length; i++) {
|
for (int i = 0; i < baseDirs.length; i++) {
|
||||||
assertTrue(new File(baseDirs[i],"previous").isDirectory());
|
assertTrue(new File(baseDirs[i],"previous").isDirectory());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"previous")),
|
UpgradeUtilities.checksumContents(DATA_NODE,
|
||||||
|
new File(baseDirs[i],"previous"), false),
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -290,8 +292,8 @@ public class TestDFSStorageStateRecovery {
|
||||||
if (currentShouldExist) {
|
if (currentShouldExist) {
|
||||||
for (int i = 0; i < baseDirs.length; i++) {
|
for (int i = 0; i < baseDirs.length; i++) {
|
||||||
File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
|
File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir,
|
||||||
UpgradeUtilities.checksumMasterBlockPoolContents());
|
false), UpgradeUtilities.checksumMasterBlockPoolContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (previousShouldExist) {
|
if (previousShouldExist) {
|
||||||
|
@ -299,8 +301,8 @@ public class TestDFSStorageStateRecovery {
|
||||||
File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
|
File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
|
||||||
assertTrue(bpPrevDir.isDirectory());
|
assertTrue(bpPrevDir.isDirectory());
|
||||||
assertEquals(
|
assertEquals(
|
||||||
UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir),
|
UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir,
|
||||||
UpgradeUtilities.checksumMasterBlockPoolContents());
|
false), UpgradeUtilities.checksumMasterBlockPoolContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,7 +100,7 @@ public class TestDFSUpgrade {
|
||||||
|
|
||||||
File previous = new File(baseDir, "previous");
|
File previous = new File(baseDir, "previous");
|
||||||
assertExists(previous);
|
assertExists(previous);
|
||||||
assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous),
|
assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous, false),
|
||||||
UpgradeUtilities.checksumMasterNameNodeContents());
|
UpgradeUtilities.checksumMasterNameNodeContents());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,23 +114,25 @@ public class TestDFSUpgrade {
|
||||||
void checkDataNode(String[] baseDirs, String bpid) throws IOException {
|
void checkDataNode(String[] baseDirs, String bpid) throws IOException {
|
||||||
for (int i = 0; i < baseDirs.length; i++) {
|
for (int i = 0; i < baseDirs.length; i++) {
|
||||||
File current = new File(baseDirs[i], "current/" + bpid + "/current");
|
File current = new File(baseDirs[i], "current/" + bpid + "/current");
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current, false),
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
|
|
||||||
// block files are placed under <sd>/current/<bpid>/current/finalized
|
// block files are placed under <sd>/current/<bpid>/current/finalized
|
||||||
File currentFinalized =
|
File currentFinalized =
|
||||||
MiniDFSCluster.getFinalizedDir(new File(baseDirs[i]), bpid);
|
MiniDFSCluster.getFinalizedDir(new File(baseDirs[i]), bpid);
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, currentFinalized),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
|
||||||
|
currentFinalized, true),
|
||||||
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
||||||
|
|
||||||
File previous = new File(baseDirs[i], "current/" + bpid + "/previous");
|
File previous = new File(baseDirs[i], "current/" + bpid + "/previous");
|
||||||
assertTrue(previous.isDirectory());
|
assertTrue(previous.isDirectory());
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous, false),
|
||||||
UpgradeUtilities.checksumMasterDataNodeContents());
|
UpgradeUtilities.checksumMasterDataNodeContents());
|
||||||
|
|
||||||
File previousFinalized =
|
File previousFinalized =
|
||||||
new File(baseDirs[i], "current/" + bpid + "/previous"+"/finalized");
|
new File(baseDirs[i], "current/" + bpid + "/previous"+"/finalized");
|
||||||
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previousFinalized),
|
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
|
||||||
|
previousFinalized, true),
|
||||||
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -80,7 +81,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
long checksum;
|
long checksum;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Configuration upgradeConf;
|
static final Configuration upgradeConf;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
upgradeConf = new HdfsConfiguration();
|
upgradeConf = new HdfsConfiguration();
|
||||||
|
@ -95,7 +96,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
|
|
||||||
boolean printChecksum = false;
|
boolean printChecksum = false;
|
||||||
|
|
||||||
private void unpackStorage(String tarFileName)
|
void unpackStorage(String tarFileName, String referenceName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
String tarFile = System.getProperty("test.cache.data", "build/test/cache")
|
String tarFile = System.getProperty("test.cache.data", "build/test/cache")
|
||||||
+ "/" + tarFileName;
|
+ "/" + tarFileName;
|
||||||
|
@ -110,7 +111,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
|
|
||||||
BufferedReader reader = new BufferedReader(new FileReader(
|
BufferedReader reader = new BufferedReader(new FileReader(
|
||||||
System.getProperty("test.cache.data", "build/test/cache")
|
System.getProperty("test.cache.data", "build/test/cache")
|
||||||
+ "/" + HADOOP_DFS_DIR_TXT));
|
+ "/" + referenceName));
|
||||||
String line;
|
String line;
|
||||||
while ( (line = reader.readLine()) != null ) {
|
while ( (line = reader.readLine()) != null ) {
|
||||||
|
|
||||||
|
@ -285,7 +286,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromRel22Image() throws IOException {
|
public void testUpgradeFromRel22Image() throws IOException {
|
||||||
unpackStorage(HADOOP22_IMAGE);
|
unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
|
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
|
||||||
numDataNodes(4));
|
numDataNodes(4));
|
||||||
}
|
}
|
||||||
|
@ -296,7 +297,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromCorruptRel22Image() throws IOException {
|
public void testUpgradeFromCorruptRel22Image() throws IOException {
|
||||||
unpackStorage(HADOOP22_IMAGE);
|
unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
|
|
||||||
// Overwrite the md5 stored in the VERSION files
|
// Overwrite the md5 stored in the VERSION files
|
||||||
File baseDir = new File(MiniDFSCluster.getBaseDirectory());
|
File baseDir = new File(MiniDFSCluster.getBaseDirectory());
|
||||||
|
@ -333,7 +334,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromRel1ReservedImage() throws Exception {
|
public void testUpgradeFromRel1ReservedImage() throws Exception {
|
||||||
unpackStorage(HADOOP1_RESERVED_IMAGE);
|
unpackStorage(HADOOP1_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
// Try it once without setting the upgrade flag to ensure it fails
|
// Try it once without setting the upgrade flag to ensure it fails
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
@ -403,7 +404,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromRel023ReservedImage() throws Exception {
|
public void testUpgradeFromRel023ReservedImage() throws Exception {
|
||||||
unpackStorage(HADOOP023_RESERVED_IMAGE);
|
unpackStorage(HADOOP023_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
// Try it once without setting the upgrade flag to ensure it fails
|
// Try it once without setting the upgrade flag to ensure it fails
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
@ -468,7 +469,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromRel2ReservedImage() throws Exception {
|
public void testUpgradeFromRel2ReservedImage() throws Exception {
|
||||||
unpackStorage(HADOOP2_RESERVED_IMAGE);
|
unpackStorage(HADOOP2_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
// Try it once without setting the upgrade flag to ensure it fails
|
// Try it once without setting the upgrade flag to ensure it fails
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
|
@ -572,7 +573,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
} while (dirList.hasMore());
|
} while (dirList.hasMore());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void upgradeAndVerify(MiniDFSCluster.Builder bld)
|
void upgradeAndVerify(MiniDFSCluster.Builder bld)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
try {
|
try {
|
||||||
|
@ -601,7 +602,7 @@ public class TestDFSUpgradeFromImage {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testUpgradeFromRel1BBWImage() throws IOException {
|
public void testUpgradeFromRel1BBWImage() throws IOException {
|
||||||
unpackStorage(HADOOP1_BBW_IMAGE);
|
unpackStorage(HADOOP1_BBW_IMAGE, HADOOP_DFS_DIR_TXT);
|
||||||
Configuration conf = new Configuration(upgradeConf);
|
Configuration conf = new Configuration(upgradeConf);
|
||||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
System.getProperty("test.build.data") + File.separator +
|
System.getProperty("test.build.data") + File.separator +
|
||||||
|
|
|
@ -445,19 +445,14 @@ public class TestDatanodeBlockScanner {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicaInfoParsing() throws Exception {
|
public void testReplicaInfoParsing() throws Exception {
|
||||||
testReplicaInfoParsingSingle(BASE_PATH, new int[0]);
|
testReplicaInfoParsingSingle(BASE_PATH);
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1", new int[]{1});
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1");
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir43", new int[]{43});
|
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3");
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3", new int[]{1, 2, 3});
|
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir43", new int[]{1, 2, 43});
|
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir23/subdir3", new int[]{1, 23, 3});
|
|
||||||
testReplicaInfoParsingSingle(BASE_PATH + "/subdir13/subdir2/subdir3", new int[]{13, 2, 3});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void testReplicaInfoParsingSingle(String subDirPath, int[] expectedSubDirs) {
|
private static void testReplicaInfoParsingSingle(String subDirPath) {
|
||||||
File testFile = new File(subDirPath);
|
File testFile = new File(subDirPath);
|
||||||
assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs);
|
assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath);
|
||||||
assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
public class TestDatanodeLayoutUpgrade {
|
||||||
|
private static final String HADOOP_DATANODE_DIR_TXT =
|
||||||
|
"hadoop-datanode-dir.txt";
|
||||||
|
private static final String HADOOP24_DATANODE = "hadoop-24-datanode-dir.tgz";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
// Upgrade from LDir-based layout to block ID-based layout -- change described
|
||||||
|
// in HDFS-6482
|
||||||
|
public void testUpgradeToIdBasedLayout() throws IOException {
|
||||||
|
TestDFSUpgradeFromImage upgrade = new TestDFSUpgradeFromImage();
|
||||||
|
upgrade.unpackStorage(HADOOP24_DATANODE, HADOOP_DATANODE_DIR_TXT);
|
||||||
|
Configuration conf = new Configuration(TestDFSUpgradeFromImage.upgradeConf);
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||||
|
System.getProperty("test.build.data") + File.separator +
|
||||||
|
"dfs" + File.separator + "data");
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||||
|
System.getProperty("test.build.data") + File.separator +
|
||||||
|
"dfs" + File.separator + "name");
|
||||||
|
upgrade.upgradeAndVerify(new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||||
|
.manageDataDfsDirs(false).manageNameDfsDirs(false));
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,6 +27,7 @@ import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
|
@ -137,13 +139,15 @@ public class TestFileCorruption {
|
||||||
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
File storageDir = cluster.getInstanceStorageDir(0, 0);
|
File storageDir = cluster.getInstanceStorageDir(0, 0);
|
||||||
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
|
assertTrue("Data directory does not exist", dataDir.exists());
|
||||||
ExtendedBlock blk = getBlock(bpid, dataDir);
|
ExtendedBlock blk = getBlock(bpid, dataDir);
|
||||||
if (blk == null) {
|
if (blk == null) {
|
||||||
storageDir = cluster.getInstanceStorageDir(0, 1);
|
storageDir = cluster.getInstanceStorageDir(0, 1);
|
||||||
dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
blk = getBlock(bpid, dataDir);
|
blk = getBlock(bpid, dataDir);
|
||||||
}
|
}
|
||||||
assertFalse(blk==null);
|
assertFalse("Data directory does not contain any blocks or there was an "
|
||||||
|
+ "IO error", blk==null);
|
||||||
|
|
||||||
// start a third datanode
|
// start a third datanode
|
||||||
cluster.startDataNodes(conf, 1, true, null, null);
|
cluster.startDataNodes(conf, 1, true, null, null);
|
||||||
|
@ -174,33 +178,15 @@ public class TestFileCorruption {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExtendedBlock getBlock(String bpid, File dataDir) {
|
public static ExtendedBlock getBlock(String bpid, File dataDir) {
|
||||||
assertTrue("data directory does not exist", dataDir.exists());
|
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(dataDir);
|
||||||
File[] blocks = dataDir.listFiles();
|
if (metadataFiles == null || metadataFiles.isEmpty()) {
|
||||||
assertTrue("Blocks do not exist in dataDir", (blocks != null) && (blocks.length > 0));
|
|
||||||
|
|
||||||
int idx = 0;
|
|
||||||
String blockFileName = null;
|
|
||||||
for (; idx < blocks.length; idx++) {
|
|
||||||
blockFileName = blocks[idx].getName();
|
|
||||||
if (blockFileName.startsWith("blk_") && !blockFileName.endsWith(".meta")) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (blockFileName == null) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
long blockId = Long.parseLong(blockFileName.substring("blk_".length()));
|
File metadataFile = metadataFiles.get(0);
|
||||||
long blockTimeStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
|
File blockFile = Block.metaToBlockFile(metadataFile);
|
||||||
for (idx=0; idx < blocks.length; idx++) {
|
return new ExtendedBlock(bpid, Block.getBlockId(blockFile.getName()),
|
||||||
String fileName = blocks[idx].getName();
|
blockFile.length(), Block.getGenerationStamp(metadataFile.getName()));
|
||||||
if (fileName.startsWith(blockFileName) && fileName.endsWith(".meta")) {
|
|
||||||
int startIndex = blockFileName.length()+1;
|
|
||||||
int endIndex = fileName.length() - ".meta".length();
|
|
||||||
blockTimeStamp = Long.parseLong(fileName.substring(startIndex, endIndex));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return new ExtendedBlock(bpid, blockId, blocks[idx].length(), blockTimeStamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -158,21 +158,23 @@ public class UpgradeUtilities {
|
||||||
FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
|
FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
|
||||||
}
|
}
|
||||||
namenodeStorageChecksum = checksumContents(NAME_NODE,
|
namenodeStorageChecksum = checksumContents(NAME_NODE,
|
||||||
new File(namenodeStorage, "current"));
|
new File(namenodeStorage, "current"), false);
|
||||||
File dnCurDir = new File(datanodeStorage, "current");
|
File dnCurDir = new File(datanodeStorage, "current");
|
||||||
datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir);
|
datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir, false);
|
||||||
|
|
||||||
File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
||||||
"current");
|
"current");
|
||||||
blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir);
|
blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir, false);
|
||||||
|
|
||||||
File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
||||||
"current/"+DataStorage.STORAGE_DIR_FINALIZED);
|
"current/"+DataStorage.STORAGE_DIR_FINALIZED);
|
||||||
blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, bpCurFinalizeDir);
|
blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE,
|
||||||
|
bpCurFinalizeDir, true);
|
||||||
|
|
||||||
File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
|
||||||
"current/"+DataStorage.STORAGE_DIR_RBW);
|
"current/"+DataStorage.STORAGE_DIR_RBW);
|
||||||
blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir);
|
blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Private helper method that writes a file to the given file system.
|
// Private helper method that writes a file to the given file system.
|
||||||
|
@ -266,35 +268,46 @@ public class UpgradeUtilities {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the checksum of all the files in the specified directory.
|
* Compute the checksum of all the files in the specified directory.
|
||||||
* The contents of subdirectories are not included. This method provides
|
* This method provides an easy way to ensure equality between the contents
|
||||||
* an easy way to ensure equality between the contents of two directories.
|
* of two directories.
|
||||||
*
|
*
|
||||||
* @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
|
* @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
|
||||||
* This is because this file file is changed every time
|
* This is because this file file is changed every time
|
||||||
* the Datanode is started.
|
* the Datanode is started.
|
||||||
* @param dir must be a directory. Subdirectories are ignored.
|
* @param dir must be a directory
|
||||||
|
* @param recursive whether or not to consider subdirectories
|
||||||
*
|
*
|
||||||
* @throws IllegalArgumentException if specified directory is not a directory
|
* @throws IllegalArgumentException if specified directory is not a directory
|
||||||
* @throws IOException if an IOException occurs while reading the files
|
* @throws IOException if an IOException occurs while reading the files
|
||||||
* @return the computed checksum value
|
* @return the computed checksum value
|
||||||
*/
|
*/
|
||||||
public static long checksumContents(NodeType nodeType, File dir) throws IOException {
|
public static long checksumContents(NodeType nodeType, File dir,
|
||||||
|
boolean recursive) throws IOException {
|
||||||
|
CRC32 checksum = new CRC32();
|
||||||
|
checksumContentsHelper(nodeType, dir, checksum, recursive);
|
||||||
|
return checksum.getValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void checksumContentsHelper(NodeType nodeType, File dir,
|
||||||
|
CRC32 checksum, boolean recursive) throws IOException {
|
||||||
if (!dir.isDirectory()) {
|
if (!dir.isDirectory()) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Given argument is not a directory:" + dir);
|
"Given argument is not a directory:" + dir);
|
||||||
}
|
}
|
||||||
File[] list = dir.listFiles();
|
File[] list = dir.listFiles();
|
||||||
Arrays.sort(list);
|
Arrays.sort(list);
|
||||||
CRC32 checksum = new CRC32();
|
|
||||||
for (int i = 0; i < list.length; i++) {
|
for (int i = 0; i < list.length; i++) {
|
||||||
if (!list[i].isFile()) {
|
if (!list[i].isFile()) {
|
||||||
|
if (recursive) {
|
||||||
|
checksumContentsHelper(nodeType, list[i], checksum, recursive);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// skip VERSION and dfsUsed file for DataNodes
|
// skip VERSION and dfsUsed file for DataNodes
|
||||||
if (nodeType == DATA_NODE &&
|
if (nodeType == DATA_NODE &&
|
||||||
(list[i].getName().equals("VERSION") ||
|
(list[i].getName().equals("VERSION") ||
|
||||||
list[i].getName().equals("dfsUsed"))) {
|
list[i].getName().equals("dfsUsed"))) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,7 +325,6 @@ public class UpgradeUtilities {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return checksum.getValue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -384,7 +385,7 @@ public class TestDataNodeVolumeFailure {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
String [] res = metaFilesInDir(dir);
|
List<File> res = MiniDFSCluster.getAllBlockMetadataFiles(dir);
|
||||||
if(res == null) {
|
if(res == null) {
|
||||||
System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
|
System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
|
||||||
continue;
|
continue;
|
||||||
|
@ -392,7 +393,8 @@ public class TestDataNodeVolumeFailure {
|
||||||
//System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
|
//System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
|
||||||
|
|
||||||
//int ii = 0;
|
//int ii = 0;
|
||||||
for(String s: res) {
|
for(File f: res) {
|
||||||
|
String s = f.getName();
|
||||||
// cut off "blk_-" at the beginning and ".meta" at the end
|
// cut off "blk_-" at the beginning and ".meta" at the end
|
||||||
assertNotNull("Block file name should not be null", s);
|
assertNotNull("Block file name should not be null", s);
|
||||||
String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
|
String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
|
||||||
|
@ -408,25 +410,9 @@ public class TestDataNodeVolumeFailure {
|
||||||
//System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
|
//System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
|
||||||
//System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
|
//System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
|
||||||
|
|
||||||
total += res.length;
|
total += res.size();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* count how many files *.meta are in the dir
|
|
||||||
*/
|
|
||||||
private String [] metaFilesInDir(File dir) {
|
|
||||||
String [] res = dir.list(
|
|
||||||
new FilenameFilter() {
|
|
||||||
@Override
|
|
||||||
public boolean accept(File dir, String name) {
|
|
||||||
return name.startsWith("blk_") &&
|
|
||||||
name.endsWith(Block.METADATA_EXTENSION);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,9 +103,10 @@ public class TestDeleteBlockPool {
|
||||||
fs1.delete(new Path("/alpha"), true);
|
fs1.delete(new Path("/alpha"), true);
|
||||||
|
|
||||||
// Wait till all blocks are deleted from the dn2 for bpid1.
|
// Wait till all blocks are deleted from the dn2 for bpid1.
|
||||||
while ((MiniDFSCluster.getFinalizedDir(dn2StorageDir1,
|
File finalDir1 = MiniDFSCluster.getFinalizedDir(dn2StorageDir1, bpid1);
|
||||||
bpid1).list().length != 0) || (MiniDFSCluster.getFinalizedDir(
|
File finalDir2 = MiniDFSCluster.getFinalizedDir(dn2StorageDir1, bpid2);
|
||||||
dn2StorageDir2, bpid1).list().length != 0)) {
|
while ((!DatanodeUtil.dirNoFilesRecursive(finalDir1)) ||
|
||||||
|
(!DatanodeUtil.dirNoFilesRecursive(finalDir2))) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(3000);
|
Thread.sleep(3000);
|
||||||
} catch (Exception ignored) {
|
} catch (Exception ignored) {
|
||||||
|
|
|
@ -41,6 +41,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
|
@ -750,15 +752,14 @@ public class TestFsck {
|
||||||
for (int j=0; j<=1; j++) {
|
for (int j=0; j<=1; j++) {
|
||||||
File storageDir = cluster.getInstanceStorageDir(i, j);
|
File storageDir = cluster.getInstanceStorageDir(i, j);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
|
||||||
if (blocks == null)
|
data_dir);
|
||||||
|
if (metadataFiles == null)
|
||||||
continue;
|
continue;
|
||||||
|
for (File metadataFile : metadataFiles) {
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
File blockFile = Block.metaToBlockFile(metadataFile);
|
||||||
if (!blocks[idx].getName().startsWith("blk_")) {
|
assertTrue("Cannot remove file.", blockFile.delete());
|
||||||
continue;
|
assertTrue("Cannot remove file.", metadataFile.delete());
|
||||||
}
|
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.io.RandomAccessFile;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -39,7 +40,11 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.TestFileCorruption;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -87,36 +92,29 @@ public class TestListCorruptFileBlocks {
|
||||||
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
File storageDir = cluster.getInstanceStorageDir(0, 1);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
assertTrue("data directory does not exist", data_dir.exists());
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
||||||
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0));
|
assertTrue("Data directory does not contain any blocks or there was an "
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||||
if (blocks[idx].getName().startsWith("blk_") &&
|
File metaFile = metaFiles.get(0);
|
||||||
blocks[idx].getName().endsWith(".meta")) {
|
RandomAccessFile file = new RandomAccessFile(metaFile, "rw");
|
||||||
//
|
FileChannel channel = file.getChannel();
|
||||||
// shorten .meta file
|
long position = channel.size() - 2;
|
||||||
//
|
int length = 2;
|
||||||
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
|
byte[] buffer = new byte[length];
|
||||||
FileChannel channel = file.getChannel();
|
random.nextBytes(buffer);
|
||||||
long position = channel.size() - 2;
|
channel.write(ByteBuffer.wrap(buffer), position);
|
||||||
int length = 2;
|
file.close();
|
||||||
byte[] buffer = new byte[length];
|
LOG.info("Deliberately corrupting file " + metaFile.getName() +
|
||||||
random.nextBytes(buffer);
|
" at offset " + position + " length " + length);
|
||||||
channel.write(ByteBuffer.wrap(buffer), position);
|
|
||||||
file.close();
|
|
||||||
LOG.info("Deliberately corrupting file " + blocks[idx].getName() +
|
|
||||||
" at offset " + position + " length " + length);
|
|
||||||
|
|
||||||
// read all files to trigger detection of corrupted replica
|
// read all files to trigger detection of corrupted replica
|
||||||
try {
|
try {
|
||||||
util.checkFiles(fs, "/srcdat10");
|
util.checkFiles(fs, "/srcdat10");
|
||||||
} catch (BlockMissingException e) {
|
} catch (BlockMissingException e) {
|
||||||
System.out.println("Received BlockMissingException as expected.");
|
System.out.println("Received BlockMissingException as expected.");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertTrue("Corrupted replicas not handled properly. Expecting BlockMissingException " +
|
assertTrue("Corrupted replicas not handled properly. Expecting BlockMissingException " +
|
||||||
" but received IOException " + e, false);
|
" but received IOException " + e, false);
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch bad file list from namenode. There should be one file.
|
// fetch bad file list from namenode. There should be one file.
|
||||||
|
@ -174,38 +172,30 @@ public class TestListCorruptFileBlocks {
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
|
||||||
cluster.getNamesystem().getBlockPoolId());
|
cluster.getNamesystem().getBlockPoolId());
|
||||||
assertTrue("data directory does not exist", data_dir.exists());
|
assertTrue("data directory does not exist", data_dir.exists());
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
|
||||||
assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
|
assertTrue("Data directory does not contain any blocks or there was an "
|
||||||
(blocks.length > 0));
|
+ "IO error", metaFiles != null && !metaFiles.isEmpty());
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
File metaFile = metaFiles.get(0);
|
||||||
if (blocks[idx].getName().startsWith("blk_") &&
|
RandomAccessFile file = new RandomAccessFile(metaFile, "rw");
|
||||||
blocks[idx].getName().endsWith(".meta")) {
|
FileChannel channel = file.getChannel();
|
||||||
//
|
long position = channel.size() - 2;
|
||||||
// shorten .meta file
|
int length = 2;
|
||||||
//
|
byte[] buffer = new byte[length];
|
||||||
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
|
random.nextBytes(buffer);
|
||||||
FileChannel channel = file.getChannel();
|
channel.write(ByteBuffer.wrap(buffer), position);
|
||||||
long position = channel.size() - 2;
|
file.close();
|
||||||
int length = 2;
|
LOG.info("Deliberately corrupting file " + metaFile.getName() +
|
||||||
byte[] buffer = new byte[length];
|
" at offset " + position + " length " + length);
|
||||||
random.nextBytes(buffer);
|
|
||||||
channel.write(ByteBuffer.wrap(buffer), position);
|
|
||||||
file.close();
|
|
||||||
LOG.info("Deliberately corrupting file " + blocks[idx].getName() +
|
|
||||||
" at offset " + position + " length " + length);
|
|
||||||
|
|
||||||
// read all files to trigger detection of corrupted replica
|
// read all files to trigger detection of corrupted replica
|
||||||
try {
|
try {
|
||||||
util.checkFiles(fs, "/srcdat10");
|
util.checkFiles(fs, "/srcdat10");
|
||||||
} catch (BlockMissingException e) {
|
} catch (BlockMissingException e) {
|
||||||
System.out.println("Received BlockMissingException as expected.");
|
System.out.println("Received BlockMissingException as expected.");
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertTrue("Corrupted replicas not handled properly. " +
|
assertTrue("Corrupted replicas not handled properly. " +
|
||||||
"Expecting BlockMissingException " +
|
"Expecting BlockMissingException " +
|
||||||
" but received IOException " + e, false);
|
" but received IOException " + e, false);
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetch bad file list from namenode. There should be one file.
|
// fetch bad file list from namenode. There should be one file.
|
||||||
|
@ -295,17 +285,18 @@ public class TestListCorruptFileBlocks {
|
||||||
for (int j = 0; j <= 1; j++) {
|
for (int j = 0; j <= 1; j++) {
|
||||||
File storageDir = cluster.getInstanceStorageDir(i, j);
|
File storageDir = cluster.getInstanceStorageDir(i, j);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
|
||||||
if (blocks == null)
|
data_dir);
|
||||||
|
if (metadataFiles == null)
|
||||||
continue;
|
continue;
|
||||||
// assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
|
// assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
|
||||||
// (blocks.length > 0));
|
// (blocks.length > 0));
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
for (File metadataFile : metadataFiles) {
|
||||||
if (!blocks[idx].getName().startsWith("blk_")) {
|
File blockFile = Block.metaToBlockFile(metadataFile);
|
||||||
continue;
|
LOG.info("Deliberately removing file " + blockFile.getName());
|
||||||
}
|
assertTrue("Cannot remove file.", blockFile.delete());
|
||||||
LOG.info("Deliberately removing file " + blocks[idx].getName());
|
LOG.info("Deliberately removing file " + metadataFile.getName());
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
assertTrue("Cannot remove file.", metadataFile.delete());
|
||||||
// break;
|
// break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -405,17 +396,18 @@ public class TestListCorruptFileBlocks {
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
File storageDir = cluster.getInstanceStorageDir(0, i);
|
File storageDir = cluster.getInstanceStorageDir(0, i);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
|
||||||
if (blocks == null)
|
data_dir);
|
||||||
|
if (metadataFiles == null)
|
||||||
continue;
|
continue;
|
||||||
// assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
|
// assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
|
||||||
// (blocks.length > 0));
|
// (blocks.length > 0));
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
for (File metadataFile : metadataFiles) {
|
||||||
if (!blocks[idx].getName().startsWith("blk_")) {
|
File blockFile = Block.metaToBlockFile(metadataFile);
|
||||||
continue;
|
LOG.info("Deliberately removing file " + blockFile.getName());
|
||||||
}
|
assertTrue("Cannot remove file.", blockFile.delete());
|
||||||
LOG.info("Deliberately removing file " + blocks[idx].getName());
|
LOG.info("Deliberately removing file " + metadataFile.getName());
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
assertTrue("Cannot remove file.", metadataFile.delete());
|
||||||
// break;
|
// break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -482,15 +474,14 @@ public class TestListCorruptFileBlocks {
|
||||||
File storageDir = cluster.getInstanceStorageDir(i, j);
|
File storageDir = cluster.getInstanceStorageDir(i, j);
|
||||||
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
LOG.info("Removing files from " + data_dir);
|
LOG.info("Removing files from " + data_dir);
|
||||||
File[] blocks = data_dir.listFiles();
|
List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
|
||||||
if (blocks == null)
|
data_dir);
|
||||||
|
if (metadataFiles == null)
|
||||||
continue;
|
continue;
|
||||||
|
for (File metadataFile : metadataFiles) {
|
||||||
for (int idx = 0; idx < blocks.length; idx++) {
|
File blockFile = Block.metaToBlockFile(metadataFile);
|
||||||
if (!blocks[idx].getName().startsWith("blk_")) {
|
assertTrue("Cannot remove file.", blockFile.delete());
|
||||||
continue;
|
assertTrue("Cannot remove file.", metadataFile.delete());
|
||||||
}
|
|
||||||
assertTrue("Cannot remove file.", blocks[idx].delete());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
|
@ -0,0 +1,23 @@
|
||||||
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
# contributor license agreements. See the NOTICE file distributed with
|
||||||
|
# this work for additional information regarding copyright ownership.
|
||||||
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
# (the "License"); you may not use this file except in compliance with
|
||||||
|
# the License. You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# Similar to hadoop-dfs-dir.txt, except this is used for a datanode layout
|
||||||
|
# upgrade test.
|
||||||
|
# Uncomment the following line to produce checksum info for a new DFS image.
|
||||||
|
#printChecksums
|
||||||
|
|
||||||
|
/small 2976363016
|
||||||
|
overallCRC 4099869518
|
||||||
|
|
Loading…
Reference in New Issue