HDFS-6340: Merging r1593436 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1593437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-05-08 22:21:10 +00:00
parent 3019b2c7f0
commit 92fdc7113f
3 changed files with 67 additions and 14 deletions

View File

@ -245,6 +245,8 @@ Release 2.4.1 - UNRELEASED
HDFS-2882. DN continues to start up, even if block pool fails to initialize HDFS-2882. DN continues to start up, even if block pool fails to initialize
(vinayakumarb) (vinayakumarb)
HDFS-6340. DN can't finalize upgrade. (Rahul Singhal via Arpit Agarwal)
Release 2.4.0 - 2014-04-07 Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -1019,16 +1019,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
+ "from " + nodeReg + ", reports.length=" + reports.length); + "from " + nodeReg + ", reports.length=" + reports.length);
} }
final BlockManager bm = namesystem.getBlockManager(); final BlockManager bm = namesystem.getBlockManager();
boolean hasStaleStorages = true; boolean noStaleStorages = false;
for(StorageBlockReport r : reports) { for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks()); final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks); noStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
metrics.incrStorageBlockReportOps(); metrics.incrStorageBlockReportOps();
} }
if (nn.getFSImage().isUpgradeFinalized() && if (nn.getFSImage().isUpgradeFinalized() &&
!nn.isStandbyState() && !nn.isStandbyState() &&
!hasStaleStorages) { noStaleStorages) {
return new FinalizeCommand(poolId); return new FinalizeCommand(poolId);
} }

View File

@ -30,6 +30,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -61,11 +63,9 @@ public class TestDFSFinalize {
* Verify that the current directory exists and that the previous directory * Verify that the current directory exists and that the previous directory
* does not exist. Verify that current hasn't been modified by comparing * does not exist. Verify that current hasn't been modified by comparing
* the checksum of all it's containing files with their original checksum. * the checksum of all it's containing files with their original checksum.
* Note that we do not check that previous is removed on the DataNode
* because its removal is asynchronous therefore we have no reliable
* way to know when it will happen.
*/ */
static void checkResult(String[] nameNodeDirs, String[] dataNodeDirs) throws Exception { static void checkResult(String[] nameNodeDirs, String[] dataNodeDirs,
String bpid) throws Exception {
List<File> dirs = Lists.newArrayList(); List<File> dirs = Lists.newArrayList();
for (int i = 0; i < nameNodeDirs.length; i++) { for (int i = 0; i < nameNodeDirs.length; i++) {
File curDir = new File(nameNodeDirs[i], "current"); File curDir = new File(nameNodeDirs[i], "current");
@ -76,15 +76,30 @@ public class TestDFSFinalize {
FSImageTestUtil.assertParallelFilesAreIdentical( FSImageTestUtil.assertParallelFilesAreIdentical(
dirs, Collections.<String>emptySet()); dirs, Collections.<String>emptySet());
File dnCurDirs[] = new File[dataNodeDirs.length];
for (int i = 0; i < dataNodeDirs.length; i++) { for (int i = 0; i < dataNodeDirs.length; i++) {
assertEquals( dnCurDirs[i] = new File(dataNodeDirs[i],"current");
UpgradeUtilities.checksumContents( assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, dnCurDirs[i]),
DATA_NODE, new File(dataNodeDirs[i],"current")),
UpgradeUtilities.checksumMasterDataNodeContents()); UpgradeUtilities.checksumMasterDataNodeContents());
} }
for (int i = 0; i < nameNodeDirs.length; i++) { for (int i = 0; i < nameNodeDirs.length; i++) {
assertFalse(new File(nameNodeDirs[i],"previous").isDirectory()); assertFalse(new File(nameNodeDirs[i],"previous").isDirectory());
} }
if (bpid == null) {
for (int i = 0; i < dataNodeDirs.length; i++) {
assertFalse(new File(dataNodeDirs[i],"previous").isDirectory());
}
} else {
for (int i = 0; i < dataNodeDirs.length; i++) {
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, dnCurDirs[i]);
assertFalse(new File(bpRoot,"previous").isDirectory());
File bpCurFinalizeDir = new File(bpRoot,"current/"+DataStorage.STORAGE_DIR_FINALIZED);
assertEquals(UpgradeUtilities.checksumContents(DATA_NODE, bpCurFinalizeDir),
UpgradeUtilities.checksumMasterBlockPoolFinalizedContents());
}
}
} }
/** /**
@ -106,7 +121,7 @@ public class TestDFSFinalize {
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY); String[] dataNodeDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
log("Finalize with existing previous dir", numDirs); log("Finalize NN & DN with existing previous dir", numDirs);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current"); UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous"); UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current"); UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
@ -118,11 +133,47 @@ public class TestDFSFinalize {
.startupOption(StartupOption.REGULAR) .startupOption(StartupOption.REGULAR)
.build(); .build();
cluster.finalizeCluster(conf); cluster.finalizeCluster(conf);
checkResult(nameNodeDirs, dataNodeDirs); cluster.triggerBlockReports();
// 1 second should be enough for asynchronous DN finalize
Thread.sleep(1000);
checkResult(nameNodeDirs, dataNodeDirs, null);
log("Finalize without existing previous dir", numDirs); log("Finalize NN & DN without existing previous dir", numDirs);
cluster.finalizeCluster(conf); cluster.finalizeCluster(conf);
checkResult(nameNodeDirs, dataNodeDirs); cluster.triggerBlockReports();
// 1 second should be enough for asynchronous DN finalize
Thread.sleep(1000);
checkResult(nameNodeDirs, dataNodeDirs, null);
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
UpgradeUtilities.createEmptyDirs(dataNodeDirs);
log("Finalize NN & BP with existing previous dir", numDirs);
String bpid = UpgradeUtilities.getCurrentBlockPoolID(cluster);
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
UpgradeUtilities.createDataNodeStorageDirs(dataNodeDirs, "current");
UpgradeUtilities.createBlockPoolStorageDirs(dataNodeDirs, "current", bpid);
UpgradeUtilities.createBlockPoolStorageDirs(dataNodeDirs, "previous", bpid);
cluster = new MiniDFSCluster.Builder(conf)
.format(false)
.manageDataDfsDirs(false)
.manageNameDfsDirs(false)
.startupOption(StartupOption.REGULAR)
.build();
cluster.finalizeCluster(conf);
cluster.triggerBlockReports();
// 1 second should be enough for asynchronous BP finalize
Thread.sleep(1000);
checkResult(nameNodeDirs, dataNodeDirs, bpid);
log("Finalize NN & BP without existing previous dir", numDirs);
cluster.finalizeCluster(conf);
cluster.triggerBlockReports();
// 1 second should be enough for asynchronous BP finalize
Thread.sleep(1000);
checkResult(nameNodeDirs, dataNodeDirs, bpid);
cluster.shutdown(); cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs); UpgradeUtilities.createEmptyDirs(nameNodeDirs);