HDFS-14317. Ensure checkpoints are created when in-progress edit log tailing is enabled with a period shorter than the log roll period. Contributed by Ekanth Sethuramalingam.
(cherry-picked from commit 1bc282e0b3
)
This commit is contained in:
parent
451844fee5
commit
6d076dd5e8
|
@ -327,7 +327,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;
|
public static final int DFS_NAMENODE_QUOTA_INIT_THREADS_DEFAULT = 4;
|
||||||
|
|
||||||
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
|
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD = "dfs.namenode.edit.log.autoroll.multiplier.threshold";
|
||||||
public static final float DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 2.0f;
|
public static final float
|
||||||
|
DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT = 0.5f;
|
||||||
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
|
public static final String DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS = "dfs.namenode.edit.log.autoroll.check.interval.ms";
|
||||||
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
|
public static final int DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS_DEFAULT = 5*60*1000;
|
||||||
|
|
||||||
|
|
|
@ -563,7 +563,8 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
/**
|
/**
|
||||||
* @return the first transaction ID in the current log segment
|
* @return the first transaction ID in the current log segment
|
||||||
*/
|
*/
|
||||||
synchronized long getCurSegmentTxId() {
|
@VisibleForTesting
|
||||||
|
public synchronized long getCurSegmentTxId() {
|
||||||
Preconditions.checkState(isSegmentOpen(),
|
Preconditions.checkState(isSegmentOpen(),
|
||||||
"Bad state: %s", state);
|
"Bad state: %s", state);
|
||||||
return curSegmentTxId;
|
return curSegmentTxId;
|
||||||
|
|
|
@ -110,6 +110,11 @@ public class EditLogTailer {
|
||||||
*/
|
*/
|
||||||
private long lastLoadTimeMs;
|
private long lastLoadTimeMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The last time we triggered a edit log roll on active namenode.
|
||||||
|
*/
|
||||||
|
private long lastRollTimeMs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* How often the Standby should roll edit logs. Since the Standby only reads
|
* How often the Standby should roll edit logs. Since the Standby only reads
|
||||||
* from finalized log segments, the Standby will only be as up-to-date as how
|
* from finalized log segments, the Standby will only be as up-to-date as how
|
||||||
|
@ -140,7 +145,8 @@ public class EditLogTailer {
|
||||||
private int nnLoopCount = 0;
|
private int nnLoopCount = 0;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* maximum number of retries we should give each of the remote namenodes before giving up
|
* Maximum number of retries we should give each of the remote namenodes
|
||||||
|
* before giving up.
|
||||||
*/
|
*/
|
||||||
private int maxRetries;
|
private int maxRetries;
|
||||||
|
|
||||||
|
@ -162,6 +168,7 @@ public class EditLogTailer {
|
||||||
this.editLog = namesystem.getEditLog();
|
this.editLog = namesystem.getEditLog();
|
||||||
|
|
||||||
lastLoadTimeMs = monotonicNow();
|
lastLoadTimeMs = monotonicNow();
|
||||||
|
lastRollTimeMs = monotonicNow();
|
||||||
|
|
||||||
logRollPeriodMs = conf.getTimeDuration(
|
logRollPeriodMs = conf.getTimeDuration(
|
||||||
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||||
|
@ -350,7 +357,7 @@ public class EditLogTailer {
|
||||||
*/
|
*/
|
||||||
private boolean tooLongSinceLastLoad() {
|
private boolean tooLongSinceLastLoad() {
|
||||||
return logRollPeriodMs >= 0 &&
|
return logRollPeriodMs >= 0 &&
|
||||||
(monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
|
(monotonicNow() - lastRollTimeMs) > logRollPeriodMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -378,6 +385,7 @@ public class EditLogTailer {
|
||||||
try {
|
try {
|
||||||
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
|
future = rollEditsRpcExecutor.submit(getNameNodeProxy());
|
||||||
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS);
|
||||||
|
lastRollTimeMs = monotonicNow();
|
||||||
lastRollTriggerTxId = lastLoadedTxnId;
|
lastRollTriggerTxId = lastLoadedTxnId;
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
LOG.warn("Unable to trigger a roll of the active NN", e);
|
LOG.warn("Unable to trigger a roll of the active NN", e);
|
||||||
|
|
|
@ -2600,7 +2600,7 @@
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.edit.log.autoroll.multiplier.threshold</name>
|
<name>dfs.namenode.edit.log.autoroll.multiplier.threshold</name>
|
||||||
<value>2.0</value>
|
<value>0.5</value>
|
||||||
<description>
|
<description>
|
||||||
Determines when an active namenode will roll its own edit log.
|
Determines when an active namenode will roll its own edit log.
|
||||||
The actual threshold (in number of edits) is determined by multiplying
|
The actual threshold (in number of edits) is determined by multiplying
|
||||||
|
|
|
@ -338,9 +338,97 @@ public class TestEditLogTailer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStandbyTriggersLogRollsWhenTailInProgressEdits()
|
||||||
|
throws Exception {
|
||||||
|
// Time in seconds to wait for standby to catch up to edits from active
|
||||||
|
final int standbyCatchupWaitTime = 2;
|
||||||
|
// Time in seconds to wait before checking if edit logs are rolled while
|
||||||
|
// expecting no edit log roll
|
||||||
|
final int noLogRollWaitTime = 2;
|
||||||
|
// Time in seconds to wait before checking if edit logs are rolled while
|
||||||
|
// expecting edit log roll
|
||||||
|
final int logRollWaitTime = 3;
|
||||||
|
|
||||||
|
Configuration conf = getConf();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||||
|
standbyCatchupWaitTime + noLogRollWaitTime + 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = createMiniDFSCluster(conf, 2);
|
||||||
|
if (cluster == null) {
|
||||||
|
fail("failed to start mini cluster.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
int activeIndex = new Random().nextBoolean() ? 1 : 0;
|
||||||
|
int standbyIndex = (activeIndex == 0) ? 1 : 0;
|
||||||
|
cluster.transitionToActive(activeIndex);
|
||||||
|
NameNode active = cluster.getNameNode(activeIndex);
|
||||||
|
NameNode standby = cluster.getNameNode(standbyIndex);
|
||||||
|
|
||||||
|
long origTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||||
|
.getCurSegmentTxId();
|
||||||
|
for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
|
||||||
|
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
||||||
|
new PermissionStatus("test", "test",
|
||||||
|
new FsPermission((short)00755)), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||||
|
.getLastWrittenTxId();
|
||||||
|
waitForStandbyToCatchUpWithInProgressEdits(standby, activeTxId,
|
||||||
|
standbyCatchupWaitTime);
|
||||||
|
|
||||||
|
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
||||||
|
NameNodeAdapter.mkdirs(active, getDirPath(i),
|
||||||
|
new PermissionStatus("test", "test",
|
||||||
|
new FsPermission((short)00755)), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean exceptionThrown = false;
|
||||||
|
try {
|
||||||
|
checkForLogRoll(active, origTxId, noLogRollWaitTime);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
exceptionThrown = true;
|
||||||
|
}
|
||||||
|
assertTrue(exceptionThrown);
|
||||||
|
|
||||||
|
checkForLogRoll(active, origTxId, logRollWaitTime);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void waitForStandbyToCatchUpWithInProgressEdits(
|
||||||
|
final NameNode standby, final long activeTxId,
|
||||||
|
int maxWaitSec) throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
long standbyTxId = standby.getNamesystem().getFSImage()
|
||||||
|
.getLastAppliedTxId();
|
||||||
|
return (standbyTxId >= activeTxId);
|
||||||
|
}
|
||||||
|
}, 100, maxWaitSec * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void checkForLogRoll(final NameNode active,
|
||||||
|
final long origTxId, int maxWaitSec) throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
long curSegmentTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||||
|
.getCurSegmentTxId();
|
||||||
|
return (origTxId != curSegmentTxId);
|
||||||
|
}
|
||||||
|
}, 100, maxWaitSec * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
private static MiniDFSCluster createMiniDFSCluster(Configuration conf,
|
||||||
int nnCount) throws IOException {
|
int nnCount) throws IOException {
|
||||||
int basePort = 10060 + new Random().nextInt(100) * 2;
|
int basePort = 10060 + new Random().nextInt(1000) * 2;
|
||||||
|
|
||||||
// By passing in basePort, name node will have IPC port set,
|
// By passing in basePort, name node will have IPC port set,
|
||||||
// which is needed for enabling roll log.
|
// which is needed for enabling roll log.
|
||||||
|
|
|
@ -255,10 +255,10 @@ public class TestFailureToReadEdits {
|
||||||
|
|
||||||
// Once the standby catches up, it should notice that it needs to
|
// Once the standby catches up, it should notice that it needs to
|
||||||
// do a checkpoint and save one to its local directories.
|
// do a checkpoint and save one to its local directories.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3));
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5));
|
||||||
|
|
||||||
// It should also upload it back to the active.
|
// It should also upload it back to the active.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5));
|
||||||
|
|
||||||
causeFailureOnEditLogRead();
|
causeFailureOnEditLogRead();
|
||||||
|
|
||||||
|
@ -273,15 +273,15 @@ public class TestFailureToReadEdits {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR
|
// 5 because we should get OP_START_LOG_SEGMENT and one successful OP_MKDIR
|
||||||
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 3, 5));
|
HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(0, 5, 7));
|
||||||
|
|
||||||
// It should also upload it back to the active.
|
// It should also upload it back to the active.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7));
|
||||||
|
|
||||||
// Restart the active NN
|
// Restart the active NN
|
||||||
cluster.restartNameNode(0);
|
cluster.restartNameNode(0);
|
||||||
|
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3, 5));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5, 7));
|
||||||
|
|
||||||
FileSystem fs0 = null;
|
FileSystem fs0 = null;
|
||||||
try {
|
try {
|
||||||
|
@ -310,7 +310,7 @@ public class TestFailureToReadEdits {
|
||||||
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
|
||||||
|
|
||||||
// It should also upload it back to the active.
|
// It should also upload it back to the active.
|
||||||
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 3));
|
HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(0, 5));
|
||||||
|
|
||||||
causeFailureOnEditLogRead();
|
causeFailureOnEditLogRead();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue