HDFS-12978. Fine-grained locking while consuming journal stream. Contributed by Konstantin Shvachko.
This commit is contained in:
parent
ad11e40557
commit
442dd87dcd
|
@ -138,7 +138,7 @@ public class FSEditLogLoader {
|
|||
|
||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
|
||||
throws IOException {
|
||||
return loadFSEdits(edits, expectedStartingTxId, null, null);
|
||||
return loadFSEdits(edits, expectedStartingTxId, Long.MAX_VALUE, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -147,6 +147,7 @@ public class FSEditLogLoader {
|
|||
* along.
|
||||
*/
|
||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
||||
long maxTxnsToRead,
|
||||
StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
Step step = createStartupProgressStep(edits);
|
||||
|
@ -154,9 +155,10 @@ public class FSEditLogLoader {
|
|||
fsNamesys.writeLock();
|
||||
try {
|
||||
long startTime = monotonicNow();
|
||||
FSImage.LOG.info("Start loading edits file " + edits.getName());
|
||||
FSImage.LOG.info("Start loading edits file " + edits.getName()
|
||||
+ " maxTxnsToRead = " + maxTxnsToRead);
|
||||
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
||||
startOpt, recovery);
|
||||
maxTxnsToRead, startOpt, recovery);
|
||||
FSImage.LOG.info("Edits file " + edits.getName()
|
||||
+ " of size " + edits.length() + " edits # " + numEdits
|
||||
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
|
||||
|
@ -171,8 +173,13 @@ public class FSEditLogLoader {
|
|||
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
|
||||
long expectedStartingTxId, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery) throws IOException {
|
||||
FSDirectory fsDir = fsNamesys.dir;
|
||||
return loadEditRecords(in, closeOnExit, expectedStartingTxId,
|
||||
Long.MAX_VALUE, startOpt, recovery);
|
||||
}
|
||||
|
||||
long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
|
||||
long expectedStartingTxId, long maxTxnsToRead, StartupOption startOpt,
|
||||
MetaRecoveryContext recovery) throws IOException {
|
||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
||||
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
||||
|
||||
|
@ -181,6 +188,7 @@ public class FSEditLogLoader {
|
|||
}
|
||||
|
||||
fsNamesys.writeLock();
|
||||
FSDirectory fsDir = fsNamesys.dir;
|
||||
fsDir.writeLock();
|
||||
|
||||
long recentOpcodeOffsets[] = new long[4];
|
||||
|
@ -285,6 +293,9 @@ public class FSEditLogLoader {
|
|||
}
|
||||
numEdits++;
|
||||
totalEdits++;
|
||||
if(numEdits >= maxTxnsToRead) {
|
||||
break;
|
||||
}
|
||||
} catch (RollingUpgradeOp.RollbackException e) {
|
||||
LOG.info("Stopped at OP_START_ROLLING_UPGRADE for rollback.");
|
||||
break;
|
||||
|
@ -308,7 +319,11 @@ public class FSEditLogLoader {
|
|||
|
||||
if (FSImage.LOG.isDebugEnabled()) {
|
||||
dumpOpCounts(opCounts);
|
||||
FSImage.LOG.debug("maxTxnsToRead = " + maxTxnsToRead
|
||||
+ " actual edits read = " + numEdits);
|
||||
}
|
||||
assert numEdits <= maxTxnsToRead || numEdits == 1 :
|
||||
"should read at least one txn, but not more than the configured max";
|
||||
}
|
||||
return numEdits;
|
||||
}
|
||||
|
|
|
@ -742,7 +742,8 @@ public class FSImage implements Closeable {
|
|||
prog.endPhase(Phase.LOADING_FSIMAGE);
|
||||
|
||||
if (!rollingRollback) {
|
||||
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
|
||||
long txnsAdvanced = loadEdits(editStreams, target, Long.MAX_VALUE,
|
||||
startOpt, recovery);
|
||||
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
||||
txnsAdvanced);
|
||||
} else {
|
||||
|
@ -866,11 +867,12 @@ public class FSImage implements Closeable {
|
|||
*/
|
||||
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target) throws IOException {
|
||||
return loadEdits(editStreams, target, null, null);
|
||||
return loadEdits(editStreams, target, Long.MAX_VALUE, null, null);
|
||||
}
|
||||
|
||||
private long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
|
||||
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||
FSNamesystem target, long maxTxnsToRead,
|
||||
StartupOption startOpt, MetaRecoveryContext recovery)
|
||||
throws IOException {
|
||||
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
||||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
|
@ -885,14 +887,16 @@ public class FSImage implements Closeable {
|
|||
LOG.info("Reading " + editIn + " expecting start txid #" +
|
||||
(lastAppliedTxId + 1));
|
||||
try {
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, startOpt, recovery);
|
||||
loader.loadFSEdits(editIn, lastAppliedTxId + 1, maxTxnsToRead,
|
||||
startOpt, recovery);
|
||||
} finally {
|
||||
// Update lastAppliedTxId even in case of error, since some ops may
|
||||
// have been successfully applied before the error.
|
||||
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||
}
|
||||
// If we are in recovery mode, we may have skipped over some txids.
|
||||
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID) {
|
||||
if (editIn.getLastTxId() != HdfsServerConstants.INVALID_TXID
|
||||
&& recovery != null) {
|
||||
lastAppliedTxId = editIn.getLastTxId();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,7 +73,19 @@ import com.google.common.base.Preconditions;
|
|||
@InterfaceStability.Evolving
|
||||
public class EditLogTailer {
|
||||
public static final Log LOG = LogFactory.getLog(EditLogTailer.class);
|
||||
|
||||
|
||||
/**
|
||||
* StandbyNode will hold namesystem lock to apply at most this many journal
|
||||
* transactions.
|
||||
* It will then release the lock and re-acquire it to load more transactions.
|
||||
* By default the write lock is held for the entire journal segment.
|
||||
* Fine-grained locking allows read requests to get through.
|
||||
*/
|
||||
public static final String DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY =
|
||||
"dfs.ha.tail-edits.max-txns-per-lock";
|
||||
public static final long DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT =
|
||||
Long.MAX_VALUE;
|
||||
|
||||
private final EditLogTailerThread tailerThread;
|
||||
|
||||
private final Configuration conf;
|
||||
|
@ -138,6 +150,12 @@ public class EditLogTailer {
|
|||
*/
|
||||
private final boolean inProgressOk;
|
||||
|
||||
/**
|
||||
* Release the namesystem lock after loading this many transactions.
|
||||
* Then re-acquire the lock to load more edits.
|
||||
*/
|
||||
private final long maxTxnsPerLock;
|
||||
|
||||
public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
|
||||
this.tailerThread = new EditLogTailerThread();
|
||||
this.conf = conf;
|
||||
|
@ -198,6 +216,10 @@ public class EditLogTailer {
|
|||
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY,
|
||||
DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_DEFAULT);
|
||||
|
||||
this.maxTxnsPerLock = conf.getLong(
|
||||
DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY,
|
||||
DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_DEFAULT);
|
||||
|
||||
nnCount = nns.size();
|
||||
// setup the iterator to endlessly loop the nns
|
||||
this.nnLookup = Iterators.cycle(nns);
|
||||
|
@ -290,7 +312,8 @@ public class EditLogTailer {
|
|||
// disk are ignored.
|
||||
long editsLoaded = 0;
|
||||
try {
|
||||
editsLoaded = image.loadEdits(streams, namesystem);
|
||||
editsLoaded = image.loadEdits(
|
||||
streams, namesystem, maxTxnsPerLock, null, null);
|
||||
} catch (EditLogInputException elie) {
|
||||
editsLoaded = elie.getNumEditsLoaded();
|
||||
throw elie;
|
||||
|
|
|
@ -98,8 +98,9 @@ public class TestEditLogTailer {
|
|||
public void testTailer() throws IOException, InterruptedException,
|
||||
ServiceFailedException {
|
||||
Configuration conf = getConf();
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 0);
|
||||
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100);
|
||||
conf.setLong(EditLogTailer.DFS_HA_TAILEDITS_MAX_TXNS_PER_LOCK_KEY, 3);
|
||||
|
||||
HAUtil.setAllowStandbyReads(conf, true);
|
||||
|
||||
|
@ -121,7 +122,10 @@ public class TestEditLogTailer {
|
|||
}
|
||||
|
||||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
|
||||
|
||||
assertEquals("Inconsistent number of applied txns on Standby",
|
||||
nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
|
||||
nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
|
||||
|
||||
for (int i = 0; i < DIRS_TO_MAKE / 2; i++) {
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn2,
|
||||
getDirPath(i), false, false, false).isDirectory());
|
||||
|
@ -134,7 +138,10 @@ public class TestEditLogTailer {
|
|||
}
|
||||
|
||||
HATestUtil.waitForStandbyToCatchUp(nn1, nn2);
|
||||
|
||||
assertEquals("Inconsistent number of applied txns on Standby",
|
||||
nn1.getNamesystem().getEditLog().getLastWrittenTxId(),
|
||||
nn2.getNamesystem().getFSImage().getLastAppliedTxId() + 1);
|
||||
|
||||
for (int i = DIRS_TO_MAKE / 2; i < DIRS_TO_MAKE; i++) {
|
||||
assertTrue(NameNodeAdapter.getFileInfo(nn2,
|
||||
getDirPath(i), false, false, false).isDirectory());
|
||||
|
|
Loading…
Reference in New Issue