HDFS-14392. Backport HDFS-9787 to branch-2. Contributed by Chao Sun.
This commit is contained in:
parent
99e41293ee
commit
812e6d5c81
|
@ -65,6 +65,7 @@ public class StandbyCheckpointer {
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
private long lastCheckpointTime;
|
private long lastCheckpointTime;
|
||||||
|
private long lastUploadTime;
|
||||||
private final CheckpointerThread thread;
|
private final CheckpointerThread thread;
|
||||||
private final ThreadFactory uploadThreadFactory;
|
private final ThreadFactory uploadThreadFactory;
|
||||||
private List<URL> activeNNAddresses;
|
private List<URL> activeNNAddresses;
|
||||||
|
@ -257,6 +258,7 @@ public class StandbyCheckpointer {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
lastUploadTime = monotonicNow();
|
||||||
|
|
||||||
// we are primary if we successfully updated the ANN
|
// we are primary if we successfully updated the ANN
|
||||||
this.isPrimaryCheckPointer = success;
|
this.isPrimaryCheckPointer = success;
|
||||||
|
@ -367,6 +369,7 @@ public class StandbyCheckpointer {
|
||||||
// Reset checkpoint time so that we don't always checkpoint
|
// Reset checkpoint time so that we don't always checkpoint
|
||||||
// on startup.
|
// on startup.
|
||||||
lastCheckpointTime = monotonicNow();
|
lastCheckpointTime = monotonicNow();
|
||||||
|
lastUploadTime = monotonicNow();
|
||||||
while (shouldRun) {
|
while (shouldRun) {
|
||||||
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage();
|
||||||
if (!needRollbackCheckpoint) {
|
if (!needRollbackCheckpoint) {
|
||||||
|
@ -419,7 +422,9 @@ public class StandbyCheckpointer {
|
||||||
|
|
||||||
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
|
// on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a
|
||||||
// rollback request, are the checkpointer, are outside the quiet period.
|
// rollback request, are the checkpointer, are outside the quiet period.
|
||||||
boolean sendRequest = isPrimaryCheckPointer || secsSinceLast >= checkpointConf.getQuietPeriod();
|
final long secsSinceLastUpload = (now - lastUploadTime) / 1000;
|
||||||
|
boolean sendRequest = isPrimaryCheckPointer
|
||||||
|
|| secsSinceLastUpload >= checkpointConf.getQuietPeriod();
|
||||||
doCheckpoint(sendRequest);
|
doCheckpoint(sendRequest);
|
||||||
|
|
||||||
// reset needRollbackCheckpoint to false only when we finish a ckpt
|
// reset needRollbackCheckpoint to false only when we finish a ckpt
|
||||||
|
|
|
@ -527,6 +527,48 @@ public class TestStandbyCheckpoints {
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case standby NNs can upload FSImage to ANN after
|
||||||
|
* become non-primary standby NN. HDFS-9787
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testNonPrimarySBNUploadFSImage() throws Exception {
|
||||||
|
// Shutdown all standby NNs.
|
||||||
|
for (int i = 1; i < NUM_NNS; i++) {
|
||||||
|
cluster.shutdownNameNode(i);
|
||||||
|
|
||||||
|
// Checkpoint as fast as we can, in a tight loop.
|
||||||
|
cluster.getConfiguration(i).setInt(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
doEdits(0, 10);
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
|
||||||
|
// Standby NNs do checkpoint without active NN available.
|
||||||
|
for (int i = 1; i < NUM_NNS; i++) {
|
||||||
|
cluster.restartNameNode(i, false);
|
||||||
|
}
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_NNS; i++) {
|
||||||
|
// Once the standby catches up, it should do a checkpoint
|
||||||
|
// and save to local directories.
|
||||||
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12));
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
// Wait for 2 seconds to expire last upload time.
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
doEdits(11, 20);
|
||||||
|
nns[0].getRpcServer().rollEditLog();
|
||||||
|
|
||||||
|
// One of standby NNs should also upload it back to the active.
|
||||||
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(23));
|
||||||
|
}
|
||||||
|
|
||||||
private void doEdits(int start, int stop) throws IOException {
|
private void doEdits(int start, int stop) throws IOException {
|
||||||
for (int i = start; i < stop; i++) {
|
for (int i = start; i < stop; i++) {
|
||||||
Path p = new Path("/test" + i);
|
Path p = new Path("/test" + i);
|
||||||
|
|
Loading…
Reference in New Issue