diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ca6d2bfc0c0..0a89cd40f36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -199,6 +199,8 @@ Release 2.0.0 - UNRELEASED HDFS-3102. Add CLI tool to initialize the shared-edits dir. (atm) + HDFS-3004. Implement Recovery Mode. (Colin Patrick McCabe via eli) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index c84f57d350e..31a38c7aff5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -264,4 +264,10 @@ + + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index 636471a450f..9d070d9637f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -94,8 +94,8 @@ public int getVersion() throws IOException { } @Override - public FSEditLogOp readOp() throws IOException { - return reader.readOp(); + protected FSEditLogOp nextOp() throws IOException { + return reader.readOp(false); } @Override @@ -123,12 +123,6 @@ public String getName() { lh.toString(), firstTxId, lastTxId); } - @Override - public JournalType getType() { - assert (false); - return null; - } - // TODO(HA): Test this. @Override public boolean isInProgress() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java index 6557b96e18a..41f0292e548 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogTestUtil.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; /** * Utilities for testing edit logs */ public class FSEditLogTestUtil { + private static OpInstanceCache cache = new OpInstanceCache(); + public static FSEditLogOp getNoOpInstance() { - return FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT); + return FSEditLogOp.LogSegmentOp.getInstance(cache, + FSEditLogOpCodes.OP_END_LOG_SEGMENT); } public static long countTransactionsInStream(EditLogInputStream in) @@ -32,4 +36,4 @@ public static long countTransactionsInStream(EditLogInputStream in) FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); return validation.getNumTransactions(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/hdfs_user_guide.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/hdfs_user_guide.xml index 976800e0350..6c0a846b81c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/hdfs_user_guide.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/docs/src/documentation/content/xdocs/hdfs_user_guide.xml @@ -537,7 +537,32 @@ For command usage, see fetchdt command.

-
Upgrade and Rollback +
+
Recovery Mode +

Typically, you will configure multiple metadata storage locations. + Then, if one storage location is corrupt, you can read the + metadata from one of the other storage locations.

+ +

However, what can you do if the only storage locations available are + corrupt? In this case, there is a special NameNode startup mode called + Recovery mode that may allow you to recover most of your data.

+ +

You can start the NameNode in recovery mode like so: + namenode -recover

+ +

When in recovery mode, the NameNode will interactively prompt you at + the command line about possible courses of action you can take to + recover your data.

+ +

If you don't want to be prompted, you can give the + -force option. This option will force + recovery mode to always select the first choice. Normally, this + will be the most reasonable choice.

+ +

Because Recovery mode can cause you to lose data, you should always + back up your edit log and fsimage before using it.

+
+
Upgrade and Rollback

When Hadoop is upgraded on an existing cluster, as with any software upgrade, it is possible there are new bugs or diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index 710fa4df35e..00275c5917c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext; /************************************ * Some handy internal HDFS constants @@ -54,13 +55,18 @@ static public enum StartupOption{ FINALIZE("-finalize"), IMPORT ("-importCheckpoint"), BOOTSTRAPSTANDBY("-bootstrapStandby"), - INITIALIZESHAREDEDITS("-initializeSharedEdits"); + INITIALIZESHAREDEDITS("-initializeSharedEdits"), + RECOVER ("-recover"), + FORCE("-force"); private String name = null; // Used only with format and upgrade options private String clusterId = null; + // Used only with recovery option + private int force = 0; + private StartupOption(String arg) {this.name = arg;} public String getName() {return name;} public NamenodeRole toNodeRole() { @@ -77,10 +83,24 @@ public NamenodeRole toNodeRole() { public void setClusterId(String cid) { clusterId = cid; } - + public String getClusterId() { return clusterId; } + + public MetaRecoveryContext createRecoveryContext() { + if (!name.equals(RECOVER.name)) + return null; + return new MetaRecoveryContext(force); + } + + public void setForce(int force) { + this.force = force; + } + + public int getForce() { + return this.force; + } } // Timeouts for communicating with DataNode for streaming writes/reads diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index 3bf5d66640a..85f0245928c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -213,19 +213,21 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data) LOG.debug("data:" + StringUtils.byteToHexString(data)); } - FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); + FSEditLogLoader logLoader = + new FSEditLogLoader(namesystem, lastAppliedTxId); int logVersion = storage.getLayoutVersion(); backupInputStream.setBytes(data, logVersion); - long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, - true, lastAppliedTxId + 1); - if (numLoaded != numTxns) { + long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, + backupInputStream, true, lastAppliedTxId + 1, null); + if (numTxnsAdvanced != numTxns) { throw new IOException("Batch of txns starting at txnid " + firstTxId + " was supposed to contain " + numTxns + - " transactions but only was able to apply " + numLoaded); + " transactions, but we were only able to advance by " + + numTxnsAdvanced); } - lastAppliedTxId += numTxns; - + lastAppliedTxId = logLoader.getLastAppliedTxId(); + namesystem.dir.updateCountForINodeWithQuota(); // inefficient! } finally { backupInputStream.clear(); @@ -275,7 +277,7 @@ private boolean tryConvergeJournalSpool() throws IOException { editStreams.add(s); } } - loadEdits(editStreams, namesystem); + loadEdits(editStreams, namesystem, null); } // now, need to load the in-progress file @@ -309,12 +311,11 @@ private boolean tryConvergeJournalSpool() throws IOException { LOG.info("Going to finish converging with remaining " + remainingTxns + " txns from in-progress stream " + stream); - FSEditLogLoader loader = new FSEditLogLoader(namesystem); - long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1); - lastAppliedTxId += numLoaded; - assert numLoaded == remainingTxns : - "expected to load " + remainingTxns + " but loaded " + - numLoaded + " from " + stream; + FSEditLogLoader loader = + new FSEditLogLoader(namesystem, lastAppliedTxId); + loader.loadFSEdits(stream, lastAppliedTxId + 1, null); + lastAppliedTxId = loader.getLastAppliedTxId(); + assert lastAppliedTxId == getEditLog().getLastWrittenTxId(); } finally { FSEditLog.closeAllStreams(editStreams); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index 6ae931fd44f..fcdea9c8315 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -292,6 +292,6 @@ static void rollForwardByApplyingLogs( } LOG.info("Checkpointer about to load edits from " + editsStreams.size() + " stream(s)."); - dstImage.loadEdits(editsStreams, dstNamesystem); + dstImage.loadEdits(editsStreams, dstNamesystem, null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index a0fb8fe6291..1f514cdfc8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -70,21 +70,25 @@ int length() { reader = null; } - @Override // JournalStream + @Override public String getName() { return address; } - @Override // JournalStream - public JournalType getType() { - return JournalType.BACKUP; + @Override + protected FSEditLogOp nextOp() throws IOException { + Preconditions.checkState(reader != null, + "Must call setBytes() before readOp()"); + return reader.readOp(false); } @Override - public FSEditLogOp readOp() throws IOException { - Preconditions.checkState(reader != null, - "Must call setBytes() before readOp()"); - return reader.readOp(); + protected FSEditLogOp nextValidOp() { + try { + return reader.readOp(true); + } catch (IOException e) { + throw new RuntimeException("got unexpected IOException " + e, e); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 49741861f87..0b00187c662 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -89,24 +89,6 @@ public EditLogFileInputStream(File name, long firstTxId, long lastTxId, this.isInProgress = isInProgress; } - /** - * Skip over a number of transactions. Subsequent calls to - * {@link EditLogFileInputStream#readOp()} will begin after these skipped - * transactions. If more transactions are requested to be skipped than remain - * in the edit log, all edit log ops in the log will be skipped and subsequent - * calls to {@link EditLogInputStream#readOp} will return null. - * - * @param transactionsToSkip number of transactions to skip over. - * @throws IOException if there's an error while reading an operation - */ - public void skipTransactions(long transactionsToSkip) throws IOException { - assert firstTxId != HdfsConstants.INVALID_TXID && - lastTxId != HdfsConstants.INVALID_TXID; - for (long i = 0; i < transactionsToSkip; i++) { - reader.readOp(); - } - } - @Override public long getFirstTxId() throws IOException { return firstTxId; @@ -117,19 +99,23 @@ public long getLastTxId() throws IOException { return lastTxId; } - @Override // JournalStream + @Override public String getName() { return file.getPath(); } - @Override // JournalStream - public JournalType getType() { - return JournalType.FILE; - } - @Override - public FSEditLogOp readOp() throws IOException { - return reader.readOp(); + protected FSEditLogOp nextOp() throws IOException { + return reader.readOp(false); + } + + @Override + protected FSEditLogOp nextValidOp() { + try { + return reader.readOp(true); + } catch (IOException e) { + return null; + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index 7a7f8d8743a..c2b42be2461 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -34,7 +34,14 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public abstract class EditLogInputStream implements JournalStream, Closeable { +public abstract class EditLogInputStream implements Closeable { + private FSEditLogOp cachedOp = null; + + /** + * @return the name of the EditLogInputStream + */ + public abstract String getName(); + /** * @return the first transaction which will be found in this stream */ @@ -57,8 +64,81 @@ public abstract class EditLogInputStream implements JournalStream, Closeable { * @return an operation from the stream or null if at end of stream * @throws IOException if there is an error reading from the stream */ - public abstract FSEditLogOp readOp() throws IOException; + public FSEditLogOp readOp() throws IOException { + FSEditLogOp ret; + if (cachedOp != null) { + ret = cachedOp; + cachedOp = null; + return ret; + } + return nextOp(); + } + /** + * Position the stream so that a valid operation can be read from it with + * readOp(). + * + * This method can be used to skip over corrupted sections of edit logs. + */ + public void resync() throws IOException { + if (cachedOp != null) { + return; + } + cachedOp = nextValidOp(); + } + + /** + * Get the next operation from the stream storage. + * + * @return an operation from the stream or null if at end of stream + * @throws IOException if there is an error reading from the stream + */ + protected abstract FSEditLogOp nextOp() throws IOException; + + /** + * Get the next valid operation from the stream storage. + * + * This is exactly like nextOp, except that we attempt to skip over damaged + * parts of the edit log + * + * @return an operation from the stream or null if at end of stream + */ + protected FSEditLogOp nextValidOp() { + // This is a trivial implementation which just assumes that any errors mean + // that there is nothing more of value in the log. Subclasses that support + // error recovery will want to override this. + try { + return nextOp(); + } catch (IOException e) { + return null; + } + } + + /** + * Skip edit log operations up to a given transaction ID, or until the + * end of the edit log is reached. + * + * After this function returns, the next call to readOp will return either + * end-of-file (null) or a transaction with a txid equal to or higher than + * the one we asked for. + * + * @param txid The transaction ID to read up until. + * @return Returns true if we found a transaction ID greater than + * or equal to 'txid' in the log. + */ + public boolean skipUntil(long txid) throws IOException { + while (true) { + FSEditLogOp op = readOp(); + if (op == null) { + return false; + } + if (op.getTransactionId() >= txid) { + cachedOp = op; + return true; + } + } + } + /** * Get the layout version of the data in the stream. * @return the layout version of the ops in the stream. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java index d0fc1568015..f2cbcb30f60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.io.Closeable; import static org.apache.hadoop.hdfs.server.common.Util.now; @@ -30,7 +31,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Evolving -public abstract class EditLogOutputStream { +public abstract class EditLogOutputStream implements Closeable { // these are statistics counters private long numSync; // number of sync(s) to disk private long totalTimeSync; // total time to sync diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 9753b7f4906..7f6435e778b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -127,6 +127,14 @@ private enum State { private Configuration conf; private List editsDirs; + + private ThreadLocal cache = + new ThreadLocal() { + @Override + protected OpInstanceCache initialValue() { + return new OpInstanceCache(); + } + }; /** * The edit directories that are shared between primary and secondary. @@ -596,7 +604,7 @@ private void printStatistics(boolean force) { * Records the block locations of the last block. */ public void logOpenFile(String path, INodeFileUnderConstruction newNode) { - AddOp op = AddOp.getInstance() + AddOp op = AddOp.getInstance(cache.get()) .setPath(path) .setReplication(newNode.getReplication()) .setModificationTime(newNode.getModificationTime()) @@ -614,7 +622,7 @@ public void logOpenFile(String path, INodeFileUnderConstruction newNode) { * Add close lease record to edit log. */ public void logCloseFile(String path, INodeFile newNode) { - CloseOp op = CloseOp.getInstance() + CloseOp op = CloseOp.getInstance(cache.get()) .setPath(path) .setReplication(newNode.getReplication()) .setModificationTime(newNode.getModificationTime()) @@ -627,7 +635,7 @@ public void logCloseFile(String path, INodeFile newNode) { } public void logUpdateBlocks(String path, INodeFileUnderConstruction file) { - UpdateBlocksOp op = UpdateBlocksOp.getInstance() + UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get()) .setPath(path) .setBlocks(file.getBlocks()); logEdit(op); @@ -637,7 +645,7 @@ public void logUpdateBlocks(String path, INodeFileUnderConstruction file) { * Add create directory record to edit log */ public void logMkDir(String path, INode newNode) { - MkdirOp op = MkdirOp.getInstance() + MkdirOp op = MkdirOp.getInstance(cache.get()) .setPath(path) .setTimestamp(newNode.getModificationTime()) .setPermissionStatus(newNode.getPermissionStatus()); @@ -649,7 +657,7 @@ public void logMkDir(String path, INode newNode) { * TODO: use String parameters until just before writing to disk */ void logRename(String src, String dst, long timestamp) { - RenameOldOp op = RenameOldOp.getInstance() + RenameOldOp op = RenameOldOp.getInstance(cache.get()) .setSource(src) .setDestination(dst) .setTimestamp(timestamp); @@ -660,7 +668,7 @@ void logRename(String src, String dst, long timestamp) { * Add rename record to edit log */ void logRename(String src, String dst, long timestamp, Options.Rename... options) { - RenameOp op = RenameOp.getInstance() + RenameOp op = RenameOp.getInstance(cache.get()) .setSource(src) .setDestination(dst) .setTimestamp(timestamp) @@ -672,7 +680,7 @@ void logRename(String src, String dst, long timestamp, Options.Rename... options * Add set replication record to edit log */ void logSetReplication(String src, short replication) { - SetReplicationOp op = SetReplicationOp.getInstance() + SetReplicationOp op = SetReplicationOp.getInstance(cache.get()) .setPath(src) .setReplication(replication); logEdit(op); @@ -684,7 +692,7 @@ void logSetReplication(String src, short replication) { * @param quota the directory size limit */ void logSetQuota(String src, long nsQuota, long dsQuota) { - SetQuotaOp op = SetQuotaOp.getInstance() + SetQuotaOp op = SetQuotaOp.getInstance(cache.get()) .setSource(src) .setNSQuota(nsQuota) .setDSQuota(dsQuota); @@ -693,7 +701,7 @@ void logSetQuota(String src, long nsQuota, long dsQuota) { /** Add set permissions record to edit log */ void logSetPermissions(String src, FsPermission permissions) { - SetPermissionsOp op = SetPermissionsOp.getInstance() + SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get()) .setSource(src) .setPermissions(permissions); logEdit(op); @@ -701,7 +709,7 @@ void logSetPermissions(String src, FsPermission permissions) { /** Add set owner record to edit log */ void logSetOwner(String src, String username, String groupname) { - SetOwnerOp op = SetOwnerOp.getInstance() + SetOwnerOp op = SetOwnerOp.getInstance(cache.get()) .setSource(src) .setUser(username) .setGroup(groupname); @@ -712,7 +720,7 @@ void logSetOwner(String src, String username, String groupname) { * concat(trg,src..) log */ void logConcat(String trg, String [] srcs, long timestamp) { - ConcatDeleteOp op = ConcatDeleteOp.getInstance() + ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get()) .setTarget(trg) .setSources(srcs) .setTimestamp(timestamp); @@ -723,7 +731,7 @@ void logConcat(String trg, String [] srcs, long timestamp) { * Add delete file record to edit log */ void logDelete(String src, long timestamp) { - DeleteOp op = DeleteOp.getInstance() + DeleteOp op = DeleteOp.getInstance(cache.get()) .setPath(src) .setTimestamp(timestamp); logEdit(op); @@ -733,7 +741,7 @@ void logDelete(String src, long timestamp) { * Add generation stamp record to edit log */ void logGenerationStamp(long genstamp) { - SetGenstampOp op = SetGenstampOp.getInstance() + SetGenstampOp op = SetGenstampOp.getInstance(cache.get()) .setGenerationStamp(genstamp); logEdit(op); } @@ -742,7 +750,7 @@ void logGenerationStamp(long genstamp) { * Add access time record to edit log */ void logTimes(String src, long mtime, long atime) { - TimesOp op = TimesOp.getInstance() + TimesOp op = TimesOp.getInstance(cache.get()) .setPath(src) .setModificationTime(mtime) .setAccessTime(atime); @@ -754,7 +762,7 @@ void logTimes(String src, long mtime, long atime) { */ void logSymlink(String path, String value, long mtime, long atime, INodeSymlink node) { - SymlinkOp op = SymlinkOp.getInstance() + SymlinkOp op = SymlinkOp.getInstance(cache.get()) .setPath(path) .setValue(value) .setModificationTime(mtime) @@ -770,7 +778,7 @@ void logSymlink(String path, String value, long mtime, */ void logGetDelegationToken(DelegationTokenIdentifier id, long expiryTime) { - GetDelegationTokenOp op = GetDelegationTokenOp.getInstance() + GetDelegationTokenOp op = GetDelegationTokenOp.getInstance(cache.get()) .setDelegationTokenIdentifier(id) .setExpiryTime(expiryTime); logEdit(op); @@ -778,26 +786,26 @@ void logGetDelegationToken(DelegationTokenIdentifier id, void logRenewDelegationToken(DelegationTokenIdentifier id, long expiryTime) { - RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance() + RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance(cache.get()) .setDelegationTokenIdentifier(id) .setExpiryTime(expiryTime); logEdit(op); } void logCancelDelegationToken(DelegationTokenIdentifier id) { - CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance() + CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance(cache.get()) .setDelegationTokenIdentifier(id); logEdit(op); } void logUpdateMasterKey(DelegationKey key) { - UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance() + UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance(cache.get()) .setDelegationKey(key); logEdit(op); } void logReassignLease(String leaseHolder, String src, String newHolder) { - ReassignLeaseOp op = ReassignLeaseOp.getInstance() + ReassignLeaseOp op = ReassignLeaseOp.getInstance(cache.get()) .setLeaseHolder(leaseHolder) .setPath(src) .setNewHolder(newHolder); @@ -896,7 +904,7 @@ synchronized void startLogSegment(final long segmentTxId, state = State.IN_SEGMENT; if (writeHeaderTxn) { - logEdit(LogSegmentOp.getInstance( + logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_START_LOG_SEGMENT)); logSync(); } @@ -912,7 +920,7 @@ synchronized void endCurrentLogSegment(boolean writeEndTxn) { "Bad state: %s", state); if (writeEndTxn) { - logEdit(LogSegmentOp.getInstance( + logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); logSync(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index ad8ddc06287..8f2b107e798 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -71,9 +71,11 @@ public class FSEditLogLoader { static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec private final FSNamesystem fsNamesys; - - public FSEditLogLoader(FSNamesystem fsNamesys) { + private long lastAppliedTxId; + + public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) { this.fsNamesys = fsNamesys; + this.lastAppliedTxId = lastAppliedTxId; } /** @@ -81,32 +83,29 @@ public FSEditLogLoader(FSNamesystem fsNamesys) { * This is where we apply edits that we've been writing to disk all * along. */ - long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) - throws IOException { - long numEdits = 0; + long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, + MetaRecoveryContext recovery) throws IOException { int logVersion = edits.getVersion(); fsNamesys.writeLock(); try { long startTime = now(); - numEdits = loadEditRecords(logVersion, edits, false, - expectedStartingTxId); + long numEdits = loadEditRecords(logVersion, edits, false, + expectedStartingTxId, recovery); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds."); + return numEdits; } finally { edits.close(); fsNamesys.writeUnlock(); } - - return numEdits; } long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, - long expectedStartingTxId) - throws IOException, EditLogInputException { + long expectedStartingTxId, MetaRecoveryContext recovery) + throws IOException { FSDirectory fsDir = fsNamesys.dir; - long numEdits = 0; EnumMap> opCounts = new EnumMap>(FSEditLogOpCodes.class); @@ -120,72 +119,99 @@ long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, long recentOpcodeOffsets[] = new long[4]; Arrays.fill(recentOpcodeOffsets, -1); - - long txId = expectedStartingTxId - 1; + + long expectedTxId = expectedStartingTxId; + long numEdits = 0; long lastTxId = in.getLastTxId(); long numTxns = (lastTxId - expectedStartingTxId) + 1; - long lastLogTime = now(); if (LOG.isDebugEnabled()) { LOG.debug("edit log length: " + in.length() + ", start txid: " + expectedStartingTxId + ", last txid: " + lastTxId); } - try { - try { - while (true) { + while (true) { + try { FSEditLogOp op; try { - if ((op = in.readOp()) == null) { + op = in.readOp(); + if (op == null) { break; } - } catch (IOException ioe) { - long badTxId = txId + 1; // because txId hasn't been incremented yet - String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId); + } catch (Throwable e) { + // Handle a problem with our input + check203UpgradeFailure(logVersion, e); + String errorMessage = + formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); FSImage.LOG.error(errorMessage); - throw new EditLogInputException(errorMessage, - ioe, numEdits); + if (recovery == null) { + // We will only try to skip over problematic opcodes when in + // recovery mode. + throw new EditLogInputException(errorMessage, e, numEdits); + } + MetaRecoveryContext.editLogLoaderPrompt( + "We failed to read txId " + expectedTxId, + recovery, "skipping the bad section in the log"); + in.resync(); + continue; } recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = in.getPosition(); if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { - long expectedTxId = txId + 1; - txId = op.txid; - if (txId != expectedTxId) { - throw new IOException("Expected transaction ID " + - expectedTxId + " but got " + txId); + if (op.getTransactionId() > expectedTxId) { + MetaRecoveryContext.editLogLoaderPrompt("There appears " + + "to be a gap in the edit log. We expected txid " + + expectedTxId + ", but got txid " + + op.getTransactionId() + ".", recovery, "ignoring missing " + + " transaction IDs"); + } else if (op.getTransactionId() < expectedTxId) { + MetaRecoveryContext.editLogLoaderPrompt("There appears " + + "to be an out-of-order edit in the edit log. We " + + "expected txid " + expectedTxId + ", but got txid " + + op.getTransactionId() + ".", recovery, + "skipping the out-of-order edit"); + continue; } } - - incrOpCount(op.opCode, opCounts); try { applyEditLogOp(op, fsDir, logVersion); - } catch (Throwable t) { - // Catch Throwable because in the case of a truly corrupt edits log, any - // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) - String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId); - FSImage.LOG.error(errorMessage); - throw new IOException(errorMessage, t); + } catch (Throwable e) { + LOG.error("Encountered exception on operation " + op, e); + MetaRecoveryContext.editLogLoaderPrompt("Failed to " + + "apply edit log operation " + op + ": error " + + e.getMessage(), recovery, "applying edits"); + } + // Now that the operation has been successfully decoded and + // applied, update our bookkeeping. + incrOpCount(op.opCode, opCounts); + if (op.hasTransactionId()) { + lastAppliedTxId = op.getTransactionId(); + expectedTxId = lastAppliedTxId + 1; + } else { + expectedTxId = lastAppliedTxId = expectedStartingTxId; } - // log progress - if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { - int percent = Math.round((float) txId / numTxns * 100); - LOG.info("replaying edit log: " + txId + "/" + numTxns - + " transactions completed. (" + percent + "%)"); - lastLogTime = now(); + if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { + long now = now(); + if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { + int percent = Math.round((float)lastAppliedTxId / numTxns * 100); + LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns + + " transactions completed. (" + percent + "%)"); + lastLogTime = now; + } } - numEdits++; + } catch (MetaRecoveryContext.RequestStopException e) { + MetaRecoveryContext.LOG.warn("Stopped reading edit log at " + + in.getPosition() + "/" + in.length()); + break; } - } catch (IOException ex) { - check203UpgradeFailure(logVersion, ex); - } finally { - if(closeOnExit) - in.close(); } } finally { + if(closeOnExit) { + in.close(); + } fsDir.writeUnlock(); fsNamesys.writeUnlock(); @@ -472,7 +498,7 @@ private static String formatEditLogReplayError(EditLogInputStream in, long recentOpcodeOffsets[], long txid) { StringBuilder sb = new StringBuilder(); sb.append("Error replaying edit log at offset " + in.getPosition()); - sb.append(" on transaction ID ").append(txid); + sb.append(". Expected transaction ID was ").append(txid); if (recentOpcodeOffsets[0] != -1) { Arrays.sort(recentOpcodeOffsets); sb.append("\nRecent opcode offsets:"); @@ -519,7 +545,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op, if (oldBlock.getBlockId() != newBlock.getBlockId() || (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && !(isGenStampUpdate && isLastBlock))) { - throw new IOException("Mismatched block IDs or generation stamps, " + + throw new IOException("Mismatched block IDs or generation stamps, " + "attempting to replace block " + oldBlock + " with " + newBlock + " as block # " + i + "/" + newBlocks.length + " of " + path); @@ -605,7 +631,7 @@ private void incrOpCount(FSEditLogOpCodes opCode, * Throw appropriate exception during upgrade from 203, when editlog loading * could fail due to opcode conflicts. */ - private void check203UpgradeFailure(int logVersion, IOException ex) + private void check203UpgradeFailure(int logVersion, Throwable e) throws IOException { // 0.20.203 version version has conflicting opcodes with the later releases. // The editlog must be emptied by restarting the namenode, before proceeding @@ -616,9 +642,7 @@ private void check203UpgradeFailure(int logVersion, IOException ex) + logVersion + " from release 0.20.203. Please go back to the old " + " release and restart the namenode. This empties the editlog " + " and saves the namespace. Resume the upgrade after this step."; - throw new IOException(msg, ex); - } else { - throw ex; + throw new IOException(msg, e); } } @@ -643,14 +667,14 @@ static EditLogValidation validateEditLog(EditLogInputStream in) { break; } if (firstTxId == HdfsConstants.INVALID_TXID) { - firstTxId = op.txid; + firstTxId = op.getTransactionId(); } if (lastTxId == HdfsConstants.INVALID_TXID - || op.txid == lastTxId + 1) { - lastTxId = op.txid; + || op.getTransactionId() == lastTxId + 1) { + lastTxId = op.getTransactionId(); } else { - FSImage.LOG.error("Out of order txid found. Found " + op.txid - + ", expected " + (lastTxId + 1)); + FSImage.LOG.error("Out of order txid found. Found " + + op.getTransactionId() + ", expected " + (lastTxId + 1)); break; } numValid++; @@ -743,4 +767,7 @@ public long getPos() { } } + public long getLastAppliedTxId() { + return lastAppliedTxId; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 92ac743c381..a96aa3fc6ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.util.PureJavaCrc32; @@ -54,6 +56,8 @@ import org.xml.sax.SAXException; import org.xml.sax.helpers.AttributesImpl; +import com.google.common.base.Preconditions; + import java.io.DataInput; import java.io.DataOutput; import java.io.DataInputStream; @@ -74,42 +78,44 @@ public abstract class FSEditLogOp { @SuppressWarnings("deprecation") - private static ThreadLocal> opInstances = - new ThreadLocal>() { - @Override - protected EnumMap initialValue() { - EnumMap instances - = new EnumMap(FSEditLogOpCodes.class); - instances.put(OP_ADD, new AddOp()); - instances.put(OP_CLOSE, new CloseOp()); - instances.put(OP_SET_REPLICATION, new SetReplicationOp()); - instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); - instances.put(OP_RENAME_OLD, new RenameOldOp()); - instances.put(OP_DELETE, new DeleteOp()); - instances.put(OP_MKDIR, new MkdirOp()); - instances.put(OP_SET_GENSTAMP, new SetGenstampOp()); - instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); - instances.put(OP_SET_OWNER, new SetOwnerOp()); - instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); - instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); - instances.put(OP_SET_QUOTA, new SetQuotaOp()); - instances.put(OP_TIMES, new TimesOp()); - instances.put(OP_SYMLINK, new SymlinkOp()); - instances.put(OP_RENAME, new RenameOp()); - instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); - instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); - instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); - instances.put(OP_CANCEL_DELEGATION_TOKEN, - new CancelDelegationTokenOp()); - instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); - instances.put(OP_START_LOG_SEGMENT, - new LogSegmentOp(OP_START_LOG_SEGMENT)); - instances.put(OP_END_LOG_SEGMENT, - new LogSegmentOp(OP_END_LOG_SEGMENT)); - instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp()); - return instances; - } - }; + final public static class OpInstanceCache { + private EnumMap inst = + new EnumMap(FSEditLogOpCodes.class); + + public OpInstanceCache() { + inst.put(OP_ADD, new AddOp()); + inst.put(OP_CLOSE, new CloseOp()); + inst.put(OP_SET_REPLICATION, new SetReplicationOp()); + inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); + inst.put(OP_RENAME_OLD, new RenameOldOp()); + inst.put(OP_DELETE, new DeleteOp()); + inst.put(OP_MKDIR, new MkdirOp()); + inst.put(OP_SET_GENSTAMP, new SetGenstampOp()); + inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); + inst.put(OP_SET_OWNER, new SetOwnerOp()); + inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); + inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); + inst.put(OP_SET_QUOTA, new SetQuotaOp()); + inst.put(OP_TIMES, new TimesOp()); + inst.put(OP_SYMLINK, new SymlinkOp()); + inst.put(OP_RENAME, new RenameOp()); + inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); + inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); + inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); + inst.put(OP_CANCEL_DELEGATION_TOKEN, + new CancelDelegationTokenOp()); + inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); + inst.put(OP_START_LOG_SEGMENT, + new LogSegmentOp(OP_START_LOG_SEGMENT)); + inst.put(OP_END_LOG_SEGMENT, + new LogSegmentOp(OP_END_LOG_SEGMENT)); + inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp()); + } + + public FSEditLogOp get(FSEditLogOpCodes opcode) { + return inst.get(opcode); + } + } /** * Constructor for an EditLog Op. EditLog ops cannot be constructed @@ -117,13 +123,22 @@ protected EnumMap initialValue() { */ private FSEditLogOp(FSEditLogOpCodes opCode) { this.opCode = opCode; - this.txid = 0; + this.txid = HdfsConstants.INVALID_TXID; } public long getTransactionId() { + Preconditions.checkState(txid != HdfsConstants.INVALID_TXID); return txid; } + public String getTransactionIdStr() { + return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid; + } + + public boolean hasTransactionId() { + return (txid != HdfsConstants.INVALID_TXID); + } + public void setTransactionId(long txid) { this.txid = txid; } @@ -373,8 +388,8 @@ private AddOp() { super(OP_ADD); } - static AddOp getInstance() { - return (AddOp)opInstances.get().get(OP_ADD); + static AddOp getInstance(OpInstanceCache cache) { + return (AddOp)cache.get(OP_ADD); } public boolean shouldCompleteLastBlock() { @@ -395,8 +410,8 @@ private CloseOp() { super(OP_CLOSE); } - static CloseOp getInstance() { - return (CloseOp)opInstances.get().get(OP_CLOSE); + static CloseOp getInstance(OpInstanceCache cache) { + return (CloseOp)cache.get(OP_CLOSE); } public boolean shouldCompleteLastBlock() { @@ -420,9 +435,8 @@ private UpdateBlocksOp() { super(OP_UPDATE_BLOCKS); } - static UpdateBlocksOp getInstance() { - return (UpdateBlocksOp)opInstances.get() - .get(OP_UPDATE_BLOCKS); + static UpdateBlocksOp getInstance(OpInstanceCache cache) { + return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS); } @@ -500,9 +514,8 @@ private SetReplicationOp() { super(OP_SET_REPLICATION); } - static SetReplicationOp getInstance() { - return (SetReplicationOp)opInstances.get() - .get(OP_SET_REPLICATION); + static SetReplicationOp getInstance(OpInstanceCache cache) { + return (SetReplicationOp)cache.get(OP_SET_REPLICATION); } SetReplicationOp setPath(String path) { @@ -571,9 +584,8 @@ private ConcatDeleteOp() { super(OP_CONCAT_DELETE); } - static ConcatDeleteOp getInstance() { - return (ConcatDeleteOp)opInstances.get() - .get(OP_CONCAT_DELETE); + static ConcatDeleteOp getInstance(OpInstanceCache cache) { + return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE); } ConcatDeleteOp setTarget(String trg) { @@ -697,9 +709,8 @@ private RenameOldOp() { super(OP_RENAME_OLD); } - static RenameOldOp getInstance() { - return (RenameOldOp)opInstances.get() - .get(OP_RENAME_OLD); + static RenameOldOp getInstance(OpInstanceCache cache) { + return (RenameOldOp)cache.get(OP_RENAME_OLD); } RenameOldOp setSource(String src) { @@ -790,9 +801,8 @@ private DeleteOp() { super(OP_DELETE); } - static DeleteOp getInstance() { - return (DeleteOp)opInstances.get() - .get(OP_DELETE); + static DeleteOp getInstance(OpInstanceCache cache) { + return (DeleteOp)cache.get(OP_DELETE); } DeleteOp setPath(String path) { @@ -872,9 +882,8 @@ private MkdirOp() { super(OP_MKDIR); } - static MkdirOp getInstance() { - return (MkdirOp)opInstances.get() - .get(OP_MKDIR); + static MkdirOp getInstance(OpInstanceCache cache) { + return (MkdirOp)cache.get(OP_MKDIR); } MkdirOp setPath(String path) { @@ -977,9 +986,8 @@ private SetGenstampOp() { super(OP_SET_GENSTAMP); } - static SetGenstampOp getInstance() { - return (SetGenstampOp)opInstances.get() - .get(OP_SET_GENSTAMP); + static SetGenstampOp getInstance(OpInstanceCache cache) { + return (SetGenstampOp)cache.get(OP_SET_GENSTAMP); } SetGenstampOp setGenerationStamp(long genStamp) { @@ -1031,9 +1039,8 @@ private SetPermissionsOp() { super(OP_SET_PERMISSIONS); } - static SetPermissionsOp getInstance() { - return (SetPermissionsOp)opInstances.get() - .get(OP_SET_PERMISSIONS); + static SetPermissionsOp getInstance(OpInstanceCache cache) { + return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS); } SetPermissionsOp setSource(String src) { @@ -1098,9 +1105,8 @@ private SetOwnerOp() { super(OP_SET_OWNER); } - static SetOwnerOp getInstance() { - return (SetOwnerOp)opInstances.get() - .get(OP_SET_OWNER); + static SetOwnerOp getInstance(OpInstanceCache cache) { + return (SetOwnerOp)cache.get(OP_SET_OWNER); } SetOwnerOp setSource(String src) { @@ -1179,9 +1185,8 @@ private SetNSQuotaOp() { super(OP_SET_NS_QUOTA); } - static SetNSQuotaOp getInstance() { - return (SetNSQuotaOp)opInstances.get() - .get(OP_SET_NS_QUOTA); + static SetNSQuotaOp getInstance(OpInstanceCache cache) { + return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA); } @Override @@ -1232,9 +1237,8 @@ private ClearNSQuotaOp() { super(OP_CLEAR_NS_QUOTA); } - static ClearNSQuotaOp getInstance() { - return (ClearNSQuotaOp)opInstances.get() - .get(OP_CLEAR_NS_QUOTA); + static ClearNSQuotaOp getInstance(OpInstanceCache cache) { + return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA); } @Override @@ -1281,9 +1285,8 @@ private SetQuotaOp() { super(OP_SET_QUOTA); } - static SetQuotaOp getInstance() { - return (SetQuotaOp)opInstances.get() - .get(OP_SET_QUOTA); + static SetQuotaOp getInstance(OpInstanceCache cache) { + return (SetQuotaOp)cache.get(OP_SET_QUOTA); } SetQuotaOp setSource(String src) { @@ -1360,9 +1363,8 @@ private TimesOp() { super(OP_TIMES); } - static TimesOp getInstance() { - return (TimesOp)opInstances.get() - .get(OP_TIMES); + static TimesOp getInstance(OpInstanceCache cache) { + return (TimesOp)cache.get(OP_TIMES); } TimesOp setPath(String path) { @@ -1458,9 +1460,8 @@ private SymlinkOp() { super(OP_SYMLINK); } - static SymlinkOp getInstance() { - return (SymlinkOp)opInstances.get() - .get(OP_SYMLINK); + static SymlinkOp getInstance(OpInstanceCache cache) { + return (SymlinkOp)cache.get(OP_SYMLINK); } SymlinkOp setPath(String path) { @@ -1579,9 +1580,8 @@ private RenameOp() { super(OP_RENAME); } - static RenameOp getInstance() { - return (RenameOp)opInstances.get() - .get(OP_RENAME); + static RenameOp getInstance(OpInstanceCache cache) { + return (RenameOp)cache.get(OP_RENAME); } RenameOp setSource(String src) { @@ -1723,9 +1723,8 @@ private ReassignLeaseOp() { super(OP_REASSIGN_LEASE); } - static ReassignLeaseOp getInstance() { - return (ReassignLeaseOp)opInstances.get() - .get(OP_REASSIGN_LEASE); + static ReassignLeaseOp getInstance(OpInstanceCache cache) { + return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE); } ReassignLeaseOp setLeaseHolder(String leaseHolder) { @@ -1798,9 +1797,8 @@ private GetDelegationTokenOp() { super(OP_GET_DELEGATION_TOKEN); } - static GetDelegationTokenOp getInstance() { - return (GetDelegationTokenOp)opInstances.get() - .get(OP_GET_DELEGATION_TOKEN); + static GetDelegationTokenOp getInstance(OpInstanceCache cache) { + return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN); } GetDelegationTokenOp setDelegationTokenIdentifier( @@ -1870,9 +1868,8 @@ private RenewDelegationTokenOp() { super(OP_RENEW_DELEGATION_TOKEN); } - static RenewDelegationTokenOp getInstance() { - return (RenewDelegationTokenOp)opInstances.get() - .get(OP_RENEW_DELEGATION_TOKEN); + static RenewDelegationTokenOp getInstance(OpInstanceCache cache) { + return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN); } RenewDelegationTokenOp setDelegationTokenIdentifier( @@ -1941,9 +1938,8 @@ private CancelDelegationTokenOp() { super(OP_CANCEL_DELEGATION_TOKEN); } - static CancelDelegationTokenOp getInstance() { - return (CancelDelegationTokenOp)opInstances.get() - .get(OP_CANCEL_DELEGATION_TOKEN); + static CancelDelegationTokenOp getInstance(OpInstanceCache cache) { + return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN); } CancelDelegationTokenOp setDelegationTokenIdentifier( @@ -1996,9 +1992,8 @@ private UpdateMasterKeyOp() { super(OP_UPDATE_MASTER_KEY); } - static UpdateMasterKeyOp getInstance() { - return (UpdateMasterKeyOp)opInstances.get() - .get(OP_UPDATE_MASTER_KEY); + static UpdateMasterKeyOp getInstance(OpInstanceCache cache) { + return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY); } UpdateMasterKeyOp setDelegationKey(DelegationKey key) { @@ -2050,8 +2045,9 @@ private LogSegmentOp(FSEditLogOpCodes code) { code == OP_END_LOG_SEGMENT : "Bad op: " + code; } - static LogSegmentOp getInstance(FSEditLogOpCodes code) { - return (LogSegmentOp)opInstances.get().get(code); + static LogSegmentOp getInstance(OpInstanceCache cache, + FSEditLogOpCodes code) { + return (LogSegmentOp)cache.get(code); } public void readFields(DataInputStream in, int logVersion) @@ -2091,8 +2087,8 @@ private InvalidOp() { super(OP_INVALID); } - static InvalidOp getInstance() { - return (InvalidOp)opInstances.get().get(OP_INVALID); + static InvalidOp getInstance(OpInstanceCache cache) { + return (InvalidOp)cache.get(OP_INVALID); } @Override @@ -2207,6 +2203,7 @@ public static class Reader { private final DataInputStream in; private final int logVersion; private final Checksum checksum; + private final OpInstanceCache cache; /** * Construct the reader @@ -2228,6 +2225,7 @@ public Reader(DataInputStream in, int logVersion) { } else { this.in = in; } + this.cache = new OpInstanceCache(); } /** @@ -2236,16 +2234,42 @@ public Reader(DataInputStream in, int logVersion) { * Note that the objects returned from this method may be re-used by future * calls to the same method. * + * @param skipBrokenEdits If true, attempt to skip over damaged parts of + * the input stream, rather than throwing an IOException * @return the operation read from the stream, or null at the end of the file * @throws IOException on error. */ - public FSEditLogOp readOp() throws IOException { + public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { + FSEditLogOp op = null; + while (true) { + try { + in.mark(in.available()); + try { + op = decodeOp(); + } finally { + // If we encountered an exception or an end-of-file condition, + // do not advance the input stream. + if (op == null) { + in.reset(); + } + } + return op; + } catch (IOException e) { + if (!skipBrokenEdits) { + throw e; + } + if (in.skip(1) < 1) { + return null; + } + } + } + } + + private FSEditLogOp decodeOp() throws IOException { if (checksum != null) { checksum.reset(); } - in.mark(1); - byte opCodeByte; try { opCodeByte = in.readByte(); @@ -2255,12 +2279,10 @@ public FSEditLogOp readOp() throws IOException { } FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); - if (opCode == OP_INVALID) { - in.reset(); // reset back to end of file if somebody reads it again + if (opCode == OP_INVALID) return null; - } - FSEditLogOp op = opInstances.get().get(opCode); + FSEditLogOp op = cache.get(opCode); if (op == null) { throw new IOException("Read invalid opcode " + opCode); } @@ -2268,6 +2290,8 @@ public FSEditLogOp readOp() throws IOException { if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { // Read the txid op.setTransactionId(in.readLong()); + } else { + op.setTransactionId(HdfsConstants.INVALID_TXID); } op.readFields(in, logVersion); @@ -2426,8 +2450,4 @@ public static PermissionStatus permissionStatusFromXml(Stanza st) short mode = Short.valueOf(st.getValue("MODE")); return new PermissionStatus(username, groupname, new FsPermission(mode)); } - - public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) { - return opInstances.get().get(opCode); - } -} + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 46e61a599fd..a9bf5c70667 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -158,8 +158,8 @@ void format(FSNamesystem fsn, String clusterId) throws IOException { * @throws IOException * @return true if the image needs to be saved or false otherwise */ - boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target) - throws IOException { + boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target, + MetaRecoveryContext recovery) throws IOException { assert startOpt != StartupOption.FORMAT : "NameNode formatting should be performed before reading the image"; @@ -244,7 +244,7 @@ boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target) // just load the image } - return loadFSImage(target); + return loadFSImage(target, recovery); } /** @@ -304,7 +304,7 @@ private void doUpgrade(FSNamesystem target) throws IOException { if(storage.getDistributedUpgradeState()) { // only distributed upgrade need to continue // don't do version upgrade - this.loadFSImage(target); + this.loadFSImage(target, null); storage.initializeDistributedUpgrade(); return; } @@ -319,7 +319,7 @@ private void doUpgrade(FSNamesystem target) throws IOException { } // load the latest image - this.loadFSImage(target); + this.loadFSImage(target, null); // Do upgrade for each directory long oldCTime = storage.getCTime(); @@ -505,7 +505,7 @@ void doImportCheckpoint(FSNamesystem target) throws IOException { target.dir.fsImage = ckptImage; // load from the checkpoint dirs try { - ckptImage.recoverTransitionRead(StartupOption.REGULAR, target); + ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null); } finally { ckptImage.close(); } @@ -550,7 +550,7 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException { target.dir.reset(); LOG.debug("Reloading namespace from " + file); - loadFSImage(file, target); + loadFSImage(file, target, null); } /** @@ -568,7 +568,8 @@ void reloadFromImageFile(File file, FSNamesystem target) throws IOException { * @return whether the image should be saved * @throws IOException */ - boolean loadFSImage(FSNamesystem target) throws IOException { + boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery) + throws IOException { FSImageStorageInspector inspector = storage.readAndInspectDirs(); isUpgradeFinalized = inspector.isUpgradeFinalized(); @@ -583,7 +584,6 @@ boolean loadFSImage(FSNamesystem target) throws IOException { // We only want to recover streams if we're going into Active mode. editLog.recoverUnclosedStreams(); } - if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { // If we're open for write, we're either non-HA or we're the active NN, so @@ -610,7 +610,7 @@ boolean loadFSImage(FSNamesystem target) throws IOException { getLayoutVersion())) { // For txid-based layout, we should have a .md5 file // next to the image file - loadFSImage(imageFile.getFile(), target); + loadFSImage(imageFile.getFile(), target, recovery); } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) { // In 0.22, we have the checksum stored in the VERSION file. @@ -622,22 +622,19 @@ boolean loadFSImage(FSNamesystem target) throws IOException { NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY + " not set for storage directory " + sdForProperties.getRoot()); } - loadFSImage(imageFile.getFile(), new MD5Hash(md5), target); + loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery); } else { // We don't have any record of the md5sum - loadFSImage(imageFile.getFile(), null, target); + loadFSImage(imageFile.getFile(), null, target, recovery); } } catch (IOException ioe) { FSEditLog.closeAllStreams(editStreams); throw new IOException("Failed to load image from " + imageFile, ioe); } - - long numLoaded = loadEdits(editStreams, target); + long txnsAdvanced = loadEdits(editStreams, target, recovery); needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), - numLoaded); - - // update the txid for the edit log - editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1); + txnsAdvanced); + editLog.setNextTxId(lastAppliedTxId + 1); return needToSave; } @@ -664,33 +661,29 @@ private boolean needsResaveBasedOnStaleCheckpoint( /** * Load the specified list of edit files into the image. - * @return the number of transactions loaded */ public long loadEdits(Iterable editStreams, - FSNamesystem target) throws IOException, EditLogInputException { + FSNamesystem target, MetaRecoveryContext recovery) throws IOException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); - - long startingTxId = getLastAppliedTxId() + 1; - long numLoaded = 0; - + + long prevLastAppliedTxId = lastAppliedTxId; try { - FSEditLogLoader loader = new FSEditLogLoader(target); + FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId); // Load latest edits for (EditLogInputStream editIn : editStreams) { - LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId); - long thisNumLoaded = 0; + LOG.info("Reading " + editIn + " expecting start txid #" + + (lastAppliedTxId + 1)); try { - thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); - } catch (EditLogInputException elie) { - thisNumLoaded = elie.getNumEditsLoaded(); - throw elie; + loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery); } finally { // Update lastAppliedTxId even in case of error, since some ops may // have been successfully applied before the error. - lastAppliedTxId = startingTxId + thisNumLoaded - 1; - startingTxId += thisNumLoaded; - numLoaded += thisNumLoaded; + lastAppliedTxId = loader.getLastAppliedTxId(); + } + // If we are in recovery mode, we may have skipped over some txids. + if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) { + lastAppliedTxId = editIn.getLastTxId(); } } } finally { @@ -698,8 +691,7 @@ public long loadEdits(Iterable editStreams, // update the counts target.dir.updateCountForINodeWithQuota(); } - - return numLoaded; + return lastAppliedTxId - prevLastAppliedTxId; } @@ -707,14 +699,14 @@ public long loadEdits(Iterable editStreams, * Load the image namespace from the given image file, verifying * it against the MD5 sum stored in its associated .md5 file. */ - private void loadFSImage(File imageFile, FSNamesystem target) - throws IOException { + private void loadFSImage(File imageFile, FSNamesystem target, + MetaRecoveryContext recovery) throws IOException { MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile); if (expectedMD5 == null) { throw new IOException("No MD5 file found corresponding to image file " + imageFile); } - loadFSImage(imageFile, expectedMD5, target); + loadFSImage(imageFile, expectedMD5, target, recovery); } /** @@ -722,7 +714,7 @@ private void loadFSImage(File imageFile, FSNamesystem target) * filenames and blocks. */ private void loadFSImage(File curFile, MD5Hash expectedMd5, - FSNamesystem target) throws IOException { + FSNamesystem target, MetaRecoveryContext recovery) throws IOException { FSImageFormat.Loader loader = new FSImageFormat.Loader( conf, target); loader.load(curFile); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java index dbf1860a85e..c3d35b13630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageTransactionalStorageInspector.java @@ -56,7 +56,14 @@ public void inspectDirectory(StorageDirectory sd) throws IOException { return; } - maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); + // Check for a seen_txid file, which marks a minimum transaction ID that + // must be included in our load plan. + try { + maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); + } catch (IOException ioe) { + LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); + return; + } File currentDir = sd.getCurrentDir(); File filesInStorage[]; @@ -91,15 +98,6 @@ public void inspectDirectory(StorageDirectory sd) throws IOException { } } - - // Check for a seen_txid file, which marks a minimum transaction ID that - // must be included in our load plan. - try { - maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); - } catch (IOException ioe) { - LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe); - } - // set finalized flag isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index a5a8ca0d322..1363c6cda05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -380,9 +380,12 @@ public static FSNamesystem loadFromDisk(Configuration conf, FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs); FSNamesystem namesystem = new FSNamesystem(conf, fsImage); + StartupOption startOpt = NameNode.getStartupOption(conf); + if (startOpt == StartupOption.RECOVER) { + namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + } long loadStart = now(); - StartupOption startOpt = NameNode.getStartupOption(conf); String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); namesystem.loadFSImage(startOpt, fsImage, HAUtil.isHAEnabled(conf, nameserviceId)); @@ -491,7 +494,8 @@ void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled) writeLock(); try { // We shouldn't be calling saveNamespace if we've come up in standby state. - if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) { + MetaRecoveryContext recovery = startOpt.createRecoveryContext(); + if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) { fsImage.saveNamespace(this); } // This will start a new log segment and write to the seen_txid file, so diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 603dd000909..c2281700478 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -232,7 +232,10 @@ synchronized public EditLogInputStream getInputStream(long fromTxId, LOG.info(String.format("Log begins at txid %d, but requested start " + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId, transactionsToSkip)); - elfis.skipTransactions(transactionsToSkip); + } + if (elfis.skipUntil(fromTxId) == false) { + throw new IOException("failed to advance input stream to txid " + + fromTxId); } return elfis; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalStream.java deleted file mode 100644 index d786476470d..00000000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalStream.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -/** - * A generic interface for journal input and output streams. - */ -interface JournalStream { - /** - * Type of the underlying persistent storage type the stream is based upon. - *

    - *
  • {@link JournalType#FILE} - streams edits into a local file, see - * {@link FSEditLog.EditLogFileOutputStream} and - * {@link FSEditLog.EditLogFileInputStream}
  • - *
  • {@link JournalType#BACKUP} - streams edits to a backup node, see - * {@link EditLogBackupOutputStream} and {@link EditLogBackupInputStream}
  • - *
- */ - static enum JournalType { - FILE, - BACKUP; - boolean isOfType(JournalType other) { - return other == null || this == other; - } - }; - - /** - * Get this stream name. - * - * @return name of the stream - */ - String getName(); - - /** - * Get the type of the stream. - * Determines the underlying persistent storage type. - * @see JournalType - * @return type - */ - JournalType getType(); -} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java new file mode 100644 index 00000000000..b4bd119eb58 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MetaRecoveryContext.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** Context data for an ongoing NameNode metadata recovery process. */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class MetaRecoveryContext { + public static final Log LOG = LogFactory.getLog(MetaRecoveryContext.class.getName()); + public final static int FORCE_NONE = 0; + public final static int FORCE_FIRST_CHOICE = 1; + public final static int FORCE_ALL = 2; + private int force; + + /** Exception thrown when the user has requested processing to stop. */ + static public class RequestStopException extends IOException { + private static final long serialVersionUID = 1L; + public RequestStopException(String msg) { + super(msg); + } + } + + public MetaRecoveryContext(int force) { + this.force = force; + } + + /** + * Display a prompt to the user and get his or her choice. + * + * @param prompt The prompt to display + * @param default First choice (will be taken if autoChooseDefault is + * true) + * @param choices Other choies + * + * @return The choice that was taken + * @throws IOException + */ + public String ask(String prompt, String firstChoice, String... choices) + throws IOException { + while (true) { + LOG.info(prompt); + if (force > FORCE_NONE) { + LOG.info("automatically choosing " + firstChoice); + return firstChoice; + } + StringBuilder responseBuilder = new StringBuilder(); + while (true) { + int c = System.in.read(); + if (c == -1 || c == '\r' || c == '\n') { + break; + } + responseBuilder.append((char)c); + } + String response = responseBuilder.toString(); + if (response.equalsIgnoreCase(firstChoice)) + return firstChoice; + for (String c : choices) { + if (response.equalsIgnoreCase(c)) { + return c; + } + } + LOG.error("I'm sorry, I cannot understand your response.\n"); + } + } + + public static void editLogLoaderPrompt(String prompt, + MetaRecoveryContext recovery, String contStr) + throws IOException, RequestStopException + { + if (recovery == null) { + throw new IOException(prompt); + } + LOG.error(prompt); + String answer = recovery.ask("\nEnter 'c' to continue, " + contStr + "\n" + + "Enter 's' to stop reading the edit log here, abandoning any later " + + "edits\n" + + "Enter 'q' to quit without saving\n" + + "Enter 'a' to always select the first choice in the future " + + "without prompting. " + + "(c/s/q/a)\n", "c", "s", "q", "a"); + if (answer.equals("c")) { + LOG.info("Continuing."); + return; + } else if (answer.equals("s")) { + throw new RequestStopException("user requested stop"); + } else if (answer.equals("q")) { + recovery.quit(); + } else { + recovery.setForce(FORCE_FIRST_CHOICE); + return; + } + } + + /** Log a message and quit */ + public void quit() { + LOG.error("Exiting on user request."); + System.exit(0); + } + + public int getForce() { + return this.force; + } + + public void setForce(int force) { + this.force = force; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 6fae88a9652..d5061b7be8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.UpgradeManager; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; @@ -299,8 +298,7 @@ synchronized void setStorageDirectories(Collection fsNameDirs, NameNodeDirType.IMAGE; // Add to the list of storage directories, only if the // URI is of type file:// - if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) - == 0){ + if(dirName.getScheme().compareTo("file") == 0) { this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), dirType, !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared. @@ -312,8 +310,7 @@ synchronized void setStorageDirectories(Collection fsNameDirs, checkSchemeConsistency(dirName); // Add to the list of storage directories, only if the // URI is of type file:// - if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) - == 0) + if(dirName.getScheme().compareTo("file") == 0) this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName))); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 9fb644e8722..66558606289 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -514,6 +514,8 @@ private void stopHttpServer() { *
  • {@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node
  • *
  • {@link StartupOption#UPGRADE UPGRADE} - start the cluster * upgrade and create a snapshot of the current file system state
  • + *
  • {@link StartupOption#RECOVERY RECOVERY} - recover name node + * metadata
  • *
  • {@link StartupOption#ROLLBACK ROLLBACK} - roll the * cluster back to the previous state
  • *
  • {@link StartupOption#FINALIZE FINALIZE} - finalize @@ -832,7 +834,10 @@ private static void printUsage() { StartupOption.FINALIZE.getName() + "] | [" + StartupOption.IMPORT.getName() + "] | [" + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" + - StartupOption.INITIALIZESHAREDEDITS.getName() + "]"); + StartupOption.INITIALIZESHAREDEDITS.getName() + "] | [" + + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" + + StartupOption.RECOVER.getName() + " [ " + + StartupOption.FORCE.getName() + " ] ]"); } private static StartupOption parseArguments(String args[]) { @@ -876,6 +881,21 @@ private static StartupOption parseArguments(String args[]) { } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) { startOpt = StartupOption.INITIALIZESHAREDEDITS; return startOpt; + } else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) { + if (startOpt != StartupOption.REGULAR) { + throw new RuntimeException("Can't combine -recover with " + + "other startup options."); + } + startOpt = StartupOption.RECOVER; + while (++i < argsLen) { + if (args[i].equalsIgnoreCase( + StartupOption.FORCE.getName())) { + startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE); + } else { + throw new RuntimeException("Error parsing recovery options: " + + "can't understand option \"" + args[i] + "\""); + } + } } else { return null; } @@ -892,6 +912,39 @@ static StartupOption getStartupOption(Configuration conf) { StartupOption.REGULAR.toString())); } + private static void doRecovery(StartupOption startOpt, Configuration conf) + throws IOException { + if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) { + if (!confirmPrompt("You have selected Metadata Recovery mode. " + + "This mode is intended to recover lost metadata on a corrupt " + + "filesystem. Metadata recovery mode often permanently deletes " + + "data from your HDFS filesystem. Please back up your edit log " + + "and fsimage before trying this!\n\n" + + "Are you ready to proceed? (Y/N)\n")) { + System.err.println("Recovery aborted at user request.\n"); + return; + } + } + MetaRecoveryContext.LOG.info("starting recovery..."); + UserGroupInformation.setConfiguration(conf); + NameNode.initMetrics(conf, startOpt.toNodeRole()); + FSNamesystem fsn = null; + try { + fsn = FSNamesystem.loadFromDisk(conf); + fsn.saveNamespace(); + MetaRecoveryContext.LOG.info("RECOVERY COMPLETE"); + } catch (IOException e) { + MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e); + throw e; + } catch (RuntimeException e) { + MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e); + throw e; + } finally { + if (fsn != null) + fsn.close(); + } + } + /** * Print out a prompt to the user, and return true if the user * responds with "Y" or "yes". @@ -973,6 +1026,10 @@ public static NameNode createNameNode(String argv[], Configuration conf) DefaultMetricsSystem.initialize(role.toString().replace(" ", "")); return new BackupNode(conf, role); } + case RECOVER: { + NameNode.doRecovery(startOpt, conf); + return null; + } default: DefaultMetricsSystem.initialize("NameNode"); return new NameNode(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 0b3a1f93f65..c11f1d760e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -219,7 +219,7 @@ private void doTailEdits() throws IOException, InterruptedException { // disk are ignored. long editsLoaded = 0; try { - editsLoaded = image.loadEdits(streams, namesystem); + editsLoaded = image.loadEdits(streams, namesystem, null); } catch (EditLogInputException elie) { editsLoaded = elie.getNumEditsLoaded(); throw elie; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java index 9e28c908ed3..009db6a4776 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/OfflineEditsXmlLoader.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; import org.apache.hadoop.hdfs.util.XMLUtils.Stanza; import org.xml.sax.Attributes; @@ -54,6 +55,7 @@ class OfflineEditsXmlLoader private FSEditLogOpCodes opCode; private StringBuffer cbuf; private long nextTxId; + private final OpInstanceCache opCache = new OpInstanceCache(); static enum ParseState { EXPECT_EDITS_TAG, @@ -207,7 +209,7 @@ public void endElement (String uri, String name, String qName) { throw new InvalidXmlException("expected "); } state = ParseState.EXPECT_RECORD; - FSEditLogOp op = FSEditLogOp.getOpInstance(opCode); + FSEditLogOp op = opCache.get(opCode); opCode = null; try { op.decodeXml(stanza); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 7d0bf444d50..2f1d992005d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -581,6 +581,10 @@ private void initMiniDFSCluster( } } + if (operation == StartupOption.RECOVER) { + return; + } + // Start the DataNodes startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, hosts, simulatedCapacities, setupHostsFile); @@ -781,6 +785,9 @@ private void createNameNode(int nnIndex, Configuration conf, operation == StartupOption.REGULAR) ? new String[] {} : new String[] {operation.getName()}; NameNode nn = NameNode.createNameNode(args, conf); + if (operation == StartupOption.RECOVER) { + return; + } // After the NN has started, set back the bound ports into // the conf @@ -956,6 +963,9 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes, long[] simulatedCapacities, boolean setupHostsFile, boolean checkDataNodeAddrConfig) throws IOException { + if (operation == StartupOption.RECOVER) { + return; + } conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1"); int curDatanodesNum = dataNodes.size(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index fb3bc9b4c45..05df7fe9835 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -179,8 +179,8 @@ public void testPreTxidEditLogWithEdits() throws Exception { } private long testLoad(byte[] data, FSNamesystem namesys) throws IOException { - FSEditLogLoader loader = new FSEditLogLoader(namesys); - return loader.loadFSEdits(new EditLogByteInputStream(data), 1); + FSEditLogLoader loader = new FSEditLogLoader(namesys, 0); + return loader.loadFSEdits(new EditLogByteInputStream(data), 1, null); } /** @@ -315,7 +315,7 @@ private void testEditLog(int initialSize) throws IOException { // for (Iterator it = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) { - FSEditLogLoader loader = new FSEditLogLoader(namesystem); + FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0); File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3, 3 + expectedTxns - 1); @@ -323,7 +323,7 @@ private void testEditLog(int initialSize) throws IOException { System.out.println("Verifying file: " + editFile); long numEdits = loader.loadFSEdits( - new EditLogFileInputStream(editFile), 3); + new EditLogFileInputStream(editFile), 3, null); int numLeases = namesystem.leaseManager.countLease(); System.out.println("Number of outstanding leases " + numLeases); assertEquals(0, numLeases); @@ -774,8 +774,8 @@ public long getPosition() { } @Override - public FSEditLogOp readOp() throws IOException { - return reader.readOp(); + protected FSEditLogOp nextOp() throws IOException { + return reader.readOp(false); } @Override @@ -788,16 +788,11 @@ public void close() throws IOException { input.close(); } - @Override // JournalStream + @Override public String getName() { return "AnonEditLogByteInputStream"; } - @Override // JournalStream - public JournalType getType() { - return JournalType.FILE; - } - @Override public boolean isInProgress() { return true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index da66b45da2a..a17b54f6bfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -236,9 +236,9 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage, File editFile = new File(sd.getCurrentDir(), logFileName); System.out.println("Verifying file: " + editFile); - FSEditLogLoader loader = new FSEditLogLoader(namesystem); + FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId); long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), - startTxId); + startTxId, null); System.out.println("Number of edits: " + numEditsThisLog); assertTrue(numEdits == -1 || numEditsThisLog == numEdits); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 98605e1f4e1..1917ddeb9a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -92,8 +92,8 @@ public void testDisplayRecentEditLogOpCodes() throws IOException { rwf.close(); StringBuilder bld = new StringBuilder(); - bld.append("^Error replaying edit log at offset \\d+"); - bld.append(" on transaction ID \\d+\n"); + bld.append("^Error replaying edit log at offset \\d+. "); + bld.append("Expected transaction ID was \\d+\n"); bld.append("Recent opcode offsets: (\\d+\\s*){4}$"); try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java new file mode 100644 index 00000000000..69680967897 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache; +import org.apache.hadoop.hdfs.server.namenode.FSImage; +import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp; +import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.StringUtils; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * This tests data recovery mode for the NameNode. + */ +public class TestNameNodeRecovery { + private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class); + private static StartupOption recoverStartOpt = StartupOption.RECOVER; + + static { + recoverStartOpt.setForce(MetaRecoveryContext.FORCE_ALL); + } + + static void runEditLogTest(EditLogTestSetup elts) throws IOException { + final String TEST_LOG_NAME = "test_edit_log"; + final OpInstanceCache cache = new OpInstanceCache(); + + EditLogFileOutputStream elfos = null; + File file = null; + EditLogFileInputStream elfis = null; + try { + file = new File(TEST_LOG_NAME); + elfos = new EditLogFileOutputStream(file, 0); + elfos.create(); + + elts.addTransactionsToLog(elfos, cache); + elfos.setReadyToFlush(); + elfos.flushAndSync(); + elfos.close(); + elfos = null; + file = new File(TEST_LOG_NAME); + elfis = new EditLogFileInputStream(file); + + // reading through normally will get you an exception + Set validTxIds = elts.getValidTxIds(); + FSEditLogOp op = null; + long prevTxId = 0; + try { + while (true) { + op = elfis.nextOp(); + if (op == null) { + break; + } + LOG.debug("read txid " + op.txid); + if (!validTxIds.contains(op.getTransactionId())) { + fail("read txid " + op.getTransactionId() + + ", which we did not expect to find."); + } + validTxIds.remove(op.getTransactionId()); + prevTxId = op.getTransactionId(); + } + if (elts.getLastValidTxId() != -1) { + fail("failed to throw IoException as expected"); + } + } catch (IOException e) { + if (elts.getLastValidTxId() == -1) { + fail("expected all transactions to be valid, but got exception " + + "on txid " + prevTxId); + } else { + assertEquals(prevTxId, elts.getLastValidTxId()); + } + } + + if (elts.getLastValidTxId() != -1) { + // let's skip over the bad transaction + op = null; + prevTxId = 0; + try { + while (true) { + op = elfis.nextValidOp(); + if (op == null) { + break; + } + prevTxId = op.getTransactionId(); + assertTrue(validTxIds.remove(op.getTransactionId())); + } + } catch (Throwable e) { + fail("caught IOException while trying to skip over bad " + + "transaction. message was " + e.getMessage() + + "\nstack trace\n" + StringUtils.stringifyException(e)); + } + } + // We should have read every valid transaction. + assertTrue(validTxIds.isEmpty()); + } finally { + IOUtils.cleanup(LOG, elfos, elfis); + } + } + + private interface EditLogTestSetup { + /** + * Set up the edit log. + */ + abstract public void addTransactionsToLog(EditLogOutputStream elos, + OpInstanceCache cache) throws IOException; + + /** + * Get the transaction ID right before the transaction which causes the + * normal edit log loading process to bail out-- or -1 if the first + * transaction should be bad. + */ + abstract public long getLastValidTxId(); + + /** + * Get the transaction IDs which should exist and be valid in this + * edit log. + **/ + abstract public Set getValidTxIds(); + } + + private class EltsTestEmptyLog implements EditLogTestSetup { + public void addTransactionsToLog(EditLogOutputStream elos, + OpInstanceCache cache) throws IOException { + // do nothing + } + + public long getLastValidTxId() { + return -1; + } + + public Set getValidTxIds() { + return new HashSet(); + } + } + + /** Test an empty edit log */ + @Test(timeout=180000) + public void testEmptyLog() throws IOException { + runEditLogTest(new EltsTestEmptyLog()); + } + + private class EltsTestGarbageInEditLog implements EditLogTestSetup { + final private long BAD_TXID = 4; + final private long MAX_TXID = 10; + + public void addTransactionsToLog(EditLogOutputStream elos, + OpInstanceCache cache) throws IOException { + for (long txid = 1; txid <= MAX_TXID; txid++) { + if (txid == BAD_TXID) { + byte garbage[] = { 0x1, 0x2, 0x3 }; + elos.writeRaw(garbage, 0, garbage.length); + } + else { + DeleteOp op; + op = DeleteOp.getInstance(cache); + op.setTransactionId(txid); + op.setPath("/foo." + txid); + op.setTimestamp(txid); + elos.write(op); + } + } + } + + public long getLastValidTxId() { + return BAD_TXID - 1; + } + + public Set getValidTxIds() { + return Sets.newHashSet(1L , 2L, 3L, 5L, 6L, 7L, 8L, 9L, 10L); + } + } + + /** Test that we can successfully recover from a situation where there is + * garbage in the middle of the edit log file output stream. */ + @Test(timeout=180000) + public void testSkipEdit() throws IOException { + runEditLogTest(new EltsTestGarbageInEditLog()); + } + + /** Test that we can successfully recover from a situation where the last + * entry in the edit log has been truncated. */ + @Test(timeout=180000) + public void testRecoverTruncatedEditLog() throws IOException { + final String TEST_PATH = "/test/path/dir"; + final int NUM_TEST_MKDIRS = 10; + + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + FileSystem fileSys = null; + StorageDirectory sd = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) + .build(); + cluster.waitActive(); + fileSys = cluster.getFileSystem(); + final FSNamesystem namesystem = cluster.getNamesystem(); + FSImage fsimage = namesystem.getFSImage(); + for (int i = 0; i < NUM_TEST_MKDIRS; i++) { + fileSys.mkdirs(new Path(TEST_PATH)); + } + sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next(); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + + File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile(); + assertTrue("Should exist: " + editFile, editFile.exists()); + + // Corrupt the last edit + long fileLen = editFile.length(); + RandomAccessFile rwf = new RandomAccessFile(editFile, "rw"); + rwf.setLength(fileLen - 1); + rwf.close(); + + // Make sure that we can't start the cluster normally before recovery + cluster = null; + try { + LOG.debug("trying to start normally (this should fail)..."); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) + .format(false).build(); + cluster.waitActive(); + cluster.shutdown(); + fail("expected the truncated edit log to prevent normal startup"); + } catch (IOException e) { + // success + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + + // Perform recovery + cluster = null; + try { + LOG.debug("running recovery..."); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) + .format(false).startupOption(recoverStartOpt).build(); + } catch (IOException e) { + fail("caught IOException while trying to recover. " + + "message was " + e.getMessage() + + "\nstack trace\n" + StringUtils.stringifyException(e)); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + + // Make sure that we can start the cluster normally after recovery + cluster = null; + try { + LOG.debug("starting cluster normally after recovery..."); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) + .format(false).build(); + LOG.debug("testRecoverTruncatedEditLog: successfully recovered the " + + "truncated edit log"); + assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH))); + } catch (IOException e) { + fail("failed to recover. Error message: " + e.getMessage()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java index 596df8d76b1..216fb54b002 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java @@ -143,9 +143,9 @@ public void testEditLog() throws IOException { File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 1 + expectedTransactions - 1); System.out.println("Verifying file: " + editFile); - FSEditLogLoader loader = new FSEditLogLoader(namesystem); + FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0); long numEdits = loader.loadFSEdits( - new EditLogFileInputStream(editFile), 1); + new EditLogFileInputStream(editFile), 1, null); assertEquals("Verification for " + editFile, expectedTransactions, numEdits); } } finally {