HDFS-13544. Improve logging for JournalNode in federated cluster.

(cherry picked from commit 6beb25ab7e)
This commit is contained in:
Hanisha Koneru 2018-05-14 10:12:08 -07:00
parent 6e731eb20d
commit 4f55941390
1 changed files with 64 additions and 51 deletions

View File

@ -208,11 +208,12 @@ public class Journal implements Closeable {
while (!files.isEmpty()) { while (!files.isEmpty()) {
EditLogFile latestLog = files.remove(files.size() - 1); EditLogFile latestLog = files.remove(files.size() - 1);
latestLog.scanLog(Long.MAX_VALUE, false); latestLog.scanLog(Long.MAX_VALUE, false);
LOG.info("Latest log is " + latestLog); LOG.info("Latest log is " + latestLog + " ; journal id: " + journalId);
if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
// the log contains no transactions // the log contains no transactions
LOG.warn("Latest log " + latestLog + " has no transactions. " + LOG.warn("Latest log " + latestLog + " has no transactions. " +
"moving it aside and looking for previous log"); "moving it aside and looking for previous log"
+ " ; journal id: " + journalId);
latestLog.moveAsideEmptyFile(); latestLog.moveAsideEmptyFile();
} else { } else {
return latestLog; return latestLog;
@ -230,7 +231,7 @@ public class Journal implements Closeable {
Preconditions.checkState(nsInfo.getNamespaceID() != 0, Preconditions.checkState(nsInfo.getNamespaceID() != 0,
"can't format with uninitialized namespace info: %s", "can't format with uninitialized namespace info: %s",
nsInfo); nsInfo);
LOG.info("Formatting " + this + " with namespace info: " + LOG.info("Formatting journal id : " + journalId + " with namespace info: " +
nsInfo); nsInfo);
storage.format(nsInfo); storage.format(nsInfo);
refreshCachedData(); refreshCachedData();
@ -323,7 +324,7 @@ public class Journal implements Closeable {
// any other that we've promised. // any other that we've promised.
if (epoch <= getLastPromisedEpoch()) { if (epoch <= getLastPromisedEpoch()) {
throw new IOException("Proposed epoch " + epoch + " <= last promise " + throw new IOException("Proposed epoch " + epoch + " <= last promise " +
getLastPromisedEpoch()); getLastPromisedEpoch() + " ; journal id: " + journalId);
} }
updateLastPromisedEpoch(epoch); updateLastPromisedEpoch(epoch);
@ -343,7 +344,8 @@ public class Journal implements Closeable {
private void updateLastPromisedEpoch(long newEpoch) throws IOException { private void updateLastPromisedEpoch(long newEpoch) throws IOException {
LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() + LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
" to " + newEpoch + " for client " + Server.getRemoteIp()); " to " + newEpoch + " for client " + Server.getRemoteIp() +
" ; journal id: " + journalId);
lastPromisedEpoch.set(newEpoch); lastPromisedEpoch.set(newEpoch);
// Since we have a new writer, reset the IPC serial - it will start // Since we have a new writer, reset the IPC serial - it will start
@ -378,7 +380,7 @@ public class Journal implements Closeable {
} }
checkSync(curSegment != null, checkSync(curSegment != null,
"Can't write, no segment open"); "Can't write, no segment open" + " ; journal id: " + journalId);
if (curSegmentTxId != segmentTxId) { if (curSegmentTxId != segmentTxId) {
// Sanity check: it is possible that the writer will fail IPCs // Sanity check: it is possible that the writer will fail IPCs
@ -389,17 +391,20 @@ public class Journal implements Closeable {
// and throw an exception. // and throw an exception.
JournalOutOfSyncException e = new JournalOutOfSyncException( JournalOutOfSyncException e = new JournalOutOfSyncException(
"Writer out of sync: it thinks it is writing segment " + segmentTxId "Writer out of sync: it thinks it is writing segment " + segmentTxId
+ " but current segment is " + curSegmentTxId); + " but current segment is " + curSegmentTxId
+ " ; journal id: " + journalId);
abortCurSegment(); abortCurSegment();
throw e; throw e;
} }
checkSync(nextTxId == firstTxnId, checkSync(nextTxId == firstTxnId,
"Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId
+ " ; journal id: " + journalId);
long lastTxnId = firstTxnId + numTxns - 1; long lastTxnId = firstTxnId + numTxns - 1;
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId +
" ; journal id: " + journalId);
} }
// If the edit has already been marked as committed, we know // If the edit has already been marked as committed, we know
@ -423,7 +428,7 @@ public class Journal implements Closeable {
if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) {
LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId +
" took " + milliSeconds + "ms"); " took " + milliSeconds + "ms" + " ; journal id: " + journalId);
} }
if (isLagging) { if (isLagging) {
@ -455,7 +460,7 @@ public class Journal implements Closeable {
if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
throw new IOException("IPC's epoch " + reqInfo.getEpoch() + throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
" is less than the last promised epoch " + " is less than the last promised epoch " +
lastPromisedEpoch.get()); lastPromisedEpoch.get() + " ; journal id: " + journalId);
} else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
// A newer client has arrived. Fence any previous writers by updating // A newer client has arrived. Fence any previous writers by updating
// the promise. // the promise.
@ -465,16 +470,16 @@ public class Journal implements Closeable {
// Ensure that the IPCs are arriving in-order as expected. // Ensure that the IPCs are arriving in-order as expected.
checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
"IPC serial %s from client %s was not higher than prior highest " + "IPC serial %s from client %s was not higher than prior highest " +
"IPC serial %s", reqInfo.getIpcSerialNumber(), "IPC serial %s ; journal id: %s", reqInfo.getIpcSerialNumber(),
Server.getRemoteIp(), Server.getRemoteIp(), currentEpochIpcSerial, journalId);
currentEpochIpcSerial);
currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
if (reqInfo.hasCommittedTxId()) { if (reqInfo.hasCommittedTxId()) {
Preconditions.checkArgument( Preconditions.checkArgument(
reqInfo.getCommittedTxId() >= committedTxnId.get(), reqInfo.getCommittedTxId() >= committedTxnId.get(),
"Client trying to move committed txid backward from " + "Client trying to move committed txid backward from " +
committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); committedTxnId.get() + " to " + reqInfo.getCommittedTxId() +
" ; journal id: " + journalId);
committedTxnId.set(reqInfo.getCommittedTxId()); committedTxnId.set(reqInfo.getCommittedTxId());
} }
@ -486,7 +491,7 @@ public class Journal implements Closeable {
if (reqInfo.getEpoch() != lastWriterEpoch.get()) { if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
throw new IOException("IPC's epoch " + reqInfo.getEpoch() + throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
" is not the current writer epoch " + " is not the current writer epoch " +
lastWriterEpoch.get()); lastWriterEpoch.get() + " ; journal id: " + journalId);
} }
} }
@ -497,7 +502,8 @@ public class Journal implements Closeable {
private void checkFormatted() throws JournalNotFormattedException { private void checkFormatted() throws JournalNotFormattedException {
if (!isFormatted()) { if (!isFormatted()) {
throw new JournalNotFormattedException("Journal " + throw new JournalNotFormattedException("Journal " +
storage.getSingularStorageDir() + " not formatted"); storage.getSingularStorageDir() + " not formatted" +
" ; journal id: " + journalId);
} }
} }
@ -542,7 +548,8 @@ public class Journal implements Closeable {
if (curSegment != null) { if (curSegment != null) {
LOG.warn("Client is requesting a new log segment " + txid + LOG.warn("Client is requesting a new log segment " + txid +
" though we are already writing " + curSegment + ". " + " though we are already writing " + curSegment + ". " +
"Aborting the current segment in order to begin the new one."); "Aborting the current segment in order to begin the new one." +
" ; journal id: " + journalId);
// The writer may have lost a connection to us and is now // The writer may have lost a connection to us and is now
// re-connecting after the connection came back. // re-connecting after the connection came back.
// We should abort our own old segment. // We should abort our own old segment.
@ -556,7 +563,7 @@ public class Journal implements Closeable {
if (existing != null) { if (existing != null) {
if (!existing.isInProgress()) { if (!existing.isInProgress()) {
throw new IllegalStateException("Already have a finalized segment " + throw new IllegalStateException("Already have a finalized segment " +
existing + " beginning at " + txid); existing + " beginning at " + txid + " ; journal id: " + journalId);
} }
// If it's in-progress, it should only contain one transaction, // If it's in-progress, it should only contain one transaction,
@ -565,7 +572,8 @@ public class Journal implements Closeable {
existing.scanLog(Long.MAX_VALUE, false); existing.scanLog(Long.MAX_VALUE, false);
if (existing.getLastTxId() != existing.getFirstTxId()) { if (existing.getLastTxId() != existing.getFirstTxId()) {
throw new IllegalStateException("The log file " + throw new IllegalStateException("The log file " +
existing + " seems to contain valid transactions"); existing + " seems to contain valid transactions" +
" ; journal id: " + journalId);
} }
} }
@ -573,7 +581,7 @@ public class Journal implements Closeable {
if (curLastWriterEpoch != reqInfo.getEpoch()) { if (curLastWriterEpoch != reqInfo.getEpoch()) {
LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
" to " + reqInfo.getEpoch() + " for client " + " to " + reqInfo.getEpoch() + " for client " +
Server.getRemoteIp()); Server.getRemoteIp() + " ; journal id: " + journalId);
lastWriterEpoch.set(reqInfo.getEpoch()); lastWriterEpoch.set(reqInfo.getEpoch());
} }
@ -608,8 +616,8 @@ public class Journal implements Closeable {
checkSync(nextTxId == endTxId + 1, checkSync(nextTxId == endTxId + 1,
"Trying to finalize in-progress log segment %s to end at " + "Trying to finalize in-progress log segment %s to end at " +
"txid %s but only written up to txid %s", "txid %s but only written up to txid %s ; journal id: %s",
startTxId, endTxId, nextTxId - 1); startTxId, endTxId, nextTxId - 1, journalId);
// No need to validate the edit log if the client is finalizing // No need to validate the edit log if the client is finalizing
// the log segment that it was just writing to. // the log segment that it was just writing to.
needsValidation = false; needsValidation = false;
@ -618,25 +626,27 @@ public class Journal implements Closeable {
FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
if (elf == null) { if (elf == null) {
throw new JournalOutOfSyncException("No log file to finalize at " + throw new JournalOutOfSyncException("No log file to finalize at " +
"transaction ID " + startTxId); "transaction ID " + startTxId + " ; journal id: " + journalId);
} }
if (elf.isInProgress()) { if (elf.isInProgress()) {
if (needsValidation) { if (needsValidation) {
LOG.info("Validating log segment " + elf.getFile() + " about to be " + LOG.info("Validating log segment " + elf.getFile() + " about to be " +
"finalized"); "finalized ; journal id: " + journalId);
elf.scanLog(Long.MAX_VALUE, false); elf.scanLog(Long.MAX_VALUE, false);
checkSync(elf.getLastTxId() == endTxId, checkSync(elf.getLastTxId() == endTxId,
"Trying to finalize in-progress log segment %s to end at " + "Trying to finalize in-progress log segment %s to end at " +
"txid %s but log %s on disk only contains up to txid %s", "txid %s but log %s on disk only contains up to txid %s " +
startTxId, endTxId, elf.getFile(), elf.getLastTxId()); "; journal id: %s",
startTxId, endTxId, elf.getFile(), elf.getLastTxId(), journalId);
} }
fjm.finalizeLogSegment(startTxId, endTxId); fjm.finalizeLogSegment(startTxId, endTxId);
} else { } else {
Preconditions.checkArgument(endTxId == elf.getLastTxId(), Preconditions.checkArgument(endTxId == elf.getLastTxId(),
"Trying to re-finalize already finalized log " + "Trying to re-finalize already finalized log " +
elf + " with different endTxId " + endTxId); elf + " with different endTxId " + endTxId +
" ; journal id: " + journalId);
} }
// Once logs are finalized, a different length will never be decided. // Once logs are finalized, a different length will never be decided.
@ -667,7 +677,8 @@ public class Journal implements Closeable {
File paxosFile = storage.getPaxosFile(segmentTxId); File paxosFile = storage.getPaxosFile(segmentTxId);
if (paxosFile.exists()) { if (paxosFile.exists()) {
if (!paxosFile.delete()) { if (!paxosFile.delete()) {
throw new IOException("Unable to delete paxos file " + paxosFile); throw new IOException("Unable to delete paxos file " + paxosFile +
" ; journal id: " + journalId);
} }
} }
} }
@ -717,7 +728,7 @@ public class Journal implements Closeable {
} }
if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) {
LOG.info("Edit log file " + elf + " appears to be empty. " + LOG.info("Edit log file " + elf + " appears to be empty. " +
"Moving it aside..."); "Moving it aside..." + " ; journal id: " + journalId);
elf.moveAsideEmptyFile(); elf.moveAsideEmptyFile();
return null; return null;
} }
@ -727,7 +738,7 @@ public class Journal implements Closeable {
.setIsInProgress(elf.isInProgress()) .setIsInProgress(elf.isInProgress())
.build(); .build();
LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
TextFormat.shortDebugString(ret)); TextFormat.shortDebugString(ret) + " ; journal id: " + journalId);
return ret; return ret;
} }
@ -771,7 +782,7 @@ public class Journal implements Closeable {
PrepareRecoveryResponseProto resp = builder.build(); PrepareRecoveryResponseProto resp = builder.build();
LOG.info("Prepared recovery for segment " + segmentTxId + ": " + LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
TextFormat.shortDebugString(resp)); TextFormat.shortDebugString(resp) + " ; journal id: " + journalId);
return resp; return resp;
} }
@ -792,8 +803,8 @@ public class Journal implements Closeable {
// at least one transaction. // at least one transaction.
Preconditions.checkArgument(segment.getEndTxId() > 0 && Preconditions.checkArgument(segment.getEndTxId() > 0 &&
segment.getEndTxId() >= segmentTxId, segment.getEndTxId() >= segmentTxId,
"bad recovery state for segment %s: %s", "bad recovery state for segment %s: %s ; journal id: %s",
segmentTxId, TextFormat.shortDebugString(segment)); segmentTxId, TextFormat.shortDebugString(segment), journalId);
PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId); PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder() PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
@ -806,8 +817,9 @@ public class Journal implements Closeable {
// checkRequest() call above should filter non-increasing epoch numbers. // checkRequest() call above should filter non-increasing epoch numbers.
if (oldData != null) { if (oldData != null) {
alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
"Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n", "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: " +
oldData, newData); "%s\nJournalId: %s\n",
oldData, newData, journalId);
} }
File syncedFile = null; File syncedFile = null;
@ -817,7 +829,7 @@ public class Journal implements Closeable {
currentSegment.getEndTxId() != segment.getEndTxId()) { currentSegment.getEndTxId() != segment.getEndTxId()) {
if (currentSegment == null) { if (currentSegment == null) {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": no current segment in place"); ": no current segment in place ; journal id: " + journalId);
// Update the highest txid for lag metrics // Update the highest txid for lag metrics
updateHighestWrittenTxId(Math.max(segment.getEndTxId(), updateHighestWrittenTxId(Math.max(segment.getEndTxId(),
@ -825,7 +837,7 @@ public class Journal implements Closeable {
} else { } else {
LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
": old segment " + TextFormat.shortDebugString(currentSegment) + ": old segment " + TextFormat.shortDebugString(currentSegment) +
" is not the right length"); " is not the right length ; journal id: " + journalId);
// Paranoid sanity check: if the new log is shorter than the log we // Paranoid sanity check: if the new log is shorter than the log we
// currently have, we should not end up discarding any transactions // currently have, we should not end up discarding any transactions
@ -838,14 +850,15 @@ public class Journal implements Closeable {
" with new segment " + " with new segment " +
TextFormat.shortDebugString(segment) + TextFormat.shortDebugString(segment) +
": would discard already-committed txn " + ": would discard already-committed txn " +
committedTxnId.get()); committedTxnId.get() +
" ; journal id: " + journalId);
} }
// Another paranoid check: we should not be asked to synchronize a log // Another paranoid check: we should not be asked to synchronize a log
// on top of a finalized segment. // on top of a finalized segment.
alwaysAssert(currentSegment.getIsInProgress(), alwaysAssert(currentSegment.getIsInProgress(),
"Should never be asked to synchronize a different log on top of an " + "Should never be asked to synchronize a different log on top of " +
"already-finalized segment"); "an already-finalized segment ; journal id: " + journalId);
// If we're shortening the log, update our highest txid // If we're shortening the log, update our highest txid
// used for lag metrics. // used for lag metrics.
@ -858,7 +871,7 @@ public class Journal implements Closeable {
} else { } else {
LOG.info("Skipping download of log " + LOG.info("Skipping download of log " +
TextFormat.shortDebugString(segment) + TextFormat.shortDebugString(segment) +
": already have up-to-date logs"); ": already have up-to-date logs ; journal id: " + journalId);
} }
// This is one of the few places in the protocol where we have a single // This is one of the few places in the protocol where we have a single
@ -890,12 +903,12 @@ public class Journal implements Closeable {
} }
LOG.info("Accepted recovery for segment " + segmentTxId + ": " + LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
TextFormat.shortDebugString(newData)); TextFormat.shortDebugString(newData) + " ; journal id: " + journalId);
} }
private LongRange txnRange(SegmentStateProto seg) { private LongRange txnRange(SegmentStateProto seg) {
Preconditions.checkArgument(seg.hasEndTxId(), Preconditions.checkArgument(seg.hasEndTxId(),
"invalid segment: %s", seg); "invalid segment: %s ; journal id: %s", seg, journalId);
return new LongRange(seg.getStartTxId(), seg.getEndTxId()); return new LongRange(seg.getStartTxId(), seg.getEndTxId());
} }
@ -970,7 +983,7 @@ public class Journal implements Closeable {
if (tmp.exists()) { if (tmp.exists()) {
File dst = storage.getInProgressEditLog(segmentId); File dst = storage.getInProgressEditLog(segmentId);
LOG.info("Rolling forward previously half-completed synchronization: " + LOG.info("Rolling forward previously half-completed synchronization: " +
tmp + " -> " + dst); tmp + " -> " + dst + " ; journal id: " + journalId);
FileUtil.replaceFile(tmp, dst); FileUtil.replaceFile(tmp, dst);
} }
} }
@ -991,8 +1004,8 @@ public class Journal implements Closeable {
PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
Preconditions.checkState(ret != null && Preconditions.checkState(ret != null &&
ret.getSegmentState().getStartTxId() == segmentTxId, ret.getSegmentState().getStartTxId() == segmentTxId,
"Bad persisted data for segment %s: %s", "Bad persisted data for segment %s: %s ; journal id: %s",
segmentTxId, ret); segmentTxId, ret, journalId);
return ret; return ret;
} finally { } finally {
IOUtils.closeStream(in); IOUtils.closeStream(in);
@ -1041,7 +1054,7 @@ public class Journal implements Closeable {
storage.cTime = sInfo.cTime; storage.cTime = sInfo.cTime;
int oldLV = storage.getLayoutVersion(); int oldLV = storage.getLayoutVersion();
storage.layoutVersion = sInfo.layoutVersion; storage.layoutVersion = sInfo.layoutVersion;
LOG.info("Starting upgrade of edits directory: " LOG.info("Starting upgrade of edits directory: " + storage.getRoot()
+ ".\n old LV = " + oldLV + ".\n old LV = " + oldLV
+ "; old CTime = " + oldCTime + "; old CTime = " + oldCTime
+ ".\n new LV = " + storage.getLayoutVersion() + ".\n new LV = " + storage.getLayoutVersion()
@ -1112,7 +1125,7 @@ public class Journal implements Closeable {
if (endTxId <= committedTxnId.get()) { if (endTxId <= committedTxnId.get()) {
if (!finalFile.getParentFile().exists()) { if (!finalFile.getParentFile().exists()) {
LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " + LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " +
"segment move to current directory"); "segment move to current directory ; journal id: " + journalId);
return false; return false;
} }
Files.move(tmpFile.toPath(), finalFile.toPath(), Files.move(tmpFile.toPath(), finalFile.toPath(),
@ -1122,13 +1135,13 @@ public class Journal implements Closeable {
} else { } else {
success = false; success = false;
LOG.warn("Unable to move edits file from " + tmpFile + " to " + LOG.warn("Unable to move edits file from " + tmpFile + " to " +
finalFile); finalFile + " ; journal id: " + journalId);
} }
} else { } else {
success = false; success = false;
LOG.error("The endTxId of the temporary file is not less than the " + LOG.error("The endTxId of the temporary file is not less than the " +
"last committed transaction id. Aborting move to final file" + "last committed transaction id. Aborting move to final file" +
finalFile); finalFile + " ; journal id: " + journalId);
} }
return success; return success;