HDFS-2709. Appropriately handle error conditions in EditLogTailer. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1228390 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-01-06 20:44:05 +00:00
parent d680080da0
commit 9a07ba8945
17 changed files with 722 additions and 366 deletions

View File

@ -85,3 +85,5 @@ HDFS-2716. Configuration needs to allow different dfs.http.addresses for each HA
HDFS-2720. Fix MiniDFSCluster HA support to work properly on Windows. (Uma Maheswara Rao G via todd)
HDFS-2291. Allow the StandbyNode to make checkpoints in an HA setup. (todd)
HDFS-2709. Appropriately handle error conditions in EditLogTailer (atm via todd)

View File

@ -217,7 +217,7 @@ private synchronized void applyEdits(long firstTxId, int numTxns, byte[] data)
int logVersion = storage.getLayoutVersion();
backupInputStream.setBytes(data, logVersion);
int numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream,
long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream,
true, lastAppliedTxId + 1);
if (numLoaded != numTxns) {
throw new IOException("Batch of txns starting at txnid " +
@ -310,7 +310,7 @@ private boolean tryConvergeJournalSpool() throws IOException {
+ " txns from in-progress stream " + stream);
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
int numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
lastAppliedTxId += numLoaded;
assert numLoaded == remainingTxns :
"expected to load " + remainingTxns + " but loaded " +

View File

@ -68,7 +68,8 @@ class EditLogFileInputStream extends EditLogInputStream {
* header
*/
EditLogFileInputStream(File name, long firstTxId, long lastTxId,
boolean isInProgress) throws LogHeaderCorruptException, IOException {
boolean isInProgress)
throws LogHeaderCorruptException, IOException {
file = name;
fStream = new FileInputStream(name);
@ -88,6 +89,24 @@ class EditLogFileInputStream extends EditLogInputStream {
this.isInProgress = isInProgress;
}
/**
* Skip over a number of transactions. Subsequent calls to
* {@link EditLogFileInputStream#readOp()} will begin after these skipped
* transactions. If more transactions are requested to be skipped than remain
* in the edit log, all edit log ops in the log will be skipped and subsequent
* calls to {@link EditLogInputStream#readOp} will return null.
*
* @param transactionsToSkip number of transactions to skip over.
* @throws IOException if there's an error while reading an operation
*/
public void skipTransactions(long transactionsToSkip) throws IOException {
assert firstTxId != HdfsConstants.INVALID_TXID &&
lastTxId != HdfsConstants.INVALID_TXID;
for (long i = 0; i < transactionsToSkip; i++) {
reader.readOp();
}
}
@Override
public long getFirstTxId() throws IOException {
return firstTxId;
@ -179,14 +198,13 @@ static int readLogVersion(DataInputStream in)
throw new LogHeaderCorruptException(
"Reached EOF when reading log header");
}
if (logVersion < HdfsConstants.LAYOUT_VERSION) { // future version
if (logVersion < HdfsConstants.LAYOUT_VERSION || // future version
logVersion > Storage.LAST_UPGRADABLE_LAYOUT_VERSION) { // unsupported
throw new LogHeaderCorruptException(
"Unexpected version of the file system log file: "
+ logVersion + ". Current version = "
+ HdfsConstants.LAYOUT_VERSION + ".");
}
assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
"Unsupported version " + logVersion;
return logVersion;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Thrown when there's a failure to read an edit log op from disk when loading
* edits.
*/
@InterfaceAudience.Private
public class EditLogInputException extends IOException {
private static final long serialVersionUID = 1L;
private final long numEditsLoaded;
public EditLogInputException(String message, Throwable cause,
long numEditsLoaded) {
super(message, cause);
this.numEditsLoaded = numEditsLoaded;
}
public long getNumEditsLoaded() {
return numEditsLoaded;
}
}

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Joiner;
@ -76,52 +77,41 @@ public FSEditLogLoader(FSNamesystem fsNamesys) {
* This is where we apply edits that we've been writing to disk all
* along.
*/
int loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
throws IOException {
long startTime = now();
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
throws IOException {
long numEdits = 0;
int logVersion = edits.getVersion();
fsNamesys.writeLock();
try {
int numEdits = loadFSEdits(edits, true, expectedStartingTxId);
long startTime = now();
numEdits = loadEditRecords(logVersion, edits, false,
expectedStartingTxId);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
return numEdits;
} finally {
fsNamesys.writeUnlock();
}
}
private int loadFSEdits(EditLogInputStream edits, boolean closeOnExit,
long expectedStartingTxId)
throws IOException {
int numEdits = 0;
int logVersion = edits.getVersion();
try {
numEdits = loadEditRecords(logVersion, edits, false,
expectedStartingTxId);
} finally {
fsNamesys.setBlockTotal();
// Delay the notification of genstamp updates until after
// setBlockTotal() above. Otherwise, we will mark blocks
// as "safe" before they've been incorporated in the expected
// totalBlocks and threshold for SafeMode -- triggering an
// assertion failure and/or exiting safemode too early!
fsNamesys.notifyGenStampUpdate(maxGenStamp);
if(closeOnExit) {
edits.close();
}
edits.close();
fsNamesys.writeUnlock();
}
return numEdits;
}
@SuppressWarnings("deprecation")
int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId)
throws IOException {
throws IOException, EditLogInputException {
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
long numEdits = 0;
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@ -136,9 +126,19 @@ int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
long txId = expectedStartingTxId - 1;
try {
FSEditLogOp op;
while ((op = in.readOp()) != null) {
recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
while (true) {
FSEditLogOp op;
try {
if ((op = in.readOp()) == null) {
break;
}
} catch (IOException ioe) {
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets);
FSImage.LOG.error(errorMessage);
throw new EditLogInputException(errorMessage,
ioe, numEdits);
}
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
in.getPosition();
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
long thisTxId = op.txid;
@ -149,279 +149,291 @@ int loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
txId = thisTxId;
}
numEdits++;
incrOpCount(op.opCode, opCounts);
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
// See if the file already exists (persistBlocks call)
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) { // this is OP_ADD on a new file
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (addCloseOp.permissions != null) {
permissions = addCloseOp.permissions;
}
long blockSize = addCloseOp.blockSize;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
// block size.
if (-8 <= logVersion && blockSize == 0) {
if (addCloseOp.blocks.length > 1) {
blockSize = addCloseOp.blocks[0].getNumBytes();
} else {
long first = ((addCloseOp.blocks.length == 1)?
addCloseOp.blocks[0].getNumBytes(): 0);
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
// TODO: We should do away with this add-then-replace dance.
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
replication, addCloseOp.mtime,
addCloseOp.atime, blockSize);
fsNamesys.prepareFileForWrite(addCloseOp.path, node,
addCloseOp.clientName, addCloseOp.clientMachine, null);
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is a call to append() on an already-closed file.
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null);
oldFile = getINodeFile(fsDir, addCloseOp.path);
}
updateBlocks(fsDir, addCloseOp, oldFile);
}
break;
}
case OP_CLOSE: {
AddCloseOp addCloseOp = (AddCloseOp)op;
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) {
throw new IOException("Operation trying to close non-existent file " +
addCloseOp.path);
}
// Update in-memory data structures
updateBlocks(fsDir, addCloseOp, oldFile);
// Now close the file
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
// TODO: we could use removeLease(holder, path) here, but OP_CLOSE
// doesn't seem to serialize the holder... unclear why!
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
break;
}
case OP_SET_REPLICATION: {
SetReplicationOp setReplicationOp = (SetReplicationOp)op;
short replication = fsNamesys.getBlockManager().adjustReplication(
setReplicationOp.replication);
fsDir.unprotectedSetReplication(setReplicationOp.path,
replication, null);
break;
}
case OP_CONCAT_DELETE: {
ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
concatDeleteOp.timestamp);
break;
}
case OP_RENAME_OLD: {
RenameOldOp renameOp = (RenameOldOp)op;
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_DELETE: {
DeleteOp deleteOp = (DeleteOp)op;
fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
break;
}
case OP_MKDIR: {
MkdirOp mkdirOp = (MkdirOp)op;
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (mkdirOp.permissions != null) {
permissions = mkdirOp.permissions;
}
fsDir.unprotectedMkdir(mkdirOp.path, permissions,
mkdirOp.timestamp);
break;
}
case OP_SET_GENSTAMP: {
SetGenstampOp setGenstampOp = (SetGenstampOp)op;
fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
break;
}
case OP_SET_PERMISSIONS: {
SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
fsDir.unprotectedSetPermission(setPermissionsOp.src,
setPermissionsOp.permissions);
break;
}
case OP_SET_OWNER: {
SetOwnerOp setOwnerOp = (SetOwnerOp)op;
fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
setOwnerOp.groupname);
break;
}
case OP_SET_NS_QUOTA: {
SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
fsDir.unprotectedSetQuota(setNSQuotaOp.src,
setNSQuotaOp.nsQuota,
HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_CLEAR_NS_QUOTA: {
ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
HdfsConstants.QUOTA_RESET,
HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_SET_QUOTA:
SetQuotaOp setQuotaOp = (SetQuotaOp)op;
fsDir.unprotectedSetQuota(setQuotaOp.src,
setQuotaOp.nsQuota,
setQuotaOp.dsQuota);
break;
case OP_TIMES: {
TimesOp timesOp = (TimesOp)op;
fsDir.unprotectedSetTimes(timesOp.path,
timesOp.mtime,
timesOp.atime, true);
break;
}
case OP_SYMLINK: {
SymlinkOp symlinkOp = (SymlinkOp)op;
fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
symlinkOp.mtime, symlinkOp.atime,
symlinkOp.permissionStatus);
break;
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_GET_DELEGATION_TOKEN: {
GetDelegationTokenOp getDelegationTokenOp
= (GetDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.addPersistedDelegationToken(getDelegationTokenOp.token,
getDelegationTokenOp.expiryTime);
break;
}
case OP_RENEW_DELEGATION_TOKEN: {
RenewDelegationTokenOp renewDelegationTokenOp
= (RenewDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedTokenRenewal(renewDelegationTokenOp.token,
renewDelegationTokenOp.expiryTime);
break;
}
case OP_CANCEL_DELEGATION_TOKEN: {
CancelDelegationTokenOp cancelDelegationTokenOp
= (CancelDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedTokenCancellation(
cancelDelegationTokenOp.token);
break;
}
case OP_UPDATE_MASTER_KEY: {
UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedMasterKey(updateMasterKeyOp.key);
break;
}
case OP_REASSIGN_LEASE: {
ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
Lease lease = fsNamesys.leaseManager.getLease(
reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) fsDir.getFileINode(
reassignLeaseOp.path);
fsNamesys.reassignLeaseInternal(lease,
reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break;
}
case OP_START_LOG_SEGMENT:
case OP_END_LOG_SEGMENT: {
// no data in here currently.
break;
}
case OP_DATANODE_ADD:
case OP_DATANODE_REMOVE:
break;
default:
throw new IOException("Invalid operation read " + op.opCode);
try {
applyEditLogOp(op, fsDir, logVersion);
} catch (Throwable t) {
// Catch Throwable because in the case of a truly corrupt edits log, any
// sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets);
FSImage.LOG.error(errorMessage);
throw new IOException(errorMessage, t);
}
numEdits++;
}
} catch (IOException ex) {
check203UpgradeFailure(logVersion, ex);
} finally {
if(closeOnExit)
in.close();
}
} catch (Throwable t) {
// Catch Throwable because in the case of a truly corrupt edits log, any
// sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
StringBuilder sb = new StringBuilder();
sb.append("Error replaying edit log at offset " + in.getPosition());
if (recentOpcodeOffsets[0] != -1) {
Arrays.sort(recentOpcodeOffsets);
sb.append("\nRecent opcode offsets:");
for (long offset : recentOpcodeOffsets) {
if (offset != -1) {
sb.append(' ').append(offset);
}
}
}
String errorMessage = sb.toString();
FSImage.LOG.error(errorMessage);
throw new IOException(errorMessage, t);
} finally {
fsDir.writeUnlock();
fsNamesys.writeUnlock();
}
if (FSImage.LOG.isDebugEnabled()) {
dumpOpCounts(opCounts);
if (FSImage.LOG.isDebugEnabled()) {
dumpOpCounts(opCounts);
}
}
return numEdits;
}
@SuppressWarnings("deprecation")
private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
int logVersion) throws IOException {
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
// See if the file already exists (persistBlocks call)
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) { // this is OP_ADD on a new file
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (addCloseOp.permissions != null) {
permissions = addCloseOp.permissions;
}
long blockSize = addCloseOp.blockSize;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
// block size.
if (-8 <= logVersion && blockSize == 0) {
if (addCloseOp.blocks.length > 1) {
blockSize = addCloseOp.blocks[0].getNumBytes();
} else {
long first = ((addCloseOp.blocks.length == 1)?
addCloseOp.blocks[0].getNumBytes(): 0);
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
// TODO: We should do away with this add-then-replace dance.
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
replication, addCloseOp.mtime,
addCloseOp.atime, blockSize);
fsNamesys.prepareFileForWrite(addCloseOp.path, node,
addCloseOp.clientName, addCloseOp.clientMachine, null);
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is a call to append() on an already-closed file.
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null);
oldFile = getINodeFile(fsDir, addCloseOp.path);
}
updateBlocks(fsDir, addCloseOp, oldFile);
}
break;
}
case OP_CLOSE: {
AddCloseOp addCloseOp = (AddCloseOp)op;
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) {
throw new IOException("Operation trying to close non-existent file " +
addCloseOp.path);
}
// Update in-memory data structures
updateBlocks(fsDir, addCloseOp, oldFile);
// Now close the file
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
// TODO: we could use removeLease(holder, path) here, but OP_CLOSE
// doesn't seem to serialize the holder... unclear why!
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
break;
}
case OP_SET_REPLICATION: {
SetReplicationOp setReplicationOp = (SetReplicationOp)op;
short replication = fsNamesys.getBlockManager().adjustReplication(
setReplicationOp.replication);
fsDir.unprotectedSetReplication(setReplicationOp.path,
replication, null);
break;
}
case OP_CONCAT_DELETE: {
ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs,
concatDeleteOp.timestamp);
break;
}
case OP_RENAME_OLD: {
RenameOldOp renameOp = (RenameOldOp)op;
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp);
fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_DELETE: {
DeleteOp deleteOp = (DeleteOp)op;
fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
break;
}
case OP_MKDIR: {
MkdirOp mkdirOp = (MkdirOp)op;
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (mkdirOp.permissions != null) {
permissions = mkdirOp.permissions;
}
fsDir.unprotectedMkdir(mkdirOp.path, permissions,
mkdirOp.timestamp);
break;
}
case OP_SET_GENSTAMP: {
SetGenstampOp setGenstampOp = (SetGenstampOp)op;
fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
break;
}
case OP_SET_PERMISSIONS: {
SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
fsDir.unprotectedSetPermission(setPermissionsOp.src,
setPermissionsOp.permissions);
break;
}
case OP_SET_OWNER: {
SetOwnerOp setOwnerOp = (SetOwnerOp)op;
fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
setOwnerOp.groupname);
break;
}
case OP_SET_NS_QUOTA: {
SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
fsDir.unprotectedSetQuota(setNSQuotaOp.src,
setNSQuotaOp.nsQuota,
HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_CLEAR_NS_QUOTA: {
ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
HdfsConstants.QUOTA_RESET,
HdfsConstants.QUOTA_DONT_SET);
break;
}
case OP_SET_QUOTA:
SetQuotaOp setQuotaOp = (SetQuotaOp)op;
fsDir.unprotectedSetQuota(setQuotaOp.src,
setQuotaOp.nsQuota,
setQuotaOp.dsQuota);
break;
case OP_TIMES: {
TimesOp timesOp = (TimesOp)op;
fsDir.unprotectedSetTimes(timesOp.path,
timesOp.mtime,
timesOp.atime, true);
break;
}
case OP_SYMLINK: {
SymlinkOp symlinkOp = (SymlinkOp)op;
fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
symlinkOp.mtime, symlinkOp.atime,
symlinkOp.permissionStatus);
break;
}
case OP_RENAME: {
RenameOp renameOp = (RenameOp)op;
HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
renameOp.timestamp, renameOp.options);
fsNamesys.unprotectedChangeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_GET_DELEGATION_TOKEN: {
GetDelegationTokenOp getDelegationTokenOp
= (GetDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.addPersistedDelegationToken(getDelegationTokenOp.token,
getDelegationTokenOp.expiryTime);
break;
}
case OP_RENEW_DELEGATION_TOKEN: {
RenewDelegationTokenOp renewDelegationTokenOp
= (RenewDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedTokenRenewal(renewDelegationTokenOp.token,
renewDelegationTokenOp.expiryTime);
break;
}
case OP_CANCEL_DELEGATION_TOKEN: {
CancelDelegationTokenOp cancelDelegationTokenOp
= (CancelDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedTokenCancellation(
cancelDelegationTokenOp.token);
break;
}
case OP_UPDATE_MASTER_KEY: {
UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
fsNamesys.getDelegationTokenSecretManager()
.updatePersistedMasterKey(updateMasterKeyOp.key);
break;
}
case OP_REASSIGN_LEASE: {
ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
Lease lease = fsNamesys.leaseManager.getLease(
reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction) fsDir.getFileINode(
reassignLeaseOp.path);
fsNamesys.reassignLeaseInternal(lease,
reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break;
}
case OP_START_LOG_SEGMENT:
case OP_END_LOG_SEGMENT: {
// no data in here currently.
break;
}
case OP_DATANODE_ADD:
case OP_DATANODE_REMOVE:
break;
default:
throw new IOException("Invalid operation read " + op.opCode);
}
}
private static String formatEditLogReplayError(EditLogInputStream in,
long recentOpcodeOffsets[]) {
StringBuilder sb = new StringBuilder();
sb.append("Error replaying edit log at offset " + in.getPosition());
if (recentOpcodeOffsets[0] != -1) {
Arrays.sort(recentOpcodeOffsets);
sb.append("\nRecent opcode offsets:");
for (long offset : recentOpcodeOffsets) {
if (offset != -1) {
sb.append(' ').append(offset);
}
}
}
return sb.toString();
}
private static INodeFile getINodeFile(FSDirectory fsDir, String path)
throws IOException {
INode inode = fsDir.getINode(path);

View File

@ -665,11 +665,11 @@ private boolean needsResaveBasedOnStaleCheckpoint(
* @return the number of transactions loaded
*/
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException {
FSNamesystem target) throws IOException, EditLogInputException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
long startingTxId = getLastAppliedTxId() + 1;
int numLoaded = 0;
long numLoaded = 0;
try {
FSEditLogLoader loader = new FSEditLogLoader(target);
@ -677,20 +677,28 @@ public long loadEdits(Iterable<EditLogInputStream> editStreams,
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
lastAppliedTxId = startingTxId + thisNumLoaded - 1;
startingTxId += thisNumLoaded;
numLoaded += thisNumLoaded;
long thisNumLoaded = 0;
try {
thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
} catch (EditLogInputException elie) {
thisNumLoaded = elie.getNumEditsLoaded();
throw elie;
} finally {
// Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error.
lastAppliedTxId = startingTxId + thisNumLoaded - 1;
startingTxId += thisNumLoaded;
numLoaded += thisNumLoaded;
}
}
} finally {
// TODO(HA): Should this happen when called by the tailer?
FSEditLog.closeAllStreams(editStreams);
// update the counts
// TODO(HA): this may be very slow -- we probably want to
// update them as we go for HA.
target.dir.updateCountForINodeWithQuota();
}
// update the counts
// TODO(HA): this may be very slow -- we probably want to
// update them as we go for HA.
target.dir.updateCountForINodeWithQuota();
return numLoaded;
}

View File

@ -147,6 +147,7 @@ List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if ((firstTxId > elf.getFirstTxId()) &&
(firstTxId <= elf.getLastTxId())) {
// Note that this behavior is different from getLogFiles below.
throw new IllegalStateException("Asked for firstTxId " + firstTxId
+ " which is in the middle of file " + elf.file);
}
@ -194,20 +195,21 @@ static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
synchronized public EditLogInputStream getInputStream(long fromTxId)
throws IOException {
for (EditLogFile elf : getLogFiles(fromTxId)) {
if (elf.getFirstTxId() == fromTxId) {
if (elf.containsTxId(fromTxId)) {
if (elf.isInProgress()) {
elf.validateLog();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Returning edit stream reading from " + elf);
}
return new EditLogFileInputStream(elf.getFile(),
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
elfis.skipTransactions(fromTxId - elf.getFirstTxId());
return elfis;
}
}
throw new IOException("Cannot find editlog file with " + fromTxId
+ " as first first txid");
throw new IOException("Cannot find editlog file containing " + fromTxId);
}
@Override
@ -223,7 +225,7 @@ public long getNumberOfTransactions(long fromTxId)
LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+ fromTxId + " - " + (elf.getFirstTxId() - 1));
break;
} else if (fromTxId == elf.getFirstTxId()) {
} else if (elf.containsTxId(fromTxId)) {
if (elf.isInProgress()) {
elf.validateLog();
}
@ -231,22 +233,12 @@ public long getNumberOfTransactions(long fromTxId)
if (elf.isCorrupt()) {
break;
}
numTxns += elf.getLastTxId() + 1 - fromTxId;
fromTxId = elf.getLastTxId() + 1;
numTxns += fromTxId - elf.getFirstTxId();
if (elf.isInProgress()) {
break;
}
} else if (elf.getFirstTxId() < fromTxId &&
elf.getLastTxId() >= fromTxId) {
// Middle of a log segment - this should never happen
// since getLogFiles checks for it. But we should be
// paranoid about this case since it might result in
// overlapping txid ranges, etc, if we had a bug.
IOException ioe = new IOException("txid " + fromTxId +
" falls in the middle of file " + elf);
LOG.error("Broken invariant in edit log file management", ioe);
throw ioe;
}
}
@ -302,12 +294,8 @@ private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
List<EditLogFile> logFiles = Lists.newArrayList();
for (EditLogFile elf : allLogFiles) {
if (fromTxId > elf.getFirstTxId()
&& fromTxId <= elf.getLastTxId()) {
throw new IllegalStateException("Asked for fromTxId " + fromTxId
+ " which is in middle of file " + elf.file);
}
if (fromTxId <= elf.getFirstTxId()) {
if (fromTxId <= elf.getFirstTxId() ||
elf.containsTxId(fromTxId)) {
logFiles.add(elf);
}
}
@ -389,6 +377,10 @@ long getFirstTxId() {
long getLastTxId() {
return lastTxId;
}
boolean containsTxId(long txId) {
return firstTxId <= txId && txId <= lastTxId;
}
/**
* Count the number of valid transactions in a log.

View File

@ -25,6 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
@ -46,9 +47,9 @@ public class EditLogTailer {
private final EditLogTailerThread tailerThread;
private final FSNamesystem namesystem;
private final FSEditLog editLog;
private FSEditLog editLog;
private volatile Throwable lastError = null;
private volatile Runtime runtime = Runtime.getRuntime();
public EditLogTailer(FSNamesystem namesystem) {
this.tailerThread = new EditLogTailerThread();
@ -82,8 +83,18 @@ public void interrupt() {
}
@VisibleForTesting
public Throwable getLastError() {
return lastError;
FSEditLog getEditLog() {
return editLog;
}
@VisibleForTesting
void setEditLog(FSEditLog editLog) {
this.editLog = editLog;
}
@VisibleForTesting
synchronized void setRuntime(Runtime runtime) {
this.runtime = runtime;
}
public void catchupDuringFailover() throws IOException {
@ -111,13 +122,31 @@ private void doTailEdits() throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams = editLog
.selectInputStreams(lastTxnId + 1, 0, false);
Collection<EditLogInputStream> streams;
try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0, false);
} catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress
// edits file hasn't been started yet.
LOG.warn("Edits tailer failed to find any streams. Will try again " +
"later.", ioe);
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}
long editsLoaded = image.loadEdits(streams, namesystem);
// Once we have streams to load, errors encountered are legitimate cause
// for concern, so we don't catch them here. Simple errors reading from
// disk are ignored.
long editsLoaded = 0;
try {
editsLoaded = image.loadEdits(streams, namesystem);
} catch (EditLogInputException elie) {
LOG.warn("Error while reading edits from disk. Will try again.", elie);
editsLoaded = elie.getNumEditsLoaded();
}
if (LOG.isDebugEnabled()) {
LOG.debug("editsLoaded: " + editsLoaded);
}
@ -150,22 +179,14 @@ private void setSleepTime(long sleepTime) {
public void run() {
while (shouldRun) {
try {
try {
doTailEdits();
} catch (IOException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException)e.getCause();
} else if (e.getCause() instanceof Error) {
throw (Error)e.getCause();
}
// Will try again
LOG.info("Got error, will try again.", e);
}
doTailEdits();
} catch (InterruptedException ie) {
// interrupter should have already set shouldRun to false
continue;
} catch (Throwable t) {
// TODO(HA): What should we do in this case? Shutdown the standby NN?
LOG.error("Edit log tailer received throwable", t);
lastError = t;
LOG.error("Error encountered while tailing edits. Shutting down " +
"standby NN.", t);
runtime.exit(1);
}
try {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;
@ -156,4 +157,12 @@ public static FSImage spyOnFsImage(NameNode nn1) {
nn1.getNamesystem().dir.fsImage = spy;
return spy;
}
public static String getMkdirOpPath(FSEditLogOp op) {
if (op.opCode == FSEditLogOpCodes.OP_MKDIR) {
return ((MkdirOp) op).path;
} else {
return null;
}
}
}

View File

@ -147,7 +147,7 @@ public void run() {
public void testPreTxIdEditLogNoEdits() throws Exception {
FSNamesystem namesys = Mockito.mock(FSNamesystem.class);
namesys.dir = Mockito.mock(FSDirectory.class);
int numEdits = testLoad(
long numEdits = testLoad(
StringUtils.hexStringToByte("ffffffed"), // just version number
namesys);
assertEquals(0, numEdits);
@ -166,7 +166,7 @@ public void testPreTxidEditLogWithEdits() throws Exception {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
int numEdits = testLoad(HADOOP20_SOME_EDITS, namesystem);
long numEdits = testLoad(HADOOP20_SOME_EDITS, namesystem);
assertEquals(3, numEdits);
// Sanity check the edit
HdfsFileStatus fileInfo = namesystem.getFileInfo("/myfile", false);
@ -177,7 +177,7 @@ public void testPreTxidEditLogWithEdits() throws Exception {
}
}
private int testLoad(byte[] data, FSNamesystem namesys) throws IOException {
private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
FSEditLogLoader loader = new FSEditLogLoader(namesys);
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
}
@ -315,7 +315,7 @@ private void testEditLog(int initialSize) throws IOException {
assertTrue("Expect " + editFile + " exists", editFile.exists());
System.out.println("Verifying file: " + editFile);
int numEdits = loader.loadFSEdits(
long numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile), 3);
int numLeases = namesystem.leaseManager.countLease();
System.out.println("Number of outstanding leases " + numLeases);

View File

@ -237,7 +237,7 @@ private long verifyEditLogs(FSNamesystem namesystem, FSImage fsimage,
System.out.println("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
int numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
startTxId);
System.out.println("Number of edits: " + numEditsThisLog);

View File

@ -30,6 +30,7 @@
import java.io.IOException;
import org.junit.Test;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
@ -194,12 +195,12 @@ public void testReadFromStream() throws IOException {
}
/**
* Try to make a request with a start transaction id which doesn't
* match the start ID of some log segment.
* This should fail as edit logs must currently be treated as indevisable
* units.
* Make requests with starting transaction ids which don't match the beginning
* txid of some log segments.
*
* This should succeed.
*/
@Test(expected=IllegalStateException.class)
@Test
public void testAskForTransactionsMidfile() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
@ -207,7 +208,12 @@ public void testAskForTransactionsMidfile() throws IOException {
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
jm.getNumberOfTransactions(2);
// 10 rolls, so 11 rolled files, 110 txids total.
final int TOTAL_TXIDS = 10 * 11;
for (int txid = 1; txid <= TOTAL_TXIDS; txid++) {
assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid));
}
}
/**
@ -303,6 +309,25 @@ public void testGetRemoteEditLog() throws IOException {
"", getLogsAsString(fjm, 9999));
}
/**
* Make sure that we starting reading the correct op when we request a stream
* with a txid in the middle of an edit log file.
*/
@Test
public void testReadFromMiddleOfEditLog() throws CorruptionException,
IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
EditLogInputStream elis = jm.getInputStream(5);
FSEditLogOp op = elis.readOp();
assertEquals("read unexpected op", op.getTransactionId(), 5);
}
private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));

View File

@ -141,7 +141,7 @@ public void testEditLog() throws IOException {
System.out.println("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
int numEdits = loader.loadFSEdits(
long numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile), 1);
assertEquals("Verification for " + editFile, expectedTransactions, numEdits);
}

View File

@ -101,9 +101,21 @@ public void testTailer() throws IOException, InterruptedException,
private static String getDirPath(int suffix) {
return DIR_PREFIX + suffix;
}
/**
* Trigger an edits log roll on the active and then wait for the standby to
* catch up to all the edits done by the active. This method will check
* repeatedly for up to NN_LAG_TIMEOUT milliseconds, and then fail throwing
* {@link CouldNotCatchUpException}.
*
* @param active active NN
* @param standby standby NN which should catch up to active
* @throws IOException if an error occurs rolling the edit log
* @throws CouldNotCatchUpException if the standby doesn't catch up to the
* active in NN_LAG_TIMEOUT milliseconds
*/
static void waitForStandbyToCatchUp(NameNode active,
NameNode standby) throws InterruptedException, IOException {
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
.getLastWrittenTxId();
@ -119,8 +131,15 @@ static void waitForStandbyToCatchUp(NameNode active,
}
Thread.sleep(SLEEP_TIME);
}
Assert.fail("Standby did not catch up to txid " + activeTxId +
" (currently at " +
throw new CouldNotCatchUpException("Standby did not catch up to txid " +
activeTxId + " (currently at " +
standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
}
public static class CouldNotCatchUpException extends IOException {
public CouldNotCatchUpException(String message) {
super(message);
}
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.TestEditLogTailer.CouldNotCatchUpException;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestFailureToReadEdits {
private static final String TEST_DIR1 = "/test1";
private static final String TEST_DIR2 = "/test2";
private static final String TEST_DIR3 = "/test3";
/**
* Test that the standby NN won't double-replay earlier edits if it encounters
* a failure to read a later edit.
*/
@Test
public void testFailuretoReadEdits() throws IOException,
ServiceFailedException, URISyntaxException, InterruptedException {
Configuration conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Runtime mockRuntime = mock(Runtime.class);
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
FileSystem fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
fs.mkdirs(new Path(TEST_DIR1));
TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2);
// If these two ops are applied twice, the first op will throw an
// exception the second time its replayed.
fs.setOwner(new Path(TEST_DIR1), "foo", "bar");
fs.delete(new Path(TEST_DIR1), true);
// This op should get applied just fine.
fs.mkdirs(new Path(TEST_DIR2));
// This is the op the mocking will cause to fail to be read.
fs.mkdirs(new Path(TEST_DIR3));
FSEditLog spyEditLog = spy(nn2.getNamesystem().getEditLogTailer()
.getEditLog());
LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
doAnswer(answer).when(spyEditLog).selectInputStreams(
anyLong(), anyLong(), anyBoolean());
nn2.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
try {
TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2);
fail("Standby fully caught up, but should not have been able to");
} catch (CouldNotCatchUpException e) {
verify(mockRuntime, times(0)).exit(anyInt());
}
// Null because it was deleted.
assertNull(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR1, false));
// Should have been successfully created.
assertTrue(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR2, false).isDir());
// Null because it hasn't been created yet.
assertNull(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR3, false));
// Now let the standby read ALL the edits.
answer.setThrowExceptionOnRead(false);
TestEditLogTailer.waitForStandbyToCatchUp(nn1, nn2);
// Null because it was deleted.
assertNull(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR1, false));
// Should have been successfully created.
assertTrue(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR2, false).isDir());
// Should now have been successfully created.
assertTrue(NameNodeAdapter.getFileInfo(nn2,
TEST_DIR3, false).isDir());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static class LimitedEditLogAnswer
implements Answer<Collection<EditLogInputStream>> {
private boolean throwExceptionOnRead = true;
@SuppressWarnings("unchecked")
@Override
public Collection<EditLogInputStream> answer(InvocationOnMock invocation)
throws Throwable {
Collection<EditLogInputStream> streams = (Collection<EditLogInputStream>)
invocation.callRealMethod();
if (!throwExceptionOnRead) {
return streams;
} else {
Collection<EditLogInputStream> ret = new LinkedList<EditLogInputStream>();
for (EditLogInputStream stream : streams) {
EditLogInputStream spyStream = spy(stream);
doAnswer(new Answer<FSEditLogOp>() {
@Override
public FSEditLogOp answer(InvocationOnMock invocation)
throws Throwable {
FSEditLogOp op = (FSEditLogOp) invocation.callRealMethod();
if (throwExceptionOnRead &&
TEST_DIR3.equals(NameNodeAdapter.getMkdirOpPath(op))) {
throw new IOException("failed to read op creating " + TEST_DIR3);
} else {
return op;
}
}
}).when(spyStream).readOp();
ret.add(spyStream);
}
return ret;
}
}
public void setThrowExceptionOnRead(boolean throwExceptionOnRead) {
this.throwExceptionOnRead = throwExceptionOnRead;
}
}
}

View File

@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
@ -48,6 +52,7 @@ public class TestHASafeMode {
private NameNode nn1;
private FileSystem fs;
private MiniDFSCluster cluster;
private Runtime mockRuntime = mock(Runtime.class);
@Before
public void setupCluster() throws Exception {
@ -64,6 +69,8 @@ public void setupCluster() throws Exception {
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
nn0.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
cluster.transitionToActive(0);
}
@ -71,7 +78,7 @@ public void setupCluster() throws Exception {
@After
public void shutdownCluster() throws IOException {
if (cluster != null) {
assertNull(nn1.getNamesystem().getEditLogTailer().getLastError());
verify(mockRuntime, times(0)).exit(anyInt());
cluster.shutdown();
}
}

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@ -75,12 +79,15 @@ public void testStandbyIsHot() throws Exception {
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
Runtime mockRuntime = mock(Runtime.class);
try {
cluster.waitActive();
cluster.transitionToActive(0);
NameNode nn1 = cluster.getNameNode(0);
NameNode nn2 = cluster.getNameNode(1);
nn2.getNamesystem().getEditLogTailer().setRuntime(mockRuntime);
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
nn2.getNamesystem().getEditLogTailer().interrupt();
@ -121,6 +128,7 @@ public void testStandbyIsHot() throws Exception {
waitForBlockLocations(cluster, nn2, TEST_FILE, 3);
} finally {
verify(mockRuntime, times(0)).exit(anyInt());
cluster.shutdown();
}
}