diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 408ce76d115..452664a9478 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -208,11 +208,12 @@ public class Journal implements Closeable { while (!files.isEmpty()) { EditLogFile latestLog = files.remove(files.size() - 1); latestLog.scanLog(Long.MAX_VALUE, false); - LOG.info("Latest log is " + latestLog); + LOG.info("Latest log is " + latestLog + " ; journal id: " + journalId); if (latestLog.getLastTxId() == HdfsServerConstants.INVALID_TXID) { // the log contains no transactions LOG.warn("Latest log " + latestLog + " has no transactions. " + - "moving it aside and looking for previous log"); + "moving it aside and looking for previous log" + + " ; journal id: " + journalId); latestLog.moveAsideEmptyFile(); } else { return latestLog; @@ -230,7 +231,7 @@ public class Journal implements Closeable { Preconditions.checkState(nsInfo.getNamespaceID() != 0, "can't format with uninitialized namespace info: %s", nsInfo); - LOG.info("Formatting " + this + " with namespace info: " + + LOG.info("Formatting journal id : " + journalId + " with namespace info: " + nsInfo); storage.format(nsInfo); refreshCachedData(); @@ -323,7 +324,7 @@ public class Journal implements Closeable { // any other that we've promised. if (epoch <= getLastPromisedEpoch()) { throw new IOException("Proposed epoch " + epoch + " <= last promise " + - getLastPromisedEpoch()); + getLastPromisedEpoch() + " ; journal id: " + journalId); } updateLastPromisedEpoch(epoch); @@ -343,7 +344,8 @@ public class Journal implements Closeable { private void updateLastPromisedEpoch(long newEpoch) throws IOException { LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() + - " to " + newEpoch + " for client " + Server.getRemoteIp()); + " to " + newEpoch + " for client " + Server.getRemoteIp() + + " ; journal id: " + journalId); lastPromisedEpoch.set(newEpoch); // Since we have a new writer, reset the IPC serial - it will start @@ -378,7 +380,7 @@ public class Journal implements Closeable { } checkSync(curSegment != null, - "Can't write, no segment open"); + "Can't write, no segment open" + " ; journal id: " + journalId); if (curSegmentTxId != segmentTxId) { // Sanity check: it is possible that the writer will fail IPCs @@ -389,17 +391,20 @@ public class Journal implements Closeable { // and throw an exception. JournalOutOfSyncException e = new JournalOutOfSyncException( "Writer out of sync: it thinks it is writing segment " + segmentTxId - + " but current segment is " + curSegmentTxId); + + " but current segment is " + curSegmentTxId + + " ; journal id: " + journalId); abortCurSegment(); throw e; } checkSync(nextTxId == firstTxnId, - "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); + "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId + + " ; journal id: " + journalId); long lastTxnId = firstTxnId + numTxns - 1; if (LOG.isTraceEnabled()) { - LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); + LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId + + " ; journal id: " + journalId); } // If the edit has already been marked as committed, we know @@ -423,7 +428,7 @@ public class Journal implements Closeable { if (milliSeconds > WARN_SYNC_MILLIS_THRESHOLD) { LOG.warn("Sync of transaction range " + firstTxnId + "-" + lastTxnId + - " took " + milliSeconds + "ms"); + " took " + milliSeconds + "ms" + " ; journal id: " + journalId); } if (isLagging) { @@ -455,7 +460,7 @@ public class Journal implements Closeable { if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is less than the last promised epoch " + - lastPromisedEpoch.get()); + lastPromisedEpoch.get() + " ; journal id: " + journalId); } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { // A newer client has arrived. Fence any previous writers by updating // the promise. @@ -465,16 +470,16 @@ public class Journal implements Closeable { // Ensure that the IPCs are arriving in-order as expected. checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, "IPC serial %s from client %s was not higher than prior highest " + - "IPC serial %s", reqInfo.getIpcSerialNumber(), - Server.getRemoteIp(), - currentEpochIpcSerial); + "IPC serial %s ; journal id: %s", reqInfo.getIpcSerialNumber(), + Server.getRemoteIp(), currentEpochIpcSerial, journalId); currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); if (reqInfo.hasCommittedTxId()) { Preconditions.checkArgument( reqInfo.getCommittedTxId() >= committedTxnId.get(), "Client trying to move committed txid backward from " + - committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); + committedTxnId.get() + " to " + reqInfo.getCommittedTxId() + + " ; journal id: " + journalId); committedTxnId.set(reqInfo.getCommittedTxId()); } @@ -486,7 +491,7 @@ public class Journal implements Closeable { if (reqInfo.getEpoch() != lastWriterEpoch.get()) { throw new IOException("IPC's epoch " + reqInfo.getEpoch() + " is not the current writer epoch " + - lastWriterEpoch.get()); + lastWriterEpoch.get() + " ; journal id: " + journalId); } } @@ -497,7 +502,8 @@ public class Journal implements Closeable { private void checkFormatted() throws JournalNotFormattedException { if (!isFormatted()) { throw new JournalNotFormattedException("Journal " + - storage.getSingularStorageDir() + " not formatted"); + storage.getSingularStorageDir() + " not formatted" + + " ; journal id: " + journalId); } } @@ -542,7 +548,8 @@ public class Journal implements Closeable { if (curSegment != null) { LOG.warn("Client is requesting a new log segment " + txid + " though we are already writing " + curSegment + ". " + - "Aborting the current segment in order to begin the new one."); + "Aborting the current segment in order to begin the new one." + + " ; journal id: " + journalId); // The writer may have lost a connection to us and is now // re-connecting after the connection came back. // We should abort our own old segment. @@ -556,7 +563,7 @@ public class Journal implements Closeable { if (existing != null) { if (!existing.isInProgress()) { throw new IllegalStateException("Already have a finalized segment " + - existing + " beginning at " + txid); + existing + " beginning at " + txid + " ; journal id: " + journalId); } // If it's in-progress, it should only contain one transaction, @@ -565,7 +572,8 @@ public class Journal implements Closeable { existing.scanLog(Long.MAX_VALUE, false); if (existing.getLastTxId() != existing.getFirstTxId()) { throw new IllegalStateException("The log file " + - existing + " seems to contain valid transactions"); + existing + " seems to contain valid transactions" + + " ; journal id: " + journalId); } } @@ -573,7 +581,7 @@ public class Journal implements Closeable { if (curLastWriterEpoch != reqInfo.getEpoch()) { LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + " to " + reqInfo.getEpoch() + " for client " + - Server.getRemoteIp()); + Server.getRemoteIp() + " ; journal id: " + journalId); lastWriterEpoch.set(reqInfo.getEpoch()); } @@ -608,8 +616,8 @@ public class Journal implements Closeable { checkSync(nextTxId == endTxId + 1, "Trying to finalize in-progress log segment %s to end at " + - "txid %s but only written up to txid %s", - startTxId, endTxId, nextTxId - 1); + "txid %s but only written up to txid %s ; journal id: %s", + startTxId, endTxId, nextTxId - 1, journalId); // No need to validate the edit log if the client is finalizing // the log segment that it was just writing to. needsValidation = false; @@ -618,25 +626,27 @@ public class Journal implements Closeable { FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); if (elf == null) { throw new JournalOutOfSyncException("No log file to finalize at " + - "transaction ID " + startTxId); + "transaction ID " + startTxId + " ; journal id: " + journalId); } if (elf.isInProgress()) { if (needsValidation) { LOG.info("Validating log segment " + elf.getFile() + " about to be " + - "finalized"); + "finalized ; journal id: " + journalId); elf.scanLog(Long.MAX_VALUE, false); checkSync(elf.getLastTxId() == endTxId, "Trying to finalize in-progress log segment %s to end at " + - "txid %s but log %s on disk only contains up to txid %s", - startTxId, endTxId, elf.getFile(), elf.getLastTxId()); + "txid %s but log %s on disk only contains up to txid %s " + + "; journal id: %s", + startTxId, endTxId, elf.getFile(), elf.getLastTxId(), journalId); } fjm.finalizeLogSegment(startTxId, endTxId); } else { Preconditions.checkArgument(endTxId == elf.getLastTxId(), "Trying to re-finalize already finalized log " + - elf + " with different endTxId " + endTxId); + elf + " with different endTxId " + endTxId + + " ; journal id: " + journalId); } // Once logs are finalized, a different length will never be decided. @@ -667,7 +677,8 @@ public class Journal implements Closeable { File paxosFile = storage.getPaxosFile(segmentTxId); if (paxosFile.exists()) { if (!paxosFile.delete()) { - throw new IOException("Unable to delete paxos file " + paxosFile); + throw new IOException("Unable to delete paxos file " + paxosFile + + " ; journal id: " + journalId); } } } @@ -717,7 +728,7 @@ public class Journal implements Closeable { } if (elf.getLastTxId() == HdfsServerConstants.INVALID_TXID) { LOG.info("Edit log file " + elf + " appears to be empty. " + - "Moving it aside..."); + "Moving it aside..." + " ; journal id: " + journalId); elf.moveAsideEmptyFile(); return null; } @@ -727,7 +738,7 @@ public class Journal implements Closeable { .setIsInProgress(elf.isInProgress()) .build(); LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + - TextFormat.shortDebugString(ret)); + TextFormat.shortDebugString(ret) + " ; journal id: " + journalId); return ret; } @@ -771,7 +782,7 @@ public class Journal implements Closeable { PrepareRecoveryResponseProto resp = builder.build(); LOG.info("Prepared recovery for segment " + segmentTxId + ": " + - TextFormat.shortDebugString(resp)); + TextFormat.shortDebugString(resp) + " ; journal id: " + journalId); return resp; } @@ -792,8 +803,8 @@ public class Journal implements Closeable { // at least one transaction. Preconditions.checkArgument(segment.getEndTxId() > 0 && segment.getEndTxId() >= segmentTxId, - "bad recovery state for segment %s: %s", - segmentTxId, TextFormat.shortDebugString(segment)); + "bad recovery state for segment %s: %s ; journal id: %s", + segmentTxId, TextFormat.shortDebugString(segment), journalId); PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId); PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder() @@ -806,8 +817,9 @@ public class Journal implements Closeable { // checkRequest() call above should filter non-increasing epoch numbers. if (oldData != null) { alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), - "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n", - oldData, newData); + "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: " + + "%s\nJournalId: %s\n", + oldData, newData, journalId); } File syncedFile = null; @@ -817,7 +829,7 @@ public class Journal implements Closeable { currentSegment.getEndTxId() != segment.getEndTxId()) { if (currentSegment == null) { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + - ": no current segment in place"); + ": no current segment in place ; journal id: " + journalId); // Update the highest txid for lag metrics updateHighestWrittenTxId(Math.max(segment.getEndTxId(), @@ -825,7 +837,7 @@ public class Journal implements Closeable { } else { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + ": old segment " + TextFormat.shortDebugString(currentSegment) + - " is not the right length"); + " is not the right length ; journal id: " + journalId); // Paranoid sanity check: if the new log is shorter than the log we // currently have, we should not end up discarding any transactions @@ -838,14 +850,15 @@ public class Journal implements Closeable { " with new segment " + TextFormat.shortDebugString(segment) + ": would discard already-committed txn " + - committedTxnId.get()); + committedTxnId.get() + + " ; journal id: " + journalId); } // Another paranoid check: we should not be asked to synchronize a log // on top of a finalized segment. alwaysAssert(currentSegment.getIsInProgress(), - "Should never be asked to synchronize a different log on top of an " + - "already-finalized segment"); + "Should never be asked to synchronize a different log on top of " + + "an already-finalized segment ; journal id: " + journalId); // If we're shortening the log, update our highest txid // used for lag metrics. @@ -858,7 +871,7 @@ public class Journal implements Closeable { } else { LOG.info("Skipping download of log " + TextFormat.shortDebugString(segment) + - ": already have up-to-date logs"); + ": already have up-to-date logs ; journal id: " + journalId); } // This is one of the few places in the protocol where we have a single @@ -890,12 +903,12 @@ public class Journal implements Closeable { } LOG.info("Accepted recovery for segment " + segmentTxId + ": " + - TextFormat.shortDebugString(newData)); + TextFormat.shortDebugString(newData) + " ; journal id: " + journalId); } private LongRange txnRange(SegmentStateProto seg) { Preconditions.checkArgument(seg.hasEndTxId(), - "invalid segment: %s", seg); + "invalid segment: %s ; journal id: %s", seg, journalId); return new LongRange(seg.getStartTxId(), seg.getEndTxId()); } @@ -970,7 +983,7 @@ public class Journal implements Closeable { if (tmp.exists()) { File dst = storage.getInProgressEditLog(segmentId); LOG.info("Rolling forward previously half-completed synchronization: " + - tmp + " -> " + dst); + tmp + " -> " + dst + " ; journal id: " + journalId); FileUtil.replaceFile(tmp, dst); } } @@ -991,8 +1004,8 @@ public class Journal implements Closeable { PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); Preconditions.checkState(ret != null && ret.getSegmentState().getStartTxId() == segmentTxId, - "Bad persisted data for segment %s: %s", - segmentTxId, ret); + "Bad persisted data for segment %s: %s ; journal id: %s", + segmentTxId, ret, journalId); return ret; } finally { IOUtils.closeStream(in); @@ -1041,7 +1054,7 @@ public class Journal implements Closeable { storage.cTime = sInfo.cTime; int oldLV = storage.getLayoutVersion(); storage.layoutVersion = sInfo.layoutVersion; - LOG.info("Starting upgrade of edits directory: " + LOG.info("Starting upgrade of edits directory: " + storage.getRoot() + ".\n old LV = " + oldLV + "; old CTime = " + oldCTime + ".\n new LV = " + storage.getLayoutVersion() @@ -1112,7 +1125,7 @@ public class Journal implements Closeable { if (endTxId <= committedTxnId.get()) { if (!finalFile.getParentFile().exists()) { LOG.error(finalFile.getParentFile() + " doesn't exist. Aborting tmp " + - "segment move to current directory"); + "segment move to current directory ; journal id: " + journalId); return false; } Files.move(tmpFile.toPath(), finalFile.toPath(), @@ -1122,13 +1135,13 @@ public class Journal implements Closeable { } else { success = false; LOG.warn("Unable to move edits file from " + tmpFile + " to " + - finalFile); + finalFile + " ; journal id: " + journalId); } } else { success = false; LOG.error("The endTxId of the temporary file is not less than the " + "last committed transaction id. Aborting move to final file" + - finalFile); + finalFile + " ; journal id: " + journalId); } return success;