HDFS-12978. Fine-grained locking while consuming journal stream. Contributed by Konstantin Shvachko.

This commit is contained in:
Konstantin V Shvachko 2018-05-31 14:56:32 -07:00
parent 950dea86f4
commit ebe5853a45
4 changed files with 64 additions and 15 deletions

View File

@ -138,7 +138,7 @@ public class FSEditLogLoader {
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
throws IOException { throws IOException {
return loadFSEdits(edits, expectedStartingTxId, null, null); return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null);
} }
/** /**
@ -147,6 +147,7 @@ public class FSEditLogLoader {
* along. * along.
*/ */
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
long maxTxnsToRead,
StartupOption startOpt, MetaRecoveryContext recovery) throws IOException { StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
Step step = createStartupProgressStep(edits); Step step = createStartupProgressStep(edits);
@ -154,9 +155,10 @@ public class FSEditLogLoader {
fsNamesys.writeLock(); fsNamesys.writeLock();
try { try {
long startTime = monotonicNow(); long startTime = monotonicNow();
FSImage.LOG.info("Start loading edits file " + edits.getName()); FSImage.LOG.info("Start loading edits file " + edits.getName()
+ " maxTxnsToRead = " + maxTxnsToRead);
long numEdits = loadEditRecords(edits, false, expectedStartingTxId, long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
startOpt, recovery); maxTxnsToRead, startOpt, recovery);
FSImage.LOG.info("Edits file " + edits.getName() FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits + " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds"); + " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
@ -171,8 +173,13 @@ public class FSEditLogLoader {
long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId, StartupOption startOpt, long expectedStartingTxId, StartupOption startOpt,
MetaRecoveryContext recovery) throws IOException { MetaRecoveryContext recovery) throws IOException {
FSDirectory fsDir = fsNamesys.dir; return loadEditRecords(in, closeOnExit, expectedStartingTxId,
Long.MAX_VALUE, startOpt, recovery);
}
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt,
MetaRecoveryContext recovery) throws IOException {
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts = EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class); new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@ -181,6 +188,7 @@ public class FSEditLogLoader {
} }
fsNamesys.writeLock(); fsNamesys.writeLock();
FSDirectory fsDir = fsNamesys.dir;
fsDir.writeLock(); fsDir.writeLock();
long recentOpcodeOffsets[] = new long[4]; long recentOpcodeOffsets[] = new long[4];
@ -285,6 +293,9 @@ public class FSEditLogLoader {
} }
numEdits++; numEdits++;
totalEdits++; totalEdits++;
if(numEdits >= maxTxnsToRead) {
break;
}
} catch (RollingUpgradeOp.RollbackException e) { } catch (RollingUpgradeOp.RollbackException e) {
LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback."); LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
break; break;
@ -308,7 +319,11 @@ public class FSEditLogLoader {
if (FSImage.LOG.isDebugEnabled()) { if (FSImage.LOG.isDebugEnabled()) {
dumpOpCounts(opCounts); dumpOpCounts(opCounts);
FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead
+ " actual edits read = " + numEdits);
} }
assert numEdits <= maxTxnsToRead || numEdits == 1 :
"should read at least one txn, but not more than the configured max";
} }
return numEdits; return numEdits;
} }

View File

@ -742,7 +742,8 @@ public class FSImage implements Closeable {
prog.endPhase(Phase.LOADING_FSIMAGE); prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) { if (!rollingRollback) {
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery); long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced); txnsAdvanced);
} else { } else {
@ -866,11 +867,12 @@ public class FSImage implements Closeable {
*/ */
public long loadEdits(Iterable<EditLogInputStream> editStreams, public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException { FSNamesystem target) throws IOException {
return loadEdits(editStreams, target, null, null); return loadEdits(editStreams, target, Long.MAX_VALUE, null, null);
} }
private long loadEdits(Iterable<EditLogInputStream> editStreams, public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery) FSNamesystem target, long maxTxnsToRead,
StartupOption startOpt, MetaRecoveryContext recovery)
throws IOException { throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
StartupProgress prog = NameNode.getStartupProgress(); StartupProgress prog = NameNode.getStartupProgress();
@ -885,14 +887,16 @@ public class FSImage implements Closeable {
LOG.info("Reading " + editIn + " expecting start txid #" + LOG.info("Reading " + editIn + " expecting start txid #" +
(lastAppliedTxId + 1)); (lastAppliedTxId + 1));
try { try {
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery); loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
startOpt, recovery);
} finally { } finally {
// Update lastAppliedTxId even in case of error, since some ops may // Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error. // have been successfully applied before the error.
lastAppliedTxId = loader.getLastAppliedTxId(); lastAppliedTxId = loader.getLastAppliedTxId();
} }
// If we are in recovery mode, we may have skipped over some txids. // If we are in recovery mode, we may have skipped over some txids.
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) { if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
&& recovery != null) {
lastAppliedTxId = editIn.getLastTxId(); lastAppliedTxId = editIn.getLastTxId();
} }
} }

View File

@ -74,6 +74,18 @@ import com.google.common.base.Preconditions;
public class EditLogTailer { public class EditLogTailer {
public static final Log LOG = LogFactory.getLog(EditLogTailer.class); public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
/**
* StandbyNode will hold namesystem lock to apply at most this many journal
* transactions.
* It will then release the lock and re-acquire it to load more transactions.
* By default the write lock is held for the entire journal segment.
* Fine-grained locking allows read requests to get through.
*/
public static final String DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY =
"dfs.ha.tail-edits.max-txns-per-lock";
public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT =
Long.MAX_VALUE;
private final EditLogTailerThread tailerThread; private final EditLogTailerThread tailerThread;
private final Configuration conf; private final Configuration conf;
@ -138,6 +150,12 @@ public class EditLogTailer {
*/ */
private final boolean inProgressOk; private final boolean inProgressOk;
/**
* Release the namesystem lock after loading this many transactions.
* Then re-acquire the lock to load more edits.
*/
private final long maxTxnsPerLock;
public EditLogTailer(FSNamesystem namesystem, Configuration conf) { public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.tailerThread = new EditLogTailerThread(); this.tailerThread = new EditLogTailerThread();
this.conf = conf; this.conf = conf;
@ -198,6 +216,10 @@ public class EditLogTailer {
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT); DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
this.maxTxnsPerLock = conf.getLong(
DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY,
DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT);
nnCount = nns.size(); nnCount = nns.size();
// setup the iterator to endlessly loop the nns // setup the iterator to endlessly loop the nns
this.nnLookup = Iterators.cycle(nns); this.nnLookup = Iterators.cycle(nns);
@ -290,7 +312,8 @@ public class EditLogTailer {
// disk are ignored. // disk are ignored.
long editsLoaded = 0; long editsLoaded = 0;
try { try {
editsLoaded = image.loadEdits(streams, namesystem); editsLoaded = image.loadEdits(
streams, namesystem, maxTxnsPerLock, null, null);
} catch (EditLogInputException elie) { } catch (EditLogInputException elie) {
editsLoaded = elie.getNumEditsLoaded(); editsLoaded = elie.getNumEditsLoaded();
throw elie; throw elie;

View File

@ -98,8 +98,9 @@ public class TestEditLogTailer {
public void testTailer() throws IOException, InterruptedException, public void testTailer() throws IOException, InterruptedException,
ServiceFailedException { ServiceFailedException {
Configuration conf = getConf(); Configuration conf = getConf();
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 0);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
conf.setLong(EditLogTailer.DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, 3);
HAUtil.setAllowStandbyReads(conf, true); HAUtil.setAllowStandbyReads(conf, true);
@ -121,6 +122,9 @@ public class TestEditLogTailer {
} }
HATestUtil.waitForStandbyToCatchUp(nn1, nn2); HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
assertEquals("Inconsistent number of applied txns on Standby",
nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
for (int i = 0; i < DIRS_TO_MAKE / 2; i++) { for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
assertTrue(NameNodeAdapter.getFileInfo(nn2, assertTrue(NameNodeAdapter.getFileInfo(nn2,
@ -134,6 +138,9 @@ public class TestEditLogTailer {
} }
HATestUtil.waitForStandbyToCatchUp(nn1, nn2); HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
assertEquals("Inconsistent number of applied txns on Standby",
nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) { for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
assertTrue(NameNodeAdapter.getFileInfo(nn2, assertTrue(NameNodeAdapter.getFileInfo(nn2,