HDFS-3731. Release upgrade must handle blocks being written from 1.0. Contributed by Colin Patrick McCabe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1377548 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-08-27 01:07:04 +00:00
parent 804c45867a
commit 0e5d96e3ee
5 changed files with 132 additions and 35 deletions

View File

@ -510,6 +510,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3683. Edit log replay progress indicator shows >100% complete. (Plamen
Jeliazkov via atm)
HDFS-3731. Release upgrade must handle blocks being written from 1.0.
(Colin Patrick McCabe via eli)
BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)

View File

@ -85,6 +85,12 @@ public abstract class Storage extends StorageInfo {
public static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
public static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
/**
* The blocksBeingWritten directory which was used in some 1.x and earlier
* releases.
*/
public static final String STORAGE_1_BBW = "blocksBeingWritten";
public enum StorageState {
NON_EXISTENT,
NOT_FORMATTED,

View File

@ -451,6 +451,8 @@ public class DataStorage extends Storage {
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
assert curDir.exists() : "Data node current directory must exist.";
// Cleanup directory "detach"
cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
@ -471,7 +473,7 @@ public class DataStorage extends Storage {
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
bpStorage.format(curDir, nsInfo);
linkAllBlocks(tmpDir, new File(curBpDir, STORAGE_DIR_CURRENT));
linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
// 4. Write version file under <SD>/current
layoutVersion = HdfsConstants.LAYOUT_VERSION;
@ -578,15 +580,21 @@ public class DataStorage extends Storage {
+ "; cur CTime = " + this.getCTime());
assert sd.getCurrentDir().exists() : "Current directory must exist.";
final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
// 1. rename previous to finalized.tmp
rename(prevDir, tmpDir);
// 2. delete finalized.tmp dir in a separate thread
// Also delete the blocksBeingWritten from HDFS 1.x and earlier, if
// it exists.
new Daemon(new Runnable() {
@Override
public void run() {
try {
deleteDir(tmpDir);
if (bbwDir.exists()) {
deleteDir(bbwDir);
}
} catch(IOException ex) {
LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
}
@ -620,11 +628,16 @@ public class DataStorage extends Storage {
/**
* Hardlink all finalized and RBW blocks in fromDir to toDir
* @param fromDir directory where the snapshot is stored
* @param toDir the current data directory
* @throws IOException if error occurs during hardlink
*
* @param fromDir The directory where the 'from' snapshot is stored
* @param fromBbwDir In HDFS 1.x, the directory where blocks
* that are under construction are stored.
* @param toDir The current data directory
*
* @throws IOException If error occurs during hardlink
*/
private void linkAllBlocks(File fromDir, File toDir) throws IOException {
private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
throws IOException {
HardLink hardLink = new HardLink();
// do the link
int diskLayoutVersion = this.getLayoutVersion();
@ -632,13 +645,23 @@ public class DataStorage extends Storage {
// hardlink finalized blocks in tmpDir/finalized
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
// hardlink rbw blocks in tmpDir/finalized
// hardlink rbw blocks in tmpDir/rbw
linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
} else { // pre-RBW version
// hardlink finalized blocks in tmpDir
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
diskLayoutVersion, hardLink);
if (fromBbwDir.exists()) {
/*
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
* directory. It's a little messy, because the blocksBeingWriten was
* NOT underneath the 'current' directory in those releases. See
* HDFS-3731 for details.
*/
linkBlocks(fromBbwDir,
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
}
}
LOG.info( hardLink.linkStats.report() );
}

View File

@ -39,7 +39,9 @@ import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.util.StringUtils;
@ -49,8 +51,9 @@ import org.junit.Test;
* This tests data transfer protocol handling in the Datanode. It sends
* various forms of wrong data and verifies that Datanode handles it well.
*
* This test uses the following two file from src/test/.../dfs directory :
* 1) hadoop-version-dfs-dir.tgz : contains DFS directories.
* This test uses the following items from src/test/.../dfs directory :
* 1) hadoop-22-dfs-dir.tgz and other tarred pre-upgrade NN / DN
* directory images
* 2) hadoop-dfs-dir.txt : checksums that are compared in this test.
* Please read hadoop-dfs-dir.txt for more information.
*/
@ -62,14 +65,23 @@ public class TestDFSUpgradeFromImage {
new File(MiniDFSCluster.getBaseDirectory());
private static final String HADOOP_DFS_DIR_TXT = "hadoop-dfs-dir.txt";
private static final String HADOOP22_IMAGE = "hadoop-22-dfs-dir.tgz";
public int numDataNodes = 4;
private static final String HADOOP1_BBW_IMAGE = "hadoop1-bbw.tgz";
private static class ReferenceFileInfo {
String path;
long checksum;
}
private static final Configuration upgradeConf;
static {
upgradeConf = new HdfsConfiguration();
upgradeConf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Maven
System.setProperty("test.build.data", "build/test/data");
}
}
LinkedList<ReferenceFileInfo> refList = new LinkedList<ReferenceFileInfo>();
Iterator<ReferenceFileInfo> refIter;
@ -137,11 +149,33 @@ public class TestDFSUpgradeFromImage {
}
}
CRC32 overallChecksum = new CRC32();
/**
* Try to open a file for reading several times.
*
* If we fail because lease recovery hasn't completed, retry the open.
*/
private static FSInputStream dfsOpenFileWithRetries(DistributedFileSystem dfs,
String pathName) throws IOException {
IOException exc = null;
for (int tries = 0; tries < 10; tries++) {
try {
return dfs.dfs.open(pathName);
} catch (IOException e) {
exc = e;
}
if (!exc.getMessage().contains("Cannot obtain " +
"block length for LocatedBlock")) {
throw exc;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
throw exc;
}
private void verifyDir(DistributedFileSystem dfs, Path dir)
throws IOException {
private void verifyDir(DistributedFileSystem dfs, Path dir,
CRC32 overallChecksum) throws IOException {
FileStatus[] fileArr = dfs.listStatus(dir);
TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
@ -157,11 +191,11 @@ public class TestDFSUpgradeFromImage {
overallChecksum.update(pathName.getBytes());
if ( isDir ) {
verifyDir(dfs, path);
verifyDir(dfs, path, overallChecksum);
} else {
// this is not a directory. Checksum the file data.
CRC32 fileCRC = new CRC32();
FSInputStream in = dfs.dfs.open(pathName);
FSInputStream in = dfsOpenFileWithRetries(dfs, pathName);
byte[] buf = new byte[4096];
int nRead = 0;
while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
@ -175,7 +209,8 @@ public class TestDFSUpgradeFromImage {
private void verifyFileSystem(DistributedFileSystem dfs) throws IOException {
verifyDir(dfs, new Path("/"));
CRC32 overallChecksum = new CRC32();
verifyDir(dfs, new Path("/"), overallChecksum);
verifyChecksum("overallCRC", overallChecksum.getValue());
@ -237,7 +272,8 @@ public class TestDFSUpgradeFromImage {
@Test
public void testUpgradeFromRel22Image() throws IOException {
unpackStorage(HADOOP22_IMAGE);
upgradeAndVerify();
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
numDataNodes(4));
}
/**
@ -259,7 +295,8 @@ public class TestDFSUpgradeFromImage {
// Upgrade should now fail
try {
upgradeAndVerify();
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
numDataNodes(4));
fail("Upgrade did not fail with bad MD5");
} catch (IOException ioe) {
String msg = StringUtils.stringifyException(ioe);
@ -268,21 +305,34 @@ public class TestDFSUpgradeFromImage {
}
}
}
private void upgradeAndVerify() throws IOException {
static void recoverAllLeases(DFSClient dfs,
Path path) throws IOException {
String pathStr = path.toString();
HdfsFileStatus status = dfs.getFileInfo(pathStr);
if (!status.isDir()) {
dfs.recoverLease(pathStr);
return;
}
byte prev[] = HdfsFileStatus.EMPTY_NAME;
DirectoryListing dirList;
do {
dirList = dfs.listPaths(pathStr, prev);
HdfsFileStatus files[] = dirList.getPartialListing();
for (HdfsFileStatus f : files) {
recoverAllLeases(dfs, f.getFullPath(path));
}
prev = dirList.getLastName();
} while (dirList.hasMore());
}
private void upgradeAndVerify(MiniDFSCluster.Builder bld)
throws IOException {
MiniDFSCluster cluster = null;
try {
Configuration conf = new HdfsConfiguration();
if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Ant
System.setProperty("test.build.data", "build/test/data");
}
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes)
.format(false)
.startupOption(StartupOption.UPGRADE)
.clusterId("testClusterId")
.build();
bld.format(false).startupOption(StartupOption.UPGRADE)
.clusterId("testClusterId");
cluster = bld.build();
cluster.waitActive();
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
DFSClient dfsClient = dfs.dfs;
@ -293,12 +343,27 @@ public class TestDFSUpgradeFromImage {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
}
recoverAllLeases(dfsClient, new Path("/"));
verifyFileSystem(dfs);
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
/**
* Test upgrade from a 1.x image with some blocksBeingWritten
*/
@Test
public void testUpgradeFromRel1BBWImage() throws IOException {
unpackStorage(HADOOP1_BBW_IMAGE);
Configuration conf = new Configuration(upgradeConf);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
System.getProperty("test.build.data") + File.separator +
"dfs" + File.separator +
"data" + File.separator +
"data1");
upgradeAndVerify(new MiniDFSCluster.Builder(conf).
numDataNodes(1).enableManagedDfsDirsRedundancy(false).
manageDataDfsDirs(false));
}
}