diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0095f4511de..1fa8ac4eaf7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -82,6 +82,9 @@ Release 0.23.1 - UNRELEASED for a client on the same node as the block file. (Andrew Purtell, Suresh Srinivas and Jitendra Nath Pandey via szetszwo) + HDFS-2018. Move all journal stream management code into one place. + (Ivan Kelly via jitendra) + BUG FIXES HDFS-2541. For a sufficiently large value of blocks, the DN Scanner diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index fae4f8d80f7..547c9765ba4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -18,18 +18,21 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Collection; import java.util.Iterator; +import java.util.List; +import java.util.zip.Checksum; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageState; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; /** * Extension of FSImage for the backup node. @@ -257,11 +260,18 @@ public class BackupImage extends FSImage { new FSImageTransactionalStorageInspector(); storage.inspectStorageDirs(inspector); - LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId, - target - 1); - - logLoadPlan.doRecovery(); - loadEdits(logLoadPlan.getEditsFiles()); + + editLog.recoverUnclosedStreams(); + Iterable editStreamsAll + = editLog.selectInputStreams(lastAppliedTxId, target - 1); + // remove inprogress + List editStreams = Lists.newArrayList(); + for (EditLogInputStream s : editStreamsAll) { + if (s.getFirstTxId() != editLog.getCurSegmentTxId()) { + editStreams.add(s); + } + } + loadEdits(editStreams); } // now, need to load the in-progress file @@ -271,7 +281,24 @@ public class BackupImage extends FSImage { return false; // drop lock and try again to load local logs } - EditLogInputStream stream = getEditLog().getInProgressFileInputStream(); + EditLogInputStream stream = null; + Collection editStreams + = getEditLog().selectInputStreams( + getEditLog().getCurSegmentTxId(), + getEditLog().getCurSegmentTxId()); + + for (EditLogInputStream s : editStreams) { + if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) { + stream = s; + } + break; + } + if (stream == null) { + LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId() + + ". This indicates that there is an error in synchronization in BackupImage"); + return false; + } + try { long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId; @@ -285,7 +312,7 @@ public class BackupImage extends FSImage { "expected to load " + remainingTxns + " but loaded " + numLoaded + " from " + stream; } finally { - IOUtils.closeStream(stream); + FSEditLog.closeAllStreams(editStreams); } LOG.info("Successfully synced BackupNode with NameNode at txnid " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index 7dd2e2338fa..8eb8984bced 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -57,12 +57,31 @@ class BackupJournalManager implements JournalManager { throws IOException { } + @Override + public long getNumberOfTransactions(long fromTxnId) + throws IOException, CorruptionException { + // This JournalManager is never used for input. Therefore it cannot + // return any transactions + return 0; + } + + @Override + public EditLogInputStream getInputStream(long fromTxnId) throws IOException { + // This JournalManager is never used for input. Therefore it cannot + // return any transactions + throw new IOException("Unsupported operation"); + } + + @Override + public void recoverUnfinalizedSegments() throws IOException { + } + public boolean matchesRegistration(NamenodeRegistration bnReg) { return bnReg.getAddress().equals(this.bnReg.getAddress()); } @Override - public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) { - return null; + public String toString() { + return "BackupJournalManager"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index 2c861ac217e..76b295769bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -275,16 +275,17 @@ class Checkpointer extends Daemon { FSImage dstImage) throws IOException { NNStorage dstStorage = dstImage.getStorage(); - List editsFiles = Lists.newArrayList(); + List editsStreams = Lists.newArrayList(); for (RemoteEditLog log : manifest.getLogs()) { File f = dstStorage.findFinalizedEditsFile( log.getStartTxId(), log.getEndTxId()); if (log.getStartTxId() > dstImage.getLastAppliedTxId()) { - editsFiles.add(f); - } + editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(), + log.getEndTxId())); + } } LOG.info("Checkpointer about to load edits from " + - editsFiles.size() + " file(s)."); - dstImage.loadEdits(editsFiles); + editsStreams.size() + " stream(s)."); + dstImage.loadEdits(editsStreams); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index 8921bc0c554..974697d9271 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import com.google.common.base.Preconditions; /** @@ -122,4 +123,14 @@ class EditLogBackupInputStream extends EditLogInputStream { reader = null; this.version = 0; } + + @Override + public long getFirstTxId() throws IOException { + return HdfsConstants.INVALID_TXID; + } + + @Override + public long getLastTxId() throws IOException { + return HdfsConstants.INVALID_TXID; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 2e0404ed397..8685ca59345 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -27,6 +27,7 @@ import java.io.DataInputStream; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import com.google.common.annotations.VisibleForTesting; @@ -37,12 +38,15 @@ import com.google.common.annotations.VisibleForTesting; class EditLogFileInputStream extends EditLogInputStream { private final File file; private final FileInputStream fStream; + final private long firstTxId; + final private long lastTxId; private final int logVersion; private final FSEditLogOp.Reader reader; private final FSEditLogLoader.PositionTrackingInputStream tracker; /** * Open an EditLogInputStream for the given file. + * The file is pretransactional, so has no txids * @param name filename to open * @throws LogHeaderCorruptException if the header is either missing or * appears to be corrupt/truncated @@ -51,6 +55,21 @@ class EditLogFileInputStream extends EditLogInputStream { */ EditLogFileInputStream(File name) throws LogHeaderCorruptException, IOException { + this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID); + } + + /** + * Open an EditLogInputStream for the given file. + * @param name filename to open + * @param firstTxId first transaction found in file + * @param lastTxId last transaction id found in file + * @throws LogHeaderCorruptException if the header is either missing or + * appears to be corrupt/truncated + * @throws IOException if an actual IO error occurs while reading the + * header + */ + EditLogFileInputStream(File name, long firstTxId, long lastTxId) + throws LogHeaderCorruptException, IOException { file = name; fStream = new FileInputStream(name); @@ -65,6 +84,18 @@ class EditLogFileInputStream extends EditLogInputStream { } reader = new FSEditLogOp.Reader(in, logVersion); + this.firstTxId = firstTxId; + this.lastTxId = lastTxId; + } + + @Override + public long getFirstTxId() throws IOException { + return firstTxId; + } + + @Override + public long getLastTxId() throws IOException { + return lastTxId; } @Override // JournalStream @@ -116,7 +147,8 @@ class EditLogFileInputStream extends EditLogInputStream { // If it's missing its header, this is equivalent to no transactions FSImage.LOG.warn("Log at " + file + " has no valid header", corrupt); - return new FSEditLogLoader.EditLogValidation(0, 0); + return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID, + HdfsConstants.INVALID_TXID); } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index 52a3dd4c203..c6f850542f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -28,6 +28,17 @@ import java.io.IOException; * into the #{@link EditLogOutputStream}. */ abstract class EditLogInputStream implements JournalStream, Closeable { + /** + * @return the first transaction which will be found in this stream + */ + public abstract long getFirstTxId() throws IOException; + + /** + * @return the last transaction which will be found in this stream + */ + public abstract long getLastTxId() throws IOException; + + /** * Close the stream. * @throws IOException if an error occurred while closing diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 2108dedb522..7f9c218ee59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -35,11 +36,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.io.IOUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -1068,6 +1071,112 @@ public class FSEditLog { } } + /** + * Find the best editlog input stream to read from txid. In this case + * best means the editlog which has the largest continuous range of + * transactions starting from the transaction id, fromTxId. + * + * If a journal throws an CorruptionException while reading from a txn id, + * it means that it has more transactions, but can't find any from fromTxId. + * If this is the case and no other journal has transactions, we should throw + * an exception as it means more transactions exist, we just can't load them. + * + * @param fromTxId Transaction id to start from. + * @return a edit log input stream with tranactions fromTxId + * or null if no more exist + */ + private EditLogInputStream selectStream(long fromTxId) + throws IOException { + JournalManager bestjm = null; + long bestjmNumTxns = 0; + CorruptionException corruption = null; + + for (JournalAndStream jas : journals) { + JournalManager candidate = jas.getManager(); + long candidateNumTxns = 0; + try { + candidateNumTxns = candidate.getNumberOfTransactions(fromTxId); + } catch (CorruptionException ce) { + corruption = ce; + } catch (IOException ioe) { + LOG.warn("Error reading number of transactions from " + candidate); + continue; // error reading disk, just skip + } + + if (candidateNumTxns > bestjmNumTxns) { + bestjm = candidate; + bestjmNumTxns = candidateNumTxns; + } + } + + + if (bestjm == null) { + /** + * If all candidates either threw a CorruptionException or + * found 0 transactions, then a gap exists. + */ + if (corruption != null) { + throw new IOException("Gap exists in logs from " + + fromTxId, corruption); + } else { + return null; + } + } + + return bestjm.getInputStream(fromTxId); + } + + /** + * Run recovery on all journals to recover any unclosed segments + */ + void recoverUnclosedStreams() { + mapJournalsAndReportErrors(new JournalClosure() { + @Override + public void apply(JournalAndStream jas) throws IOException { + jas.manager.recoverUnfinalizedSegments(); + } + }, "recovering unclosed streams"); + } + + /** + * Select a list of input streams to load. + * @param fromTxId first transaction in the selected streams + * @param toAtLeast the selected streams must contain this transaction + */ + Collection selectInputStreams(long fromTxId, long toAtLeastTxId) + throws IOException { + List streams = Lists.newArrayList(); + + boolean gapFound = false; + EditLogInputStream stream = selectStream(fromTxId); + while (stream != null) { + fromTxId = stream.getLastTxId() + 1; + streams.add(stream); + try { + stream = selectStream(fromTxId); + } catch (IOException ioe) { + gapFound = true; + break; + } + } + if (fromTxId <= toAtLeastTxId || gapFound) { + closeAllStreams(streams); + throw new IOException("No non-corrupt logs for txid " + + fromTxId); + } + return streams; + } + + /** + * Close all the streams in a collection + * @param streams The list of streams to close + */ + static void closeAllStreams(Iterable streams) { + for (EditLogInputStream s : streams) { + IOUtils.closeStream(s); + } + } + /** * Container for a JournalManager paired with its currently * active stream. @@ -1137,30 +1246,5 @@ public class FSEditLog { JournalManager getManager() { return manager; } - - private EditLogInputStream getInProgressInputStream() throws IOException { - return manager.getInProgressInputStream(segmentStartsAtTxId); - } - } - - /** - * @return an EditLogInputStream that reads from the same log that - * the edit log is currently writing. This is used from the BackupNode - * during edits synchronization. - * @throws IOException if no valid logs are available. - */ - synchronized EditLogInputStream getInProgressFileInputStream() - throws IOException { - for (JournalAndStream jas : journals) { - if (!jas.isActive()) continue; - try { - EditLogInputStream in = jas.getInProgressInputStream(); - if (in != null) return in; - } catch (IOException ioe) { - LOG.warn("Unable to get the in-progress input stream from " + jas, - ioe); - } - } - throw new IOException("No in-progress stream provided edits"); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 51865c82de1..991fd08c842 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -446,24 +446,6 @@ public class FSEditLogLoader { } } - static EditLogValidation validateEditLog(File file) throws IOException { - EditLogFileInputStream in; - try { - in = new EditLogFileInputStream(file); - } catch (LogHeaderCorruptException corrupt) { - // If it's missing its header, this is equivalent to no transactions - FSImage.LOG.warn("Log at " + file + " has no valid header", - corrupt); - return new EditLogValidation(0, 0); - } - - try { - return validateEditLog(in); - } finally { - IOUtils.closeStream(in); - } - } - /** * Return the number of valid transactions in the stream. If the stream is * truncated during the header, returns a value indicating that there are @@ -473,12 +455,26 @@ public class FSEditLogLoader { * if the log does not exist) */ static EditLogValidation validateEditLog(EditLogInputStream in) { - long numValid = 0; long lastPos = 0; + long firstTxId = HdfsConstants.INVALID_TXID; + long lastTxId = HdfsConstants.INVALID_TXID; + long numValid = 0; try { + FSEditLogOp op = null; while (true) { lastPos = in.getPosition(); - if (in.readOp() == null) { + if ((op = in.readOp()) == null) { + break; + } + if (firstTxId == HdfsConstants.INVALID_TXID) { + firstTxId = op.txid; + } + if (lastTxId == HdfsConstants.INVALID_TXID + || op.txid == lastTxId + 1) { + lastTxId = op.txid; + } else { + FSImage.LOG.error("Out of order txid found. Found " + op.txid + + ", expected " + (lastTxId + 1)); break; } numValid++; @@ -489,16 +485,33 @@ public class FSEditLogLoader { FSImage.LOG.debug("Caught exception after reading " + numValid + " ops from " + in + " while determining its valid length.", t); } - return new EditLogValidation(lastPos, numValid); + return new EditLogValidation(lastPos, firstTxId, lastTxId); } static class EditLogValidation { - long validLength; - long numTransactions; - - EditLogValidation(long validLength, long numTransactions) { + private long validLength; + private long startTxId; + private long endTxId; + + EditLogValidation(long validLength, + long startTxId, long endTxId) { this.validLength = validLength; - this.numTransactions = numTransactions; + this.startTxId = startTxId; + this.endTxId = endTxId; + } + + long getValidLength() { return validLength; } + + long getStartTxId() { return startTxId; } + + long getEndTxId() { return endTxId; } + + long getNumTransactions() { + if (endTxId == HdfsConstants.INVALID_TXID + || startTxId == HdfsConstants.INVALID_TXID) { + return 0; + } + return (endTxId - startTxId) + 1; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 50f87e4fc5f..4032a149824 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hdfs.server.common.Util; import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; -import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan; + import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; @@ -584,32 +584,38 @@ public class FSImage implements Closeable { FSImageStorageInspector inspector = storage.readAndInspectDirs(); isUpgradeFinalized = inspector.isUpgradeFinalized(); - + + FSImageStorageInspector.FSImageFile imageFile + = inspector.getLatestImage(); boolean needToSave = inspector.needToSave(); + + Iterable editStreams = null; + + editLog.recoverUnclosedStreams(); + + if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, + getLayoutVersion())) { + editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, + inspector.getMaxSeenTxId()); + } else { + editStreams = FSImagePreTransactionalStorageInspector + .getEditLogStreams(storage); + } + + LOG.debug("Planning to load image :\n" + imageFile); + for (EditLogInputStream l : editStreams) { + LOG.debug("\t Planning to load edit stream: " + l); + } - // Plan our load. This will throw if it's impossible to load from the - // data that's available. - LoadPlan loadPlan = inspector.createLoadPlan(); - LOG.debug("Planning to load image using following plan:\n" + loadPlan); - - - // Recover from previous interrupted checkpoint, if any - needToSave |= loadPlan.doRecovery(); - - // - // Load in bits - // - StorageDirectory sdForProperties = - loadPlan.getStorageDirectoryForProperties(); - storage.readProperties(sdForProperties); - File imageFile = loadPlan.getImageFile(); - try { + StorageDirectory sdForProperties = imageFile.sd; + storage.readProperties(sdForProperties); + if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { // For txid-based layout, we should have a .md5 file // next to the image file - loadFSImage(imageFile); + loadFSImage(imageFile.getFile()); } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) { // In 0.22, we have the checksum stored in the VERSION file. @@ -621,17 +627,19 @@ public class FSImage implements Closeable { NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY + " not set for storage directory " + sdForProperties.getRoot()); } - loadFSImage(imageFile, new MD5Hash(md5)); + loadFSImage(imageFile.getFile(), new MD5Hash(md5)); } else { // We don't have any record of the md5sum - loadFSImage(imageFile, null); + loadFSImage(imageFile.getFile(), null); } } catch (IOException ioe) { - throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe); + FSEditLog.closeAllStreams(editStreams); + throw new IOException("Failed to load image from " + imageFile, ioe); } - long numLoaded = loadEdits(loadPlan.getEditsFiles()); - needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded); + long numLoaded = loadEdits(editStreams); + needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), + numLoaded); // update the txid for the edit log editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); @@ -663,22 +671,25 @@ public class FSImage implements Closeable { * Load the specified list of edit files into the image. * @return the number of transactions loaded */ - protected long loadEdits(List editLogs) throws IOException { - LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editLogs)); + protected long loadEdits(Iterable editStreams) throws IOException { + LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); long startingTxId = getLastAppliedTxId() + 1; - - FSEditLogLoader loader = new FSEditLogLoader(namesystem); int numLoaded = 0; - // Load latest edits - for (File edits : editLogs) { - LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId); - EditLogFileInputStream editIn = new EditLogFileInputStream(edits); - int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); - startingTxId += thisNumLoaded; - numLoaded += thisNumLoaded; - lastAppliedTxId += thisNumLoaded; - editIn.close(); + + try { + FSEditLogLoader loader = new FSEditLogLoader(namesystem); + + // Load latest edits + for (EditLogInputStream editIn : editStreams) { + LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId); + int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); + startingTxId += thisNumLoaded; + numLoaded += thisNumLoaded; + lastAppliedTxId += thisNumLoaded; + } + } finally { + FSEditLog.closeAllStreams(editStreams); } // update the counts diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java index cec2eeff2d8..91076ef5f65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImagePreTransactionalStorageInspector.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; @@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector { private boolean hasOutOfDateStorageDirs = false; /* Flag set false if there are any "previous" directories found */ private boolean isUpgradeFinalized = true; + private boolean needToSaveAfterRecovery = false; // Track the name and edits dir with the latest times private long latestNameCheckpointTime = Long.MIN_VALUE; @@ -139,15 +141,15 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector { boolean isUpgradeFinalized() { return isUpgradeFinalized; } - + @Override - LoadPlan createLoadPlan() throws IOException { + FSImageFile getLatestImage() throws IOException { // We should have at least one image and one edits dirs if (latestNameSD == null) throw new IOException("Image file is not found in " + imageDirs); if (latestEditsSD == null) throw new IOException("Edits file is not found in " + editsDirs); - + // Make sure we are loading image and edits from same checkpoint if (latestNameCheckpointTime > latestEditsCheckpointTime && latestNameSD != latestEditsSD @@ -168,92 +170,70 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector { "image checkpoint time = " + latestNameCheckpointTime + "edits checkpoint time = " + latestEditsCheckpointTime); } + + needToSaveAfterRecovery = doRecovery(); - return new PreTransactionalLoadPlan(); + return new FSImageFile(latestNameSD, + NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE), + HdfsConstants.INVALID_TXID); } - + @Override boolean needToSave() { return hasOutOfDateStorageDirs || checkpointTimes.size() != 1 || - latestNameCheckpointTime > latestEditsCheckpointTime; - + latestNameCheckpointTime > latestEditsCheckpointTime || + needToSaveAfterRecovery; } - private class PreTransactionalLoadPlan extends LoadPlan { - - @Override - boolean doRecovery() throws IOException { - LOG.debug( + boolean doRecovery() throws IOException { + LOG.debug( "Performing recovery in "+ latestNameSD + " and " + latestEditsSD); - boolean needToSave = false; - File curFile = - NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE); - File ckptFile = - NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW); - - // - // If we were in the midst of a checkpoint - // - if (ckptFile.exists()) { - needToSave = true; - if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW) - .exists()) { - // - // checkpointing migth have uploaded a new - // merged image, but we discard it here because we are - // not sure whether the entire merged image was uploaded - // before the namenode crashed. - // - if (!ckptFile.delete()) { - throw new IOException("Unable to delete " + ckptFile); - } - } else { - // - // checkpointing was in progress when the namenode - // shutdown. The fsimage.ckpt was created and the edits.new - // file was moved to edits. We complete that checkpoint by - // moving fsimage.new to fsimage. There is no need to - // update the fstime file here. renameTo fails on Windows - // if the destination file already exists. - // + boolean needToSave = false; + File curFile = + NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE); + File ckptFile = + NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW); + + // + // If we were in the midst of a checkpoint + // + if (ckptFile.exists()) { + needToSave = true; + if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW) + .exists()) { + // + // checkpointing migth have uploaded a new + // merged image, but we discard it here because we are + // not sure whether the entire merged image was uploaded + // before the namenode crashed. + // + if (!ckptFile.delete()) { + throw new IOException("Unable to delete " + ckptFile); + } + } else { + // + // checkpointing was in progress when the namenode + // shutdown. The fsimage.ckpt was created and the edits.new + // file was moved to edits. We complete that checkpoint by + // moving fsimage.new to fsimage. There is no need to + // update the fstime file here. renameTo fails on Windows + // if the destination file already exists. + // + if (!ckptFile.renameTo(curFile)) { + if (!curFile.delete()) + LOG.warn("Unable to delete dir " + curFile + " before rename"); if (!ckptFile.renameTo(curFile)) { - if (!curFile.delete()) - LOG.warn("Unable to delete dir " + curFile + " before rename"); - if (!ckptFile.renameTo(curFile)) { - throw new IOException("Unable to rename " + ckptFile + - " to " + curFile); - } + throw new IOException("Unable to rename " + ckptFile + + " to " + curFile); } } } - return needToSave; } - - @Override - File getImageFile() { - return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE); - } - - @Override - List getEditsFiles() { - if (latestNameCheckpointTime > latestEditsCheckpointTime) { - // the image is already current, discard edits - LOG.debug( - "Name checkpoint time is newer than edits, not loading edits."); - return Collections.emptyList(); - } - - return getEditsInStorageDir(latestEditsSD); - } - - @Override - StorageDirectory getStorageDirectoryForProperties() { - return latestNameSD; - } + return needToSave; } - + /** * @return a list with the paths to EDITS and EDITS_NEW (if it exists) * in a given storage directory. @@ -269,4 +249,33 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector { } return files; } + + private List getLatestEditsFiles() { + if (latestNameCheckpointTime > latestEditsCheckpointTime) { + // the image is already current, discard edits + LOG.debug( + "Name checkpoint time is newer than edits, not loading edits."); + return Collections.emptyList(); + } + + return getEditsInStorageDir(latestEditsSD); + } + + @Override + long getMaxSeenTxId() { + return 0L; + } + + static Iterable getEditLogStreams(NNStorage storage) + throws IOException { + FSImagePreTransactionalStorageInspector inspector + = new FSImagePreTransactionalStorageInspector(); + storage.inspectStorageDirs(inspector); + + List editStreams = new ArrayList(); + for (File f : inspector.getLatestEditsFiles()) { + editStreams.add(new EditLogFileInputStream(f)); + } + return editStreams; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java index 65bfa0ac556..a7c2949f293 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageStorageInspector.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.util.List; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; @@ -43,60 +44,22 @@ abstract class FSImageStorageInspector { abstract boolean isUpgradeFinalized(); /** - * Create a plan to load the image from the set of inspected storage directories. + * Get the image files which should be loaded into the filesystem. * @throws IOException if not enough files are available (eg no image found in any directory) */ - abstract LoadPlan createLoadPlan() throws IOException; - + abstract FSImageFile getLatestImage() throws IOException; + + /** + * Get the minimum tx id which should be loaded with this set of images. + */ + abstract long getMaxSeenTxId(); + /** * @return true if the directories are in such a state that the image should be re-saved * following the load */ abstract boolean needToSave(); - /** - * A plan to load the namespace from disk, providing the locations from which to load - * the image and a set of edits files. - */ - abstract static class LoadPlan { - /** - * Execute atomic move sequence in the chosen storage directories, - * in order to recover from an interrupted checkpoint. - * @return true if some recovery action was taken - */ - abstract boolean doRecovery() throws IOException; - - /** - * @return the file from which to load the image data - */ - abstract File getImageFile(); - - /** - * @return a list of flies containing edits to replay - */ - abstract List getEditsFiles(); - - /** - * @return the storage directory containing the VERSION file that should be - * loaded. - */ - abstract StorageDirectory getStorageDirectoryForProperties(); - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Will load image file: ").append(getImageFile()).append("\n"); - sb.append("Will load edits files:").append("\n"); - for (File f : getEditsFiles()) { - sb.append(" ").append(f).append("\n"); - } - sb.append("Will load metadata from: ") - .append(getStorageDirectoryForProperties()) - .append("\n"); - return sb.toString(); - } - } - /** * Record of an image that has been located and had its filename parsed. */ @@ -106,7 +69,8 @@ abstract class FSImageStorageInspector { private final File file; FSImageFile(StorageDirectory sd, File file, long txId) { - assert txId >= 0 : "Invalid txid on " + file +": " + txId; + assert txId >= 0 || txId == HdfsConstants.INVALID_TXID + : "Invalid txid on " + file +": " + txId; this.sd = sd; this.txId = txId; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java index 303b8e60c31..1a523737e5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; -import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -52,9 +51,7 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { private boolean isUpgradeFinalized = true; List foundImages = new ArrayList(); - List foundEditLogs = new ArrayList(); - SortedMap logGroups = new TreeMap(); - long maxSeenTxId = 0; + private long maxSeenTxId = 0; private static final Pattern IMAGE_REGEX = Pattern.compile( NameNodeFile.IMAGE.getName() + "_(\\d+)"); @@ -68,6 +65,8 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { return; } + maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); + File currentDir = sd.getCurrentDir(); File filesInStorage[]; try { @@ -110,34 +109,10 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); } - List editLogs - = FileJournalManager.matchEditLogs(filesInStorage); - if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) { - for (EditLogFile log : editLogs) { - addEditLog(log); - } - } else if (!editLogs.isEmpty()){ - LOG.warn("Found the following edit log file(s) in " + sd + - " even though it was not configured to store edits:\n" + - " " + Joiner.on("\n ").join(editLogs)); - - } - // set finalized flag isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); } - private void addEditLog(EditLogFile foundEditLog) { - foundEditLogs.add(foundEditLog); - LogGroup group = logGroups.get(foundEditLog.getFirstTxId()); - if (group == null) { - group = new LogGroup(foundEditLog.getFirstTxId()); - logGroups.put(foundEditLog.getFirstTxId(), group); - } - group.add(foundEditLog); - } - - @Override public boolean isUpgradeFinalized() { return isUpgradeFinalized; @@ -148,9 +123,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { * If there are multiple storage directories which contain equal images * the storage directory that was inspected first will be preferred. * - * Returns null if no images were found. + * @throws FileNotFoundException if not images are found. */ - FSImageFile getLatestImage() { + FSImageFile getLatestImage() throws IOException { + if (foundImages.isEmpty()) { + throw new FileNotFoundException("No valid image files found"); + } + FSImageFile ret = null; for (FSImageFile img : foundImages) { if (ret == null || img.txId > ret.txId) { @@ -164,349 +143,13 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector { return ImmutableList.copyOf(foundImages); } - public List getEditLogFiles() { - return ImmutableList.copyOf(foundEditLogs); - } - - @Override - public LoadPlan createLoadPlan() throws IOException { - if (foundImages.isEmpty()) { - throw new FileNotFoundException("No valid image files found"); - } - - FSImageFile recoveryImage = getLatestImage(); - LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE); - - return new TransactionalLoadPlan(recoveryImage, - logPlan); - } - - /** - * Plan which logs to load in order to bring the namespace up-to-date. - * Transactions will be considered in the range (sinceTxId, maxTxId] - * - * @param sinceTxId the highest txid that is already loaded - * (eg from the image checkpoint) - * @param maxStartTxId ignore any log files that start after this txid - */ - LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException { - long expectedTxId = sinceTxId + 1; - - List recoveryLogs = new ArrayList(); - - SortedMap tailGroups = logGroups.tailMap(expectedTxId); - if (logGroups.size() > tailGroups.size()) { - LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) + - " groups of logs because they start with a txid less than image " + - "txid " + sinceTxId); - } - - SortedMap usefulGroups; - if (maxStartTxId > sinceTxId) { - usefulGroups = tailGroups.headMap(maxStartTxId); - } else { - usefulGroups = new TreeMap(); - } - - if (usefulGroups.size() > tailGroups.size()) { - LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) + - " groups of logs because they start with a txid higher than max " + - "txid " + sinceTxId); - } - - - for (Map.Entry entry : usefulGroups.entrySet()) { - long logStartTxId = entry.getKey(); - LogGroup logGroup = entry.getValue(); - - logGroup.planRecovery(); - - if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) { - throw new IOException("Expected next log group would start at txid " + - expectedTxId + " but starts at txid " + logStartTxId); - } - - // We can pick any of the non-corrupt logs here - recoveryLogs.add(logGroup.getBestNonCorruptLog()); - - // If this log group was finalized, we know to expect the next - // log group to start at the following txid (ie no gaps) - if (logGroup.hasKnownLastTxId()) { - expectedTxId = logGroup.getLastTxId() + 1; - } else { - // the log group was in-progress so we don't know what ID - // the next group should start from. - expectedTxId = HdfsConstants.INVALID_TXID; - } - } - - long lastLogGroupStartTxId = usefulGroups.isEmpty() ? - 0 : usefulGroups.lastKey(); - if (maxSeenTxId > sinceTxId && - maxSeenTxId > lastLogGroupStartTxId) { - String msg = "At least one storage directory indicated it has seen a " + - "log segment starting at txid " + maxSeenTxId; - if (usefulGroups.isEmpty()) { - msg += " but there are no logs to load."; - } else { - msg += " but the most recent log file found starts with txid " + - lastLogGroupStartTxId; - } - throw new IOException(msg); - } - - return new LogLoadPlan(recoveryLogs, - Lists.newArrayList(usefulGroups.values())); - - } - @Override public boolean needToSave() { return needToSave; } - - /** - * A group of logs that all start at the same txid. - * - * Handles determining which logs are corrupt and which should be considered - * candidates for loading. - */ - static class LogGroup { - long startTxId; - List logs = new ArrayList();; - private Set endTxIds = new TreeSet(); - private boolean hasInProgress = false; - private boolean hasFinalized = false; - - LogGroup(long startTxId) { - this.startTxId = startTxId; - } - - EditLogFile getBestNonCorruptLog() { - // First look for non-corrupt finalized logs - for (EditLogFile log : logs) { - if (!log.isCorrupt() && !log.isInProgress()) { - return log; - } - } - // Then look for non-corrupt in-progress logs - for (EditLogFile log : logs) { - if (!log.isCorrupt()) { - return log; - } - } - // We should never get here, because we don't get to the planning stage - // without calling planRecovery first, and if we've called planRecovery, - // we would have already thrown if there were no non-corrupt logs! - throw new IllegalStateException( - "No non-corrupt logs for txid " + startTxId); - } - - /** - * @return true if we can determine the last txid in this log group. - */ - boolean hasKnownLastTxId() { - for (EditLogFile log : logs) { - if (!log.isInProgress()) { - return true; - } - } - return false; - } - - /** - * @return the last txid included in the logs in this group - * @throws IllegalStateException if it is unknown - - * {@see #hasKnownLastTxId()} - */ - long getLastTxId() { - for (EditLogFile log : logs) { - if (!log.isInProgress()) { - return log.getLastTxId(); - } - } - throw new IllegalStateException("LogGroup only has in-progress logs"); - } - - - void add(EditLogFile log) { - assert log.getFirstTxId() == startTxId; - logs.add(log); - - if (log.isInProgress()) { - hasInProgress = true; - } else { - hasFinalized = true; - endTxIds.add(log.getLastTxId()); - } - } - - void planRecovery() throws IOException { - assert hasInProgress || hasFinalized; - - checkConsistentEndTxIds(); - - if (hasFinalized && hasInProgress) { - planMixedLogRecovery(); - } else if (!hasFinalized && hasInProgress) { - planAllInProgressRecovery(); - } else if (hasFinalized && !hasInProgress) { - LOG.debug("No recovery necessary for logs starting at txid " + - startTxId); - } - } - - /** - * Recovery case for when some logs in the group were in-progress, and - * others were finalized. This happens when one of the storage - * directories fails. - * - * The in-progress logs in this case should be considered corrupt. - */ - private void planMixedLogRecovery() throws IOException { - for (EditLogFile log : logs) { - if (log.isInProgress()) { - LOG.warn("Log at " + log.getFile() + " is in progress, but " + - "other logs starting at the same txid " + startTxId + - " are finalized. Moving aside."); - log.markCorrupt(); - } - } - } - - /** - * Recovery case for when all of the logs in the group were in progress. - * This happens if the NN completely crashes and restarts. In this case - * we check the non-zero lengths of each log file, and any logs that are - * less than the max of these lengths are considered corrupt. - */ - private void planAllInProgressRecovery() throws IOException { - // We only have in-progress logs. We need to figure out which logs have - // the latest data to reccover them - LOG.warn("Logs beginning at txid " + startTxId + " were are all " + - "in-progress (probably truncated due to a previous NameNode " + - "crash)"); - if (logs.size() == 1) { - // Only one log, it's our only choice! - EditLogFile log = logs.get(0); - if (log.validateLog().numTransactions == 0) { - // If it has no transactions, we should consider it corrupt just - // to be conservative. - // See comment below for similar case - LOG.warn("Marking log at " + log.getFile() + " as corrupt since " + - "it has no transactions in it."); - log.markCorrupt(); - } - return; - } - - long maxValidTxnCount = Long.MIN_VALUE; - for (EditLogFile log : logs) { - long validTxnCount = log.validateLog().numTransactions; - LOG.warn(" Log " + log.getFile() + - " valid txns=" + validTxnCount + - " valid len=" + log.validateLog().validLength); - maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount); - } - - for (EditLogFile log : logs) { - long txns = log.validateLog().numTransactions; - if (txns < maxValidTxnCount) { - LOG.warn("Marking log at " + log.getFile() + " as corrupt since " + - "it is has only " + txns + " valid txns whereas another " + - "log has " + maxValidTxnCount); - log.markCorrupt(); - } else if (txns == 0) { - // this can happen if the NN crashes right after rolling a log - // but before the START_LOG_SEGMENT txn is written. Since the log - // is empty, we can just move it aside to its corrupt name. - LOG.warn("Marking log at " + log.getFile() + " as corrupt since " + - "it has no transactions in it."); - log.markCorrupt(); - } - } - } - - /** - * Check for the case when we have multiple finalized logs and they have - * different ending transaction IDs. This violates an invariant that all - * log directories should roll together. We should abort in this case. - */ - private void checkConsistentEndTxIds() throws IOException { - if (hasFinalized && endTxIds.size() > 1) { - throw new IOException("More than one ending txid was found " + - "for logs starting at txid " + startTxId + ". " + - "Found: " + StringUtils.join(endTxIds, ',')); - } - } - - void recover() throws IOException { - for (EditLogFile log : logs) { - if (log.isCorrupt()) { - log.moveAsideCorruptFile(); - } else if (log.isInProgress()) { - log.finalizeLog(); - } - } - } - } - - static class TransactionalLoadPlan extends LoadPlan { - final FSImageFile image; - final LogLoadPlan logPlan; - - public TransactionalLoadPlan(FSImageFile image, - LogLoadPlan logPlan) { - super(); - this.image = image; - this.logPlan = logPlan; - } - - @Override - boolean doRecovery() throws IOException { - logPlan.doRecovery(); - return false; - } - - @Override - File getImageFile() { - return image.getFile(); - } - - @Override - List getEditsFiles() { - return logPlan.getEditsFiles(); - } - - @Override - StorageDirectory getStorageDirectoryForProperties() { - return image.sd; - } - } - - static class LogLoadPlan { - final List editLogs; - final List logGroupsToRecover; - - LogLoadPlan(List editLogs, - List logGroupsToRecover) { - this.editLogs = editLogs; - this.logGroupsToRecover = logGroupsToRecover; - } - - public void doRecovery() throws IOException { - for (LogGroup g : logGroupsToRecover) { - g.recover(); - } - } - - public List getEditsFiles() { - List ret = new ArrayList(); - for (EditLogFile log : editLogs) { - ret.add(log.getFile()); - } - return ret; - } + @Override + long getMaxSeenTxId() { + return maxSeenTxId; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 991d7f5c5dd..6e4c17161ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -23,11 +23,14 @@ import org.apache.commons.logging.LogFactory; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.HashMap; import java.util.Comparator; +import java.util.Collections; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; @@ -57,6 +60,9 @@ class FileJournalManager implements JournalManager { private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile( NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)"); + private File currentInProgress = null; + private long maxSeenTransaction = 0L; + @VisibleForTesting StoragePurger purger = new NNStorageRetentionManager.DeletionStoragePurger(); @@ -66,19 +72,20 @@ class FileJournalManager implements JournalManager { } @Override - public EditLogOutputStream startLogSegment(long txid) throws IOException { - File newInProgress = NNStorage.getInProgressEditsFile(sd, txid); - EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress, + synchronized public EditLogOutputStream startLogSegment(long txid) + throws IOException { + currentInProgress = NNStorage.getInProgressEditsFile(sd, txid); + EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress, outputBufferCapacity); stm.create(); return stm; } @Override - public void finalizeLogSegment(long firstTxId, long lastTxId) + synchronized public void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException { - File inprogressFile = NNStorage.getInProgressEditsFile( - sd, firstTxId); + File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId); + File dstFile = NNStorage.getFinalizedEditsFile( sd, firstTxId, lastTxId); LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile); @@ -89,6 +96,9 @@ class FileJournalManager implements JournalManager { if (!inprogressFile.renameTo(dstFile)) { throw new IOException("Unable to finalize edits file " + inprogressFile); } + if (inprogressFile.equals(currentInProgress)) { + currentInProgress = null; + } } @VisibleForTesting @@ -97,12 +107,7 @@ class FileJournalManager implements JournalManager { } @Override - public String toString() { - return "FileJournalManager for storage directory " + sd; - } - - @Override - public void setOutputBufferCapacity(int size) { + synchronized public void setOutputBufferCapacity(int size) { this.outputBufferCapacity = size; } @@ -120,13 +125,6 @@ class FileJournalManager implements JournalManager { } } - @Override - public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) - throws IOException { - File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId); - return new EditLogFileInputStream(f); - } - /** * Find all editlog segments starting at or above the given txid. * @param fromTxId the txnid which to start looking @@ -178,17 +176,156 @@ class FileJournalManager implements JournalManager { try { long startTxId = Long.valueOf(inProgressEditsMatch.group(1)); ret.add( - new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END)); + new EditLogFile(f, startTxId, startTxId, true)); } catch (NumberFormatException nfe) { LOG.error("In-progress edits file " + f + " has improperly " + "formatted transaction ID"); // skip - } + } } } return ret; } + @Override + synchronized public EditLogInputStream getInputStream(long fromTxId) + throws IOException { + for (EditLogFile elf : getLogFiles(fromTxId)) { + if (elf.getFirstTxId() == fromTxId) { + if (elf.isInProgress()) { + elf.validateLog(); + } + if (LOG.isTraceEnabled()) { + LOG.trace("Returning edit stream reading from " + elf); + } + return new EditLogFileInputStream(elf.getFile(), + elf.getFirstTxId(), elf.getLastTxId()); + } + } + + throw new IOException("Cannot find editlog file with " + fromTxId + + " as first first txid"); + } + + @Override + public long getNumberOfTransactions(long fromTxId) + throws IOException, CorruptionException { + long numTxns = 0L; + + for (EditLogFile elf : getLogFiles(fromTxId)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Counting " + elf); + } + if (elf.getFirstTxId() > fromTxId) { // there must be a gap + LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is " + + fromTxId + " - " + (elf.getFirstTxId() - 1)); + break; + } else if (fromTxId == elf.getFirstTxId()) { + if (elf.isInProgress()) { + elf.validateLog(); + } + + if (elf.isCorrupt()) { + break; + } + fromTxId = elf.getLastTxId() + 1; + numTxns += fromTxId - elf.getFirstTxId(); + + if (elf.isInProgress()) { + break; + } + } // else skip + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Journal " + this + " has " + numTxns + + " txns from " + fromTxId); + } + + long max = findMaxTransaction(); + // fromTxId should be greater than max, as it points to the next + // transaction we should expect to find. If it is less than or equal + // to max, it means that a transaction with txid == max has not been found + if (numTxns == 0 && fromTxId <= max) { + String error = String.format("Gap in transactions, max txnid is %d" + + ", 0 txns from %d", max, fromTxId); + LOG.error(error); + throw new CorruptionException(error); + } + + return numTxns; + } + + @Override + synchronized public void recoverUnfinalizedSegments() throws IOException { + File currentDir = sd.getCurrentDir(); + List allLogFiles = matchEditLogs(currentDir.listFiles()); + + // make sure journal is aware of max seen transaction before moving corrupt + // files aside + findMaxTransaction(); + + for (EditLogFile elf : allLogFiles) { + if (elf.getFile().equals(currentInProgress)) { + continue; + } + if (elf.isInProgress()) { + elf.validateLog(); + + if (elf.isCorrupt()) { + elf.moveAsideCorruptFile(); + continue; + } + finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId()); + } + } + } + + private List getLogFiles(long fromTxId) throws IOException { + File currentDir = sd.getCurrentDir(); + List allLogFiles = matchEditLogs(currentDir.listFiles()); + List logFiles = Lists.newArrayList(); + + for (EditLogFile elf : allLogFiles) { + if (fromTxId > elf.getFirstTxId() + && fromTxId <= elf.getLastTxId()) { + throw new IOException("Asked for fromTxId " + fromTxId + + " which is in middle of file " + elf.file); + } + if (fromTxId <= elf.getFirstTxId()) { + logFiles.add(elf); + } + } + + Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID); + + return logFiles; + } + + /** + * Find the maximum transaction in the journal. + * This gets stored in a member variable, as corrupt edit logs + * will be moved aside, but we still need to remember their first + * tranaction id in the case that it was the maximum transaction in + * the journal. + */ + private long findMaxTransaction() + throws IOException { + for (EditLogFile elf : getLogFiles(0)) { + if (elf.isInProgress()) { + maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction); + elf.validateLog(); + } + maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction); + } + return maxSeenTransaction; + } + + @Override + public String toString() { + return String.format("FileJournalManager(root=%s)", sd.getRoot()); + } + /** * Record of an edit log that has been located and had its filename parsed. */ @@ -196,12 +333,10 @@ class FileJournalManager implements JournalManager { private File file; private final long firstTxId; private long lastTxId; - - private EditLogValidation cachedValidation = null; + private boolean isCorrupt = false; - - static final long UNKNOWN_END = -1; - + private final boolean isInProgress; + final static Comparator COMPARE_BY_START_TXID = new Comparator() { public int compare(EditLogFile a, EditLogFile b) { @@ -214,30 +349,24 @@ class FileJournalManager implements JournalManager { EditLogFile(File file, long firstTxId, long lastTxId) { - assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId; - assert firstTxId > 0; + this(file, firstTxId, lastTxId, false); + assert (lastTxId != HdfsConstants.INVALID_TXID) + && (lastTxId >= firstTxId); + } + + EditLogFile(File file, long firstTxId, + long lastTxId, boolean isInProgress) { + assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress) + || (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId); + assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID); assert file != null; this.firstTxId = firstTxId; this.lastTxId = lastTxId; this.file = file; + this.isInProgress = isInProgress; } - public void finalizeLog() throws IOException { - long numTransactions = validateLog().numTransactions; - long lastTxId = firstTxId + numTransactions - 1; - File dst = new File(file.getParentFile(), - NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId)); - LOG.info("Finalizing edits log " + file + " by renaming to " - + dst.getName()); - if (!file.renameTo(dst)) { - throw new IOException("Couldn't finalize log " + - file + " to " + dst); - } - this.lastTxId = lastTxId; - file = dst; - } - long getFirstTxId() { return firstTxId; } @@ -246,15 +375,22 @@ class FileJournalManager implements JournalManager { return lastTxId; } - EditLogValidation validateLog() throws IOException { - if (cachedValidation == null) { - cachedValidation = EditLogFileInputStream.validateEditLog(file); + /** + * Count the number of valid transactions in a log. + * This will update the lastTxId of the EditLogFile or + * mark it as corrupt if it is. + */ + void validateLog() throws IOException { + EditLogValidation val = EditLogFileInputStream.validateEditLog(file); + if (val.getNumTransactions() == 0) { + markCorrupt(); + } else { + this.lastTxId = val.getEndTxId(); } - return cachedValidation; } boolean isInProgress() { - return (lastTxId == UNKNOWN_END); + return isInProgress; } File getFile() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java index f2138ff6940..64c4ce7fd2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java @@ -39,6 +39,25 @@ interface JournalManager { */ void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException; + /** + * Get the input stream starting with fromTxnId from this journal manager + * @param fromTxnId the first transaction id we want to read + * @return the stream starting with transaction fromTxnId + * @throws IOException if a stream cannot be found. + */ + EditLogInputStream getInputStream(long fromTxnId) throws IOException; + + /** + * Get the number of transaction contiguously available from fromTxnId. + * + * @param fromTxnId Transaction id to count from + * @return The number of transactions available from fromTxnId + * @throws IOException if the journal cannot be read. + * @throws CorruptionException if there is a gap in the journal at fromTxnId. + */ + long getNumberOfTransactions(long fromTxnId) + throws IOException, CorruptionException; + /** * Set the amount of memory that this stream should use to buffer edits */ @@ -57,10 +76,21 @@ interface JournalManager { throws IOException; /** - * @return an EditLogInputStream that reads from the same log that - * the edit log is currently writing. May return null if this journal - * manager does not support this operation. - */ - EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) - throws IOException; + * Recover segments which have not been finalized. + */ + void recoverUnfinalizedSegments() throws IOException; + + /** + * Indicate that a journal is cannot be used to load a certain range of + * edits. + * This exception occurs in the case of a gap in the transactions, or a + * corrupt edit file. + */ + public static class CorruptionException extends IOException { + static final long serialVersionUID = -4687802717006172702L; + + public CorruptionException(String reason) { + super(reason); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java index 5e14b96a0c0..b07bad252ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRollback.java @@ -248,7 +248,7 @@ public class TestDFSRollback extends TestCase { baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous"); deleteMatchingFiles(baseDirs, "edits.*"); startNameNodeShouldFail(StartupOption.ROLLBACK, - "but there are no logs to load"); + "No non-corrupt logs for txid "); UpgradeUtilities.createEmptyDirs(nameNodeDirs); log("NameNode rollback with no image file", numDirs); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index ce9b224f22c..5cb26a3e5d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -392,12 +392,9 @@ public abstract class FSImageTestUtil { */ public static EditLogFile findLatestEditsLog(StorageDirectory sd) throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - inspector.inspectDirectory(sd); - - List foundEditLogs = Lists.newArrayList( - inspector.getEditLogFiles()); + File currentDir = sd.getCurrentDir(); + List foundEditLogs + = Lists.newArrayList(FileJournalManager.matchEditLogs(currentDir.listFiles())); return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java index 3454f7f0af3..fbbcfc72f85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckPointForSecurityTokens.java @@ -84,8 +84,10 @@ public class TestCheckPointForSecurityTokens { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); + log.validateLog(); + long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should have 5 transactions", - 5, log.validateLog().numTransactions); + 5, numTransactions);; } // Saving image in safe mode should succeed @@ -99,8 +101,10 @@ public class TestCheckPointForSecurityTokens { for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) { EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd); assertTrue(log.isInProgress()); + log.validateLog(); + long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1; assertEquals("In-progress log " + log + " should only have START txn", - 1, log.validateLog().numTransactions); + 1, numTransactions); } // restart cluster diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 156f8415bae..123810c9dc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -23,6 +23,9 @@ import java.net.URI; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,6 +40,7 @@ import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -54,6 +58,7 @@ import org.aspectj.util.FileUtil; import org.mockito.Mockito; import org.junit.Test; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import static org.apache.hadoop.test.MetricsAsserts.*; @@ -76,7 +81,7 @@ public class TestEditLog extends TestCase { static final int NUM_TRANSACTIONS = 100; static final int NUM_THREADS = 100; - private static final File TEST_DIR = new File( + static final File TEST_DIR = new File( System.getProperty("test.build.data","build/test/data")); /** An edits log with 3 edits from 0.20 - the result of @@ -623,13 +628,23 @@ public class TestEditLog extends TestCase { } public void testCrashRecoveryEmptyLogOneDir() throws Exception { - doTestCrashRecoveryEmptyLog(false); + doTestCrashRecoveryEmptyLog(false, true); } public void testCrashRecoveryEmptyLogBothDirs() throws Exception { - doTestCrashRecoveryEmptyLog(true); + doTestCrashRecoveryEmptyLog(true, true); + } + + public void testCrashRecoveryEmptyLogOneDirNoUpdateSeenTxId() + throws Exception { + doTestCrashRecoveryEmptyLog(false, false); } + public void testCrashRecoveryEmptyLogBothDirsNoUpdateSeenTxId() + throws Exception { + doTestCrashRecoveryEmptyLog(true, false); + } + /** * Test that the NN handles the corruption properly * after it crashes just after creating an edit log @@ -642,8 +657,14 @@ public class TestEditLog extends TestCase { * will only be in one of the directories. In both cases, the * NN should fail to start up, because it's aware that txid 3 * was reached, but unable to find a non-corrupt log starting there. + * @param updateTransactionIdFile if true update the seen_txid file. + * If false, the it will not be updated. This will simulate a case + * where the NN crashed between creating the new segment and updating + * seen_txid. */ - private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception { + private void doTestCrashRecoveryEmptyLog(boolean inBothDirs, + boolean updateTransactionIdFile) + throws Exception { // start a cluster Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = null; @@ -661,6 +682,14 @@ public class TestEditLog extends TestCase { // Make a truncated edits_3_inprogress File log = new File(currentDir, NNStorage.getInProgressEditsFileName(3)); + NNStorage storage = new NNStorage(conf, + Collections.emptyList(), + Lists.newArrayList(uri)); + if (updateTransactionIdFile) { + storage.writeTransactionIdFileToStorage(3); + } + storage.close(); + new EditLogFileOutputStream(log, 1024).create(); if (!inBothDirs) { break; @@ -671,9 +700,9 @@ public class TestEditLog extends TestCase { cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(NUM_DATA_NODES).format(false).build(); fail("Did not fail to start with all-corrupt logs"); - } catch (IllegalStateException ise) { + } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( - "No non-corrupt logs for txid 3", ise); + "No non-corrupt logs for txid 3", ioe); } cluster.shutdown(); } @@ -698,7 +727,17 @@ public class TestEditLog extends TestCase { reader = new FSEditLogOp.Reader(in, version); } + + @Override + public long getFirstTxId() throws IOException { + return HdfsConstants.INVALID_TXID; + } + @Override + public long getLastTxId() throws IOException { + return HdfsConstants.INVALID_TXID; + } + @Override public long length() throws IOException { return len; @@ -848,6 +887,168 @@ public class TestEditLog extends TestCase { Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS); return storage; } - - + + /** + * Specification for a failure during #setupEdits + */ + static class AbortSpec { + final int roll; + final int logindex; + + /** + * Construct the failure specification. + * @param roll number to fail after. e.g. 1 to fail after the first roll + * @param loginfo index of journal to fail. + */ + AbortSpec(int roll, int logindex) { + this.roll = roll; + this.logindex = logindex; + } + } + + final static int TXNS_PER_ROLL = 10; + final static int TXNS_PER_FAIL = 2; + + /** + * Set up directories for tests. + * + * Each rolled file is 10 txns long. + * A failed file is 2 txns long. + * + * @param editUris directories to create edit logs in + * @param numrolls number of times to roll the edit log during setup + * @param abortAtRolls Specifications for when to fail, see AbortSpec + */ + public static NNStorage setupEdits(List editUris, int numrolls, + AbortSpec... abortAtRolls) + throws IOException { + List aborts = new ArrayList(Arrays.asList(abortAtRolls)); + NNStorage storage = new NNStorage(new Configuration(), + Collections.emptyList(), + editUris); + storage.format("test-cluster-id"); + FSEditLog editlog = new FSEditLog(storage); + // open the edit log and add two transactions + // logGenerationStamp is used, simply because it doesn't + // require complex arguments. + editlog.open(); + for (int i = 2; i < TXNS_PER_ROLL; i++) { + editlog.logGenerationStamp((long)0); + } + editlog.logSync(); + + // Go into edit log rolling loop. + // On each roll, the abortAtRolls abort specs are + // checked to see if an abort is required. If so the + // the specified journal is aborted. It will be brought + // back into rotation automatically by rollEditLog + for (int i = 0; i < numrolls; i++) { + editlog.rollEditLog(); + + editlog.logGenerationStamp((long)i); + editlog.logSync(); + + while (aborts.size() > 0 + && aborts.get(0).roll == (i+1)) { + AbortSpec spec = aborts.remove(0); + editlog.getJournals().get(spec.logindex).abort(); + } + + for (int j = 3; j < TXNS_PER_ROLL; j++) { + editlog.logGenerationStamp((long)i); + } + editlog.logSync(); + } + editlog.close(); + + FSImageTestUtil.logStorageContents(LOG, storage); + return storage; + } + + /** + * Test loading an editlog which has had both its storage fail + * on alternating rolls. Two edit log directories are created. + * The first on fails on odd rolls, the second on even. Test + * that we are able to load the entire editlog regardless. + */ + @Test + public void testAlternatingJournalFailure() throws IOException { + File f1 = new File(TEST_DIR + "/alternatingjournaltest0"); + File f2 = new File(TEST_DIR + "/alternatingjournaltest1"); + + List editUris = ImmutableList.of(f1.toURI(), f2.toURI()); + + NNStorage storage = setupEdits(editUris, 10, + new AbortSpec(1, 0), + new AbortSpec(2, 1), + new AbortSpec(3, 0), + new AbortSpec(4, 1), + new AbortSpec(5, 0), + new AbortSpec(6, 1), + new AbortSpec(7, 0), + new AbortSpec(8, 1), + new AbortSpec(9, 0), + new AbortSpec(10, 1)); + long totaltxnread = 0; + FSEditLog editlog = new FSEditLog(storage); + long startTxId = 1; + Iterable editStreams = editlog.selectInputStreams(startTxId, + TXNS_PER_ROLL*11); + + for (EditLogInputStream edits : editStreams) { + FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits); + long read = val.getNumTransactions(); + LOG.info("Loading edits " + edits + " read " + read); + assertEquals(startTxId, val.getStartTxId()); + startTxId += read; + totaltxnread += read; + } + + editlog.close(); + storage.close(); + assertEquals(TXNS_PER_ROLL*11, totaltxnread); + } + + /** + * Test loading an editlog with gaps. A single editlog directory + * is set up. On of the edit log files is deleted. This should + * fail when selecting the input streams as it will not be able + * to select enough streams to load up to 4*TXNS_PER_ROLL. + * There should be 4*TXNS_PER_ROLL transactions as we rolled 3 + * times. + */ + @Test + public void testLoadingWithGaps() throws IOException { + File f1 = new File(TEST_DIR + "/gaptest0"); + List editUris = ImmutableList.of(f1.toURI()); + + NNStorage storage = setupEdits(editUris, 3); + + final long startGapTxId = 1*TXNS_PER_ROLL + 1; + final long endGapTxId = 2*TXNS_PER_ROLL; + + File[] files = new File(f1, "current").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId, + endGapTxId))) { + return true; + } + return false; + } + }); + assertEquals(1, files.length); + assertTrue(files[0].delete()); + + FSEditLog editlog = new FSEditLog(storage); + long startTxId = 1; + try { + Iterable editStreams + = editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL); + + fail("Should have thrown exception"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "No non-corrupt logs for txid " + startGapTxId, ioe); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java index a673c5f3b37..1228bef6044 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java @@ -63,8 +63,8 @@ public class TestEditLogFileOutputStream { EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog); assertEquals("Edit log should contain a header as valid length", - HEADER_LEN, validation.validLength); - assertEquals(1, validation.numTransactions); + HEADER_LEN, validation.getValidLength()); + assertEquals(1, validation.getNumTransactions()); assertEquals("Edit log should have 1MB of bytes allocated", 1024*1024, editLog.length()); @@ -72,12 +72,12 @@ public class TestEditLogFileOutputStream { cluster.getFileSystem().mkdirs(new Path("/tmp"), new FsPermission((short)777)); - long oldLength = validation.validLength; + long oldLength = validation.getValidLength(); validation = EditLogFileInputStream.validateEditLog(editLog); assertTrue("Edit log should have more valid data after writing a txn " + - "(was: " + oldLength + " now: " + validation.validLength + ")", - validation.validLength > oldLength); - assertEquals(2, validation.numTransactions); + "(was: " + oldLength + " now: " + validation.getValidLength() + ")", + validation.getValidLength() > oldLength); + assertEquals(2, validation.getNumTransactions()); assertEquals("Edit log should be 1MB long", 1024 * 1024, editLog.length()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index a0cd69eacc8..3c7e47981a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -186,8 +186,8 @@ public class TestFSEditLogLoader { // Make sure that uncorrupted log has the expected length and number // of transactions. EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile); - assertEquals(NUM_TXNS + 2, validation.numTransactions); - assertEquals(validLength, validation.validLength); + assertEquals(NUM_TXNS + 2, validation.getNumTransactions()); + assertEquals(validLength, validation.getValidLength()); // Back up the uncorrupted log File logFileBak = new File(testDir, logFile.getName() + ".bak"); @@ -203,8 +203,8 @@ public class TestFSEditLogLoader { truncateFile(logFile, txOffset); validation = EditLogFileInputStream.validateEditLog(logFile); assertEquals("Failed when truncating to length " + txOffset, - txid - 1, validation.numTransactions); - assertEquals(txOffset, validation.validLength); + txid - 1, validation.getNumTransactions()); + assertEquals(txOffset, validation.getValidLength()); // Restore backup, truncate the file with one byte in the txn, // also isn't valid @@ -212,24 +212,24 @@ public class TestFSEditLogLoader { truncateFile(logFile, txOffset + 1); validation = EditLogFileInputStream.validateEditLog(logFile); assertEquals("Failed when truncating to length " + (txOffset + 1), - txid - 1, validation.numTransactions); - assertEquals(txOffset, validation.validLength); + txid - 1, validation.getNumTransactions()); + assertEquals(txOffset, validation.getValidLength()); // Restore backup, corrupt the txn opcode Files.copy(logFileBak, logFile); corruptByteInFile(logFile, txOffset); validation = EditLogFileInputStream.validateEditLog(logFile); assertEquals("Failed when corrupting txn opcode at " + txOffset, - txid - 1, validation.numTransactions); - assertEquals(txOffset, validation.validLength); + txid - 1, validation.getNumTransactions()); + assertEquals(txOffset, validation.getValidLength()); // Restore backup, corrupt a byte a few bytes into the txn Files.copy(logFileBak, logFile); corruptByteInFile(logFile, txOffset+5); validation = EditLogFileInputStream.validateEditLog(logFile); assertEquals("Failed when corrupting txn data at " + (txOffset+5), - txid - 1, validation.numTransactions); - assertEquals(txOffset, validation.validLength); + txid - 1, validation.getNumTransactions()); + assertEquals(txOffset, validation.getValidLength()); } // Corrupt the log at every offset to make sure that validation itself @@ -240,8 +240,8 @@ public class TestFSEditLogLoader { Files.copy(logFileBak, logFile); corruptByteInFile(logFile, offset); EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile); - assertTrue(val.numTransactions >= prevNumValid); - prevNumValid = val.numTransactions; + assertTrue(val.getNumTransactions() >= prevNumValid); + prevNumValid = val.getNumTransactions(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java index 113dcbc3393..649c4152871 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImageStorageInspector.java @@ -36,9 +36,6 @@ import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan; -import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup; -import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan; import org.junit.Test; import org.mockito.Mockito; @@ -63,335 +60,14 @@ public class TestFSImageStorageInspector { "/foo/current/" + getInProgressEditsFileName(457)); inspector.inspectDirectory(mockDir); - mockLogValidation(inspector, - "/foo/current/" + getInProgressEditsFileName(457), 10); - - assertEquals(2, inspector.foundEditLogs.size()); assertEquals(2, inspector.foundImages.size()); - assertTrue(inspector.foundEditLogs.get(1).isInProgress()); - + FSImageFile latestImage = inspector.getLatestImage(); assertEquals(456, latestImage.txId); assertSame(mockDir, latestImage.sd); assertTrue(inspector.isUpgradeFinalized()); - LoadPlan plan = inspector.createLoadPlan(); - LOG.info("Plan: " + plan); - assertEquals(new File("/foo/current/"+getImageFileName(456)), - plan.getImageFile()); - assertArrayEquals(new File[] { - new File("/foo/current/" + getInProgressEditsFileName(457)) }, - plan.getEditsFiles().toArray(new File[0])); - } - - /** - * Test that we check for gaps in txids when devising a load plan. - */ - @Test - public void testPlanWithGaps() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - - StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.IMAGE_AND_EDITS, - false, - "/foo/current/" + getImageFileName(123), - "/foo/current/" + getImageFileName(456), - "/foo/current/" + getFinalizedEditsFileName(457,900), - "/foo/current/" + getFinalizedEditsFileName(901,950), - "/foo/current/" + getFinalizedEditsFileName(952,1000)); // <-- missing edit 951! - - inspector.inspectDirectory(mockDir); - try { - inspector.createLoadPlan(); - fail("Didn't throw IOE trying to load with gaps in edits"); - } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains( - "would start at txid 951 but starts at txid 952")); - } - } - - /** - * Test the case where an in-progress log comes in the middle of a sequence - * of logs - */ - @Test - public void testPlanWithInProgressInMiddle() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - - StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.IMAGE_AND_EDITS, - false, - "/foo/current/" + getImageFileName(123), - "/foo/current/" + getImageFileName(456), - "/foo/current/" + getFinalizedEditsFileName(457,900), - "/foo/current/" + getInProgressEditsFileName(901), // <-- inprogress in middle - "/foo/current/" + getFinalizedEditsFileName(952,1000)); - - inspector.inspectDirectory(mockDir); - mockLogValidation(inspector, - "/foo/current/" + getInProgressEditsFileName(901), 51); - - LoadPlan plan = inspector.createLoadPlan(); - LOG.info("Plan: " + plan); - - assertEquals(new File("/foo/current/" + getImageFileName(456)), - plan.getImageFile()); - assertArrayEquals(new File[] { - new File("/foo/current/" + getFinalizedEditsFileName(457,900)), - new File("/foo/current/" + getInProgressEditsFileName(901)), - new File("/foo/current/" + getFinalizedEditsFileName(952,1000)) }, - plan.getEditsFiles().toArray(new File[0])); - - } - - - /** - * Test case for the usual case where no recovery of a log group is necessary - * (i.e all logs have the same start and end txids and finalized) - */ - @Test - public void testLogGroupRecoveryNoop() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo1/current/" - + getFinalizedEditsFileName(123,456))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo2/current/" - + getFinalizedEditsFileName(123,456))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo3/current/" - + getFinalizedEditsFileName(123,456))); - LogGroup lg = inspector.logGroups.get(123L); - assertEquals(3, lg.logs.size()); - - lg.planRecovery(); - - assertFalse(lg.logs.get(0).isCorrupt()); - assertFalse(lg.logs.get(1).isCorrupt()); - assertFalse(lg.logs.get(2).isCorrupt()); - } - - /** - * Test case where we have some in-progress and some finalized logs - * for a given txid. - */ - @Test - public void testLogGroupRecoveryMixed() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo1/current/" - + getFinalizedEditsFileName(123,456))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo2/current/" - + getFinalizedEditsFileName(123,456))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo3/current/" - + getInProgressEditsFileName(123))); - inspector.inspectDirectory(FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.IMAGE, - false, - "/foo4/current/" + getImageFileName(122))); - - LogGroup lg = inspector.logGroups.get(123L); - assertEquals(3, lg.logs.size()); - EditLogFile inProgressLog = lg.logs.get(2); - assertTrue(inProgressLog.isInProgress()); - - LoadPlan plan = inspector.createLoadPlan(); - - // Check that it was marked corrupt. - assertFalse(lg.logs.get(0).isCorrupt()); - assertFalse(lg.logs.get(1).isCorrupt()); - assertTrue(lg.logs.get(2).isCorrupt()); - - - // Calling recover should move it aside - inProgressLog = spy(inProgressLog); - Mockito.doNothing().when(inProgressLog).moveAsideCorruptFile(); - lg.logs.set(2, inProgressLog); - - plan.doRecovery(); - - Mockito.verify(inProgressLog).moveAsideCorruptFile(); - } - - /** - * Test case where we have finalized logs with different end txids - */ - @Test - public void testLogGroupRecoveryInconsistentEndTxIds() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo1/current/" - + getFinalizedEditsFileName(123,456))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo2/current/" - + getFinalizedEditsFileName(123,678))); - - LogGroup lg = inspector.logGroups.get(123L); - assertEquals(2, lg.logs.size()); - - try { - lg.planRecovery(); - fail("Didn't throw IOE on inconsistent end txids"); - } catch (IOException ioe) { - assertTrue(ioe.getMessage().contains("More than one ending txid")); - } - } - - /** - * Test case where we have only in-progress logs and need to synchronize - * based on valid length. - */ - @Test - public void testLogGroupRecoveryInProgress() throws IOException { - String paths[] = new String[] { - "/foo1/current/" + getInProgressEditsFileName(123), - "/foo2/current/" + getInProgressEditsFileName(123), - "/foo3/current/" + getInProgressEditsFileName(123) - }; - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[0])); - inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[1])); - inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[2])); - - // Inject spies to return the valid counts we would like to see - mockLogValidation(inspector, paths[0], 2000); - mockLogValidation(inspector, paths[1], 2000); - mockLogValidation(inspector, paths[2], 1000); - - LogGroup lg = inspector.logGroups.get(123L); - assertEquals(3, lg.logs.size()); - - lg.planRecovery(); - - // Check that the short one was marked corrupt - assertFalse(lg.logs.get(0).isCorrupt()); - assertFalse(lg.logs.get(1).isCorrupt()); - assertTrue(lg.logs.get(2).isCorrupt()); - - // Calling recover should move it aside - EditLogFile badLog = lg.logs.get(2); - Mockito.doNothing().when(badLog).moveAsideCorruptFile(); - Mockito.doNothing().when(lg.logs.get(0)).finalizeLog(); - Mockito.doNothing().when(lg.logs.get(1)).finalizeLog(); - - lg.recover(); - - Mockito.verify(badLog).moveAsideCorruptFile(); - Mockito.verify(lg.logs.get(0)).finalizeLog(); - Mockito.verify(lg.logs.get(1)).finalizeLog(); - } - - /** - * Mock out the log at the given path to return a specified number - * of transactions upon validation. - */ - private void mockLogValidation( - FSImageTransactionalStorageInspector inspector, - String path, int numValidTransactions) throws IOException { - - for (LogGroup lg : inspector.logGroups.values()) { - List logs = lg.logs; - for (int i = 0; i < logs.size(); i++) { - EditLogFile log = logs.get(i); - if (log.getFile().getPath().equals(path)) { - // mock out its validation - EditLogFile spyLog = spy(log); - doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions)) - .when(spyLog).validateLog(); - logs.set(i, spyLog); - return; - } - } - } - fail("No log found to mock out at " + path); - } - - /** - * Test when edits and image are in separate directories. - */ - @Test - public void testCurrentSplitEditsAndImage() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - - StorageDirectory mockImageDir = FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.IMAGE, - false, - "/foo/current/" + getImageFileName(123)); - StorageDirectory mockImageDir2 = FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.IMAGE, - false, - "/foo2/current/" + getImageFileName(456)); - StorageDirectory mockEditsDir = FSImageTestUtil.mockStorageDirectory( - NameNodeDirType.EDITS, - false, - "/foo3/current/" + getFinalizedEditsFileName(123, 456), - "/foo3/current/" + getInProgressEditsFileName(457)); - - inspector.inspectDirectory(mockImageDir); - inspector.inspectDirectory(mockEditsDir); - inspector.inspectDirectory(mockImageDir2); - - mockLogValidation(inspector, - "/foo3/current/" + getInProgressEditsFileName(457), 2); - - assertEquals(2, inspector.foundEditLogs.size()); - assertEquals(2, inspector.foundImages.size()); - assertTrue(inspector.foundEditLogs.get(1).isInProgress()); - assertTrue(inspector.isUpgradeFinalized()); - - // Check plan - TransactionalLoadPlan plan = - (TransactionalLoadPlan)inspector.createLoadPlan(); - FSImageFile pickedImage = plan.image; - assertEquals(456, pickedImage.txId); - assertSame(mockImageDir2, pickedImage.sd); - assertEquals(new File("/foo2/current/" + getImageFileName(456)), - plan.getImageFile()); - assertArrayEquals(new File[] { - new File("/foo3/current/" + getInProgressEditsFileName(457)) - }, plan.getEditsFiles().toArray(new File[0])); - } - - /** - * Test case where an in-progress log is in an earlier name directory - * than a finalized log. Previously, getEditLogManifest wouldn't - * see this log. - */ - @Test - public void testLogManifestInProgressComesFirst() throws IOException { - FSImageTransactionalStorageInspector inspector = - new FSImageTransactionalStorageInspector(); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo1/current/" - + getFinalizedEditsFileName(2622,2623), - "/foo1/current/" - + getFinalizedEditsFileName(2624,2625), - "/foo1/current/" - + getInProgressEditsFileName(2626))); - inspector.inspectDirectory( - mockDirectoryWithEditLogs("/foo2/current/" - + getFinalizedEditsFileName(2622,2623), - "/foo2/current/" - + getFinalizedEditsFileName(2624,2625), - "/foo2/current/" - + getFinalizedEditsFileName(2626,2627), - "/foo2/current/" - + getFinalizedEditsFileName(2628,2629))); - } - - static StorageDirectory mockDirectoryWithEditLogs(String... fileNames) { - return FSImageTestUtil.mockStorageDirectory(NameNodeDirType.EDITS, false, fileNames); + latestImage.getFile()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index 748caf4d4cb..d2f9781bed9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -19,17 +19,277 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.*; -import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; +import java.io.RandomAccessFile; +import java.io.File; +import java.io.FilenameFilter; +import java.io.BufferedInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.security.SecurityUtil; +import org.junit.Test; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.test.GenericTestUtils; -import org.junit.Test; +import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; +import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec; +import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL; +import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL; +import com.google.common.collect.ImmutableList; import com.google.common.base.Joiner; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; + public class TestFileJournalManager { + /** + * Test the normal operation of loading transactions from + * file journal manager. 3 edits directories are setup without any + * failures. Test that we read in the expected number of transactions. + */ + @Test + public void testNormalOperation() throws IOException { + File f1 = new File(TestEditLog.TEST_DIR + "/normtest0"); + File f2 = new File(TestEditLog.TEST_DIR + "/normtest1"); + File f3 = new File(TestEditLog.TEST_DIR + "/normtest2"); + + List editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI()); + NNStorage storage = setupEdits(editUris, 5); + + long numJournals = 0; + for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + numJournals++; + } + assertEquals(3, numJournals); + } + + /** + * Test that inprogress files are handled correct. Set up a single + * edits directory. Fail on after the last roll. Then verify that the + * logs have the expected number of transactions. + */ + @Test + public void testInprogressRecovery() throws IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest0"); + // abort after the 5th roll + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), + 5, new AbortSpec(5, 0)); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, + jm.getNumberOfTransactions(1)); + } + + /** + * Test a mixture of inprogress files and finalised. Set up 3 edits + * directories and fail the second on the last roll. Verify that reading + * the transactions, reads from the finalised directories. + */ + @Test + public void testInprogressRecoveryMixed() throws IOException { + File f1 = new File(TestEditLog.TEST_DIR + "/mixtest0"); + File f2 = new File(TestEditLog.TEST_DIR + "/mixtest1"); + File f3 = new File(TestEditLog.TEST_DIR + "/mixtest2"); + + List editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI()); + + // abort after the 5th roll + NNStorage storage = setupEdits(editUris, + 5, new AbortSpec(5, 1)); + Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); + StorageDirectory sd = dirs.next(); + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + + sd = dirs.next(); + jm = new FileJournalManager(sd); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + + sd = dirs.next(); + jm = new FileJournalManager(sd); + assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1)); + } + + /** + * Test that FileJournalManager behaves correctly despite inprogress + * files in all its edit log directories. Set up 3 directories and fail + * all on the last roll. Verify that the correct number of transaction + * are then loaded. + */ + @Test + public void testInprogressRecoveryAll() throws IOException { + File f1 = new File(TestEditLog.TEST_DIR + "/failalltest0"); + File f2 = new File(TestEditLog.TEST_DIR + "/failalltest1"); + File f3 = new File(TestEditLog.TEST_DIR + "/failalltest2"); + + List editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI()); + // abort after the 5th roll + NNStorage storage = setupEdits(editUris, 5, + new AbortSpec(5, 0), + new AbortSpec(5, 1), + new AbortSpec(5, 2)); + Iterator dirs = storage.dirIterator(NameNodeDirType.EDITS); + StorageDirectory sd = dirs.next(); + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + + sd = dirs.next(); + jm = new FileJournalManager(sd); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + + sd = dirs.next(); + jm = new FileJournalManager(sd); + assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1)); + } + + /** + * Corrupt an edit log file after the start segment transaction + */ + private void corruptAfterStartSegment(File f) throws IOException { + RandomAccessFile raf = new RandomAccessFile(f, "rw"); + raf.seek(0x16); // skip version and first tranaction and a bit of next transaction + for (int i = 0; i < 1000; i++) { + raf.writeInt(0xdeadbeef); + } + raf.close(); + } + + /** + * Test that we can read from a stream created by FileJournalManager. + * Create a single edits directory, failing it on the final roll. + * Then try loading from the point of the 3rd roll. Verify that we read + * the correct number of transactions from this point. + */ + @Test + public void testReadFromStream() throws IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest1"); + // abort after 10th roll + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), + 10, new AbortSpec(10, 0)); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + FileJournalManager jm = new FileJournalManager(sd); + long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; + assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1)); + + long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files + long startingTxId = skippedTxns + 1; + + long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId); + long numLoaded = 0; + while (numLoaded < numTransactionsToLoad) { + EditLogInputStream editIn = jm.getInputStream(startingTxId); + FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn); + long count = val.getNumTransactions(); + + editIn.close(); + startingTxId += count; + numLoaded += count; + } + + assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded); + } + + /** + * Try to make a request with a start transaction id which doesn't + * match the start ID of some log segment. + * This should fail as edit logs must currently be treated as indevisable + * units. + */ + @Test(expected=IOException.class) + public void testAskForTransactionsMidfile() throws IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2"); + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), + 10); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + FileJournalManager jm = new FileJournalManager(sd); + jm.getNumberOfTransactions(2); + } + + /** + * Test that we receive the correct number of transactions when we count + * the number of transactions around gaps. + * Set up a single edits directory, with no failures. Delete the 4th logfile. + * Test that getNumberOfTransactions returns the correct number of + * transactions before this gap and after this gap. Also verify that if you + * try to count on the gap that an exception is thrown. + */ + @Test + public void testManyLogsWithGaps() throws IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest3"); + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), 10); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + final long startGapTxId = 3*TXNS_PER_ROLL + 1; + final long endGapTxId = 4*TXNS_PER_ROLL; + File[] files = new File(f, "current").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId, endGapTxId))) { + return true; + } + return false; + } + }); + assertEquals(1, files.length); + assertTrue(files[0].delete()); + + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1)); + + try { + jm.getNumberOfTransactions(startGapTxId); + fail("Should have thrown an exception by now"); + } catch (IOException ioe) { + assertTrue(true); + } + + // rolled 10 times so there should be 11 files. + assertEquals(11*TXNS_PER_ROLL - endGapTxId, + jm.getNumberOfTransactions(endGapTxId+1)); + } + + /** + * Test that we can load an edits directory with a corrupt inprogress file. + * The corrupt inprogress file should be moved to the side. + */ + @Test + public void testManyLogsWithCorruptInprogress() throws IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest5"); + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), 10, new AbortSpec(10, 0)); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + File[] files = new File(f, "current").listFiles(new FilenameFilter() { + public boolean accept(File dir, String name) { + if (name.startsWith("edits_inprogress")) { + return true; + } + return false; + } + }); + assertEquals(files.length, 1); + + corruptAfterStartSegment(files[0]); + + FileJournalManager jm = new FileJournalManager(sd); + assertEquals(10*TXNS_PER_ROLL+1, + jm.getNumberOfTransactions(1)); + } + @Test public void testGetRemoteEditLog() throws IOException { StorageDirectory sd = FSImageTestUtil.mockStorageDirectory( @@ -58,5 +318,4 @@ public class TestFileJournalManager { FileJournalManager fjm, long firstTxId) throws IOException { return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId)); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java index 31a77778bf0..b024bab1d79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameEditsConfigs.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import junit.framework.TestCase; import java.io.*; import java.util.Random; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -80,10 +81,12 @@ public class TestNameEditsConfigs extends TestCase { assertTrue("Expect no images in " + dir, ins.foundImages.isEmpty()); } + List editlogs + = FileJournalManager.matchEditLogs(new File(dir, "current").listFiles()); if (shouldHaveEdits) { - assertTrue("Expect edits in " + dir, ins.foundEditLogs.size() > 0); + assertTrue("Expect edits in " + dir, editlogs.size() > 0); } else { - assertTrue("Expect no edits in " + dir, ins.foundEditLogs.isEmpty()); + assertTrue("Expect no edits in " + dir, editlogs.isEmpty()); } }