HDFS-6019. Standby NN might not checkpoint when processing the rolling upgrade marker. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-5535@1572182 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c8182ea764
commit
dedcc09e71
|
@ -105,3 +105,6 @@ HDFS-5535 subtasks:
|
|||
Arpit Agarwal)
|
||||
|
||||
HDFS-6020. Fix the five findbugs warnings. (kihwal)
|
||||
|
||||
HDFS-6019. Standby NN might not checkpoint when processing the rolling
|
||||
upgrade marker. (Haohui Mai via jing9)
|
||||
|
|
|
@ -406,6 +406,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
private final DatanodeStatistics datanodeStatistics;
|
||||
|
||||
private RollingUpgradeInfo rollingUpgradeInfo = null;
|
||||
/**
|
||||
* A flag that indicates whether the checkpointer should checkpoint a rollback
|
||||
* fsimage. The edit log tailer sets this flag. The checkpoint will create a
|
||||
* rollback fsimage if the flag is true, and then change the flag to false.
|
||||
*/
|
||||
private volatile boolean needRollbackFsImage;
|
||||
|
||||
// Block pool ID used by this namenode
|
||||
private String blockPoolId;
|
||||
|
@ -1149,6 +1155,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
* OP_ROLLING_UPGRADE_START.
|
||||
*/
|
||||
void triggerRollbackCheckpoint() {
|
||||
setNeedRollbackFsImage(true);
|
||||
if (standbyCheckpointer != null) {
|
||||
standbyCheckpointer.triggerRollbackCheckpoint();
|
||||
}
|
||||
|
@ -7232,6 +7239,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
return rollingUpgradeInfo;
|
||||
}
|
||||
|
||||
public boolean isNeedRollbackFsImage() {
|
||||
return needRollbackFsImage;
|
||||
}
|
||||
|
||||
public void setNeedRollbackFsImage(boolean needRollbackFsImage) {
|
||||
this.needRollbackFsImage = needRollbackFsImage;
|
||||
}
|
||||
|
||||
/** Is rolling upgrade in progress? */
|
||||
public boolean isRollingUpgrade() {
|
||||
return rollingUpgradeInfo != null;
|
||||
|
|
|
@ -143,7 +143,6 @@ public class StandbyCheckpointer {
|
|||
}
|
||||
|
||||
public void triggerRollbackCheckpoint() {
|
||||
thread.setNeedRollbackCheckpoint(true);
|
||||
thread.interrupt();
|
||||
}
|
||||
|
||||
|
@ -242,9 +241,6 @@ public class StandbyCheckpointer {
|
|||
private class CheckpointerThread extends Thread {
|
||||
private volatile boolean shouldRun = true;
|
||||
private volatile long preventCheckpointsUntil = 0;
|
||||
// Indicate that a rollback checkpoint is required immediately. It will be
|
||||
// reset to false after the checkpoint is done
|
||||
private volatile boolean needRollbackCheckpoint = false;
|
||||
|
||||
private CheckpointerThread() {
|
||||
super("Standby State Checkpointer");
|
||||
|
@ -254,10 +250,6 @@ public class StandbyCheckpointer {
|
|||
this.shouldRun = shouldRun;
|
||||
}
|
||||
|
||||
private void setNeedRollbackCheckpoint(boolean need) {
|
||||
this.needRollbackCheckpoint = need;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// We have to make sure we're logged in as far as JAAS
|
||||
|
@ -292,6 +284,7 @@ public class StandbyCheckpointer {
|
|||
// on startup.
|
||||
lastCheckpointTime = now();
|
||||
while (shouldRun) {
|
||||
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
||||
if (!needRollbackCheckpoint) {
|
||||
try {
|
||||
Thread.sleep(checkPeriod);
|
||||
|
@ -344,7 +337,7 @@ public class StandbyCheckpointer {
|
|||
if (needRollbackCheckpoint
|
||||
&& namesystem.getFSImage().hasRollbackFSImage()) {
|
||||
namesystem.setCreatedRollbackImages(true);
|
||||
needRollbackCheckpoint = false;
|
||||
namesystem.setNeedRollbackFsImage(false);
|
||||
}
|
||||
lastCheckpointTime = now;
|
||||
}
|
||||
|
|
|
@ -363,7 +363,8 @@ public class TestRollingUpgrade {
|
|||
dfs.mkdirs(foo);
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
RollingUpgradeInfo info = dfs
|
||||
.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
dfs.mkdirs(bar);
|
||||
dfs.close();
|
||||
|
@ -407,7 +408,8 @@ public class TestRollingUpgrade {
|
|||
FSImage fsimage = dfsCluster.getNamesystem(0).getFSImage();
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
RollingUpgradeInfo info = dfs
|
||||
.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
dfs.mkdirs(bar);
|
||||
|
||||
|
@ -429,6 +431,42 @@ public class TestRollingUpgrade {
|
|||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 300000)
|
||||
public void testQuery() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
MiniQJMHACluster cluster = null;
|
||||
try {
|
||||
cluster = new MiniQJMHACluster.Builder(conf).build();
|
||||
MiniDFSCluster dfsCluster = cluster.getDfsCluster();
|
||||
dfsCluster.waitActive();
|
||||
|
||||
dfsCluster.transitionToActive(0);
|
||||
DistributedFileSystem dfs = dfsCluster.getFileSystem(0);
|
||||
|
||||
dfsCluster.shutdownNameNode(1);
|
||||
|
||||
// start rolling upgrade
|
||||
RollingUpgradeInfo info = dfs
|
||||
.rollingUpgrade(RollingUpgradeAction.PREPARE);
|
||||
Assert.assertTrue(info.isStarted());
|
||||
|
||||
info = dfs.rollingUpgrade(RollingUpgradeAction.QUERY);
|
||||
Assert.assertFalse(info.createdRollbackImages());
|
||||
|
||||
dfsCluster.restartNameNode(1);
|
||||
|
||||
queryForPreparation(dfs);
|
||||
|
||||
// The NN should have a copy of the fsimage in case of rollbacks.
|
||||
Assert.assertTrue(dfsCluster.getNamesystem(0).getFSImage()
|
||||
.hasRollbackFSImage());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void queryForPreparation(DistributedFileSystem dfs) throws IOException,
|
||||
InterruptedException {
|
||||
RollingUpgradeInfo info;
|
||||
|
|
Loading…
Reference in New Issue