HDFS-3731. 2.0 release upgrade must handle blocks being written from 1.0. Contributed by Colin Patrick McCabe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377137 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
92cb6b093c
commit
a11042365f
|
@ -673,6 +673,9 @@ Branch-2 ( Unreleased changes )
|
|||
HDFS-3715. Fix TestFileCreation#testFileCreationNamenodeRestart.
|
||||
(Andrew Whang via eli)
|
||||
|
||||
HDFS-3731. 2.0 release upgrade must handle blocks being written from 1.0.
|
||||
(Colin Patrick McCabe via eli)
|
||||
|
||||
BREAKDOWN OF HDFS-3042 SUBTASKS
|
||||
|
||||
HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
|
||||
|
|
|
@ -86,6 +86,12 @@ public abstract class Storage extends StorageInfo {
|
|||
public static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
|
||||
public static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
|
||||
|
||||
/**
|
||||
* The blocksBeingWritten directory which was used in some 1.x and earlier
|
||||
* releases.
|
||||
*/
|
||||
public static final String STORAGE_1_BBW = "blocksBeingWritten";
|
||||
|
||||
public enum StorageState {
|
||||
NON_EXISTENT,
|
||||
NOT_FORMATTED,
|
||||
|
|
|
@ -451,6 +451,8 @@ public class DataStorage extends Storage {
|
|||
|
||||
File curDir = sd.getCurrentDir();
|
||||
File prevDir = sd.getPreviousDir();
|
||||
File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
|
||||
|
||||
assert curDir.exists() : "Data node current directory must exist.";
|
||||
// Cleanup directory "detach"
|
||||
cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
|
||||
|
@ -471,7 +473,7 @@ public class DataStorage extends Storage {
|
|||
BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(),
|
||||
nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
|
||||
bpStorage.format(curDir, nsInfo);
|
||||
linkAllBlocks(tmpDir, new File(curBpDir, STORAGE_DIR_CURRENT));
|
||||
linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
|
||||
|
||||
// 4. Write version file under <SD>/current
|
||||
layoutVersion = HdfsConstants.LAYOUT_VERSION;
|
||||
|
@ -578,15 +580,21 @@ public class DataStorage extends Storage {
|
|||
+ "; cur CTime = " + this.getCTime());
|
||||
assert sd.getCurrentDir().exists() : "Current directory must exist.";
|
||||
final File tmpDir = sd.getFinalizedTmp();//finalized.tmp directory
|
||||
final File bbwDir = new File(sd.getRoot(), Storage.STORAGE_1_BBW);
|
||||
// 1. rename previous to finalized.tmp
|
||||
rename(prevDir, tmpDir);
|
||||
|
||||
// 2. delete finalized.tmp dir in a separate thread
|
||||
// Also delete the blocksBeingWritten from HDFS 1.x and earlier, if
|
||||
// it exists.
|
||||
new Daemon(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
deleteDir(tmpDir);
|
||||
if (bbwDir.exists()) {
|
||||
deleteDir(bbwDir);
|
||||
}
|
||||
} catch(IOException ex) {
|
||||
LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
|
||||
}
|
||||
|
@ -620,11 +628,16 @@ public class DataStorage extends Storage {
|
|||
|
||||
/**
|
||||
* Hardlink all finalized and RBW blocks in fromDir to toDir
|
||||
* @param fromDir directory where the snapshot is stored
|
||||
* @param toDir the current data directory
|
||||
* @throws IOException if error occurs during hardlink
|
||||
*
|
||||
* @param fromDir The directory where the 'from' snapshot is stored
|
||||
* @param fromBbwDir In HDFS 1.x, the directory where blocks
|
||||
* that are under construction are stored.
|
||||
* @param toDir The current data directory
|
||||
*
|
||||
* @throws IOException If error occurs during hardlink
|
||||
*/
|
||||
private void linkAllBlocks(File fromDir, File toDir) throws IOException {
|
||||
private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir)
|
||||
throws IOException {
|
||||
HardLink hardLink = new HardLink();
|
||||
// do the link
|
||||
int diskLayoutVersion = this.getLayoutVersion();
|
||||
|
@ -632,13 +645,23 @@ public class DataStorage extends Storage {
|
|||
// hardlink finalized blocks in tmpDir/finalized
|
||||
linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED),
|
||||
new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);
|
||||
// hardlink rbw blocks in tmpDir/finalized
|
||||
// hardlink rbw blocks in tmpDir/rbw
|
||||
linkBlocks(new File(fromDir, STORAGE_DIR_RBW),
|
||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
||||
} else { // pre-RBW version
|
||||
// hardlink finalized blocks in tmpDir
|
||||
linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED),
|
||||
diskLayoutVersion, hardLink);
|
||||
if (fromBbwDir.exists()) {
|
||||
/*
|
||||
* We need to put the 'blocksBeingWritten' from HDFS 1.x into the rbw
|
||||
* directory. It's a little messy, because the blocksBeingWriten was
|
||||
* NOT underneath the 'current' directory in those releases. See
|
||||
* HDFS-3731 for details.
|
||||
*/
|
||||
linkBlocks(fromBbwDir,
|
||||
new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink);
|
||||
}
|
||||
}
|
||||
LOG.info( hardLink.linkStats.report() );
|
||||
}
|
||||
|
|
|
@ -39,7 +39,9 @@ import org.apache.hadoop.fs.FSInputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -49,8 +51,9 @@ import org.junit.Test;
|
|||
* This tests data transfer protocol handling in the Datanode. It sends
|
||||
* various forms of wrong data and verifies that Datanode handles it well.
|
||||
*
|
||||
* This test uses the following two file from src/test/.../dfs directory :
|
||||
* 1) hadoop-version-dfs-dir.tgz : contains DFS directories.
|
||||
* This test uses the following items from src/test/.../dfs directory :
|
||||
* 1) hadoop-22-dfs-dir.tgz and other tarred pre-upgrade NN / DN
|
||||
* directory images
|
||||
* 2) hadoop-dfs-dir.txt : checksums that are compared in this test.
|
||||
* Please read hadoop-dfs-dir.txt for more information.
|
||||
*/
|
||||
|
@ -62,14 +65,23 @@ public class TestDFSUpgradeFromImage {
|
|||
new File(MiniDFSCluster.getBaseDirectory());
|
||||
private static final String HADOOP_DFS_DIR_TXT = "hadoop-dfs-dir.txt";
|
||||
private static final String HADOOP22_IMAGE = "hadoop-22-dfs-dir.tgz";
|
||||
|
||||
public int numDataNodes = 4;
|
||||
|
||||
private static final String HADOOP1_BBW_IMAGE = "hadoop1-bbw.tgz";
|
||||
|
||||
private static class ReferenceFileInfo {
|
||||
String path;
|
||||
long checksum;
|
||||
}
|
||||
|
||||
private static final Configuration upgradeConf;
|
||||
|
||||
static {
|
||||
upgradeConf = new HdfsConfiguration();
|
||||
upgradeConf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
|
||||
if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Maven
|
||||
System.setProperty("test.build.data", "build/test/data");
|
||||
}
|
||||
}
|
||||
|
||||
LinkedList<ReferenceFileInfo> refList = new LinkedList<ReferenceFileInfo>();
|
||||
Iterator<ReferenceFileInfo> refIter;
|
||||
|
||||
|
@ -137,11 +149,33 @@ public class TestDFSUpgradeFromImage {
|
|||
}
|
||||
}
|
||||
|
||||
CRC32 overallChecksum = new CRC32();
|
||||
/**
|
||||
* Try to open a file for reading several times.
|
||||
*
|
||||
* If we fail because lease recovery hasn't completed, retry the open.
|
||||
*/
|
||||
private static FSInputStream dfsOpenFileWithRetries(DistributedFileSystem dfs,
|
||||
String pathName) throws IOException {
|
||||
IOException exc = null;
|
||||
for (int tries = 0; tries < 10; tries++) {
|
||||
try {
|
||||
return dfs.dfs.open(pathName);
|
||||
} catch (IOException e) {
|
||||
exc = e;
|
||||
}
|
||||
if (!exc.getMessage().contains("Cannot obtain " +
|
||||
"block length for LocatedBlock")) {
|
||||
throw exc;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {}
|
||||
}
|
||||
throw exc;
|
||||
}
|
||||
|
||||
private void verifyDir(DistributedFileSystem dfs, Path dir)
|
||||
throws IOException {
|
||||
|
||||
private void verifyDir(DistributedFileSystem dfs, Path dir,
|
||||
CRC32 overallChecksum) throws IOException {
|
||||
FileStatus[] fileArr = dfs.listStatus(dir);
|
||||
TreeMap<Path, Boolean> fileMap = new TreeMap<Path, Boolean>();
|
||||
|
||||
|
@ -157,11 +191,11 @@ public class TestDFSUpgradeFromImage {
|
|||
overallChecksum.update(pathName.getBytes());
|
||||
|
||||
if ( isDir ) {
|
||||
verifyDir(dfs, path);
|
||||
verifyDir(dfs, path, overallChecksum);
|
||||
} else {
|
||||
// this is not a directory. Checksum the file data.
|
||||
CRC32 fileCRC = new CRC32();
|
||||
FSInputStream in = dfs.dfs.open(pathName);
|
||||
FSInputStream in = dfsOpenFileWithRetries(dfs, pathName);
|
||||
byte[] buf = new byte[4096];
|
||||
int nRead = 0;
|
||||
while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
|
||||
|
@ -175,7 +209,8 @@ public class TestDFSUpgradeFromImage {
|
|||
|
||||
private void verifyFileSystem(DistributedFileSystem dfs) throws IOException {
|
||||
|
||||
verifyDir(dfs, new Path("/"));
|
||||
CRC32 overallChecksum = new CRC32();
|
||||
verifyDir(dfs, new Path("/"), overallChecksum);
|
||||
|
||||
verifyChecksum("overallCRC", overallChecksum.getValue());
|
||||
|
||||
|
@ -237,7 +272,8 @@ public class TestDFSUpgradeFromImage {
|
|||
@Test
|
||||
public void testUpgradeFromRel22Image() throws IOException {
|
||||
unpackStorage(HADOOP22_IMAGE);
|
||||
upgradeAndVerify();
|
||||
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
|
||||
numDataNodes(4));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -259,7 +295,8 @@ public class TestDFSUpgradeFromImage {
|
|||
|
||||
// Upgrade should now fail
|
||||
try {
|
||||
upgradeAndVerify();
|
||||
upgradeAndVerify(new MiniDFSCluster.Builder(upgradeConf).
|
||||
numDataNodes(4));
|
||||
fail("Upgrade did not fail with bad MD5");
|
||||
} catch (IOException ioe) {
|
||||
String msg = StringUtils.stringifyException(ioe);
|
||||
|
@ -268,21 +305,34 @@ public class TestDFSUpgradeFromImage {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void upgradeAndVerify() throws IOException {
|
||||
|
||||
static void recoverAllLeases(DFSClient dfs,
|
||||
Path path) throws IOException {
|
||||
String pathStr = path.toString();
|
||||
HdfsFileStatus status = dfs.getFileInfo(pathStr);
|
||||
if (!status.isDir()) {
|
||||
dfs.recoverLease(pathStr);
|
||||
return;
|
||||
}
|
||||
byte prev[] = HdfsFileStatus.EMPTY_NAME;
|
||||
DirectoryListing dirList;
|
||||
do {
|
||||
dirList = dfs.listPaths(pathStr, prev);
|
||||
HdfsFileStatus files[] = dirList.getPartialListing();
|
||||
for (HdfsFileStatus f : files) {
|
||||
recoverAllLeases(dfs, f.getFullPath(path));
|
||||
}
|
||||
prev = dirList.getLastName();
|
||||
} while (dirList.hasMore());
|
||||
}
|
||||
|
||||
private void upgradeAndVerify(MiniDFSCluster.Builder bld)
|
||||
throws IOException {
|
||||
MiniDFSCluster cluster = null;
|
||||
try {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Ant
|
||||
System.setProperty("test.build.data", "build/test/data");
|
||||
}
|
||||
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); // block scanning off
|
||||
cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numDataNodes)
|
||||
.format(false)
|
||||
.startupOption(StartupOption.UPGRADE)
|
||||
.clusterId("testClusterId")
|
||||
.build();
|
||||
bld.format(false).startupOption(StartupOption.UPGRADE)
|
||||
.clusterId("testClusterId");
|
||||
cluster = bld.build();
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
|
||||
DFSClient dfsClient = dfs.dfs;
|
||||
|
@ -293,12 +343,27 @@ public class TestDFSUpgradeFromImage {
|
|||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {}
|
||||
}
|
||||
|
||||
recoverAllLeases(dfsClient, new Path("/"));
|
||||
verifyFileSystem(dfs);
|
||||
} finally {
|
||||
if (cluster != null) { cluster.shutdown(); }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test upgrade from a 1.x image with some blocksBeingWritten
|
||||
*/
|
||||
@Test
|
||||
public void testUpgradeFromRel1BBWImage() throws IOException {
|
||||
unpackStorage(HADOOP1_BBW_IMAGE);
|
||||
Configuration conf = new Configuration(upgradeConf);
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
|
||||
System.getProperty("test.build.data") + File.separator +
|
||||
"dfs" + File.separator +
|
||||
"data" + File.separator +
|
||||
"data1");
|
||||
upgradeAndVerify(new MiniDFSCluster.Builder(conf).
|
||||
numDataNodes(1).enableManagedDfsDirsRedundancy(false).
|
||||
manageDataDfsDirs(false));
|
||||
}
|
||||
}
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue