diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 795e65876f2..e48312daf47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -85,3 +85,5 @@ HDFS-2716. Configuration needs to allow different dfs.http.addresses for each HA HDFS-2720. Fix MiniDFSCluster HA support to work properly on Windows. (Uma Maheswara Rao G via todd) HDFS-2291. Allow the StandbyNode to make checkpoints in an HA setup. (todd) + +HDFS-2709. Appropriately handle error conditions in EditLogTailer (atm via todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index 4e28d83a528..ece013fa55a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -217,7 +217,7 @@ public class BackupImage extends FSImage { int logVersion = storage.getLayoutVersion(); backupInputStream.setBytes(data, logVersion); - int numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, + long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, true, lastAppliedTxId + 1); if (numLoaded != numTxns) { throw new IOException("Batch of txns starting at txnid " + @@ -310,7 +310,7 @@ public class BackupImage extends FSImage { + " txns from in-progress stream " + stream); FSEditLogLoader loader = new FSEditLogLoader(namesystem); - int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1); + long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1); lastAppliedTxId += numLoaded; assert numLoaded == remainingTxns : "expected to load " + remainingTxns + " but loaded " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 3857db236c6..a27fa9490e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -68,7 +68,8 @@ class EditLogFileInputStream extends EditLogInputStream { * header */ EditLogFileInputStream(File name, long firstTxId, long lastTxId, - boolean isInProgress) throws LogHeaderCorruptException, IOException { + boolean isInProgress) + throws LogHeaderCorruptException, IOException { file = name; fStream = new FileInputStream(name); @@ -88,6 +89,24 @@ class EditLogFileInputStream extends EditLogInputStream { this.isInProgress = isInProgress; } + /** + * Skip over a number of transactions. Subsequent calls to + * {@link EditLogFileInputStream#readOp()} will begin after these skipped + * transactions. If more transactions are requested to be skipped than remain + * in the edit log, all edit log ops in the log will be skipped and subsequent + * calls to {@link EditLogInputStream#readOp} will return null. + * + * @param transactionsToSkip number of transactions to skip over. + * @throws IOException if there's an error while reading an operation + */ + public void skipTransactions(long transactionsToSkip) throws IOException { + assert firstTxId != HdfsConstants.INVALID_TXID && + lastTxId != HdfsConstants.INVALID_TXID; + for (long i = 0; i < transactionsToSkip; i++) { + reader.readOp(); + } + } + @Override public long getFirstTxId() throws IOException { return firstTxId; @@ -179,14 +198,13 @@ class EditLogFileInputStream extends EditLogInputStream { throw new LogHeaderCorruptException( "Reached EOF when reading log header"); } - if (logVersion < HdfsConstants.LAYOUT_VERSION) { // future version + if (logVersion < HdfsConstants.LAYOUT_VERSION || // future version + logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported throw new LogHeaderCorruptException( "Unexpected version of the file system log file: " + logVersion + ". Current version = " + HdfsConstants.LAYOUT_VERSION + "."); } - assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION : - "Unsupported version " + logVersion; return logVersion; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputException.java new file mode 100644 index 00000000000..56edf8cb22c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown when there's a failure to read an edit log op from disk when loading + * edits. + */ +@InterfaceAudience.Private +public class EditLogInputException extends IOException { + + private static final long serialVersionUID = 1L; + + private final long numEditsLoaded; + + public EditLogInputException(String message, Throwable cause, + long numEditsLoaded) { + super(message, cause); + this.numEditsLoaded = numEditsLoaded; + } + + public long getNumEditsLoaded() { + return numEditsLoaded; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 6e9ea8e2875..e1394e630bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.util.Holder; +import org.apache.hadoop.io.IOUtils; import com.google.common.base.Joiner; @@ -76,52 +77,41 @@ public class FSEditLogLoader { * This is where we apply edits that we've been writing to disk all * along. */ - int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) - throws IOException { - long startTime = now(); + long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) + throws IOException { + long numEdits = 0; + int logVersion = edits.getVersion(); + fsNamesys.writeLock(); try { - int numEdits = loadFSEdits(edits, true, expectedStartingTxId); + long startTime = now(); + numEdits = loadEditRecords(logVersion, edits, false, + expectedStartingTxId); FSImage.LOG.info("Edits file " + edits.getName() + " of size " + edits.length() + " edits # " + numEdits + " loaded in " + (now()-startTime)/1000 + " seconds."); - return numEdits; - } finally { - fsNamesys.writeUnlock(); - } - } - - private int loadFSEdits(EditLogInputStream edits, boolean closeOnExit, - long expectedStartingTxId) - throws IOException { - int numEdits = 0; - int logVersion = edits.getVersion(); - - try { - numEdits = loadEditRecords(logVersion, edits, false, - expectedStartingTxId); } finally { fsNamesys.setBlockTotal(); + // Delay the notification of genstamp updates until after // setBlockTotal() above. Otherwise, we will mark blocks // as "safe" before they've been incorporated in the expected // totalBlocks and threshold for SafeMode -- triggering an // assertion failure and/or exiting safemode too early! fsNamesys.notifyGenStampUpdate(maxGenStamp); - if(closeOnExit) { - edits.close(); - } + + edits.close(); + fsNamesys.writeUnlock(); } return numEdits; } - @SuppressWarnings("deprecation") - int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, + long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, long expectedStartingTxId) - throws IOException { + throws IOException, EditLogInputException { FSDirectory fsDir = fsNamesys.dir; - int numEdits = 0; + long numEdits = 0; EnumMap> opCounts = new EnumMap>(FSEditLogOpCodes.class); @@ -136,9 +126,19 @@ public class FSEditLogLoader { long txId = expectedStartingTxId - 1; try { - FSEditLogOp op; - while ((op = in.readOp()) != null) { - recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] = + while (true) { + FSEditLogOp op; + try { + if ((op = in.readOp()) == null) { + break; + } + } catch (IOException ioe) { + String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets); + FSImage.LOG.error(errorMessage); + throw new EditLogInputException(errorMessage, + ioe, numEdits); + } + recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = in.getPosition(); if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { long thisTxId = op.txid; @@ -149,279 +149,291 @@ public class FSEditLogLoader { txId = thisTxId; } - numEdits++; incrOpCount(op.opCode, opCounts); - switch (op.opCode) { - case OP_ADD: { - AddCloseOp addCloseOp = (AddCloseOp)op; - - // See if the file already exists (persistBlocks call) - INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); - if (oldFile == null) { // this is OP_ADD on a new file - // versions > 0 support per file replication - // get name and replication - final short replication = fsNamesys.getBlockManager( - ).adjustReplication(addCloseOp.replication); - PermissionStatus permissions = fsNamesys.getUpgradePermission(); - if (addCloseOp.permissions != null) { - permissions = addCloseOp.permissions; - } - long blockSize = addCloseOp.blockSize; - - if (FSNamesystem.LOG.isDebugEnabled()) { - FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path + - " numblocks : " + addCloseOp.blocks.length + - " clientHolder " + addCloseOp.clientName + - " clientMachine " + addCloseOp.clientMachine); - } - - // Older versions of HDFS does not store the block size in inode. - // If the file has more than one block, use the size of the - // first block as the blocksize. Otherwise use the default - // block size. - if (-8 <= logVersion && blockSize == 0) { - if (addCloseOp.blocks.length > 1) { - blockSize = addCloseOp.blocks[0].getNumBytes(); - } else { - long first = ((addCloseOp.blocks.length == 1)? - addCloseOp.blocks[0].getNumBytes(): 0); - blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first); - } - } - - // TODO: We should do away with this add-then-replace dance. - - // add to the file tree - INodeFile node = (INodeFile)fsDir.unprotectedAddFile( - addCloseOp.path, permissions, - replication, addCloseOp.mtime, - addCloseOp.atime, blockSize); - - fsNamesys.prepareFileForWrite(addCloseOp.path, node, - addCloseOp.clientName, addCloseOp.clientMachine, null); - } else { // This is OP_ADD on an existing file - if (!oldFile.isUnderConstruction()) { - // This is a call to append() on an already-closed file. - fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile, - addCloseOp.clientName, addCloseOp.clientMachine, null); - oldFile = getINodeFile(fsDir, addCloseOp.path); - } - - updateBlocks(fsDir, addCloseOp, oldFile); - } - break; - } - case OP_CLOSE: { - AddCloseOp addCloseOp = (AddCloseOp)op; - - INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); - if (oldFile == null) { - throw new IOException("Operation trying to close non-existent file " + - addCloseOp.path); - } - - // Update in-memory data structures - updateBlocks(fsDir, addCloseOp, oldFile); - - // Now close the file - INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile; - // TODO: we could use removeLease(holder, path) here, but OP_CLOSE - // doesn't seem to serialize the holder... unclear why! - fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); - INodeFile newFile = ucFile.convertToInodeFile(); - fsDir.replaceNode(addCloseOp.path, ucFile, newFile); - break; - } - case OP_SET_REPLICATION: { - SetReplicationOp setReplicationOp = (SetReplicationOp)op; - short replication = fsNamesys.getBlockManager().adjustReplication( - setReplicationOp.replication); - fsDir.unprotectedSetReplication(setReplicationOp.path, - replication, null); - break; - } - case OP_CONCAT_DELETE: { - ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op; - fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs, - concatDeleteOp.timestamp); - break; - } - case OP_RENAME_OLD: { - RenameOldOp renameOp = (RenameOldOp)op; - HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false); - fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, - renameOp.timestamp); - fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo); - break; - } - case OP_DELETE: { - DeleteOp deleteOp = (DeleteOp)op; - fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp); - break; - } - case OP_MKDIR: { - MkdirOp mkdirOp = (MkdirOp)op; - PermissionStatus permissions = fsNamesys.getUpgradePermission(); - if (mkdirOp.permissions != null) { - permissions = mkdirOp.permissions; - } - - fsDir.unprotectedMkdir(mkdirOp.path, permissions, - mkdirOp.timestamp); - break; - } - case OP_SET_GENSTAMP: { - SetGenstampOp setGenstampOp = (SetGenstampOp)op; - fsNamesys.setGenerationStamp(setGenstampOp.genStamp); - break; - } - case OP_SET_PERMISSIONS: { - SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op; - fsDir.unprotectedSetPermission(setPermissionsOp.src, - setPermissionsOp.permissions); - break; - } - case OP_SET_OWNER: { - SetOwnerOp setOwnerOp = (SetOwnerOp)op; - fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username, - setOwnerOp.groupname); - break; - } - case OP_SET_NS_QUOTA: { - SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op; - fsDir.unprotectedSetQuota(setNSQuotaOp.src, - setNSQuotaOp.nsQuota, - HdfsConstants.QUOTA_DONT_SET); - break; - } - case OP_CLEAR_NS_QUOTA: { - ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op; - fsDir.unprotectedSetQuota(clearNSQuotaOp.src, - HdfsConstants.QUOTA_RESET, - HdfsConstants.QUOTA_DONT_SET); - break; - } - - case OP_SET_QUOTA: - SetQuotaOp setQuotaOp = (SetQuotaOp)op; - fsDir.unprotectedSetQuota(setQuotaOp.src, - setQuotaOp.nsQuota, - setQuotaOp.dsQuota); - break; - - case OP_TIMES: { - TimesOp timesOp = (TimesOp)op; - - fsDir.unprotectedSetTimes(timesOp.path, - timesOp.mtime, - timesOp.atime, true); - break; - } - case OP_SYMLINK: { - SymlinkOp symlinkOp = (SymlinkOp)op; - fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value, - symlinkOp.mtime, symlinkOp.atime, - symlinkOp.permissionStatus); - break; - } - case OP_RENAME: { - RenameOp renameOp = (RenameOp)op; - - HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false); - fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, - renameOp.timestamp, renameOp.options); - fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo); - break; - } - case OP_GET_DELEGATION_TOKEN: { - GetDelegationTokenOp getDelegationTokenOp - = (GetDelegationTokenOp)op; - - fsNamesys.getDelegationTokenSecretManager() - .addPersistedDelegationToken(getDelegationTokenOp.token, - getDelegationTokenOp.expiryTime); - break; - } - case OP_RENEW_DELEGATION_TOKEN: { - RenewDelegationTokenOp renewDelegationTokenOp - = (RenewDelegationTokenOp)op; - fsNamesys.getDelegationTokenSecretManager() - .updatePersistedTokenRenewal(renewDelegationTokenOp.token, - renewDelegationTokenOp.expiryTime); - break; - } - case OP_CANCEL_DELEGATION_TOKEN: { - CancelDelegationTokenOp cancelDelegationTokenOp - = (CancelDelegationTokenOp)op; - fsNamesys.getDelegationTokenSecretManager() - .updatePersistedTokenCancellation( - cancelDelegationTokenOp.token); - break; - } - case OP_UPDATE_MASTER_KEY: { - UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op; - fsNamesys.getDelegationTokenSecretManager() - .updatePersistedMasterKey(updateMasterKeyOp.key); - break; - } - case OP_REASSIGN_LEASE: { - ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op; - - Lease lease = fsNamesys.leaseManager.getLease( - reassignLeaseOp.leaseHolder); - INodeFileUnderConstruction pendingFile = - (INodeFileUnderConstruction) fsDir.getFileINode( - reassignLeaseOp.path); - fsNamesys.reassignLeaseInternal(lease, - reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile); - break; - } - case OP_START_LOG_SEGMENT: - case OP_END_LOG_SEGMENT: { - // no data in here currently. - break; - } - case OP_DATANODE_ADD: - case OP_DATANODE_REMOVE: - break; - default: - throw new IOException("Invalid operation read " + op.opCode); + try { + applyEditLogOp(op, fsDir, logVersion); + } catch (Throwable t) { + // Catch Throwable because in the case of a truly corrupt edits log, any + // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) + String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets); + FSImage.LOG.error(errorMessage); + throw new IOException(errorMessage, t); } + numEdits++; } - } catch (IOException ex) { check203UpgradeFailure(logVersion, ex); } finally { if(closeOnExit) in.close(); } - } catch (Throwable t) { - // Catch Throwable because in the case of a truly corrupt edits log, any - // sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) - StringBuilder sb = new StringBuilder(); - sb.append("Error replaying edit log at offset " + in.getPosition()); - if (recentOpcodeOffsets[0] != -1) { - Arrays.sort(recentOpcodeOffsets); - sb.append("\nRecent opcode offsets:"); - for (long offset : recentOpcodeOffsets) { - if (offset != -1) { - sb.append(' ').append(offset); - } - } - } - String errorMessage = sb.toString(); - FSImage.LOG.error(errorMessage); - throw new IOException(errorMessage, t); } finally { fsDir.writeUnlock(); fsNamesys.writeUnlock(); - } - if (FSImage.LOG.isDebugEnabled()) { - dumpOpCounts(opCounts); + if (FSImage.LOG.isDebugEnabled()) { + dumpOpCounts(opCounts); + } } return numEdits; } + @SuppressWarnings("deprecation") + private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, + int logVersion) throws IOException { + switch (op.opCode) { + case OP_ADD: { + AddCloseOp addCloseOp = (AddCloseOp)op; + + // See if the file already exists (persistBlocks call) + INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); + if (oldFile == null) { // this is OP_ADD on a new file + // versions > 0 support per file replication + // get name and replication + final short replication = fsNamesys.getBlockManager( + ).adjustReplication(addCloseOp.replication); + PermissionStatus permissions = fsNamesys.getUpgradePermission(); + if (addCloseOp.permissions != null) { + permissions = addCloseOp.permissions; + } + long blockSize = addCloseOp.blockSize; + + if (FSNamesystem.LOG.isDebugEnabled()) { + FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path + + " numblocks : " + addCloseOp.blocks.length + + " clientHolder " + addCloseOp.clientName + + " clientMachine " + addCloseOp.clientMachine); + } + + // Older versions of HDFS does not store the block size in inode. + // If the file has more than one block, use the size of the + // first block as the blocksize. Otherwise use the default + // block size. + if (-8 <= logVersion && blockSize == 0) { + if (addCloseOp.blocks.length > 1) { + blockSize = addCloseOp.blocks[0].getNumBytes(); + } else { + long first = ((addCloseOp.blocks.length == 1)? + addCloseOp.blocks[0].getNumBytes(): 0); + blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first); + } + } + + // TODO: We should do away with this add-then-replace dance. + + // add to the file tree + INodeFile node = (INodeFile)fsDir.unprotectedAddFile( + addCloseOp.path, permissions, + replication, addCloseOp.mtime, + addCloseOp.atime, blockSize); + + fsNamesys.prepareFileForWrite(addCloseOp.path, node, + addCloseOp.clientName, addCloseOp.clientMachine, null); + } else { // This is OP_ADD on an existing file + if (!oldFile.isUnderConstruction()) { + // This is a call to append() on an already-closed file. + fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile, + addCloseOp.clientName, addCloseOp.clientMachine, null); + oldFile = getINodeFile(fsDir, addCloseOp.path); + } + + updateBlocks(fsDir, addCloseOp, oldFile); + } + break; + } + case OP_CLOSE: { + AddCloseOp addCloseOp = (AddCloseOp)op; + + INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path); + if (oldFile == null) { + throw new IOException("Operation trying to close non-existent file " + + addCloseOp.path); + } + + // Update in-memory data structures + updateBlocks(fsDir, addCloseOp, oldFile); + + // Now close the file + INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile; + // TODO: we could use removeLease(holder, path) here, but OP_CLOSE + // doesn't seem to serialize the holder... unclear why! + fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path); + INodeFile newFile = ucFile.convertToInodeFile(); + fsDir.replaceNode(addCloseOp.path, ucFile, newFile); + break; + } + case OP_SET_REPLICATION: { + SetReplicationOp setReplicationOp = (SetReplicationOp)op; + short replication = fsNamesys.getBlockManager().adjustReplication( + setReplicationOp.replication); + fsDir.unprotectedSetReplication(setReplicationOp.path, + replication, null); + break; + } + case OP_CONCAT_DELETE: { + ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op; + fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs, + concatDeleteOp.timestamp); + break; + } + case OP_RENAME_OLD: { + RenameOldOp renameOp = (RenameOldOp)op; + HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false); + fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, + renameOp.timestamp); + fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo); + break; + } + case OP_DELETE: { + DeleteOp deleteOp = (DeleteOp)op; + fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp); + break; + } + case OP_MKDIR: { + MkdirOp mkdirOp = (MkdirOp)op; + PermissionStatus permissions = fsNamesys.getUpgradePermission(); + if (mkdirOp.permissions != null) { + permissions = mkdirOp.permissions; + } + + fsDir.unprotectedMkdir(mkdirOp.path, permissions, + mkdirOp.timestamp); + break; + } + case OP_SET_GENSTAMP: { + SetGenstampOp setGenstampOp = (SetGenstampOp)op; + fsNamesys.setGenerationStamp(setGenstampOp.genStamp); + break; + } + case OP_SET_PERMISSIONS: { + SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op; + fsDir.unprotectedSetPermission(setPermissionsOp.src, + setPermissionsOp.permissions); + break; + } + case OP_SET_OWNER: { + SetOwnerOp setOwnerOp = (SetOwnerOp)op; + fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username, + setOwnerOp.groupname); + break; + } + case OP_SET_NS_QUOTA: { + SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op; + fsDir.unprotectedSetQuota(setNSQuotaOp.src, + setNSQuotaOp.nsQuota, + HdfsConstants.QUOTA_DONT_SET); + break; + } + case OP_CLEAR_NS_QUOTA: { + ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op; + fsDir.unprotectedSetQuota(clearNSQuotaOp.src, + HdfsConstants.QUOTA_RESET, + HdfsConstants.QUOTA_DONT_SET); + break; + } + + case OP_SET_QUOTA: + SetQuotaOp setQuotaOp = (SetQuotaOp)op; + fsDir.unprotectedSetQuota(setQuotaOp.src, + setQuotaOp.nsQuota, + setQuotaOp.dsQuota); + break; + + case OP_TIMES: { + TimesOp timesOp = (TimesOp)op; + + fsDir.unprotectedSetTimes(timesOp.path, + timesOp.mtime, + timesOp.atime, true); + break; + } + case OP_SYMLINK: { + SymlinkOp symlinkOp = (SymlinkOp)op; + fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value, + symlinkOp.mtime, symlinkOp.atime, + symlinkOp.permissionStatus); + break; + } + case OP_RENAME: { + RenameOp renameOp = (RenameOp)op; + + HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false); + fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst, + renameOp.timestamp, renameOp.options); + fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo); + break; + } + case OP_GET_DELEGATION_TOKEN: { + GetDelegationTokenOp getDelegationTokenOp + = (GetDelegationTokenOp)op; + + fsNamesys.getDelegationTokenSecretManager() + .addPersistedDelegationToken(getDelegationTokenOp.token, + getDelegationTokenOp.expiryTime); + break; + } + case OP_RENEW_DELEGATION_TOKEN: { + RenewDelegationTokenOp renewDelegationTokenOp + = (RenewDelegationTokenOp)op; + fsNamesys.getDelegationTokenSecretManager() + .updatePersistedTokenRenewal(renewDelegationTokenOp.token, + renewDelegationTokenOp.expiryTime); + break; + } + case OP_CANCEL_DELEGATION_TOKEN: { + CancelDelegationTokenOp cancelDelegationTokenOp + = (CancelDelegationTokenOp)op; + fsNamesys.getDelegationTokenSecretManager() + .updatePersistedTokenCancellation( + cancelDelegationTokenOp.token); + break; + } + case OP_UPDATE_MASTER_KEY: { + UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op; + fsNamesys.getDelegationTokenSecretManager() + .updatePersistedMasterKey(updateMasterKeyOp.key); + break; + } + case OP_REASSIGN_LEASE: { + ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op; + + Lease lease = fsNamesys.leaseManager.getLease( + reassignLeaseOp.leaseHolder); + INodeFileUnderConstruction pendingFile = + (INodeFileUnderConstruction) fsDir.getFileINode( + reassignLeaseOp.path); + fsNamesys.reassignLeaseInternal(lease, + reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile); + break; + } + case OP_START_LOG_SEGMENT: + case OP_END_LOG_SEGMENT: { + // no data in here currently. + break; + } + case OP_DATANODE_ADD: + case OP_DATANODE_REMOVE: + break; + default: + throw new IOException("Invalid operation read " + op.opCode); + } + } + + private static String formatEditLogReplayError(EditLogInputStream in, + long recentOpcodeOffsets[]) { + StringBuilder sb = new StringBuilder(); + sb.append("Error replaying edit log at offset " + in.getPosition()); + if (recentOpcodeOffsets[0] != -1) { + Arrays.sort(recentOpcodeOffsets); + sb.append("\nRecent opcode offsets:"); + for (long offset : recentOpcodeOffsets) { + if (offset != -1) { + sb.append(' ').append(offset); + } + } + } + return sb.toString(); + } + private static INodeFile getINodeFile(FSDirectory fsDir, String path) throws IOException { INode inode = fsDir.getINode(path); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index b92a37eae8c..8eb4dede34c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -665,11 +665,11 @@ public class FSImage implements Closeable { * @return the number of transactions loaded */ public long loadEdits(Iterable editStreams, - FSNamesystem target) throws IOException { + FSNamesystem target) throws IOException, EditLogInputException { LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); long startingTxId = getLastAppliedTxId() + 1; - int numLoaded = 0; + long numLoaded = 0; try { FSEditLogLoader loader = new FSEditLogLoader(target); @@ -677,20 +677,28 @@ public class FSImage implements Closeable { // Load latest edits for (EditLogInputStream editIn : editStreams) { LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId); - int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); - lastAppliedTxId = startingTxId + thisNumLoaded - 1; - startingTxId += thisNumLoaded; - numLoaded += thisNumLoaded; + long thisNumLoaded = 0; + try { + thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); + } catch (EditLogInputException elie) { + thisNumLoaded = elie.getNumEditsLoaded(); + throw elie; + } finally { + // Update lastAppliedTxId even in case of error, since some ops may + // have been successfully applied before the error. + lastAppliedTxId = startingTxId + thisNumLoaded - 1; + startingTxId += thisNumLoaded; + numLoaded += thisNumLoaded; + } } } finally { - // TODO(HA): Should this happen when called by the tailer? FSEditLog.closeAllStreams(editStreams); + // update the counts + // TODO(HA): this may be very slow -- we probably want to + // update them as we go for HA. + target.dir.updateCountForINodeWithQuota(); } - - // update the counts - // TODO(HA): this may be very slow -- we probably want to - // update them as we go for HA. - target.dir.updateCountForINodeWithQuota(); + return numLoaded; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 182d5f763d3..06b8eff3fa9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -147,6 +147,7 @@ class FileJournalManager implements JournalManager { ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); } else if ((firstTxId > elf.getFirstTxId()) && (firstTxId <= elf.getLastTxId())) { + // Note that this behavior is different from getLogFiles below. throw new IllegalStateException("Asked for firstTxId " + firstTxId + " which is in the middle of file " + elf.file); } @@ -194,20 +195,21 @@ class FileJournalManager implements JournalManager { synchronized public EditLogInputStream getInputStream(long fromTxId) throws IOException { for (EditLogFile elf : getLogFiles(fromTxId)) { - if (elf.getFirstTxId() == fromTxId) { + if (elf.containsTxId(fromTxId)) { if (elf.isInProgress()) { elf.validateLog(); } if (LOG.isTraceEnabled()) { LOG.trace("Returning edit stream reading from " + elf); } - return new EditLogFileInputStream(elf.getFile(), + EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); + elfis.skipTransactions(fromTxId - elf.getFirstTxId()); + return elfis; } } - throw new IOException("Cannot find editlog file with " + fromTxId - + " as first first txid"); + throw new IOException("Cannot find editlog file containing " + fromTxId); } @Override @@ -223,7 +225,7 @@ class FileJournalManager implements JournalManager { LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is " + fromTxId + " - " + (elf.getFirstTxId() - 1)); break; - } else if (fromTxId == elf.getFirstTxId()) { + } else if (elf.containsTxId(fromTxId)) { if (elf.isInProgress()) { elf.validateLog(); } @@ -231,22 +233,12 @@ class FileJournalManager implements JournalManager { if (elf.isCorrupt()) { break; } + numTxns += elf.getLastTxId() + 1 - fromTxId; fromTxId = elf.getLastTxId() + 1; - numTxns += fromTxId - elf.getFirstTxId(); if (elf.isInProgress()) { break; } - } else if (elf.getFirstTxId() < fromTxId && - elf.getLastTxId() >= fromTxId) { - // Middle of a log segment - this should never happen - // since getLogFiles checks for it. But we should be - // paranoid about this case since it might result in - // overlapping txid ranges, etc, if we had a bug. - IOException ioe = new IOException("txid " + fromTxId + - " falls in the middle of file " + elf); - LOG.error("Broken invariant in edit log file management", ioe); - throw ioe; } } @@ -302,12 +294,8 @@ class FileJournalManager implements JournalManager { List logFiles = Lists.newArrayList(); for (EditLogFile elf : allLogFiles) { - if (fromTxId > elf.getFirstTxId() - && fromTxId <= elf.getLastTxId()) { - throw new IllegalStateException("Asked for fromTxId " + fromTxId - + " which is in middle of file " + elf.file); - } - if (fromTxId <= elf.getFirstTxId()) { + if (fromTxId <= elf.getFirstTxId() || + elf.containsTxId(fromTxId)) { logFiles.add(elf); } } @@ -389,6 +377,10 @@ class FileJournalManager implements JournalManager { long getLastTxId() { return lastTxId; } + + boolean containsTxId(long txId) { + return firstTxId <= txId && txId <= lastTxId; + } /** * Count the number of valid transactions in a log. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 53e96a73a32..097332b1404 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputException; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSImage; @@ -46,9 +47,9 @@ public class EditLogTailer { private final EditLogTailerThread tailerThread; private final FSNamesystem namesystem; - private final FSEditLog editLog; + private FSEditLog editLog; - private volatile Throwable lastError = null; + private volatile Runtime runtime = Runtime.getRuntime(); public EditLogTailer(FSNamesystem namesystem) { this.tailerThread = new EditLogTailerThread(); @@ -82,8 +83,18 @@ public class EditLogTailer { } @VisibleForTesting - public Throwable getLastError() { - return lastError; + FSEditLog getEditLog() { + return editLog; + } + + @VisibleForTesting + void setEditLog(FSEditLog editLog) { + this.editLog = editLog; + } + + @VisibleForTesting + synchronized void setRuntime(Runtime runtime) { + this.runtime = runtime; } public void catchupDuringFailover() throws IOException { @@ -111,13 +122,31 @@ public class EditLogTailer { if (LOG.isDebugEnabled()) { LOG.debug("lastTxnId: " + lastTxnId); } - Collection streams = editLog - .selectInputStreams(lastTxnId + 1, 0, false); + Collection streams; + try { + streams = editLog.selectInputStreams(lastTxnId + 1, 0, false); + } catch (IOException ioe) { + // This is acceptable. If we try to tail edits in the middle of an edits + // log roll, i.e. the last one has been finalized but the new inprogress + // edits file hasn't been started yet. + LOG.warn("Edits tailer failed to find any streams. Will try again " + + "later.", ioe); + return; + } if (LOG.isDebugEnabled()) { LOG.debug("edit streams to load from: " + streams.size()); } - long editsLoaded = image.loadEdits(streams, namesystem); + // Once we have streams to load, errors encountered are legitimate cause + // for concern, so we don't catch them here. Simple errors reading from + // disk are ignored. + long editsLoaded = 0; + try { + editsLoaded = image.loadEdits(streams, namesystem); + } catch (EditLogInputException elie) { + LOG.warn("Error while reading edits from disk. Will try again.", elie); + editsLoaded = elie.getNumEditsLoaded(); + } if (LOG.isDebugEnabled()) { LOG.debug("editsLoaded: " + editsLoaded); } @@ -150,22 +179,14 @@ public class EditLogTailer { public void run() { while (shouldRun) { try { - try { - doTailEdits(); - } catch (IOException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException)e.getCause(); - } else if (e.getCause() instanceof Error) { - throw (Error)e.getCause(); - } - - // Will try again - LOG.info("Got error, will try again.", e); - } + doTailEdits(); + } catch (InterruptedException ie) { + // interrupter should have already set shouldRun to false + continue; } catch (Throwable t) { - // TODO(HA): What should we do in this case? Shutdown the standby NN? - LOG.error("Edit log tailer received throwable", t); - lastError = t; + LOG.error("Error encountered while tailing edits. Shutting down " + + "standby NN.", t); + runtime.exit(1); } try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 8223e7c60c1..800cb542c60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.ipc.Server; @@ -156,4 +157,12 @@ public class NameNodeAdapter { nn1.getNamesystem().dir.fsImage = spy; return spy; } + + public static String getMkdirOpPath(FSEditLogOp op) { + if (op.opCode == FSEditLogOpCodes.OP_MKDIR) { + return ((MkdirOp) op).path; + } else { + return null; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index fe756b2c992..f36b5d20516 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -147,7 +147,7 @@ public class TestEditLog extends TestCase { public void testPreTxIdEditLogNoEdits() throws Exception { FSNamesystem namesys = Mockito.mock(FSNamesystem.class); namesys.dir = Mockito.mock(FSDirectory.class); - int numEdits = testLoad( + long numEdits = testLoad( StringUtils.hexStringToByte("ffffffed"), // just version number namesys); assertEquals(0, numEdits); @@ -166,7 +166,7 @@ public class TestEditLog extends TestCase { cluster.waitActive(); final FSNamesystem namesystem = cluster.getNamesystem(); - int numEdits = testLoad(HADOOP20_SOME_EDITS, namesystem); + long numEdits = testLoad(HADOOP20_SOME_EDITS, namesystem); assertEquals(3, numEdits); // Sanity check the edit HdfsFileStatus fileInfo = namesystem.getFileInfo("/myfile", false); @@ -177,7 +177,7 @@ public class TestEditLog extends TestCase { } } - private int testLoad(byte[] data, FSNamesystem namesys) throws IOException { + private long testLoad(byte[] data, FSNamesystem namesys) throws IOException { FSEditLogLoader loader = new FSEditLogLoader(namesys); return loader.loadFSEdits(new EditLogByteInputStream(data), 1); } @@ -315,7 +315,7 @@ public class TestEditLog extends TestCase { assertTrue("Expect " + editFile + " exists", editFile.exists()); System.out.println("Verifying file: " + editFile); - int numEdits = loader.loadFSEdits( + long numEdits = loader.loadFSEdits( new EditLogFileInputStream(editFile), 3); int numLeases = namesystem.leaseManager.countLease(); System.out.println("Number of outstanding leases " + numLeases); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index a855f8ddc65..da66b45da2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -237,7 +237,7 @@ public class TestEditLogRace { System.out.println("Verifying file: " + editFile); FSEditLogLoader loader = new FSEditLogLoader(namesystem); - int numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), + long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), startTxId); System.out.println("Number of edits: " + numEditsThisLog); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java index 0321dff4e16..275c3fa38ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileJournalManager.java @@ -30,6 +30,7 @@ import java.io.FilenameFilter; import java.io.IOException; import org.junit.Test; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; @@ -194,12 +195,12 @@ public class TestFileJournalManager { } /** - * Try to make a request with a start transaction id which doesn't - * match the start ID of some log segment. - * This should fail as edit logs must currently be treated as indevisable - * units. + * Make requests with starting transaction ids which don't match the beginning + * txid of some log segments. + * + * This should succeed. */ - @Test(expected=IllegalStateException.class) + @Test public void testAskForTransactionsMidfile() throws IOException { File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2"); NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), @@ -207,7 +208,12 @@ public class TestFileJournalManager { StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); FileJournalManager jm = new FileJournalManager(sd); - jm.getNumberOfTransactions(2); + + // 10 rolls, so 11 rolled files, 110 txids total. + final int TOTAL_TXIDS = 10 * 11; + for (int txid = 1; txid <= TOTAL_TXIDS; txid++) { + assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid)); + } } /** @@ -303,6 +309,25 @@ public class TestFileJournalManager { "", getLogsAsString(fjm, 9999)); } + /** + * Make sure that we starting reading the correct op when we request a stream + * with a txid in the middle of an edit log file. + */ + @Test + public void testReadFromMiddleOfEditLog() throws CorruptionException, + IOException { + File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2"); + NNStorage storage = setupEdits(Collections.singletonList(f.toURI()), + 10); + StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next(); + + FileJournalManager jm = new FileJournalManager(sd); + + EditLogInputStream elis = jm.getInputStream(5); + FSEditLogOp op = elis.readOp(); + assertEquals("read unexpected op", op.getTransactionId(), 5); + } + private static String getLogsAsString( FileJournalManager fjm, long firstTxId) throws IOException { return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java index d4fd72d3b06..c0012be5baa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSecurityTokenEditLog.java @@ -141,7 +141,7 @@ public class TestSecurityTokenEditLog extends TestCase { System.out.println("Verifying file: " + editFile); FSEditLogLoader loader = new FSEditLogLoader(namesystem); - int numEdits = loader.loadFSEdits( + long numEdits = loader.loadFSEdits( new EditLogFileInputStream(editFile), 1); assertEquals("Verification for " + editFile, expectedTransactions, numEdits); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 876a632bc5f..5f7170dd0d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -101,9 +101,21 @@ public class TestEditLogTailer { private static String getDirPath(int suffix) { return DIR_PREFIX + suffix; } - + + /** + * Trigger an edits log roll on the active and then wait for the standby to + * catch up to all the edits done by the active. This method will check + * repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing + * {@link CouldNotCatchUpException}. + * + * @param active active NN + * @param standby standby NN which should catch up to active + * @throws IOException if an error occurs rolling the edit log + * @throws CouldNotCatchUpException if the standby doesn't catch up to the + * active in NN_LAG_TIMEOUT milliseconds + */ static void waitForStandbyToCatchUp(NameNode active, - NameNode standby) throws InterruptedException, IOException { + NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { long activeTxId = active.getNamesystem().getFSImage().getEditLog() .getLastWrittenTxId(); @@ -119,8 +131,15 @@ public class TestEditLogTailer { } Thread.sleep(SLEEP_TIME); } - Assert.fail("Standby did not catch up to txid " + activeTxId + - " (currently at " + + throw new CouldNotCatchUpException("Standby did not catch up to txid " + + activeTxId + " (currently at " + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")"); } + + public static class CouldNotCatchUpException extends IOException { + + public CouldNotCatchUpException(String message) { + super(message); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java new file mode 100644 index 00000000000..b1105517548 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestFailureToReadEdits.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.LinkedList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.hdfs.HAUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.TestDFSClientFailover; +import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLog; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.ha.TestEditLogTailer.CouldNotCatchUpException; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestFailureToReadEdits { + private static final String TEST_DIR1 = "/test1"; + private static final String TEST_DIR2 = "/test2"; + private static final String TEST_DIR3 = "/test3"; + + /** + * Test that the standby NN won't double-replay earlier edits if it encounters + * a failure to read a later edit. + */ + @Test + public void testFailuretoReadEdits() throws IOException, + ServiceFailedException, URISyntaxException, InterruptedException { + Configuration conf = new Configuration(); + HAUtil.setAllowStandbyReads(conf, true); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + Runtime mockRuntime = mock(Runtime.class); + + NameNode nn1 = cluster.getNameNode(0); + NameNode nn2 = cluster.getNameNode(1); + nn2.getNamesystem().getEditLogTailer().setSleepTime(250); + nn2.getNamesystem().getEditLogTailer().interrupt(); + nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); + + FileSystem fs = TestDFSClientFailover.configureFailoverFs(cluster, conf); + fs.mkdirs(new Path(TEST_DIR1)); + TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2); + + // If these two ops are applied twice, the first op will throw an + // exception the second time its replayed. + fs.setOwner(new Path(TEST_DIR1), "foo", "bar"); + fs.delete(new Path(TEST_DIR1), true); + + // This op should get applied just fine. + fs.mkdirs(new Path(TEST_DIR2)); + + // This is the op the mocking will cause to fail to be read. + fs.mkdirs(new Path(TEST_DIR3)); + + FSEditLog spyEditLog = spy(nn2.getNamesystem().getEditLogTailer() + .getEditLog()); + LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); + doAnswer(answer).when(spyEditLog).selectInputStreams( + anyLong(), anyLong(), anyBoolean()); + nn2.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); + + try { + TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2); + fail("Standby fully caught up, but should not have been able to"); + } catch (CouldNotCatchUpException e) { + verify(mockRuntime, times(0)).exit(anyInt()); + } + + // Null because it was deleted. + assertNull(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR1, false)); + // Should have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR2, false).isDir()); + // Null because it hasn't been created yet. + assertNull(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR3, false)); + + // Now let the standby read ALL the edits. + answer.setThrowExceptionOnRead(false); + TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2); + + // Null because it was deleted. + assertNull(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR1, false)); + // Should have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR2, false).isDir()); + // Should now have been successfully created. + assertTrue(NameNodeAdapter.getFileInfo(nn2, + TEST_DIR3, false).isDir()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private static class LimitedEditLogAnswer + implements Answer> { + + private boolean throwExceptionOnRead = true; + + @SuppressWarnings("unchecked") + @Override + public Collection answer(InvocationOnMock invocation) + throws Throwable { + Collection streams = (Collection) + invocation.callRealMethod(); + + if (!throwExceptionOnRead) { + return streams; + } else { + Collection ret = new LinkedList(); + for (EditLogInputStream stream : streams) { + EditLogInputStream spyStream = spy(stream); + doAnswer(new Answer() { + + @Override + public FSEditLogOp answer(InvocationOnMock invocation) + throws Throwable { + FSEditLogOp op = (FSEditLogOp) invocation.callRealMethod(); + if (throwExceptionOnRead && + TEST_DIR3.equals(NameNodeAdapter.getMkdirOpPath(op))) { + throw new IOException("failed to read op creating " + TEST_DIR3); + } else { + return op; + } + } + + }).when(spyStream).readOp(); + ret.add(spyStream); + } + return ret; + } + } + + public void setThrowExceptionOnRead(boolean throwExceptionOnRead) { + this.throwExceptionOnRead = throwExceptionOnRead; + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java index b69d7c6db4c..d168bc8cbce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHASafeMode.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; @@ -48,6 +52,7 @@ public class TestHASafeMode { private NameNode nn1; private FileSystem fs; private MiniDFSCluster cluster; + private Runtime mockRuntime = mock(Runtime.class); @Before public void setupCluster() throws Exception { @@ -64,6 +69,8 @@ public class TestHASafeMode { nn0 = cluster.getNameNode(0); nn1 = cluster.getNameNode(1); fs = TestDFSClientFailover.configureFailoverFs(cluster, conf); + + nn0.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); cluster.transitionToActive(0); } @@ -71,7 +78,7 @@ public class TestHASafeMode { @After public void shutdownCluster() throws IOException { if (cluster != null) { - assertNull(nn1.getNamesystem().getEditLogTailer().getLastError()); + verify(mockRuntime, times(0)).exit(anyInt()); cluster.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java index 298bdffa2c7..a9d09ca7a7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyIsHot.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.*; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.concurrent.TimeoutException; @@ -75,12 +79,15 @@ public class TestStandbyIsHot { .nnTopology(MiniDFSNNTopology.simpleHATopology()) .numDataNodes(3) .build(); + Runtime mockRuntime = mock(Runtime.class); try { cluster.waitActive(); cluster.transitionToActive(0); NameNode nn1 = cluster.getNameNode(0); NameNode nn2 = cluster.getNameNode(1); + + nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime); nn2.getNamesystem().getEditLogTailer().setSleepTime(250); nn2.getNamesystem().getEditLogTailer().interrupt(); @@ -121,6 +128,7 @@ public class TestStandbyIsHot { waitForBlockLocations(cluster, nn2, TEST_FILE, 3); } finally { + verify(mockRuntime, times(0)).exit(anyInt()); cluster.shutdown(); } }