diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0c56ecf18a2..65cc8207c02 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -205,6 +205,9 @@ Release 2.0.1-alpha - UNRELEASED OPTIMIZATIONS + HDFS-2982. Startup performance suffers when there are many edit log + segments. (Colin Patrick McCabe via todd) + BUG FIXES HDFS-3385. The last block of INodeFileUnderConstruction is not diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index 0baab0b141c..c8ec162479d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -79,12 +79,12 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { } @Override - public long getFirstTxId() throws IOException { + public long getFirstTxId() { return firstTxId; } @Override - public long getLastTxId() throws IOException { + public long getLastTxId() { return lastTxId; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java index 238f090694a..6a3bfbd651e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperJournalManager.java @@ -37,6 +37,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; +import java.util.Collection; import java.util.Collections; import java.util.ArrayList; import java.util.List; @@ -313,8 +314,7 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) } // TODO(HA): Handle inProgressOk - @Override - public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) + EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) throws IOException { for (EditLogLedgerMetadata l : getLedgerList()) { if (l.getFirstTxId() == fromTxnId) { @@ -328,12 +328,34 @@ public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) } } } - throw new IOException("No ledger for fromTxnId " + fromTxnId + " found."); + return null; + } + + @Override + public void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) { + // NOTE: could probably be rewritten more efficiently + while (true) { + EditLogInputStream elis; + try { + elis = getInputStream(fromTxId, inProgressOk); + } catch (IOException e) { + LOG.error(e); + return; + } + if (elis == null) { + return; + } + streams.add(elis); + if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { + return; + } + fromTxId = elis.getLastTxId() + 1; + } } // TODO(HA): Handle inProgressOk - @Override - public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) + long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) throws IOException { long count = 0; long expectedStart = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java index 41f0292e548..a46f9cf0ed1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -34,6 +34,6 @@ public static FSEditLogOp getNoOpInstance() { public static long countTransactionsInStream(EditLogInputStream in) throws IOException { FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); - return validation.getNumTransactions(); + return (validation.getEndTxId() - in.getFirstTxId()) + 1; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index a9aa20d4d2b..51e2728cd31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -207,7 +207,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data) int logVersion = storage.getLayoutVersion(); backupInputStream.setBytes(data, logVersion); - long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, + long numTxnsAdvanced = logLoader.loadEditRecords( backupInputStream, true, lastAppliedTxId + 1, null); if (numTxnsAdvanced != numTxns) { throw new IOException("Batch of txns starting at txnid " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index ebf4f480f35..2f6fe8cbde2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Collection; import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -60,19 +61,10 @@ public void purgeLogsOlderThan(long minTxIdToKeep) } @Override - public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) - throws IOException, CorruptionException { + public void selectInputStreams(Collection streams, + long fromTxnId, boolean inProgressOk) { // This JournalManager is never used for input. Therefore it cannot // return any transactions - return 0; - } - - @Override - public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) - throws IOException { - // This JournalManager is never used for input. Therefore it cannot - // return any transactions - throw new IOException("Unsupported operation"); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index f91b7138223..b3c45ffdb9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -129,12 +129,12 @@ void clear() throws IOException { } @Override - public long getFirstTxId() throws IOException { + public long getFirstTxId() { return HdfsConstants.INVALID_TXID; } @Override - public long getLastTxId() throws IOException { + public long getLastTxId() { return HdfsConstants.INVALID_TXID; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index a76cc6c84b9..e6ddf5b920e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -24,10 +24,14 @@ import java.io.BufferedInputStream; import java.io.EOFException; import java.io.DataInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.io.IOUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; /** * An implementation of the abstract class {@link EditLogInputStream}, which @@ -35,13 +39,21 @@ */ public class EditLogFileInputStream extends EditLogInputStream { private final File file; - private final FileInputStream fStream; - final private long firstTxId; - final private long lastTxId; - private final int logVersion; - private final FSEditLogOp.Reader reader; - private final FSEditLogLoader.PositionTrackingInputStream tracker; + private final long firstTxId; + private final long lastTxId; private final boolean isInProgress; + static private enum State { + UNINIT, + OPEN, + CLOSED + } + private State state = State.UNINIT; + private FileInputStream fStream = null; + private int logVersion = 0; + private FSEditLogOp.Reader reader = null; + private FSEditLogLoader.PositionTrackingInputStream tracker = null; + private DataInputStream dataIn = null; + static final Log LOG = LogFactory.getLog(EditLogInputStream.class); /** * Open an EditLogInputStream for the given file. @@ -68,34 +80,43 @@ public class EditLogFileInputStream extends EditLogInputStream { * header */ public EditLogFileInputStream(File name, long firstTxId, long lastTxId, - boolean isInProgress) - throws LogHeaderCorruptException, IOException { - file = name; - fStream = new FileInputStream(name); - - BufferedInputStream bin = new BufferedInputStream(fStream); - tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); - DataInputStream in = new DataInputStream(tracker); - - try { - logVersion = readLogVersion(in); - } catch (EOFException eofe) { - throw new LogHeaderCorruptException("No header found in log"); - } - - reader = new FSEditLogOp.Reader(in, tracker, logVersion); + boolean isInProgress) { + this.file = name; this.firstTxId = firstTxId; this.lastTxId = lastTxId; this.isInProgress = isInProgress; } + private void init() throws LogHeaderCorruptException, IOException { + Preconditions.checkState(state == State.UNINIT); + BufferedInputStream bin = null; + try { + fStream = new FileInputStream(file); + bin = new BufferedInputStream(fStream); + tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); + dataIn = new DataInputStream(tracker); + try { + logVersion = readLogVersion(dataIn); + } catch (EOFException eofe) { + throw new LogHeaderCorruptException("No header found in log"); + } + reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion); + state = State.OPEN; + } finally { + if (reader == null) { + IOUtils.cleanup(LOG, dataIn, tracker, bin, fStream); + state = State.CLOSED; + } + } + } + @Override - public long getFirstTxId() throws IOException { + public long getFirstTxId() { return firstTxId; } @Override - public long getLastTxId() throws IOException { + public long getLastTxId() { return lastTxId; } @@ -104,61 +125,95 @@ public String getName() { return file.getPath(); } - @Override - protected FSEditLogOp nextOp() throws IOException { - FSEditLogOp op = reader.readOp(false); - if ((op != null) && (op.hasTransactionId())) { - long txId = op.getTransactionId(); - if ((txId >= lastTxId) && - (lastTxId != HdfsConstants.INVALID_TXID)) { - // - // Sometimes, the NameNode crashes while it's writing to the - // edit log. In that case, you can end up with an unfinalized edit log - // which has some garbage at the end. - // JournalManager#recoverUnfinalizedSegments will finalize these - // unfinished edit logs, giving them a defined final transaction - // ID. Then they will be renamed, so that any subsequent - // readers will have this information. - // - // Since there may be garbage at the end of these "cleaned up" - // logs, we want to be sure to skip it here if we've read everything - // we were supposed to read out of the stream. - // So we force an EOF on all subsequent reads. - // - long skipAmt = file.length() - tracker.getPos(); - if (skipAmt > 0) { - FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " + + private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException { + FSEditLogOp op = null; + switch (state) { + case UNINIT: + try { + init(); + } catch (Throwable e) { + LOG.error("caught exception initializing " + this, e); + if (skipBrokenEdits) { + return null; + } + Throwables.propagateIfPossible(e, IOException.class); + } + Preconditions.checkState(state != State.UNINIT); + return nextOpImpl(skipBrokenEdits); + case OPEN: + op = reader.readOp(skipBrokenEdits); + if ((op != null) && (op.hasTransactionId())) { + long txId = op.getTransactionId(); + if ((txId >= lastTxId) && + (lastTxId != HdfsConstants.INVALID_TXID)) { + // + // Sometimes, the NameNode crashes while it's writing to the + // edit log. In that case, you can end up with an unfinalized edit log + // which has some garbage at the end. + // JournalManager#recoverUnfinalizedSegments will finalize these + // unfinished edit logs, giving them a defined final transaction + // ID. Then they will be renamed, so that any subsequent + // readers will have this information. + // + // Since there may be garbage at the end of these "cleaned up" + // logs, we want to be sure to skip it here if we've read everything + // we were supposed to read out of the stream. + // So we force an EOF on all subsequent reads. + // + long skipAmt = file.length() - tracker.getPos(); + if (skipAmt > 0) { + LOG.warn("skipping " + skipAmt + " bytes at the end " + "of edit log '" + getName() + "': reached txid " + txId + " out of " + lastTxId); - tracker.skip(skipAmt); + tracker.skip(skipAmt); + } } } + break; + case CLOSED: + break; // return null } return op; } - + + @Override + protected FSEditLogOp nextOp() throws IOException { + return nextOpImpl(false); + } + @Override protected FSEditLogOp nextValidOp() { try { - return reader.readOp(true); - } catch (IOException e) { + return nextOpImpl(true); + } catch (Throwable e) { + LOG.error("nextValidOp: got exception while reading " + this, e); return null; } } @Override public int getVersion() throws IOException { + if (state == State.UNINIT) { + init(); + } return logVersion; } @Override public long getPosition() { - return tracker.getPos(); + if (state == State.OPEN) { + return tracker.getPos(); + } else { + return 0; + } } @Override public void close() throws IOException { - fStream.close(); + if (state == State.OPEN) { + dataIn.close(); + } + state = State.CLOSED; } @Override @@ -181,12 +236,12 @@ static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOExc EditLogFileInputStream in; try { in = new EditLogFileInputStream(file); - } catch (LogHeaderCorruptException corrupt) { + in.getVersion(); // causes us to read the header + } catch (LogHeaderCorruptException e) { // If the header is malformed or the wrong value, this indicates a corruption - FSImage.LOG.warn("Log at " + file + " has no valid header", - corrupt); + LOG.warn("Log file " + file + " has no valid header", e); return new FSEditLogLoader.EditLogValidation(0, - HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); + HdfsConstants.INVALID_TXID, true); } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index c2b42be2461..f9b84c9ba43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -45,12 +45,12 @@ public abstract class EditLogInputStream implements Closeable { /** * @return the first transaction which will be found in this stream */ - public abstract long getFirstTxId() throws IOException; + public abstract long getFirstTxId(); /** * @return the last transaction which will be found in this stream */ - public abstract long getLastTxId() throws IOException; + public abstract long getLastTxId(); /** @@ -73,14 +73,14 @@ public FSEditLogOp readOp() throws IOException { } return nextOp(); } - + /** * Position the stream so that a valid operation can be read from it with * readOp(). * * This method can be used to skip over corrupted sections of edit logs. */ - public void resync() throws IOException { + public void resync() { if (cachedOp != null) { return; } @@ -109,7 +109,7 @@ protected FSEditLogOp nextValidOp() { // error recovery will want to override this. try { return nextOp(); - } catch (IOException e) { + } catch (Throwable e) { return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index c144906c5c8..31ce1175754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; @@ -267,13 +268,14 @@ synchronized void openForWrite() throws IOException { long segmentTxId = getLastWrittenTxId() + 1; // Safety check: we should never start a segment if there are // newer txids readable. - EditLogInputStream s = journalSet.getInputStream(segmentTxId, true); - try { - Preconditions.checkState(s == null, - "Cannot start writing at txid %s when there is a stream " + - "available for read: %s", segmentTxId, s); - } finally { - IOUtils.closeStream(s); + List streams = new ArrayList(); + journalSet.selectInputStreams(streams, segmentTxId, true); + if (!streams.isEmpty()) { + String error = String.format("Cannot start writing at txid %s " + + "when there is a stream available for read: %s", + segmentTxId, streams.get(0)); + IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0])); + throw new IllegalStateException(error); } startLogSegmentAndWriteHeaderTxn(segmentTxId); @@ -1136,10 +1138,10 @@ synchronized void recoverUnclosedStreams() { // All journals have failed, it is handled in logSync. } } - - Collection selectInputStreams(long fromTxId, - long toAtLeastTxId) throws IOException { - return selectInputStreams(fromTxId, toAtLeastTxId, true); + + public Collection selectInputStreams( + long fromTxId, long toAtLeastTxId) throws IOException { + return selectInputStreams(fromTxId, toAtLeastTxId, null, true); } /** @@ -1149,25 +1151,71 @@ Collection selectInputStreams(long fromTxId, * @param toAtLeast the selected streams must contain this transaction * @param inProgessOk set to true if in-progress streams are OK */ - public synchronized Collection selectInputStreams(long fromTxId, - long toAtLeastTxId, boolean inProgressOk) throws IOException { + public synchronized Collection selectInputStreams( + long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, + boolean inProgressOk) throws IOException { List streams = new ArrayList(); - EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk); - while (stream != null) { - streams.add(stream); - // We're now looking for a higher range, so reset the fromTxId - fromTxId = stream.getLastTxId() + 1; - stream = journalSet.getInputStream(fromTxId, inProgressOk); + journalSet.selectInputStreams(streams, fromTxId, inProgressOk); + + try { + checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); + } catch (IOException e) { + if (recovery != null) { + // If recovery mode is enabled, continue loading even if we know we + // can't load up to toAtLeastTxId. + LOG.error(e); + } else { + closeAllStreams(streams); + throw e; + } } - - if (fromTxId <= toAtLeastTxId) { - closeAllStreams(streams); - throw new IOException(String.format("Gap in transactions. Expected to " - + "be able to read up until at least txid %d but unable to find any " - + "edit logs containing txid %d", toAtLeastTxId, fromTxId)); + // This code will go away as soon as RedundantEditLogInputStream is + // introduced. (HDFS-3049) + try { + if (!streams.isEmpty()) { + streams.get(0).skipUntil(fromTxId); + } + } catch (IOException e) { + // We don't want to throw an exception from here, because that would make + // recovery impossible even if the user requested it. An exception will + // be thrown later, when we don't read the starting txid we expect. + LOG.error("error skipping until transaction " + fromTxId, e); } return streams; } + + /** + * Check for gaps in the edit log input stream list. + * Note: we're assuming that the list is sorted and that txid ranges don't + * overlap. This could be done better and with more generality with an + * interval tree. + */ + private void checkForGaps(List streams, long fromTxId, + long toAtLeastTxId, boolean inProgressOk) throws IOException { + Iterator iter = streams.iterator(); + long txId = fromTxId; + while (true) { + if (txId > toAtLeastTxId) return; + if (!iter.hasNext()) break; + EditLogInputStream elis = iter.next(); + if (elis.getFirstTxId() > txId) break; + long next = elis.getLastTxId(); + if (next == HdfsConstants.INVALID_TXID) { + if (!inProgressOk) { + throw new RuntimeException("inProgressOk = false, but " + + "selectInputStreams returned an in-progress edit " + + "log input stream (" + elis + ")"); + } + // We don't know where the in-progress stream ends. + // It could certainly go all the way up to toAtLeastTxId. + return; + } + txId = next + 1; + } + throw new IOException(String.format("Gap in transactions. Expected to " + + "be able to read up until at least txid %d but unable to find any " + + "edit logs containing txid %d", toAtLeastTxId, txId)); + } /** * Close all the streams in a collection diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 1f39a6a0bfd..e1b26bb810d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -85,12 +85,10 @@ public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { */ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, MetaRecoveryContext recovery) throws IOException { - int logVersion = edits.getVersion(); - fsNamesys.writeLock(); try { long startTime = now(); - long numEdits = loadEditRecords(logVersion, edits, false, + long numEdits = loadEditRecords(edits, false, expectedStartingTxId, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits @@ -102,7 +100,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, } } - long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, + long loadEditRecords(EditLogInputStream in, boolean closeOnExit, long expectedStartingTxId, MetaRecoveryContext recovery) throws IOException { FSDirectory fsDir = fsNamesys.dir; @@ -141,7 +139,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, } } catch (Throwable e) { // Handle a problem with our input - check203UpgradeFailure(logVersion, e); + check203UpgradeFailure(in.getVersion(), e); String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); FSImage.LOG.error(errorMessage, e); @@ -158,7 +156,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, } recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = in.getPosition(); - if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { + if (op.hasTransactionId()) { if (op.getTransactionId() > expectedTxId) { MetaRecoveryContext.editLogLoaderPrompt("There appears " + "to be a gap in the edit log. We expected txid " + @@ -175,7 +173,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, } } try { - applyEditLogOp(op, fsDir, logVersion); + applyEditLogOp(op, fsDir, in.getVersion()); } catch (Throwable e) { LOG.error("Encountered exception on operation " + op, e); MetaRecoveryContext.editLogLoaderPrompt("Failed to " + @@ -192,7 +190,7 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, expectedTxId = lastAppliedTxId = expectedStartingTxId; } // log progress - if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { + if (op.hasTransactionId()) { long now = now(); if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { int percent = Math.round((float)lastAppliedTxId / numTxns * 100); @@ -647,76 +645,57 @@ private void check203UpgradeFailure(int logVersion, Throwable e) } /** - * Return the number of valid transactions in the stream. If the stream is - * truncated during the header, returns a value indicating that there are - * 0 valid transactions. This reads through the stream but does not close - * it. + * Find the last valid transaction ID in the stream. + * If there are invalid or corrupt transactions in the middle of the stream, + * validateEditLog will skip over them. + * This reads through the stream but does not close it. + * * @throws IOException if the stream cannot be read due to an IO error (eg * if the log does not exist) */ static EditLogValidation validateEditLog(EditLogInputStream in) { long lastPos = 0; - long firstTxId = HdfsConstants.INVALID_TXID; long lastTxId = HdfsConstants.INVALID_TXID; long numValid = 0; - try { - FSEditLogOp op = null; - while (true) { - lastPos = in.getPosition(); + FSEditLogOp op = null; + while (true) { + lastPos = in.getPosition(); + try { if ((op = in.readOp()) == null) { break; } - if (firstTxId == HdfsConstants.INVALID_TXID) { - firstTxId = op.getTransactionId(); - } - if (lastTxId == HdfsConstants.INVALID_TXID - || op.getTransactionId() == lastTxId + 1) { - lastTxId = op.getTransactionId(); - } else { - FSImage.LOG.error("Out of order txid found. Found " + - op.getTransactionId() + ", expected " + (lastTxId + 1)); - break; - } - numValid++; + } catch (Throwable t) { + FSImage.LOG.warn("Caught exception after reading " + numValid + + " ops from " + in + " while determining its valid length." + + "Position was " + lastPos, t); + break; } - } catch (Throwable t) { - // Catch Throwable and not just IOE, since bad edits may generate - // NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc. - FSImage.LOG.debug("Caught exception after reading " + numValid + - " ops from " + in + " while determining its valid length.", t); + if (lastTxId == HdfsConstants.INVALID_TXID + || op.getTransactionId() > lastTxId) { + lastTxId = op.getTransactionId(); + } + numValid++; } - return new EditLogValidation(lastPos, firstTxId, lastTxId, false); + return new EditLogValidation(lastPos, lastTxId, false); } - + static class EditLogValidation { private final long validLength; - private final long startTxId; private final long endTxId; - private final boolean corruptionDetected; - - EditLogValidation(long validLength, long startTxId, long endTxId, - boolean corruptionDetected) { + private final boolean hasCorruptHeader; + + EditLogValidation(long validLength, long endTxId, + boolean hasCorruptHeader) { this.validLength = validLength; - this.startTxId = startTxId; this.endTxId = endTxId; - this.corruptionDetected = corruptionDetected; + this.hasCorruptHeader = hasCorruptHeader; } - + long getValidLength() { return validLength; } - - long getStartTxId() { return startTxId; } - + long getEndTxId() { return endTxId; } - - long getNumTransactions() { - if (endTxId == HdfsConstants.INVALID_TXID - || startTxId == HdfsConstants.INVALID_TXID) { - return 0; - } - return (endTxId - startTxId) + 1; - } - - boolean hasCorruptHeader() { return corruptionDetected; } + + boolean hasCorruptHeader() { return hasCorruptHeader; } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 1b81f942f8e..c76a16f7483 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -559,7 +559,7 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException { /** * Choose latest image from one of the directories, - * load it and merge with the edits from that directory. + * load it and merge with the edits. * * Saving and loading fsimage should never trigger symlink resolution. * The paths that are persisted do not have *intermediate* symlinks @@ -595,7 +595,7 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery) // OK to not be able to read all of edits right now. long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0; editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, - toAtLeastTxId, false); + toAtLeastTxId, recovery, false); } else { editStreams = FSImagePreTransactionalStorageInspector .getEditLogStreams(storage); @@ -603,7 +603,10 @@ boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery) LOG.debug("Planning to load image :\n" + imageFile); for (EditLogInputStream l : editStreams) { - LOG.debug("\t Planning to load edit stream: " + l); + LOG.debug("Planning to load edit log stream: " + l); + } + if (!editStreams.iterator().hasNext()) { + LOG.info("No edit log streams selected."); } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 3767111c058..75caac647c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Comparator; import java.util.Collections; @@ -73,7 +74,7 @@ public FileJournalManager(StorageDirectory sd, NNStorage storage) { @Override public void close() throws IOException {} - + @Override synchronized public EditLogOutputStream startLogSegment(long txid) throws IOException { @@ -212,90 +213,46 @@ static List matchEditLogs(File[] filesInStorage) { } @Override - synchronized public EditLogInputStream getInputStream(long fromTxId, - boolean inProgressOk) throws IOException { - for (EditLogFile elf : getLogFiles(fromTxId)) { - if (elf.containsTxId(fromTxId)) { - if (!inProgressOk && elf.isInProgress()) { + synchronized public void selectInputStreams( + Collection streams, long fromTxId, + boolean inProgressOk) { + List elfs; + try { + elfs = matchEditLogs(sd.getCurrentDir()); + } catch (IOException e) { + LOG.error("error listing files in " + this + ". " + + "Skipping all edit logs in this directory.", e); + return; + } + LOG.debug(this + ": selecting input streams starting at " + fromTxId + + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + + "from among " + elfs.size() + " candidate file(s)"); + for (EditLogFile elf : elfs) { + if (elf.lastTxId < fromTxId) { + LOG.debug("passing over " + elf + " because it ends at " + + elf.lastTxId + ", but we only care about transactions " + + "as new as " + fromTxId); + continue; + } + if (elf.isInProgress()) { + if (!inProgressOk) { + LOG.debug("passing over " + elf + " because it is in progress " + + "and we are ignoring in-progress logs."); continue; } - if (elf.isInProgress()) { + try { elf.validateLog(); + } catch (IOException e) { + LOG.error("got IOException while trying to validate header of " + + elf + ". Skipping.", e); + continue; } - if (LOG.isTraceEnabled()) { - LOG.trace("Returning edit stream reading from " + elf); - } - EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), + } + EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); - long transactionsToSkip = fromTxId - elf.getFirstTxId(); - if (transactionsToSkip > 0) { - LOG.info(String.format("Log begins at txid %d, but requested start " - + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId, - transactionsToSkip)); - } - if (elfis.skipUntil(fromTxId) == false) { - throw new IOException("failed to advance input stream to txid " + - fromTxId); - } - return elfis; - } + LOG.debug("selecting edit log stream " + elf); + streams.add(elfis); } - - throw new IOException("Cannot find editlog file containing " + fromTxId); - } - - @Override - public long getNumberOfTransactions(long fromTxId, boolean inProgressOk) - throws IOException, CorruptionException { - long numTxns = 0L; - - for (EditLogFile elf : getLogFiles(fromTxId)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Counting " + elf); - } - if (elf.getFirstTxId() > fromTxId) { // there must be a gap - LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is " - + fromTxId + " - " + (elf.getFirstTxId() - 1)); - break; - } else if (elf.containsTxId(fromTxId)) { - if (!inProgressOk && elf.isInProgress()) { - break; - } - - if (elf.isInProgress()) { - elf.validateLog(); - } - - if (elf.hasCorruptHeader()) { - break; - } - numTxns += elf.getLastTxId() + 1 - fromTxId; - fromTxId = elf.getLastTxId() + 1; - - if (elf.isInProgress()) { - break; - } - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Journal " + this + " has " + numTxns - + " txns from " + fromTxId); - } - - long max = findMaxTransaction(inProgressOk); - - // fromTxId should be greater than max, as it points to the next - // transaction we should expect to find. If it is less than or equal - // to max, it means that a transaction with txid == max has not been found - if (numTxns == 0 && fromTxId <= max) { - String error = String.format("Gap in transactions, max txnid is %d" - + ", 0 txns from %d", max, fromTxId); - LOG.error(error); - throw new CorruptionException(error); - } - - return numTxns; } @Override @@ -318,7 +275,7 @@ synchronized public void recoverUnfinalizedSegments() throws IOException { } continue; } - + elf.validateLog(); if (elf.hasCorruptHeader()) { @@ -326,19 +283,16 @@ synchronized public void recoverUnfinalizedSegments() throws IOException { throw new CorruptionException("In-progress edit log file is corrupt: " + elf); } - - // If the file has a valid header (isn't corrupt) but contains no - // transactions, we likely just crashed after opening the file and - // writing the header, but before syncing any transactions. Safe to - // delete the file. - if (elf.getNumTransactions() == 0) { - LOG.info("Deleting edit log file with zero transactions " + elf); - if (!elf.getFile().delete()) { - throw new IOException("Unable to delete " + elf.getFile()); - } + if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { + // If the file has a valid header (isn't corrupt) but contains no + // transactions, we likely just crashed after opening the file and + // writing the header, but before syncing any transactions. Safe to + // delete the file. + LOG.info("Moving aside edit log file that seems to have zero " + + "transactions " + elf); + elf.moveAsideEmptyFile(); continue; } - finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId()); } } @@ -361,39 +315,6 @@ List getLogFiles(long fromTxId) throws IOException { return logFiles; } - /** - * Find the maximum transaction in the journal. - */ - private long findMaxTransaction(boolean inProgressOk) - throws IOException { - boolean considerSeenTxId = true; - long seenTxId = NNStorage.readTransactionIdFile(sd); - long maxSeenTransaction = 0; - for (EditLogFile elf : getLogFiles(0)) { - if (elf.isInProgress() && !inProgressOk) { - if (elf.getFirstTxId() != HdfsConstants.INVALID_TXID && - elf.getFirstTxId() <= seenTxId) { - // don't look at the seen_txid file if in-progress logs are not to be - // examined, and the value in seen_txid falls within the in-progress - // segment. - considerSeenTxId = false; - } - continue; - } - - if (elf.isInProgress()) { - maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction); - elf.validateLog(); - } - maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction); - } - if (considerSeenTxId) { - return Math.max(maxSeenTransaction, seenTxId); - } else { - return maxSeenTransaction; - } - } - @Override public String toString() { return String.format("FileJournalManager(root=%s)", sd.getRoot()); @@ -406,7 +327,6 @@ static class EditLogFile { private File file; private final long firstTxId; private long lastTxId; - private long numTx = -1; private boolean hasCorruptHeader = false; private final boolean isInProgress; @@ -454,20 +374,15 @@ boolean containsTxId(long txId) { } /** - * Count the number of valid transactions in a log. + * Find out where the edit log ends. * This will update the lastTxId of the EditLogFile or * mark it as corrupt if it is. */ void validateLog() throws IOException { EditLogValidation val = EditLogFileInputStream.validateEditLog(file); - this.numTx = val.getNumTransactions(); this.lastTxId = val.getEndTxId(); this.hasCorruptHeader = val.hasCorruptHeader(); } - - long getNumTransactions() { - return numTx; - } boolean isInProgress() { return isInProgress; @@ -483,23 +398,31 @@ boolean hasCorruptHeader() { void moveAsideCorruptFile() throws IOException { assert hasCorruptHeader; - + renameSelf(".corrupt"); + } + + void moveAsideEmptyFile() throws IOException { + assert lastTxId == HdfsConstants.INVALID_TXID; + renameSelf(".empty"); + } + + private void renameSelf(String newSuffix) throws IOException { File src = file; - File dst = new File(src.getParent(), src.getName() + ".corrupt"); + File dst = new File(src.getParent(), src.getName() + newSuffix); boolean success = src.renameTo(dst); if (!success) { throw new IOException( - "Couldn't rename corrupt log " + src + " to " + dst); + "Couldn't rename log " + src + " to " + dst); } file = dst; } - + @Override public String toString() { return String.format("EditLogFile(file=%s,first=%019d,last=%019d," - +"inProgress=%b,hasCorruptHeader=%b,numTx=%d)", + +"inProgress=%b,hasCorruptHeader=%b)", file.toString(), firstTxId, lastTxId, - isInProgress(), hasCorruptHeader, numTx); + isInProgress(), hasCorruptHeader); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java index f9c622dc387..9efeca7f953 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java @@ -19,6 +19,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -46,26 +47,17 @@ public interface JournalManager extends Closeable { void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException; /** - * Get the input stream starting with fromTxnId from this journal manager + * Get a list of edit log input streams. The list will start with the + * stream that contains fromTxnId, and continue until the end of the journal + * being managed. + * * @param fromTxnId the first transaction id we want to read * @param inProgressOk whether or not in-progress streams should be returned - * @return the stream starting with transaction fromTxnId - * @throws IOException if a stream cannot be found. - */ - EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) - throws IOException; - - /** - * Get the number of transaction contiguously available from fromTxnId. * - * @param fromTxnId Transaction id to count from - * @param inProgressOk whether or not in-progress streams should be counted - * @return The number of transactions available from fromTxnId - * @throws IOException if the journal cannot be read. - * @throws CorruptionException if there is a gap in the journal at fromTxnId. + * @return a list of streams */ - long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) - throws IOException, CorruptionException; + void selectInputStreams(Collection streams, + long fromTxnId, boolean inProgressOk); /** * Set the amount of memory that this stream should use to buffer edits @@ -92,7 +84,7 @@ void purgeLogsOlderThan(long minTxIdToKeep) * Close the journal manager, freeing any resources it may hold. */ void close() throws IOException; - + /** * Indicate that a journal is cannot be used to load a certain range of * edits. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index d84d79dcb5a..3d2e23bf2d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -19,7 +19,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.SortedSet; @@ -31,11 +34,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimaps; import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultiset; /** * Manages a collection of Journals. None of the methods are synchronized, it is @@ -47,6 +52,17 @@ public class JournalSet implements JournalManager { static final Log LOG = LogFactory.getLog(FSEditLog.class); + static final public Comparator + EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator() { + @Override + public int compare(EditLogInputStream a, EditLogInputStream b) { + return ComparisonChain.start(). + compare(a.getFirstTxId(), b.getFirstTxId()). + compare(b.getLastTxId(), a.getLastTxId()). + result(); + } + }; + /** * Container for a JournalManager paired with its currently * active stream. @@ -194,75 +210,57 @@ public void apply(JournalAndStream jas) throws IOException { }, "close journal"); } - /** - * Find the best editlog input stream to read from txid. - * If a journal throws an CorruptionException while reading from a txn id, - * it means that it has more transactions, but can't find any from fromTxId. - * If this is the case and no other journal has transactions, we should throw - * an exception as it means more transactions exist, we just can't load them. - * - * @param fromTxnId Transaction id to start from. - * @return A edit log input stream with tranactions fromTxId - * or null if no more exist + * In this function, we get a bunch of streams from all of our JournalManager + * objects. Then we add these to the collection one by one. + * + * @param streams The collection to add the streams to. It may or + * may not be sorted-- this is up to the caller. + * @param fromTxId The transaction ID to start looking for streams at + * @param inProgressOk Should we consider unfinalized streams? */ @Override - public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) - throws IOException { - JournalManager bestjm = null; - long bestjmNumTxns = 0; - CorruptionException corruption = null; - + public void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) { + final TreeMultiset allStreams = + TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR); for (JournalAndStream jas : journals) { - if (jas.isDisabled()) continue; - - JournalManager candidate = jas.getManager(); - long candidateNumTxns = 0; - try { - candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId, - inProgressOk); - } catch (CorruptionException ce) { - corruption = ce; - } catch (IOException ioe) { - LOG.warn("Unable to read input streams from JournalManager " + candidate, - ioe); - continue; // error reading disk, just skip - } - - if (candidateNumTxns > bestjmNumTxns) { - bestjm = candidate; - bestjmNumTxns = candidateNumTxns; - } - } - - if (bestjm == null) { - if (corruption != null) { - throw new IOException("No non-corrupt logs for txid " - + fromTxnId, corruption); - } else { - return null; - } - } - return bestjm.getInputStream(fromTxnId, inProgressOk); - } - - @Override - public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) - throws IOException { - long num = 0; - for (JournalAndStream jas: journals) { if (jas.isDisabled()) { LOG.info("Skipping jas " + jas + " since it's disabled"); continue; + } + jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk); + } + // We want to group together all the streams that start on the same start + // transaction ID. To do this, we maintain an accumulator (acc) of all + // the streams we've seen at a given start transaction ID. When we see a + // higher start transaction ID, we select a stream from the accumulator and + // clear it. Then we begin accumulating streams with the new, higher start + // transaction ID. + LinkedList acc = + new LinkedList(); + for (EditLogInputStream elis : allStreams) { + if (acc.isEmpty()) { + acc.add(elis); } else { - long newNum = jas.getManager().getNumberOfTransactions(fromTxnId, - inProgressOk); - if (newNum > num) { - num = newNum; + long accFirstTxId = acc.get(0).getFirstTxId(); + if (accFirstTxId == elis.getFirstTxId()) { + acc.add(elis); + } else if (accFirstTxId < elis.getFirstTxId()) { + streams.add(acc.get(0)); + acc.clear(); + acc.add(elis); + } else if (accFirstTxId > elis.getFirstTxId()) { + throw new RuntimeException("sorted set invariants violated! " + + "Got stream with first txid " + elis.getFirstTxId() + + ", but the last firstTxId was " + accFirstTxId); } } } - return num; + if (!acc.isEmpty()) { + streams.add(acc.get(0)); + acc.clear(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java index da09fff6ea1..768e69604d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/BootstrapStandby.java @@ -225,7 +225,7 @@ private boolean checkLogsAvailableForRead(FSImage image, long imageTxId, try { Collection streams = image.getEditLog().selectInputStreams( - firstTxIdInLogs, curTxIdOnOtherNode, true); + firstTxIdInLogs, curTxIdOnOtherNode, null, true); for (EditLogInputStream stream : streams) { IOUtils.closeStream(stream); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index c11f1d760e6..3733ee0d00e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -201,7 +201,7 @@ private void doTailEdits() throws IOException, InterruptedException { } Collection streams; try { - streams = editLog.selectInputStreams(lastTxnId + 1, 0, false); + streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false); } catch (IOException ioe) { // This is acceptable. If we try to tail edits in the middle of an edits // log roll, i.e. the last one has been finalized but the new inprogress diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java index b07bad252ed..685f19cd28b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java @@ -248,7 +248,7 @@ public void testRollback() throws Exception { baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous"); deleteMatchingFiles(baseDirs, "edits.*"); startNameNodeShouldFail(StartupOption.ROLLBACK, - "No non-corrupt logs for txid "); + "Gap in transactions"); UpgradeUtilities.createEmptyDirs(nameNodeDirs); log("NameNode rollback with no image file", numDirs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 5e869ecb35d..5e77d730b6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.ArrayList; import java.util.Collections; @@ -739,8 +740,9 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs, throw ioe; } else { GenericTestUtils.assertExceptionContains( - "No non-corrupt logs for txid 3", - ioe); + "Gap in transactions. Expected to be able to read up until " + + "at least txid 3 but unable to find any edit logs containing " + + "txid 3", ioe); } } finally { cluster.shutdown(); @@ -769,12 +771,12 @@ public EditLogByteInputStream(byte[] data) throws IOException { } @Override - public long getFirstTxId() throws IOException { + public long getFirstTxId() { return HdfsConstants.INVALID_TXID; } @Override - public long getLastTxId() throws IOException { + public long getLastTxId() { return HdfsConstants.INVALID_TXID; } @@ -1103,9 +1105,9 @@ public void testAlternatingJournalFailure() throws IOException { for (EditLogInputStream edits : editStreams) { FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits); - long read = val.getNumTransactions(); + long read = (val.getEndTxId() - edits.getFirstTxId()) + 1; LOG.info("Loading edits " + edits + " read " + read); - assertEquals(startTxId, val.getStartTxId()); + assertEquals(startTxId, edits.getFirstTxId()); startTxId += read; totaltxnread += read; } @@ -1153,7 +1155,9 @@ public boolean accept(File dir, String name) { fail("Should have thrown exception"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( - "No non-corrupt logs for txid " + startGapTxId, ioe); + "Gap in transactions. Expected to be able to read up until " + + "at least txid 40 but unable to find any edit logs containing " + + "txid 11", ioe); } } @@ -1227,4 +1231,55 @@ public void testFuzzSequences() throws IOException { validateNoCrash(garbage); } } + + /** + * Test creating a directory with lots and lots of edit log segments + */ + @Test + public void testManyEditLogSegments() throws IOException { + final int NUM_EDIT_LOG_ROLLS = 1000; + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + FileSystem fileSys = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); + cluster.waitActive(); + fileSys = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + FSImage fsimage = namesystem.getFSImage(); + final FSEditLog editLog = fsimage.getEditLog(); + for (int i = 0; i < NUM_EDIT_LOG_ROLLS; i++){ + editLog.logSetReplication("fakefile" + i, (short)(i % 3)); + assertExistsInStorageDirs( + cluster, NameNodeDirType.EDITS, + NNStorage.getInProgressEditsFileName((i * 3) + 1)); + editLog.logSync(); + editLog.rollEditLog(); + assertExistsInStorageDirs( + cluster, NameNodeDirType.EDITS, + NNStorage.getFinalizedEditsFileName((i * 3) + 1, (i * 3) + 3)); + } + editLog.close(); + } finally { + if(fileSys != null) fileSys.close(); + if(cluster != null) cluster.shutdown(); + } + + // How long does it take to read through all these edit logs? + long startTime = System.currentTimeMillis(); + try { + cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(NUM_DATA_NODES).build(); + cluster.waitActive(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + long endTime = System.currentTimeMillis(); + double delta = ((float)(endTime - startTime)) / 1000.0; + LOG.info(String.format("loaded %d edit log segments in %.2f seconds", + NUM_EDIT_LOG_ROLLS, delta)); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java index ebcec964a6b..d39df4030d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java @@ -40,8 +40,6 @@ import org.mockito.Mockito; public class TestEditLogFileOutputStream { - - private final static long PREALLOCATION_LENGTH = (1024 * 1024) + 4; private final static int HEADER_LEN = 17; private static final File TEST_EDITS = new File(System.getProperty("test.build.data","/tmp"), @@ -51,21 +49,22 @@ public class TestEditLogFileOutputStream { public void deleteEditsFile() { TEST_EDITS.delete(); } - + @Test public void testPreallocation() throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) .build(); + final long START_TXID = 1; StorageDirectory sd = cluster.getNameNode().getFSImage() .getStorage().getStorageDir(0); - File editLog = NNStorage.getInProgressEditsFile(sd, 1); + File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID); EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog); assertEquals("Edit log should contain a header as valid length", HEADER_LEN, validation.getValidLength()); - assertEquals(1, validation.getNumTransactions()); + assertEquals(validation.getEndTxId(), START_TXID); assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " + "for the version number", EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); @@ -79,7 +78,7 @@ public void testPreallocation() throws IOException { assertTrue("Edit log should have more valid data after writing a txn " + "(was: " + oldLength + " now: " + validation.getValidLength() + ")", validation.getValidLength() > oldLength); - assertEquals(2, validation.getNumTransactions()); + assertEquals(1, validation.getEndTxId() - START_TXID); assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number", EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 4302534d153..a54df2ca818 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -30,6 +30,7 @@ import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import org.apache.commons.logging.impl.Log4JLogger; @@ -40,16 +41,23 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.Level; import org.junit.Test; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.Files; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; + public class TestFSEditLogLoader { static { @@ -153,108 +161,6 @@ public void testReplicationAdjusted() throws IOException { } } - /** - * Test that the valid number of transactions can be counted from a file. - * @throws IOException - */ - @Test - public void testCountValidTransactions() throws IOException { - File testDir = new File(TEST_DIR, "testCountValidTransactions"); - File logFile = new File(testDir, - NNStorage.getInProgressEditsFileName(1)); - - // Create a log file, and return the offsets at which each - // transaction starts. - FSEditLog fsel = null; - final int NUM_TXNS = 30; - SortedMap offsetToTxId = Maps.newTreeMap(); - try { - fsel = FSImageTestUtil.createStandaloneEditLog(testDir); - fsel.openForWrite(); - assertTrue("should exist: " + logFile, logFile.exists()); - - for (int i = 0; i < NUM_TXNS; i++) { - long trueOffset = getNonTrailerLength(logFile); - long thisTxId = fsel.getLastWrittenTxId() + 1; - offsetToTxId.put(trueOffset, thisTxId); - System.err.println("txid " + thisTxId + " at offset " + trueOffset); - fsel.logDelete("path" + i, i); - fsel.logSync(); - } - } finally { - if (fsel != null) { - fsel.close(); - } - } - - // The file got renamed when the log was closed. - logFile = testDir.listFiles()[0]; - long validLength = getNonTrailerLength(logFile); - - // Make sure that uncorrupted log has the expected length and number - // of transactions. - EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals(NUM_TXNS + 2, validation.getNumTransactions()); - assertEquals(validLength, validation.getValidLength()); - - // Back up the uncorrupted log - File logFileBak = new File(testDir, logFile.getName() + ".bak"); - Files.copy(logFile, logFileBak); - - // Corrupt the log file in various ways for each txn - for (Map.Entry entry : offsetToTxId.entrySet()) { - long txOffset = entry.getKey(); - long txid = entry.getValue(); - - // Restore backup, truncate the file exactly before the txn - Files.copy(logFileBak, logFile); - truncateFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals("Failed when truncating to length " + txOffset, - txid - 1, validation.getNumTransactions()); - assertEquals(txOffset, validation.getValidLength()); - - // Restore backup, truncate the file with one byte in the txn, - // also isn't valid - Files.copy(logFileBak, logFile); - truncateFile(logFile, txOffset + 1); - validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals("Failed when truncating to length " + (txOffset + 1), - txid - 1, validation.getNumTransactions()); - assertEquals(txOffset, validation.getValidLength()); - - // Restore backup, corrupt the txn opcode - Files.copy(logFileBak, logFile); - corruptByteInFile(logFile, txOffset); - validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals("Failed when corrupting txn opcode at " + txOffset, - txid - 1, validation.getNumTransactions()); - assertEquals(txOffset, validation.getValidLength()); - - // Restore backup, corrupt a byte a few bytes into the txn - Files.copy(logFileBak, logFile); - corruptByteInFile(logFile, txOffset+5); - validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals("Failed when corrupting txn data at " + (txOffset+5), - txid - 1, validation.getNumTransactions()); - assertEquals(txOffset, validation.getValidLength()); - } - - // Corrupt the log at every offset to make sure that validation itself - // never throws an exception, and that the calculated lengths are monotonically - // increasing - long prevNumValid = 0; - for (long offset = 0; offset < validLength; offset++) { - Files.copy(logFileBak, logFile); - corruptByteInFile(logFile, offset); - EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile); - assertTrue(String.format("%d should have been >= %d", - val.getNumTransactions(), prevNumValid), - val.getNumTransactions() >= prevNumValid); - prevNumValid = val.getNumTransactions(); - } - } - /** * Corrupt the byte at the given offset in the given file, * by subtracting 1 from it. @@ -318,7 +224,7 @@ private static long getNonTrailerLength(File f) throws IOException { fis.close(); } } - + @Test public void testStreamLimiter() throws IOException { final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test"); @@ -361,4 +267,75 @@ public void testStreamLimiter() throws IOException { tracker.close(); } } + + /** + * Create an unfinalized edit log for testing purposes + * + * @param testDir Directory to create the edit log in + * @param numTx Number of transactions to add to the new edit log + * @param offsetToTxId A map from transaction IDs to offsets in the + * edit log file. + * @return The new edit log file name. + * @throws IOException + */ + static private File prepareUnfinalizedTestEditLog(File testDir, int numTx, + SortedMap offsetToTxId) throws IOException { + File inProgressFile = new File(testDir, NNStorage.getInProgressEditsFileName(1)); + FSEditLog fsel = null, spyLog = null; + try { + fsel = FSImageTestUtil.createStandaloneEditLog(testDir); + spyLog = spy(fsel); + // Normally, the in-progress edit log would be finalized by + // FSEditLog#endCurrentLogSegment. For testing purposes, we + // disable that here. + doNothing().when(spyLog).endCurrentLogSegment(true); + spyLog.openForWrite(); + assertTrue("should exist: " + inProgressFile, inProgressFile.exists()); + + for (int i = 0; i < numTx; i++) { + long trueOffset = getNonTrailerLength(inProgressFile); + long thisTxId = spyLog.getLastWrittenTxId() + 1; + offsetToTxId.put(trueOffset, thisTxId); + System.err.println("txid " + thisTxId + " at offset " + trueOffset); + spyLog.logDelete("path" + i, i); + spyLog.logSync(); + } + } finally { + if (spyLog != null) { + spyLog.close(); + } else if (fsel != null) { + fsel.close(); + } + } + return inProgressFile; + } + + @Test + public void testValidateEditLogWithCorruptHeader() throws IOException { + File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptHeader"); + SortedMap offsetToTxId = Maps.newTreeMap(); + File logFile = prepareUnfinalizedTestEditLog(testDir, 2, offsetToTxId); + RandomAccessFile rwf = new RandomAccessFile(logFile, "rw"); + try { + rwf.seek(0); + rwf.writeLong(42); // corrupt header + } finally { + rwf.close(); + } + EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); + assertTrue(validation.hasCorruptHeader()); + } + + @Test + public void testValidateEmptyEditLog() throws IOException { + File testDir = new File(TEST_DIR, "testValidateEmptyEditLog"); + SortedMap offsetToTxId = Maps.newTreeMap(); + File logFile = prepareUnfinalizedTestEditLog(testDir, 0, offsetToTxId); + // Truncate the file so that there is nothing except the header + truncateFile(logFile, 4); + EditLogValidation validation = + EditLogFileInputStream.validateEditLog(logFile); + assertTrue(!validation.hasCorruptHeader()); + assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index 0ac194439d3..e972f599b96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; import java.net.URI; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Iterator; @@ -29,10 +30,14 @@ import java.io.FilenameFilter; import java.io.IOException; import org.junit.Test; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec; @@ -40,10 +45,52 @@ import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL; import com.google.common.collect.ImmutableList; +import com.google.common.collect.TreeMultiset; import com.google.common.base.Joiner; public class TestFileJournalManager { + static final Log LOG = LogFactory.getLog(TestFileJournalManager.class); + /** + * Find out how many transactions we can read from a + * FileJournalManager, starting at a given transaction ID. + * + * @param jm The journal manager + * @param fromTxId Transaction ID to start at + * @param inProgressOk Should we consider edit logs that are not finalized? + * @return The number of transactions + * @throws IOException + */ + static long getNumberOfTransactions(FileJournalManager jm, long fromTxId, + boolean inProgressOk, boolean abortOnGap) throws IOException { + long numTransactions = 0, txId = fromTxId; + final TreeMultiset allStreams = + TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + jm.selectInputStreams(allStreams, fromTxId, inProgressOk); + + try { + for (EditLogInputStream elis : allStreams) { + elis.skipUntil(txId); + while (true) { + FSEditLogOp op = elis.readOp(); + if (op == null) { + break; + } + if (abortOnGap && (op.getTransactionId() != txId)) { + LOG.info("getNumberOfTransactions: detected gap at txId " + + fromTxId); + return numTransactions; + } + txId = op.getTransactionId() + 1; + numTransactions++; + } + } + } finally { + IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); + } + return numTransactions; + } + /** * Test the normal operation of loading transactions from * file journal manager. 3 edits directories are setup without any @@ -61,7 +108,7 @@ public void testNormalOperation() throws IOException { long numJournals = 0; for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { FileJournalManager jm = new FileJournalManager(sd, storage); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); + assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); numJournals++; } assertEquals(3, numJournals); @@ -82,7 +129,7 @@ public void testInprogressRecovery() throws IOException { FileJournalManager jm = new FileJournalManager(sd, storage); assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, - jm.getNumberOfTransactions(1, true)); + getNumberOfTransactions(jm, 1, true, false)); } /** @@ -104,16 +151,16 @@ public void testInprogressRecoveryMixed() throws IOException { Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); StorageDirectory sd = dirs.next(); FileJournalManager jm = new FileJournalManager(sd, storage); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); + assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); sd = dirs.next(); jm = new FileJournalManager(sd, storage); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, - true)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, + true, false)); sd = dirs.next(); jm = new FileJournalManager(sd, storage); - assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); + assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false)); } /** @@ -137,18 +184,18 @@ public void testInprogressRecoveryAll() throws IOException { Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); StorageDirectory sd = dirs.next(); FileJournalManager jm = new FileJournalManager(sd, storage); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, - true)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, + true, false)); sd = dirs.next(); jm = new FileJournalManager(sd, storage); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, - true)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, + true, false)); sd = dirs.next(); jm = new FileJournalManager(sd, storage); - assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, - true)); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1, + true, false)); } /** @@ -198,24 +245,15 @@ public void testReadFromStream() throws IOException { FileJournalManager jm = new FileJournalManager(sd, storage); long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; - assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true)); + assertEquals(expectedTotalTxnCount, getNumberOfTransactions(jm, 1, + true, false)); long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files long startingTxId = skippedTxns + 1; - long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId, true); - long numLoaded = 0; - while (numLoaded < numTransactionsToLoad) { - EditLogInputStream editIn = jm.getInputStream(startingTxId, true); - FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn); - long count = val.getNumTransactions(); - - editIn.close(); - startingTxId += count; - numLoaded += count; - } - - assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded); + long numLoadable = getNumberOfTransactions(jm, startingTxId, + true, false); + assertEquals(expectedTotalTxnCount - skippedTxns, numLoadable); } /** @@ -236,8 +274,8 @@ public void testAskForTransactionsMidfile() throws IOException { // 10 rolls, so 11 rolled files, 110 txids total. final int TOTAL_TXIDS = 10 * 11; for (int txid = 1; txid <= TOTAL_TXIDS; txid++) { - assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid, - true)); + assertEquals((TOTAL_TXIDS - txid) + 1, getNumberOfTransactions(jm, txid, + true, false)); } } @@ -269,19 +307,13 @@ public boolean accept(File dir, String name) { assertTrue(files[0].delete()); FileJournalManager jm = new FileJournalManager(sd, storage); - assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true)); + assertEquals(startGapTxId-1, getNumberOfTransactions(jm, 1, true, true)); - try { - jm.getNumberOfTransactions(startGapTxId, true); - fail("Should have thrown an exception by now"); - } catch (IOException ioe) { - GenericTestUtils.assertExceptionContains( - "Gap in transactions, max txnid is 110, 0 txns from 31", ioe); - } + assertEquals(0, getNumberOfTransactions(jm, startGapTxId, true, true)); // rolled 10 times so there should be 11 files. assertEquals(11*TXNS_PER_ROLL - endGapTxId, - jm.getNumberOfTransactions(endGapTxId + 1, true)); + getNumberOfTransactions(jm, endGapTxId + 1, true, true)); } /** @@ -308,7 +340,7 @@ public boolean accept(File dir, String name) { FileJournalManager jm = new FileJournalManager(sd, storage); assertEquals(10*TXNS_PER_ROLL+1, - jm.getNumberOfTransactions(1, true)); + getNumberOfTransactions(jm, 1, true, false)); } @Test @@ -345,6 +377,33 @@ public void testMatchEditLogInvalidDirThrowsIOException() throws IOException { FileJournalManager.matchEditLogs(badDir); } + private static EditLogInputStream getJournalInputStream(JournalManager jm, + long txId, boolean inProgressOk) throws IOException { + final TreeMultiset allStreams = + TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR); + jm.selectInputStreams(allStreams, txId, inProgressOk); + try { + for (Iterator iter = allStreams.iterator(); + iter.hasNext();) { + EditLogInputStream elis = iter.next(); + if (elis.getFirstTxId() > txId) { + break; + } + if (elis.getLastTxId() < txId) { + iter.remove(); + elis.close(); + continue; + } + elis.skipUntil(txId); + iter.remove(); + return elis; + } + } finally { + IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0])); + } + return null; + } + /** * Make sure that we starting reading the correct op when we request a stream * with a txid in the middle of an edit log file. @@ -359,7 +418,7 @@ public void testReadFromMiddleOfEditLog() throws CorruptionException, FileJournalManager jm = new FileJournalManager(sd, storage); - EditLogInputStream elis = jm.getInputStream(5, true); + EditLogInputStream elis = getJournalInputStream(jm, 5, true); FSEditLogOp op = elis.readOp(); assertEquals("read unexpected op", op.getTransactionId(), 5); } @@ -381,9 +440,9 @@ public void testExcludeInProgressStreams() throws CorruptionException, FileJournalManager jm = new FileJournalManager(sd, storage); // If we exclude the in-progess stream, we should only have 100 tx. - assertEquals(100, jm.getNumberOfTransactions(1, false)); + assertEquals(100, getNumberOfTransactions(jm, 1, false, false)); - EditLogInputStream elis = jm.getInputStream(90, false); + EditLogInputStream elis = getJournalInputStream(jm, 90, false); FSEditLogOp lastReadOp = null; while ((lastReadOp = elis.readOp()) != null) { assertTrue(lastReadOp.getTransactionId() <= 100); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java index 51e49a92375..f21f65ee013 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestGenericJournalConf.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import static org.mockito.Mockito.mock; @@ -26,9 +24,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.io.Writable; import java.net.URI; +import java.util.Collection; import java.io.IOException; public class TestGenericJournalConf { @@ -144,15 +142,8 @@ public void finalizeLogSegment(long firstTxId, long lastTxId) } @Override - public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) - throws IOException { - return null; - } - - @Override - public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) - throws IOException { - return 0; + public void selectInputStreams(Collection streams, + long fromTxnId, boolean inProgressOk) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java index 5a86fbf2c59..608ee26e3fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -333,7 +333,7 @@ public String getName() { static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) throws IOException { final String TEST_PATH = "/test/path/dir"; - final int NUM_TEST_MKDIRS = 10; + final String TEST_PATH2 = "/second/dir"; final boolean needRecovery = corruptor.needRecovery(finalize); // start a cluster @@ -357,9 +357,8 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); FSImage fsimage = namesystem.getFSImage(); - for (int i = 0; i < NUM_TEST_MKDIRS; i++) { - fileSys.mkdirs(new Path(TEST_PATH)); - } + fileSys.mkdirs(new Path(TEST_PATH)); + fileSys.mkdirs(new Path(TEST_PATH2)); sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next(); } finally { if (cluster != null) { @@ -371,6 +370,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) assertTrue("Should exist: " + editFile, editFile.exists()); // Corrupt the edit log + LOG.info("corrupting edit log file '" + editFile + "'"); corruptor.corrupt(editFile); // If needRecovery == true, make sure that we can't start the @@ -423,6 +423,7 @@ static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) .format(false).build(); LOG.debug("successfully recovered the " + corruptor.getName() + " corrupted edit log"); + cluster.waitActive(); assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH))); } catch (IOException e) { fail("failed to recover. Error message: " + e.getMessage()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java index 97effd449a7..adcdc6bebd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -23,6 +23,7 @@ import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.junit.After; @@ -278,7 +280,7 @@ private LimitedEditLogAnswer causeFailureOnEditLogRead() throws IOException { .getEditLog()); LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); doAnswer(answer).when(spyEditLog).selectInputStreams( - anyLong(), anyLong(), anyBoolean()); + anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean()); nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); return answer;