HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1615223 13f79535-47bb-0310-9956-ffa450edef68

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
This commit is contained in:
Colin McCabe 2014-08-01 20:41:05 +00:00 committed by arp
parent 176b07b480
commit da118bb113
32 changed files with 612 additions and 543 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException; import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
@ -824,6 +825,14 @@ public class NativeIO {
} }
} }
public static void link(File src, File dst) throws IOException {
if (!nativeLoaded) {
HardLink.createHardLink(src, dst);
} else {
link0(src.getAbsolutePath(), dst.getAbsolutePath());
}
}
/** /**
* A version of renameTo that throws a descriptive exception when it fails. * A version of renameTo that throws a descriptive exception when it fails.
* *
@ -834,4 +843,7 @@ public class NativeIO {
*/ */
private static native void renameTo0(String src, String dst) private static native void renameTo0(String src, String dst)
throws NativeIOException; throws NativeIOException;
private static native void link0(String src, String dst)
throws NativeIOException;
} }

View File

@ -79,6 +79,20 @@ public class DiskChecker {
(canonDir.mkdir() || canonDir.exists())); (canonDir.mkdir() || canonDir.exists()));
} }
/**
* Recurse down a directory tree, checking all child directories.
* @param dir
* @throws DiskErrorException
*/
public static void checkDirs(File dir) throws DiskErrorException {
checkDir(dir);
for (File child : dir.listFiles()) {
if (child.isDirectory()) {
checkDirs(child);
}
}
}
/** /**
* Create the directory if it doesn't exist and check that dir is readable, * Create the directory if it doesn't exist and check that dir is readable,
* writable and executable * writable and executable

View File

@ -1087,6 +1087,43 @@ done:
#endif #endif
} }
JNIEXPORT void JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_link0(JNIEnv *env,
jclass clazz, jstring jsrc, jstring jdst)
{
#ifdef UNIX
const char *src = NULL, *dst = NULL;
src = (*env)->GetStringUTFChars(env, jsrc, NULL);
if (!src) goto done; // exception was thrown
dst = (*env)->GetStringUTFChars(env, jdst, NULL);
if (!dst) goto done; // exception was thrown
if (link(src, dst)) {
throw_ioe(env, errno);
}
done:
if (src) (*env)->ReleaseStringUTFChars(env, jsrc, src);
if (dst) (*env)->ReleaseStringUTFChars(env, jdst, dst);
#endif
#ifdef WINDOWS
LPCTSTR src = NULL, dst = NULL;
src = (LPCTSTR) (*env)->GetStringChars(env, jsrc, NULL);
if (!src) goto done; // exception was thrown
dst = (LPCTSTR) (*env)->GetStringChars(env, jdst, NULL);
if (!dst) goto done; // exception was thrown
if (!CreateHardLink(dst, src)) {
throw_ioe(env, GetLastError());
}
done:
if (src) (*env)->ReleaseStringChars(env, jsrc, src);
if (dst) (*env)->ReleaseStringChars(env, jdst, dst);
#endif
}
JNIEXPORT jlong JNICALL JNIEXPORT jlong JNICALL
Java_org_apache_hadoop_io_nativeio_NativeIO_getMemlockLimit0( Java_org_apache_hadoop_io_nativeio_NativeIO_getMemlockLimit0(
JNIEnv *env, jclass clazz) JNIEnv *env, jclass clazz)

View File

@ -196,6 +196,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do HDFS-6036. Forcibly timeout misbehaving DFSClients that try to do
no-checksum reads that extend too long (cmccabe) no-checksum reads that extend too long (cmccabe)
HDFS-6482. Use block ID-based block layout on datanodes (James Thomas via
Colin Patrick McCabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -396,8 +396,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT; public static final String DFS_DATANODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_HTTP_DEFAULT_PORT;
public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads"; public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = "dfs.datanode.max.transfer.threads";
public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096; public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
public static final String DFS_DATANODE_NUMBLOCKS_KEY = "dfs.datanode.numblocks";
public static final int DFS_DATANODE_NUMBLOCKS_DEFAULT = 64;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
@ -694,4 +692,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT = public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT =
1000; 1000;
public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
"dfs.datanode.block.id.layout.upgrade.threads";
public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
} }

View File

@ -50,6 +50,9 @@ public class Block implements Writable, Comparable<Block> {
public static final Pattern metaFilePattern = Pattern public static final Pattern metaFilePattern = Pattern
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
+ "$"); + "$");
public static final Pattern metaOrBlockFilePattern = Pattern
.compile(BLOCK_FILE_PREFIX + "(-??\\d++)(_(\\d++)\\" + METADATA_EXTENSION
+ ")?$");
public static boolean isBlockFilename(File f) { public static boolean isBlockFilename(File f) {
String name = f.getName(); String name = f.getName();
@ -65,6 +68,11 @@ public class Block implements Writable, Comparable<Block> {
return metaFilePattern.matcher(name).matches(); return metaFilePattern.matcher(name).matches();
} }
public static File metaToBlockFile(File metaFile) {
return new File(metaFile.getParent(), metaFile.getName().substring(
0, metaFile.getName().lastIndexOf('_')));
}
/** /**
* Get generation stamp from the name of the metafile name * Get generation stamp from the name of the metafile name
*/ */
@ -75,10 +83,10 @@ public class Block implements Writable, Comparable<Block> {
} }
/** /**
* Get the blockId from the name of the metafile name * Get the blockId from the name of the meta or block file
*/ */
public static long getBlockId(String metaFile) { public static long getBlockId(String metaOrBlockFile) {
Matcher m = metaFilePattern.matcher(metaFile); Matcher m = metaOrBlockFilePattern.matcher(metaOrBlockFile);
return m.matches() ? Long.parseLong(m.group(1)) : 0; return m.matches() ? Long.parseLong(m.group(1)) : 0;
} }

View File

@ -163,7 +163,7 @@ public class BlockPoolSliceStorage extends Storage {
// During startup some of them can upgrade or roll back // During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup. // while others could be up-to-date for the regular startup.
for (int idx = 0; idx < getNumStorageDirs(); idx++) { for (int idx = 0; idx < getNumStorageDirs(); idx++) {
doTransition(getStorageDir(idx), nsInfo, startOpt); doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
assert getCTime() == nsInfo.getCTime() assert getCTime() == nsInfo.getCTime()
: "Data-node and name-node CTimes must be the same."; : "Data-node and name-node CTimes must be the same.";
} }
@ -267,7 +267,7 @@ public class BlockPoolSliceStorage extends Storage {
* @param startOpt startup option * @param startOpt startup option
* @throws IOException * @throws IOException
*/ */
private void doTransition(StorageDirectory sd, private void doTransition(DataNode datanode, StorageDirectory sd,
NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
if (startOpt == StartupOption.ROLLBACK) { if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable doRollback(sd, nsInfo); // rollback if applicable
@ -300,7 +300,7 @@ public class BlockPoolSliceStorage extends Storage {
} }
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) { || this.cTime < nsInfo.getCTime()) {
doUpgrade(sd, nsInfo); // upgrade doUpgrade(datanode, sd, nsInfo); // upgrade
return; return;
} }
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@ -329,7 +329,8 @@ public class BlockPoolSliceStorage extends Storage {
* @param nsInfo Namespace Info from the namenode * @param nsInfo Namespace Info from the namenode
* @throws IOException on error * @throws IOException on error
*/ */
void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException { void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo)
throws IOException {
// Upgrading is applicable only to release with federation or after // Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports( if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) { LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@ -365,7 +366,7 @@ public class BlockPoolSliceStorage extends Storage {
rename(bpCurDir, bpTmpDir); rename(bpCurDir, bpTmpDir);
// 3. Create new <SD>/current with block files hardlinks and VERSION // 3. Create new <SD>/current with block files hardlinks and VERSION
linkAllBlocks(bpTmpDir, bpCurDir); linkAllBlocks(datanode, bpTmpDir, bpCurDir);
this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
assert this.namespaceID == nsInfo.getNamespaceID() assert this.namespaceID == nsInfo.getNamespaceID()
: "Data-node and name-node layout versions must be the same."; : "Data-node and name-node layout versions must be the same.";
@ -542,14 +543,15 @@ public class BlockPoolSliceStorage extends Storage {
* @param toDir the current data directory * @param toDir the current data directory
* @throws IOException if error occurs during hardlink * @throws IOException if error occurs during hardlink
*/ */
private void linkAllBlocks(File fromDir, File toDir) throws IOException { private void linkAllBlocks(DataNode datanode, File fromDir, File toDir)
throws IOException {
// do the link // do the link
int diskLayoutVersion = this.getLayoutVersion(); int diskLayoutVersion = this.getLayoutVersion();
// hardlink finalized blocks in tmpDir // hardlink finalized blocks in tmpDir
HardLink hardLink = new HardLink(); HardLink hardLink = new HardLink();
DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED),
new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW), DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW),
new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink); new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
LOG.info( hardLink.linkStats.report() ); LOG.info( hardLink.linkStats.report() );
} }

View File

@ -62,7 +62,10 @@ public class DataNodeLayoutVersion {
* </ul> * </ul>
*/ */
public static enum Feature implements LayoutFeature { public static enum Feature implements LayoutFeature {
FIRST_LAYOUT(-55, -53, "First datanode layout", false); FIRST_LAYOUT(-55, -53, "First datanode layout", false),
BLOCKID_BASED_LAYOUT(-56,
"The block ID of a finalized block uniquely determines its position " +
"in the directory structure");
private final FeatureInfo info; private final FeatureInfo info;

View File

@ -18,13 +18,19 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@ -35,13 +41,18 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import java.io.*; import java.io.*;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.*; import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/** /**
* Data storage information file. * Data storage information file.
@ -394,6 +405,7 @@ public class DataStorage extends Storage {
STORAGE_DIR_CURRENT)); STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot); bpDataDirs.add(bpRoot);
} }
// mkdir for the list of BlockPoolStorage // mkdir for the list of BlockPoolStorage
makeBlockPoolDataDir(bpDataDirs, null); makeBlockPoolDataDir(bpDataDirs, null);
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage( BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
@ -621,7 +633,7 @@ public class DataStorage extends Storage {
// do upgrade // do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) { if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
doUpgrade(sd, nsInfo); // upgrade doUpgrade(datanode, sd, nsInfo); // upgrade
return; return;
} }
@ -656,7 +668,8 @@ public class DataStorage extends Storage {
* @param sd storage directory * @param sd storage directory
* @throws IOException on error * @throws IOException on error
*/ */
void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo)
throws IOException {
// If the existing on-disk layout version supportes federation, simply // If the existing on-disk layout version supportes federation, simply
// update its layout version. // update its layout version.
if (DataNodeLayoutVersion.supports( if (DataNodeLayoutVersion.supports(
@ -701,7 +714,8 @@ public class DataStorage extends Storage {
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID()); nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo); bpStorage.format(curDir, nsInfo);
linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT)); linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir,
STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current // 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
@ -879,22 +893,22 @@ public class DataStorage extends Storage {
* *
* @throws IOException If error occurs during hardlink * @throws IOException If error occurs during hardlink
*/ */
private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir) private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir,
throws IOException { File toDir) throws IOException {
HardLink hardLink = new HardLink(); HardLink hardLink = new HardLink();
// do the link // do the link
int diskLayoutVersion = this.getLayoutVersion(); int diskLayoutVersion = this.getLayoutVersion();
if (DataNodeLayoutVersion.supports( if (DataNodeLayoutVersion.supports(
LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
// hardlink finalized blocks in tmpDir/finalized // hardlink finalized blocks in tmpDir/finalized
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/rbw // hardlink rbw blocks in tmpDir/rbw
linkBlocks(new File(fromDir, STORAGE_DIR_RBW), linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version } else { // pre-RBW version
// hardlink finalized blocks in tmpDir // hardlink finalized blocks in tmpDir
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink); diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) { if (fromBbwDir.exists()) {
/* /*
@ -903,15 +917,67 @@ public class DataStorage extends Storage {
* NOT underneath the 'current' directory in those releases. See * NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details. * HDFS-3731 for details.
*/ */
linkBlocks(fromBbwDir, linkBlocks(datanode, fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} }
} }
LOG.info( hardLink.linkStats.report() ); LOG.info( hardLink.linkStats.report() );
} }
static void linkBlocks(File from, File to, int oldLV, HardLink hl) private static class LinkArgs {
throws IOException { public File src;
public File dst;
public LinkArgs(File src, File dst) {
this.src = src;
this.dst = dst;
}
}
static void linkBlocks(DataNode datanode, File from, File to, int oldLV,
HardLink hl) throws IOException {
boolean upgradeToIdBasedLayout = false;
// If we are upgrading from a version older than the one where we introduced
// block ID-based layout AND we're working with the finalized directory,
// we'll need to upgrade from the old flat layout to the block ID-based one
if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo().
getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) {
upgradeToIdBasedLayout = true;
}
final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList();
linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to,
idBasedLayoutSingleLinks);
int numLinkWorkers = datanode.getConf().getInt(
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS);
ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers);
final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1;
List<Future<Void>> futures = Lists.newArrayList();
for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) {
final int iCopy = i;
futures.add(linkWorkers.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
int upperBound = Math.min(iCopy + step,
idBasedLayoutSingleLinks.size());
for (int j = iCopy; j < upperBound; j++) {
LinkArgs cur = idBasedLayoutSingleLinks.get(j);
NativeIO.link(cur.src, cur.dst);
}
return null;
}
}));
}
linkWorkers.shutdown();
for (Future<Void> f : futures) {
Futures.get(f, IOException.class);
}
}
static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl,
boolean upgradeToIdBasedLayout, File blockRoot,
List<LinkArgs> idBasedLayoutSingleLinks) throws IOException {
if (!from.exists()) { if (!from.exists()) {
return; return;
} }
@ -938,9 +1004,6 @@ public class DataStorage extends Storage {
// from is a directory // from is a directory
hl.linkStats.countDirs++; hl.linkStats.countDirs++;
if (!to.mkdirs())
throw new IOException("Cannot create directory " + to);
String[] blockNames = from.list(new java.io.FilenameFilter() { String[] blockNames = from.list(new java.io.FilenameFilter() {
@Override @Override
public boolean accept(File dir, String name) { public boolean accept(File dir, String name) {
@ -948,12 +1011,36 @@ public class DataStorage extends Storage {
} }
}); });
// If we are upgrading to block ID-based layout, we don't want to recreate
// any subdirs from the source that contain blocks, since we have a new
// directory structure
if (!upgradeToIdBasedLayout || !to.getName().startsWith(
BLOCK_SUBDIR_PREFIX)) {
if (!to.mkdirs())
throw new IOException("Cannot create directory " + to);
}
// Block files just need hard links with the same file names // Block files just need hard links with the same file names
// but a different directory // but a different directory
if (blockNames.length > 0) { if (blockNames.length > 0) {
if (upgradeToIdBasedLayout) {
for (String blockName : blockNames) {
long blockId = Block.getBlockId(blockName);
File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId);
if (!blockLocation.exists()) {
if (!blockLocation.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockLocation);
}
}
idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName),
new File(blockLocation, blockName)));
hl.linkStats.countSingleLinks++;
}
} else {
HardLink.createHardLinkMult(from, blockNames, to); HardLink.createHardLinkMult(from, blockNames, to);
hl.linkStats.countMultLinks++; hl.linkStats.countMultLinks++;
hl.linkStats.countFilesMultLinks += blockNames.length; hl.linkStats.countFilesMultLinks += blockNames.length;
}
} else { } else {
hl.linkStats.countEmptyDirs++; hl.linkStats.countEmptyDirs++;
} }
@ -967,8 +1054,9 @@ public class DataStorage extends Storage {
} }
}); });
for(int i = 0; i < otherNames.length; i++) for(int i = 0; i < otherNames.length; i++)
linkBlocks(new File(from, otherNames[i]), linkBlocksHelper(new File(from, otherNames[i]),
new File(to, otherNames[i]), oldLV, hl); new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout,
blockRoot, idBasedLayoutSingleLinks);
} }
/** /**

View File

@ -30,6 +30,8 @@ public class DatanodeUtil {
public static final String DISK_ERROR = "Possible disk error: "; public static final String DISK_ERROR = "Possible disk error: ";
private static final String SEP = System.getProperty("file.separator");
/** Get the cause of an I/O exception if caused by a possible disk error /** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception * @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error; * @return cause if the I/O exception is caused by a possible disk error;
@ -78,4 +80,38 @@ public class DatanodeUtil {
public static File getUnlinkTmpFile(File f) { public static File getUnlinkTmpFile(File f) {
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX); return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
} }
/**
* Checks whether there are any files anywhere in the directory tree rooted
* at dir (directories don't count as files). dir must exist
* @return true if there are no files
* @throws IOException if unable to list subdirectories
*/
public static boolean dirNoFilesRecursive(File dir) throws IOException {
File[] contents = dir.listFiles();
if (contents == null) {
throw new IOException("Cannot list contents of " + dir);
}
for (File f : contents) {
if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) {
return false;
}
}
return true;
}
/**
* Get the directory where a finalized block with this ID should be stored.
* Do not attempt to create the directory.
* @param root the root directory where finalized blocks are stored
* @param blockId
* @return
*/
public static File idToBlockDir(File root, long blockId) {
int d1 = (int)((blockId >> 16) & 0xff);
int d2 = (int)((blockId >> 8) & 0xff);
String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP +
DataStorage.BLOCK_SUBDIR_PREFIX + d2;
return new File(root, path);
}
} }

View File

@ -54,10 +54,10 @@ abstract public class ReplicaInfo extends Block implements Replica {
private File baseDir; private File baseDir;
/** /**
* Ints representing the sub directory path from base dir to the directory * Whether or not this replica's parent directory includes subdirs, in which
* containing this replica. * case we can generate them based on the replica's block ID
*/ */
private int[] subDirs; private boolean hasSubdirs;
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
@ -151,18 +151,8 @@ abstract public class ReplicaInfo extends Block implements Replica {
* @return the parent directory path where this replica is located * @return the parent directory path where this replica is located
*/ */
File getDir() { File getDir() {
if (subDirs == null) { return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir,
return null; getBlockId()) : baseDir;
}
StringBuilder sb = new StringBuilder();
for (int i : subDirs) {
sb.append(DataStorage.BLOCK_SUBDIR_PREFIX);
sb.append(i);
sb.append("/");
}
File ret = new File(baseDir, sb.toString());
return ret;
} }
/** /**
@ -175,54 +165,46 @@ abstract public class ReplicaInfo extends Block implements Replica {
private void setDirInternal(File dir) { private void setDirInternal(File dir) {
if (dir == null) { if (dir == null) {
subDirs = null;
baseDir = null; baseDir = null;
return; return;
} }
ReplicaDirInfo replicaDirInfo = parseSubDirs(dir); ReplicaDirInfo dirInfo = parseBaseDir(dir);
this.subDirs = replicaDirInfo.subDirs; this.hasSubdirs = dirInfo.hasSubidrs;
synchronized (internedBaseDirs) { synchronized (internedBaseDirs) {
if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) { if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) {
// Create a new String path of this file and make a brand new File object // Create a new String path of this file and make a brand new File object
// to guarantee we drop the reference to the underlying char[] storage. // to guarantee we drop the reference to the underlying char[] storage.
File baseDir = new File(replicaDirInfo.baseDirPath); File baseDir = new File(dirInfo.baseDirPath);
internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir); internedBaseDirs.put(dirInfo.baseDirPath, baseDir);
} }
this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath); this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath);
} }
} }
@VisibleForTesting @VisibleForTesting
public static class ReplicaDirInfo { public static class ReplicaDirInfo {
@VisibleForTesting
public String baseDirPath; public String baseDirPath;
public boolean hasSubidrs;
@VisibleForTesting public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) {
public int[] subDirs; this.baseDirPath = baseDirPath;
this.hasSubidrs = hasSubidrs;
}
} }
@VisibleForTesting @VisibleForTesting
public static ReplicaDirInfo parseSubDirs(File dir) { public static ReplicaDirInfo parseBaseDir(File dir) {
ReplicaDirInfo ret = new ReplicaDirInfo();
File currentDir = dir; File currentDir = dir;
List<Integer> subDirList = new ArrayList<Integer>(); boolean hasSubdirs = false;
while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) { while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) {
// Prepend the integer into the list. hasSubdirs = true;
subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst(
DataStorage.BLOCK_SUBDIR_PREFIX, "")));
currentDir = currentDir.getParentFile(); currentDir = currentDir.getParentFile();
} }
ret.subDirs = new int[subDirList.size()];
for (int i = 0; i < subDirList.size(); i++) {
ret.subDirs[i] = subDirList.get(i);
}
ret.baseDirPath = currentDir.getAbsolutePath(); return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs);
return ret;
} }
/** /**

View File

@ -59,7 +59,8 @@ class BlockPoolSlice {
private final String bpid; private final String bpid;
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
private final File currentDir; // StorageDirectory/current/bpid/current private final File currentDir; // StorageDirectory/current/bpid/current
private final LDir finalizedDir; // directory store Finalized replica // directory where finalized replicas are stored
private final File finalizedDir;
private final File rbwDir; // directory store RBW replica private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica private final File tmpDir; // directory store Temporary replica
private static final String DU_CACHE_FILE = "dfsUsed"; private static final String DU_CACHE_FILE = "dfsUsed";
@ -82,8 +83,13 @@ class BlockPoolSlice {
this.bpid = bpid; this.bpid = bpid;
this.volume = volume; this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
final File finalizedDir = new File( this.finalizedDir = new File(
currentDir, DataStorage.STORAGE_DIR_FINALIZED); currentDir, DataStorage.STORAGE_DIR_FINALIZED);
if (!this.finalizedDir.exists()) {
if (!this.finalizedDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + this.finalizedDir);
}
}
// Files that were being written when the datanode was last shutdown // Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that // are now moved back to the data directory. It is possible that
@ -101,10 +107,6 @@ class BlockPoolSlice {
if (rbwDir.exists() && !supportAppends) { if (rbwDir.exists() && !supportAppends) {
FileUtil.fullyDelete(rbwDir); FileUtil.fullyDelete(rbwDir);
} }
final int maxBlocksPerDir = conf.getInt(
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.mkdirs()) { // create rbw directory if not exist
if (!rbwDir.isDirectory()) { if (!rbwDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + rbwDir.toString()); throw new IOException("Mkdirs failed to create " + rbwDir.toString());
@ -137,7 +139,7 @@ class BlockPoolSlice {
} }
File getFinalizedDir() { File getFinalizedDir() {
return finalizedDir.dir; return finalizedDir;
} }
File getRbwDir() { File getRbwDir() {
@ -245,25 +247,56 @@ class BlockPoolSlice {
} }
File addBlock(Block b, File f) throws IOException { File addBlock(Block b, File f) throws IOException {
File blockFile = finalizedDir.addBlock(b, f); File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
if (!blockDir.exists()) {
if (!blockDir.mkdirs()) {
throw new IOException("Failed to mkdirs " + blockDir);
}
}
File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir);
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
return blockFile; return blockFile;
} }
void checkDirs() throws DiskErrorException { void checkDirs() throws DiskErrorException {
finalizedDir.checkDirTree(); DiskChecker.checkDirs(finalizedDir);
DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(tmpDir);
DiskChecker.checkDir(rbwDir); DiskChecker.checkDir(rbwDir);
} }
void getVolumeMap(ReplicaMap volumeMap) throws IOException { void getVolumeMap(ReplicaMap volumeMap) throws IOException {
// add finalized replicas // add finalized replicas
finalizedDir.getVolumeMap(bpid, volumeMap, volume); addToReplicasMap(volumeMap, finalizedDir, true);
// add rbw replicas // add rbw replicas
addToReplicasMap(volumeMap, rbwDir, false); addToReplicasMap(volumeMap, rbwDir, false);
} }
/**
* Recover an unlinked tmp file on datanode restart. If the original block
* does not exist, then the tmp file is renamed to be the
* original file name and the original name is returned; otherwise the tmp
* file is deleted and null is returned.
*/
File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException {
File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp);
if (blockFile.exists()) {
// If the original block file still exists, then no recovery is needed.
if (!unlinkedTmp.delete()) {
throw new IOException("Unable to cleanup unlinked tmp file " +
unlinkedTmp);
}
return null;
} else {
if (!unlinkedTmp.renameTo(blockFile)) {
throw new IOException("Unable to rename unlinked tmp file " +
unlinkedTmp);
}
return blockFile;
}
}
/** /**
* Add replicas under the given directory to the volume map * Add replicas under the given directory to the volume map
* @param volumeMap the replicas map * @param volumeMap the replicas map
@ -273,23 +306,34 @@ class BlockPoolSlice {
*/ */
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
) throws IOException { ) throws IOException {
File blockFiles[] = FileUtil.listFiles(dir); File files[] = FileUtil.listFiles(dir);
for (File blockFile : blockFiles) { for (File file : files) {
if (!Block.isBlockFilename(blockFile)) if (file.isDirectory()) {
addToReplicasMap(volumeMap, file, isFinalized);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
file = recoverTempUnlinkedBlock(file);
if (file == null) { // the original block still exists, so we cover it
// in another iteration and can continue here
continue;
}
}
if (!Block.isBlockFilename(file))
continue; continue;
long genStamp = FsDatasetUtil.getGenerationStampFromFile( long genStamp = FsDatasetUtil.getGenerationStampFromFile(
blockFiles, blockFile); files, file);
long blockId = Block.filename2id(blockFile.getName()); long blockId = Block.filename2id(file.getName());
ReplicaInfo newReplica = null; ReplicaInfo newReplica = null;
if (isFinalized) { if (isFinalized) {
newReplica = new FinalizedReplica(blockId, newReplica = new FinalizedReplica(blockId,
blockFile.length(), genStamp, volume, blockFile.getParentFile()); file.length(), genStamp, volume, file.getParentFile());
} else { } else {
boolean loadRwr = true; boolean loadRwr = true;
File restartMeta = new File(blockFile.getParent() + File restartMeta = new File(file.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart"); File.pathSeparator + "." + file.getName() + ".restart");
Scanner sc = null; Scanner sc = null;
try { try {
sc = new Scanner(restartMeta); sc = new Scanner(restartMeta);
@ -299,8 +343,8 @@ class BlockPoolSlice {
// We don't know the expected block length, so just use 0 // We don't know the expected block length, so just use 0
// and don't reserve any more space for writes. // and don't reserve any more space for writes.
newReplica = new ReplicaBeingWritten(blockId, newReplica = new ReplicaBeingWritten(blockId,
validateIntegrityAndSetLength(blockFile, genStamp), validateIntegrityAndSetLength(file, genStamp),
genStamp, volume, blockFile.getParentFile(), null, 0); genStamp, volume, file.getParentFile(), null, 0);
loadRwr = false; loadRwr = false;
} }
sc.close(); sc.close();
@ -309,7 +353,7 @@ class BlockPoolSlice {
restartMeta.getPath()); restartMeta.getPath());
} }
} catch (FileNotFoundException fnfe) { } catch (FileNotFoundException fnfe) {
// nothing to do here // nothing to do hereFile dir =
} finally { } finally {
if (sc != null) { if (sc != null) {
sc.close(); sc.close();
@ -318,15 +362,15 @@ class BlockPoolSlice {
// Restart meta doesn't exist or expired. // Restart meta doesn't exist or expired.
if (loadRwr) { if (loadRwr) {
newReplica = new ReplicaWaitingToBeRecovered(blockId, newReplica = new ReplicaWaitingToBeRecovered(blockId,
validateIntegrityAndSetLength(blockFile, genStamp), validateIntegrityAndSetLength(file, genStamp),
genStamp, volume, blockFile.getParentFile()); genStamp, volume, file.getParentFile());
} }
} }
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
if (oldReplica != null) { if (oldReplica != null) {
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " + FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
"on disk: " + oldReplica.getBlockFile() + " and " + blockFile ); "on disk: " + oldReplica.getBlockFile() + " and " + file );
} }
} }
} }
@ -413,10 +457,6 @@ class BlockPoolSlice {
} }
} }
void clearPath(File f) {
finalizedDir.clearPath(f);
}
@Override @Override
public String toString() { public String toString() {
return currentDir.getAbsolutePath(); return currentDir.getAbsolutePath();

View File

@ -1312,13 +1312,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
+ ". Parent not found for file " + f); + ". Parent not found for file " + f);
continue; continue;
} }
ReplicaState replicaState = info.getState();
if (replicaState == ReplicaState.FINALIZED ||
(replicaState == ReplicaState.RUR &&
((ReplicaUnderRecovery)info).getOriginalReplica().getState() ==
ReplicaState.FINALIZED)) {
v.clearPath(bpid, parent);
}
volumeMap.remove(bpid, invalidBlks[i]); volumeMap.remove(bpid, invalidBlks[i]);
} }

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@ -317,10 +318,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
bp.addToReplicasMap(volumeMap, dir, isFinalized); bp.addToReplicasMap(volumeMap, dir, isFinalized);
} }
void clearPath(String bpid, File f) throws IOException {
getBlockPoolSlice(bpid).clearPath(f);
}
@Override @Override
public String toString() { public String toString() {
return currentDir.getAbsolutePath(); return currentDir.getAbsolutePath();
@ -355,7 +352,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
File finalizedDir = new File(bpCurrentDir, File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED); DataStorage.STORAGE_DIR_FINALIZED);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) { if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive(
finalizedDir)) {
return false; return false;
} }
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) { if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
@ -382,7 +380,8 @@ public class FsVolumeImpl implements FsVolumeSpi {
if (!rbwDir.delete()) { if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir); throw new IOException("Failed to delete " + rbwDir);
} }
if (!finalizedDir.delete()) { if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) ||
!FileUtil.fullyDelete(finalizedDir)) {
throw new IOException("Failed to delete " + finalizedDir); throw new IOException("Failed to delete " + finalizedDir);
} }
FileUtil.fullyDelete(tmpDir); FileUtil.fullyDelete(tmpDir);

View File

@ -1,228 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
* A node type that can be built into a tree reflecting the
* hierarchy of replicas on the local disk.
*/
class LDir {
final File dir;
final int maxBlocksPerDir;
private int numBlocks = 0;
private LDir[] children = null;
private int lastChildIdx = 0;
LDir(File dir, int maxBlocksPerDir) throws IOException {
this.dir = dir;
this.maxBlocksPerDir = maxBlocksPerDir;
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Failed to mkdirs " + dir);
}
} else {
File[] files = FileUtil.listFiles(dir);
List<LDir> dirList = new ArrayList<LDir>();
for (int idx = 0; idx < files.length; idx++) {
if (files[idx].isDirectory()) {
dirList.add(new LDir(files[idx], maxBlocksPerDir));
} else if (Block.isBlockFilename(files[idx])) {
numBlocks++;
}
}
if (dirList.size() > 0) {
children = dirList.toArray(new LDir[dirList.size()]);
}
}
}
File addBlock(Block b, File src) throws IOException {
//First try without creating subdirectories
File file = addBlock(b, src, false, false);
return (file != null) ? file : addBlock(b, src, true, true);
}
private File addBlock(Block b, File src, boolean createOk, boolean resetIdx
) throws IOException {
if (numBlocks < maxBlocksPerDir) {
final File dest = FsDatasetImpl.moveBlockFiles(b, src, dir);
numBlocks += 1;
return dest;
}
if (lastChildIdx < 0 && resetIdx) {
//reset so that all children will be checked
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
}
if (lastChildIdx >= 0 && children != null) {
//Check if any child-tree has room for a block.
for (int i=0; i < children.length; i++) {
int idx = (lastChildIdx + i)%children.length;
File file = children[idx].addBlock(b, src, false, resetIdx);
if (file != null) {
lastChildIdx = idx;
return file;
}
}
lastChildIdx = -1;
}
if (!createOk) {
return null;
}
if (children == null || children.length == 0) {
children = new LDir[maxBlocksPerDir];
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
children[idx] = new LDir(sub, maxBlocksPerDir);
}
}
//now pick a child randomly for creating a new set of subdirs.
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
return children[ lastChildIdx ].addBlock(b, src, true, false);
}
void getVolumeMap(String bpid, ReplicaMap volumeMap, FsVolumeImpl volume
) throws IOException {
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].getVolumeMap(bpid, volumeMap, volume);
}
}
recoverTempUnlinkedBlock();
volume.addToReplicasMap(bpid, volumeMap, dir, true);
}
/**
* Recover unlinked tmp files on datanode restart. If the original block
* does not exist, then the tmp file is renamed to be the
* original file name; otherwise the tmp file is deleted.
*/
private void recoverTempUnlinkedBlock() throws IOException {
File files[] = FileUtil.listFiles(dir);
for (File file : files) {
if (!FsDatasetUtil.isUnlinkTmpFile(file)) {
continue;
}
File blockFile = FsDatasetUtil.getOrigFile(file);
if (blockFile.exists()) {
// If the original block file still exists, then no recovery is needed.
if (!file.delete()) {
throw new IOException("Unable to cleanup unlinked tmp file " + file);
}
} else {
if (!file.renameTo(blockFile)) {
throw new IOException("Unable to cleanup detached file " + file);
}
}
}
}
/**
* check if a data diretory is healthy
* @throws DiskErrorException
*/
void checkDirTree() throws DiskErrorException {
DiskChecker.checkDir(dir);
if (children != null) {
for (int i = 0; i < children.length; i++) {
children[i].checkDirTree();
}
}
}
void clearPath(File f) {
String root = dir.getAbsolutePath();
String dir = f.getAbsolutePath();
if (dir.startsWith(root)) {
String[] dirNames = dir.substring(root.length()).
split(File.separator + DataStorage.BLOCK_SUBDIR_PREFIX);
if (clearPath(f, dirNames, 1))
return;
}
clearPath(f, null, -1);
}
/**
* dirNames is an array of string integers derived from
* usual directory structure data/subdirN/subdirXY/subdirM ...
* If dirName array is non-null, we only check the child at
* the children[dirNames[idx]]. This avoids iterating over
* children in common case. If directory structure changes
* in later versions, we need to revisit this.
*/
private boolean clearPath(File f, String[] dirNames, int idx) {
if ((dirNames == null || idx == dirNames.length) &&
dir.compareTo(f) == 0) {
numBlocks--;
return true;
}
if (dirNames != null) {
//guess the child index from the directory name
if (idx > (dirNames.length - 1) || children == null) {
return false;
}
int childIdx;
try {
childIdx = Integer.parseInt(dirNames[idx]);
} catch (NumberFormatException ignored) {
// layout changed? we could print a warning.
return false;
}
return (childIdx >= 0 && childIdx < children.length) ?
children[childIdx].clearPath(f, dirNames, idx+1) : false;
}
//guesses failed. back to blind iteration.
if (children != null) {
for(int i=0; i < children.length; i++) {
if (children[i].clearPath(f, null, -1)){
return true;
}
}
}
return false;
}
@Override
public String toString() {
return "FSDir{dir=" + dir + ", children="
+ (children == null ? null : Arrays.asList(children)) + "}";
}
}

View File

@ -2129,4 +2129,12 @@
</description> </description>
</property> </property>
<property>
<name>dfs.datanode.block.id.layout.upgrade.threads</name>
<value>12</value>
<description>The number of threads to use when creating hard links from
current to previous blocks during upgrade of a DataNode to block ID-based
block layout (see HDFS-6482 for details on the layout).</description>
</property>
</configuration> </configuration>

View File

@ -2501,8 +2501,8 @@ public class MiniDFSCluster {
* @return data file corresponding to the block * @return data file corresponding to the block
*/ */
public static File getBlockFile(File storageDir, ExtendedBlock blk) { public static File getBlockFile(File storageDir, ExtendedBlock blk) {
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(storageDir,
blk.getBlockName()); blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName());
} }
/** /**
@ -2512,10 +2512,32 @@ public class MiniDFSCluster {
* @return metadata file corresponding to the block * @return metadata file corresponding to the block
*/ */
public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) { public static File getBlockMetadataFile(File storageDir, ExtendedBlock blk) {
return new File(getFinalizedDir(storageDir, blk.getBlockPoolId()), return new File(DatanodeUtil.idToBlockDir(getFinalizedDir(storageDir,
blk.getBlockName() + "_" + blk.getGenerationStamp() + blk.getBlockPoolId()), blk.getBlockId()), blk.getBlockName() + "_" +
Block.METADATA_EXTENSION); blk.getGenerationStamp() + Block.METADATA_EXTENSION);
}
/**
* Return all block metadata files in given directory (recursive search)
*/
public static List<File> getAllBlockMetadataFiles(File storageDir) {
List<File> results = new ArrayList<File>();
File[] files = storageDir.listFiles();
if (files == null) {
return null;
}
for (File f : files) {
if (f.getName().startsWith("blk_") && f.getName().endsWith(
Block.METADATA_EXTENSION)) {
results.add(f);
} else if (f.isDirectory()) {
List<File> subdirResults = getAllBlockMetadataFiles(f);
if (subdirResults != null) {
results.addAll(subdirResults);
}
}
}
return results;
} }
/** /**

View File

@ -79,8 +79,8 @@ public class TestDFSFinalize {
File dnCurDirs[] = new File[dataNodeDirs.length]; File dnCurDirs[] = new File[dataNodeDirs.length];
for (int i = 0; i < dataNodeDirs.length; i++) { for (int i = 0; i < dataNodeDirs.length; i++) {
dnCurDirs[i] = new File(dataNodeDirs[i],"current"); dnCurDirs[i] = new File(dataNodeDirs[i],"current");
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, dnCurDirs[i]), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, dnCurDirs[i],
UpgradeUtilities.checksumMasterDataNodeContents()); false), UpgradeUtilities.checksumMasterDataNodeContents());
} }
for (int i = 0; i < nameNodeDirs.length; i++) { for (int i = 0; i < nameNodeDirs.length; i++) {
assertFalse(new File(nameNodeDirs[i],"previous").isDirectory()); assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
@ -96,7 +96,8 @@ public class TestDFSFinalize {
assertFalse(new File(bpRoot,"previous").isDirectory()); assertFalse(new File(bpRoot,"previous").isDirectory());
File bpCurFinalizeDir = new File(bpRoot,"current/"+DataStorage.STORAGE_DIR_FINALIZED); File bpCurFinalizeDir = new File(bpRoot,"current/"+DataStorage.STORAGE_DIR_FINALIZED);
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurFinalizeDir), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
bpCurFinalizeDir, true),
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents()); UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
} }
} }

View File

@ -81,7 +81,7 @@ public class TestDFSRollback {
break; break;
case DATA_NODE: case DATA_NODE:
assertEquals( assertEquals(
UpgradeUtilities.checksumContents(nodeType, curDir), UpgradeUtilities.checksumContents(nodeType, curDir, false),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
break; break;
} }

View File

@ -239,7 +239,7 @@ public class TestDFSStorageStateRecovery {
assertTrue(new File(baseDirs[i],"previous").isDirectory()); assertTrue(new File(baseDirs[i],"previous").isDirectory());
assertEquals( assertEquals(
UpgradeUtilities.checksumContents( UpgradeUtilities.checksumContents(
NAME_NODE, new File(baseDirs[i],"previous")), NAME_NODE, new File(baseDirs[i],"previous"), false),
UpgradeUtilities.checksumMasterNameNodeContents()); UpgradeUtilities.checksumMasterNameNodeContents());
} }
} }
@ -259,7 +259,8 @@ public class TestDFSStorageStateRecovery {
if (currentShouldExist) { if (currentShouldExist) {
for (int i = 0; i < baseDirs.length; i++) { for (int i = 0; i < baseDirs.length; i++) {
assertEquals( assertEquals(
UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"current")), UpgradeUtilities.checksumContents(DATA_NODE,
new File(baseDirs[i],"current"), false),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
} }
} }
@ -267,7 +268,8 @@ public class TestDFSStorageStateRecovery {
for (int i = 0; i < baseDirs.length; i++) { for (int i = 0; i < baseDirs.length; i++) {
assertTrue(new File(baseDirs[i],"previous").isDirectory()); assertTrue(new File(baseDirs[i],"previous").isDirectory());
assertEquals( assertEquals(
UpgradeUtilities.checksumContents(DATA_NODE, new File(baseDirs[i],"previous")), UpgradeUtilities.checksumContents(DATA_NODE,
new File(baseDirs[i],"previous"), false),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
} }
} }
@ -290,8 +292,8 @@ public class TestDFSStorageStateRecovery {
if (currentShouldExist) { if (currentShouldExist) {
for (int i = 0; i < baseDirs.length; i++) { for (int i = 0; i < baseDirs.length; i++) {
File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT); File bpCurDir = new File(baseDirs[i], Storage.STORAGE_DIR_CURRENT);
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurDir,
UpgradeUtilities.checksumMasterBlockPoolContents()); false), UpgradeUtilities.checksumMasterBlockPoolContents());
} }
} }
if (previousShouldExist) { if (previousShouldExist) {
@ -299,8 +301,8 @@ public class TestDFSStorageStateRecovery {
File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS); File bpPrevDir = new File(baseDirs[i], Storage.STORAGE_DIR_PREVIOUS);
assertTrue(bpPrevDir.isDirectory()); assertTrue(bpPrevDir.isDirectory());
assertEquals( assertEquals(
UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir), UpgradeUtilities.checksumContents(DATA_NODE, bpPrevDir,
UpgradeUtilities.checksumMasterBlockPoolContents()); false), UpgradeUtilities.checksumMasterBlockPoolContents());
} }
} }
} }

View File

@ -100,7 +100,7 @@ public class TestDFSUpgrade {
File previous = new File(baseDir, "previous"); File previous = new File(baseDir, "previous");
assertExists(previous); assertExists(previous);
assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous), assertEquals(UpgradeUtilities.checksumContents(NAME_NODE, previous, false),
UpgradeUtilities.checksumMasterNameNodeContents()); UpgradeUtilities.checksumMasterNameNodeContents());
} }
} }
@ -114,23 +114,25 @@ public class TestDFSUpgrade {
void checkDataNode(String[] baseDirs, String bpid) throws IOException { void checkDataNode(String[] baseDirs, String bpid) throws IOException {
for (int i = 0; i < baseDirs.length; i++) { for (int i = 0; i < baseDirs.length; i++) {
File current = new File(baseDirs[i], "current/" + bpid + "/current"); File current = new File(baseDirs[i], "current/" + bpid + "/current");
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, current, false),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
// block files are placed under <sd>/current/<bpid>/current/finalized // block files are placed under <sd>/current/<bpid>/current/finalized
File currentFinalized = File currentFinalized =
MiniDFSCluster.getFinalizedDir(new File(baseDirs[i]), bpid); MiniDFSCluster.getFinalizedDir(new File(baseDirs[i]), bpid);
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, currentFinalized), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
currentFinalized, true),
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents()); UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
File previous = new File(baseDirs[i], "current/" + bpid + "/previous"); File previous = new File(baseDirs[i], "current/" + bpid + "/previous");
assertTrue(previous.isDirectory()); assertTrue(previous.isDirectory());
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previous, false),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
File previousFinalized = File previousFinalized =
new File(baseDirs[i], "current/" + bpid + "/previous"+"/finalized"); new File(baseDirs[i], "current/" + bpid + "/previous"+"/finalized");
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, previousFinalized), assertEquals(UpgradeUtilities.checksumContents(DATA_NODE,
previousFinalized, true),
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents()); UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
} }

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
@ -80,7 +81,7 @@ public class TestDFSUpgradeFromImage {
long checksum; long checksum;
} }
private static final Configuration upgradeConf; static final Configuration upgradeConf;
static { static {
upgradeConf = new HdfsConfiguration(); upgradeConf = new HdfsConfiguration();
@ -95,7 +96,7 @@ public class TestDFSUpgradeFromImage {
boolean printChecksum = false; boolean printChecksum = false;
private void unpackStorage(String tarFileName) void unpackStorage(String tarFileName, String referenceName)
throws IOException { throws IOException {
String tarFile = System.getProperty("test.cache.data", "build/test/cache") String tarFile = System.getProperty("test.cache.data", "build/test/cache")
+ "/" + tarFileName; + "/" + tarFileName;
@ -110,7 +111,7 @@ public class TestDFSUpgradeFromImage {
BufferedReader reader = new BufferedReader(new FileReader( BufferedReader reader = new BufferedReader(new FileReader(
System.getProperty("test.cache.data", "build/test/cache") System.getProperty("test.cache.data", "build/test/cache")
+ "/" + HADOOP_DFS_DIR_TXT)); + "/" + referenceName));
String line; String line;
while ( (line = reader.readLine()) != null ) { while ( (line = reader.readLine()) != null ) {
@ -285,7 +286,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromRel22Image() throws IOException { public void testUpgradeFromRel22Image() throws IOException {
unpackStorage(HADOOP22_IMAGE); unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf). upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
numDataNodes(4)); numDataNodes(4));
} }
@ -296,7 +297,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromCorruptRel22Image() throws IOException { public void testUpgradeFromCorruptRel22Image() throws IOException {
unpackStorage(HADOOP22_IMAGE); unpackStorage(HADOOP22_IMAGE, HADOOP_DFS_DIR_TXT);
// Overwrite the md5 stored in the VERSION files // Overwrite the md5 stored in the VERSION files
File baseDir = new File(MiniDFSCluster.getBaseDirectory()); File baseDir = new File(MiniDFSCluster.getBaseDirectory());
@ -333,7 +334,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromRel1ReservedImage() throws Exception { public void testUpgradeFromRel1ReservedImage() throws Exception {
unpackStorage(HADOOP1_RESERVED_IMAGE); unpackStorage(HADOOP1_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
// Try it once without setting the upgrade flag to ensure it fails // Try it once without setting the upgrade flag to ensure it fails
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
@ -403,7 +404,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromRel023ReservedImage() throws Exception { public void testUpgradeFromRel023ReservedImage() throws Exception {
unpackStorage(HADOOP023_RESERVED_IMAGE); unpackStorage(HADOOP023_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
// Try it once without setting the upgrade flag to ensure it fails // Try it once without setting the upgrade flag to ensure it fails
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
@ -468,7 +469,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromRel2ReservedImage() throws Exception { public void testUpgradeFromRel2ReservedImage() throws Exception {
unpackStorage(HADOOP2_RESERVED_IMAGE); unpackStorage(HADOOP2_RESERVED_IMAGE, HADOOP_DFS_DIR_TXT);
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
// Try it once without setting the upgrade flag to ensure it fails // Try it once without setting the upgrade flag to ensure it fails
final Configuration conf = new Configuration(); final Configuration conf = new Configuration();
@ -572,7 +573,7 @@ public class TestDFSUpgradeFromImage {
} while (dirList.hasMore()); } while (dirList.hasMore());
} }
private void upgradeAndVerify(MiniDFSCluster.Builder bld) void upgradeAndVerify(MiniDFSCluster.Builder bld)
throws IOException { throws IOException {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
@ -601,7 +602,7 @@ public class TestDFSUpgradeFromImage {
*/ */
@Test @Test
public void testUpgradeFromRel1BBWImage() throws IOException { public void testUpgradeFromRel1BBWImage() throws IOException {
unpackStorage(HADOOP1_BBW_IMAGE); unpackStorage(HADOOP1_BBW_IMAGE, HADOOP_DFS_DIR_TXT);
Configuration conf = new Configuration(upgradeConf); Configuration conf = new Configuration(upgradeConf);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
System.getProperty("test.build.data") + File.separator + System.getProperty("test.build.data") + File.separator +

View File

@ -445,19 +445,14 @@ public class TestDatanodeBlockScanner {
@Test @Test
public void testReplicaInfoParsing() throws Exception { public void testReplicaInfoParsing() throws Exception {
testReplicaInfoParsingSingle(BASE_PATH, new int[0]); testReplicaInfoParsingSingle(BASE_PATH);
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1", new int[]{1}); testReplicaInfoParsingSingle(BASE_PATH + "/subdir1");
testReplicaInfoParsingSingle(BASE_PATH + "/subdir43", new int[]{43}); testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3");
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3", new int[]{1, 2, 3});
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir43", new int[]{1, 2, 43});
testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir23/subdir3", new int[]{1, 23, 3});
testReplicaInfoParsingSingle(BASE_PATH + "/subdir13/subdir2/subdir3", new int[]{13, 2, 3});
} }
private static void testReplicaInfoParsingSingle(String subDirPath, int[] expectedSubDirs) { private static void testReplicaInfoParsingSingle(String subDirPath) {
File testFile = new File(subDirPath); File testFile = new File(subDirPath);
assertArrayEquals(expectedSubDirs, ReplicaInfo.parseSubDirs(testFile).subDirs); assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath);
assertEquals(BASE_PATH, ReplicaInfo.parseSubDirs(testFile).baseDirPath);
} }
@Test @Test

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
public class TestDatanodeLayoutUpgrade {
private static final String HADOOP_DATANODE_DIR_TXT =
"hadoop-datanode-dir.txt";
private static final String HADOOP24_DATANODE = "hadoop-24-datanode-dir.tgz";
@Test
// Upgrade from LDir-based layout to block ID-based layout -- change described
// in HDFS-6482
public void testUpgradeToIdBasedLayout() throws IOException {
TestDFSUpgradeFromImage upgrade = new TestDFSUpgradeFromImage();
upgrade.unpackStorage(HADOOP24_DATANODE, HADOOP_DATANODE_DIR_TXT);
Configuration conf = new Configuration(TestDFSUpgradeFromImage.upgradeConf);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
System.getProperty("test.build.data") + File.separator +
"dfs" + File.separator + "data");
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
System.getProperty("test.build.data") + File.separator +
"dfs" + File.separator + "name");
upgrade.upgradeAndVerify(new MiniDFSCluster.Builder(conf).numDataNodes(1)
.manageDataDfsDirs(false).manageNameDfsDirs(false));
}
}

View File

@ -27,6 +27,7 @@ import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.GenerationStamp;
@ -137,13 +139,15 @@ public class TestFileCorruption {
final String bpid = cluster.getNamesystem().getBlockPoolId(); final String bpid = cluster.getNamesystem().getBlockPoolId();
File storageDir = cluster.getInstanceStorageDir(0, 0); File storageDir = cluster.getInstanceStorageDir(0, 0);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("Data directory does not exist", dataDir.exists());
ExtendedBlock blk = getBlock(bpid, dataDir); ExtendedBlock blk = getBlock(bpid, dataDir);
if (blk == null) { if (blk == null) {
storageDir = cluster.getInstanceStorageDir(0, 1); storageDir = cluster.getInstanceStorageDir(0, 1);
dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
blk = getBlock(bpid, dataDir); blk = getBlock(bpid, dataDir);
} }
assertFalse(blk==null); assertFalse("Data directory does not contain any blocks or there was an "
+ "IO error", blk==null);
// start a third datanode // start a third datanode
cluster.startDataNodes(conf, 1, true, null, null); cluster.startDataNodes(conf, 1, true, null, null);
@ -174,33 +178,15 @@ public class TestFileCorruption {
} }
private ExtendedBlock getBlock(String bpid, File dataDir) { public static ExtendedBlock getBlock(String bpid, File dataDir) {
assertTrue("data directory does not exist", dataDir.exists()); List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(dataDir);
File[] blocks = dataDir.listFiles(); if (metadataFiles == null || metadataFiles.isEmpty()) {
assertTrue("Blocks do not exist in dataDir", (blocks != null) && (blocks.length > 0));
int idx = 0;
String blockFileName = null;
for (; idx < blocks.length; idx++) {
blockFileName = blocks[idx].getName();
if (blockFileName.startsWith("blk_") && !blockFileName.endsWith(".meta")) {
break;
}
}
if (blockFileName == null) {
return null; return null;
} }
long blockId = Long.parseLong(blockFileName.substring("blk_".length())); File metadataFile = metadataFiles.get(0);
long blockTimeStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP; File blockFile = Block.metaToBlockFile(metadataFile);
for (idx=0; idx < blocks.length; idx++) { return new ExtendedBlock(bpid, Block.getBlockId(blockFile.getName()),
String fileName = blocks[idx].getName(); blockFile.length(), Block.getGenerationStamp(metadataFile.getName()));
if (fileName.startsWith(blockFileName) && fileName.endsWith(".meta")) {
int startIndex = blockFileName.length()+1;
int endIndex = fileName.length() - ".meta".length();
blockTimeStamp = Long.parseLong(fileName.substring(startIndex, endIndex));
break;
}
}
return new ExtendedBlock(bpid, blockId, blocks[idx].length(), blockTimeStamp);
} }
} }

View File

@ -158,21 +158,23 @@ public class UpgradeUtilities {
FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock")); FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
} }
namenodeStorageChecksum = checksumContents(NAME_NODE, namenodeStorageChecksum = checksumContents(NAME_NODE,
new File(namenodeStorage, "current")); new File(namenodeStorage, "current"), false);
File dnCurDir = new File(datanodeStorage, "current"); File dnCurDir = new File(datanodeStorage, "current");
datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir); datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir, false);
File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current"); "current");
blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir); blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir, false);
File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current/"+DataStorage.STORAGE_DIR_FINALIZED); "current/"+DataStorage.STORAGE_DIR_FINALIZED);
blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, bpCurFinalizeDir); blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE,
bpCurFinalizeDir, true);
File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir),
"current/"+DataStorage.STORAGE_DIR_RBW); "current/"+DataStorage.STORAGE_DIR_RBW);
blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir); blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir,
false);
} }
// Private helper method that writes a file to the given file system. // Private helper method that writes a file to the given file system.
@ -266,28 +268,39 @@ public class UpgradeUtilities {
/** /**
* Compute the checksum of all the files in the specified directory. * Compute the checksum of all the files in the specified directory.
* The contents of subdirectories are not included. This method provides * This method provides an easy way to ensure equality between the contents
* an easy way to ensure equality between the contents of two directories. * of two directories.
* *
* @param nodeType if DATA_NODE then any file named "VERSION" is ignored. * @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
* This is because this file file is changed every time * This is because this file file is changed every time
* the Datanode is started. * the Datanode is started.
* @param dir must be a directory. Subdirectories are ignored. * @param dir must be a directory
* @param recursive whether or not to consider subdirectories
* *
* @throws IllegalArgumentException if specified directory is not a directory * @throws IllegalArgumentException if specified directory is not a directory
* @throws IOException if an IOException occurs while reading the files * @throws IOException if an IOException occurs while reading the files
* @return the computed checksum value * @return the computed checksum value
*/ */
public static long checksumContents(NodeType nodeType, File dir) throws IOException { public static long checksumContents(NodeType nodeType, File dir,
boolean recursive) throws IOException {
CRC32 checksum = new CRC32();
checksumContentsHelper(nodeType, dir, checksum, recursive);
return checksum.getValue();
}
public static void checksumContentsHelper(NodeType nodeType, File dir,
CRC32 checksum, boolean recursive) throws IOException {
if (!dir.isDirectory()) { if (!dir.isDirectory()) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Given argument is not a directory:" + dir); "Given argument is not a directory:" + dir);
} }
File[] list = dir.listFiles(); File[] list = dir.listFiles();
Arrays.sort(list); Arrays.sort(list);
CRC32 checksum = new CRC32();
for (int i = 0; i < list.length; i++) { for (int i = 0; i < list.length; i++) {
if (!list[i].isFile()) { if (!list[i].isFile()) {
if (recursive) {
checksumContentsHelper(nodeType, list[i], checksum, recursive);
}
continue; continue;
} }
@ -312,7 +325,6 @@ public class UpgradeUtilities {
} }
} }
} }
return checksum.getValue();
} }
/** /**

View File

@ -25,6 +25,7 @@ import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -384,7 +385,7 @@ public class TestDataNodeVolumeFailure {
continue; continue;
} }
String [] res = metaFilesInDir(dir); List<File> res = MiniDFSCluster.getAllBlockMetadataFiles(dir);
if(res == null) { if(res == null) {
System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j); System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
continue; continue;
@ -392,7 +393,8 @@ public class TestDataNodeVolumeFailure {
//System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files"); //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
//int ii = 0; //int ii = 0;
for(String s: res) { for(File f: res) {
String s = f.getName();
// cut off "blk_-" at the beginning and ".meta" at the end // cut off "blk_-" at the beginning and ".meta" at the end
assertNotNull("Block file name should not be null", s); assertNotNull("Block file name should not be null", s);
String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_")); String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
@ -408,25 +410,9 @@ public class TestDataNodeVolumeFailure {
//System.out.println("dir1="+dir.getPath() + "blocks=" + res.length); //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
//System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length); //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
total += res.length; total += res.size();
} }
} }
return total; return total;
} }
/*
* count how many files *.meta are in the dir
*/
private String [] metaFilesInDir(File dir) {
String [] res = dir.list(
new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("blk_") &&
name.endsWith(Block.METADATA_EXTENSION);
}
}
);
return res;
}
} }

View File

@ -103,9 +103,10 @@ public class TestDeleteBlockPool {
fs1.delete(new Path("/alpha"), true); fs1.delete(new Path("/alpha"), true);
// Wait till all blocks are deleted from the dn2 for bpid1. // Wait till all blocks are deleted from the dn2 for bpid1.
while ((MiniDFSCluster.getFinalizedDir(dn2StorageDir1, File finalDir1 = MiniDFSCluster.getFinalizedDir(dn2StorageDir1, bpid1);
bpid1).list().length != 0) || (MiniDFSCluster.getFinalizedDir( File finalDir2 = MiniDFSCluster.getFinalizedDir(dn2StorageDir1, bpid2);
dn2StorageDir2, bpid1).list().length != 0)) { while ((!DatanodeUtil.dirNoFilesRecursive(finalDir1)) ||
(!DatanodeUtil.dirNoFilesRecursive(finalDir2))) {
try { try {
Thread.sleep(3000); Thread.sleep(3000);
} catch (Exception ignored) { } catch (Exception ignored) {

View File

@ -41,6 +41,7 @@ import java.net.InetSocketAddress;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
@ -63,6 +64,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -750,15 +752,14 @@ public class TestFsck {
for (int j=0; j<=1; j++) { for (int j=0; j<=1; j++) {
File storageDir = cluster.getInstanceStorageDir(i, j); File storageDir = cluster.getInstanceStorageDir(i, j);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
File[] blocks = data_dir.listFiles(); List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
if (blocks == null) data_dir);
if (metadataFiles == null)
continue; continue;
for (File metadataFile : metadataFiles) {
for (int idx = 0; idx < blocks.length; idx++) { File blockFile = Block.metaToBlockFile(metadataFile);
if (!blocks[idx].getName().startsWith("blk_")) { assertTrue("Cannot remove file.", blockFile.delete());
continue; assertTrue("Cannot remove file.", metadataFile.delete());
}
assertTrue("Cannot remove file.", blocks[idx].delete());
} }
} }
} }

View File

@ -25,6 +25,7 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -39,7 +40,11 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestFileCorruption;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Test; import org.junit.Test;
@ -87,15 +92,11 @@ public class TestListCorruptFileBlocks {
File storageDir = cluster.getInstanceStorageDir(0, 1); File storageDir = cluster.getInstanceStorageDir(0, 1);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
assertTrue("data directory does not exist", data_dir.exists()); assertTrue("data directory does not exist", data_dir.exists());
File[] blocks = data_dir.listFiles(); List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length > 0)); assertTrue("Data directory does not contain any blocks or there was an "
for (int idx = 0; idx < blocks.length; idx++) { + "IO error", metaFiles != null && !metaFiles.isEmpty());
if (blocks[idx].getName().startsWith("blk_") && File metaFile = metaFiles.get(0);
blocks[idx].getName().endsWith(".meta")) { RandomAccessFile file = new RandomAccessFile(metaFile, "rw");
//
// shorten .meta file
//
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
FileChannel channel = file.getChannel(); FileChannel channel = file.getChannel();
long position = channel.size() - 2; long position = channel.size() - 2;
int length = 2; int length = 2;
@ -103,7 +104,7 @@ public class TestListCorruptFileBlocks {
random.nextBytes(buffer); random.nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position); channel.write(ByteBuffer.wrap(buffer), position);
file.close(); file.close();
LOG.info("Deliberately corrupting file " + blocks[idx].getName() + LOG.info("Deliberately corrupting file " + metaFile.getName() +
" at offset " + position + " length " + length); " at offset " + position + " length " + length);
// read all files to trigger detection of corrupted replica // read all files to trigger detection of corrupted replica
@ -115,9 +116,6 @@ public class TestListCorruptFileBlocks {
assertTrue("Corrupted replicas not handled properly. Expecting BlockMissingException " + assertTrue("Corrupted replicas not handled properly. Expecting BlockMissingException " +
" but received IOException " + e, false); " but received IOException " + e, false);
} }
break;
}
}
// fetch bad file list from namenode. There should be one file. // fetch bad file list from namenode. There should be one file.
badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null); badFiles = namenode.getNamesystem().listCorruptFileBlocks("/", null);
@ -174,16 +172,11 @@ public class TestListCorruptFileBlocks {
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, File data_dir = MiniDFSCluster.getFinalizedDir(storageDir,
cluster.getNamesystem().getBlockPoolId()); cluster.getNamesystem().getBlockPoolId());
assertTrue("data directory does not exist", data_dir.exists()); assertTrue("data directory does not exist", data_dir.exists());
File[] blocks = data_dir.listFiles(); List<File> metaFiles = MiniDFSCluster.getAllBlockMetadataFiles(data_dir);
assertTrue("Blocks do not exist in data-dir", (blocks != null) && assertTrue("Data directory does not contain any blocks or there was an "
(blocks.length > 0)); + "IO error", metaFiles != null && !metaFiles.isEmpty());
for (int idx = 0; idx < blocks.length; idx++) { File metaFile = metaFiles.get(0);
if (blocks[idx].getName().startsWith("blk_") && RandomAccessFile file = new RandomAccessFile(metaFile, "rw");
blocks[idx].getName().endsWith(".meta")) {
//
// shorten .meta file
//
RandomAccessFile file = new RandomAccessFile(blocks[idx], "rw");
FileChannel channel = file.getChannel(); FileChannel channel = file.getChannel();
long position = channel.size() - 2; long position = channel.size() - 2;
int length = 2; int length = 2;
@ -191,7 +184,7 @@ public class TestListCorruptFileBlocks {
random.nextBytes(buffer); random.nextBytes(buffer);
channel.write(ByteBuffer.wrap(buffer), position); channel.write(ByteBuffer.wrap(buffer), position);
file.close(); file.close();
LOG.info("Deliberately corrupting file " + blocks[idx].getName() + LOG.info("Deliberately corrupting file " + metaFile.getName() +
" at offset " + position + " length " + length); " at offset " + position + " length " + length);
// read all files to trigger detection of corrupted replica // read all files to trigger detection of corrupted replica
@ -204,9 +197,6 @@ public class TestListCorruptFileBlocks {
"Expecting BlockMissingException " + "Expecting BlockMissingException " +
" but received IOException " + e, false); " but received IOException " + e, false);
} }
break;
}
}
// fetch bad file list from namenode. There should be one file. // fetch bad file list from namenode. There should be one file.
badFiles = cluster.getNameNode().getNamesystem(). badFiles = cluster.getNameNode().getNamesystem().
@ -295,17 +285,18 @@ public class TestListCorruptFileBlocks {
for (int j = 0; j <= 1; j++) { for (int j = 0; j <= 1; j++) {
File storageDir = cluster.getInstanceStorageDir(i, j); File storageDir = cluster.getInstanceStorageDir(i, j);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
File[] blocks = data_dir.listFiles(); List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
if (blocks == null) data_dir);
if (metadataFiles == null)
continue; continue;
// assertTrue("Blocks do not exist in data-dir", (blocks != null) && // assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
// (blocks.length > 0)); // (blocks.length > 0));
for (int idx = 0; idx < blocks.length; idx++) { for (File metadataFile : metadataFiles) {
if (!blocks[idx].getName().startsWith("blk_")) { File blockFile = Block.metaToBlockFile(metadataFile);
continue; LOG.info("Deliberately removing file " + blockFile.getName());
} assertTrue("Cannot remove file.", blockFile.delete());
LOG.info("Deliberately removing file " + blocks[idx].getName()); LOG.info("Deliberately removing file " + metadataFile.getName());
assertTrue("Cannot remove file.", blocks[idx].delete()); assertTrue("Cannot remove file.", metadataFile.delete());
// break; // break;
} }
} }
@ -405,17 +396,18 @@ public class TestListCorruptFileBlocks {
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
File storageDir = cluster.getInstanceStorageDir(0, i); File storageDir = cluster.getInstanceStorageDir(0, i);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
File[] blocks = data_dir.listFiles(); List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
if (blocks == null) data_dir);
if (metadataFiles == null)
continue; continue;
// assertTrue("Blocks do not exist in data-dir", (blocks != null) && // assertTrue("Blocks do not exist in data-dir", (blocks != null) &&
// (blocks.length > 0)); // (blocks.length > 0));
for (int idx = 0; idx < blocks.length; idx++) { for (File metadataFile : metadataFiles) {
if (!blocks[idx].getName().startsWith("blk_")) { File blockFile = Block.metaToBlockFile(metadataFile);
continue; LOG.info("Deliberately removing file " + blockFile.getName());
} assertTrue("Cannot remove file.", blockFile.delete());
LOG.info("Deliberately removing file " + blocks[idx].getName()); LOG.info("Deliberately removing file " + metadataFile.getName());
assertTrue("Cannot remove file.", blocks[idx].delete()); assertTrue("Cannot remove file.", metadataFile.delete());
// break; // break;
} }
} }
@ -482,15 +474,14 @@ public class TestListCorruptFileBlocks {
File storageDir = cluster.getInstanceStorageDir(i, j); File storageDir = cluster.getInstanceStorageDir(i, j);
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
LOG.info("Removing files from " + data_dir); LOG.info("Removing files from " + data_dir);
File[] blocks = data_dir.listFiles(); List<File> metadataFiles = MiniDFSCluster.getAllBlockMetadataFiles(
if (blocks == null) data_dir);
if (metadataFiles == null)
continue; continue;
for (File metadataFile : metadataFiles) {
for (int idx = 0; idx < blocks.length; idx++) { File blockFile = Block.metaToBlockFile(metadataFile);
if (!blocks[idx].getName().startsWith("blk_")) { assertTrue("Cannot remove file.", blockFile.delete());
continue; assertTrue("Cannot remove file.", metadataFile.delete());
}
assertTrue("Cannot remove file.", blocks[idx].delete());
} }
} }
} }

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Similar to hadoop-dfs-dir.txt, except this is used for a datanode layout
# upgrade test.
# Uncomment the following line to produce checksum info for a new DFS image.
#printChecksums
/small 2976363016
overallCRC 4099869518