HDFS-2018. Move all journal stream management code into one place. Contributed by Ivan Kelly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165826 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jitendra Nath Pandey 2011-09-06 20:27:38 +00:00
parent e20f135174
commit bdc3720d5b
24 changed files with 1143 additions and 1015 deletions

View File

@ -13,6 +13,9 @@ Trunk (unreleased changes)
HdfsConstants. (Harsh J Chouraria via atm)
HDFS-2197. Refactor RPC call implementations out of NameNode class (todd)
HDFS-2018. Move all journal stream management code into one place.
(Ivan Kelly via jitendra)
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

View File

@ -20,7 +20,9 @@
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.zip.Checksum;
import org.apache.hadoop.classification.InterfaceAudience;
@ -29,11 +31,10 @@
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogLoadPlan;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
* Extension of FSImage for the backup node.
@ -261,11 +262,18 @@ private boolean tryConvergeJournalSpool() throws IOException {
new FSImageTransactionalStorageInspector();
storage.inspectStorageDirs(inspector);
LogLoadPlan logLoadPlan = inspector.createLogLoadPlan(lastAppliedTxId,
target - 1);
logLoadPlan.doRecovery();
loadEdits(logLoadPlan.getEditsFiles());
editLog.recoverUnclosedStreams();
Iterable<EditLogInputStream> editStreamsAll
= editLog.selectInputStreams(lastAppliedTxId, target - 1);
// remove inprogress
List<EditLogInputStream> editStreams = Lists.newArrayList();
for (EditLogInputStream s : editStreamsAll) {
if (s.getFirstTxId() != editLog.getCurSegmentTxId()) {
editStreams.add(s);
}
}
loadEdits(editStreams);
}
// now, need to load the in-progress file
@ -275,7 +283,24 @@ private boolean tryConvergeJournalSpool() throws IOException {
return false; // drop lock and try again to load local logs
}
EditLogInputStream stream = getEditLog().getInProgressFileInputStream();
EditLogInputStream stream = null;
Collection<EditLogInputStream> editStreams
= getEditLog().selectInputStreams(
getEditLog().getCurSegmentTxId(),
getEditLog().getCurSegmentTxId());
for (EditLogInputStream s : editStreams) {
if (s.getFirstTxId() == getEditLog().getCurSegmentTxId()) {
stream = s;
}
break;
}
if (stream == null) {
LOG.warn("Unable to find stream starting with " + editLog.getCurSegmentTxId()
+ ". This indicates that there is an error in synchronization in BackupImage");
return false;
}
try {
long remainingTxns = getEditLog().getLastWrittenTxId() - lastAppliedTxId;
@ -289,7 +314,7 @@ private boolean tryConvergeJournalSpool() throws IOException {
"expected to load " + remainingTxns + " but loaded " +
numLoaded + " from " + stream;
} finally {
IOUtils.closeStream(stream);
FSEditLog.closeAllStreams(editStreams);
}
LOG.info("Successfully synced BackupNode with NameNode at txnid " +

View File

@ -58,12 +58,31 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
}
@Override
public long getNumberOfTransactions(long fromTxnId)
throws IOException, CorruptionException {
// This JournalManager is never used for input. Therefore it cannot
// return any transactions
return 0;
}
@Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
// This JournalManager is never used for input. Therefore it cannot
// return any transactions
throw new IOException("Unsupported operation");
}
@Override
public void recoverUnfinalizedSegments() throws IOException {
}
public boolean matchesRegistration(NamenodeRegistration bnReg) {
return bnReg.getAddress().equals(this.bnReg.getAddress());
}
@Override
public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId) {
return null;
public String toString() {
return "BackupJournalManager";
}
}

View File

@ -275,16 +275,17 @@ static void rollForwardByApplyingLogs(
FSImage dstImage) throws IOException {
NNStorage dstStorage = dstImage.getStorage();
List<File> editsFiles = Lists.newArrayList();
List<EditLogInputStream> editsStreams = Lists.newArrayList();
for (RemoteEditLog log : manifest.getLogs()) {
File f = dstStorage.findFinalizedEditsFile(
log.getStartTxId(), log.getEndTxId());
if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
editsFiles.add(f);
}
editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(),
log.getEndTxId()));
}
}
LOG.info("Checkpointer about to load edits from " +
editsFiles.size() + " file(s).");
dstImage.loadEdits(editsFiles);
editsStreams.size() + " stream(s).");
dstImage.loadEdits(editsStreams);
}
}

View File

@ -21,6 +21,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import com.google.common.base.Preconditions;
/**
@ -122,4 +123,14 @@ void clear() throws IOException {
reader = null;
this.version = 0;
}
@Override
public long getFirstTxId() throws IOException {
return HdfsConstants.INVALID_TXID;
}
@Override
public long getLastTxId() throws IOException {
return HdfsConstants.INVALID_TXID;
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import com.google.common.annotations.VisibleForTesting;
@ -38,12 +39,15 @@
class EditLogFileInputStream extends EditLogInputStream {
private final File file;
private final FileInputStream fStream;
final private long firstTxId;
final private long lastTxId;
private final int logVersion;
private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
/**
* Open an EditLogInputStream for the given file.
* The file is pretransactional, so has no txids
* @param name filename to open
* @throws LogHeaderCorruptException if the header is either missing or
* appears to be corrupt/truncated
@ -52,6 +56,21 @@ class EditLogFileInputStream extends EditLogInputStream {
*/
EditLogFileInputStream(File name)
throws LogHeaderCorruptException, IOException {
this(name, HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID);
}
/**
* Open an EditLogInputStream for the given file.
* @param name filename to open
* @param firstTxId first transaction found in file
* @param lastTxId last transaction id found in file
* @throws LogHeaderCorruptException if the header is either missing or
* appears to be corrupt/truncated
* @throws IOException if an actual IO error occurs while reading the
* header
*/
EditLogFileInputStream(File name, long firstTxId, long lastTxId)
throws LogHeaderCorruptException, IOException {
file = name;
fStream = new FileInputStream(name);
@ -66,6 +85,18 @@ class EditLogFileInputStream extends EditLogInputStream {
}
reader = new FSEditLogOp.Reader(in, logVersion);
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
}
@Override
public long getFirstTxId() throws IOException {
return firstTxId;
}
@Override
public long getLastTxId() throws IOException {
return lastTxId;
}
@Override // JournalStream
@ -117,7 +148,8 @@ static FSEditLogLoader.EditLogValidation validateEditLog(File file) throws IOExc
// If it's missing its header, this is equivalent to no transactions
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
return new FSEditLogLoader.EditLogValidation(0, 0);
return new FSEditLogLoader.EditLogValidation(0, HdfsConstants.INVALID_TXID,
HdfsConstants.INVALID_TXID);
}
try {

View File

@ -28,6 +28,17 @@
* into the #{@link EditLogOutputStream}.
*/
abstract class EditLogInputStream implements JournalStream, Closeable {
/**
* @return the first transaction which will be found in this stream
*/
public abstract long getFirstTxId() throws IOException;
/**
* @return the last transaction which will be found in this stream
*/
public abstract long getLastTxId() throws IOException;
/**
* Close the stream.
* @throws IOException if an error occurred while closing

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -35,11 +36,13 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@ -1068,6 +1071,112 @@ private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals)
}
}
/**
* Find the best editlog input stream to read from txid. In this case
* best means the editlog which has the largest continuous range of
* transactions starting from the transaction id, fromTxId.
*
* If a journal throws an CorruptionException while reading from a txn id,
* it means that it has more transactions, but can't find any from fromTxId.
* If this is the case and no other journal has transactions, we should throw
* an exception as it means more transactions exist, we just can't load them.
*
* @param fromTxId Transaction id to start from.
* @return a edit log input stream with tranactions fromTxId
* or null if no more exist
*/
private EditLogInputStream selectStream(long fromTxId)
throws IOException {
JournalManager bestjm = null;
long bestjmNumTxns = 0;
CorruptionException corruption = null;
for (JournalAndStream jas : journals) {
JournalManager candidate = jas.getManager();
long candidateNumTxns = 0;
try {
candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
} catch (CorruptionException ce) {
corruption = ce;
} catch (IOException ioe) {
LOG.warn("Error reading number of transactions from " + candidate);
continue; // error reading disk, just skip
}
if (candidateNumTxns > bestjmNumTxns) {
bestjm = candidate;
bestjmNumTxns = candidateNumTxns;
}
}
if (bestjm == null) {
/**
* If all candidates either threw a CorruptionException or
* found 0 transactions, then a gap exists.
*/
if (corruption != null) {
throw new IOException("Gap exists in logs from "
+ fromTxId, corruption);
} else {
return null;
}
}
return bestjm.getInputStream(fromTxId);
}
/**
* Run recovery on all journals to recover any unclosed segments
*/
void recoverUnclosedStreams() {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.manager.recoverUnfinalizedSegments();
}
}, "recovering unclosed streams");
}
/**
* Select a list of input streams to load.
* @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction
*/
Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId)
throws IOException {
List<EditLogInputStream> streams = Lists.newArrayList();
boolean gapFound = false;
EditLogInputStream stream = selectStream(fromTxId);
while (stream != null) {
fromTxId = stream.getLastTxId() + 1;
streams.add(stream);
try {
stream = selectStream(fromTxId);
} catch (IOException ioe) {
gapFound = true;
break;
}
}
if (fromTxId <= toAtLeastTxId || gapFound) {
closeAllStreams(streams);
throw new IOException("No non-corrupt logs for txid "
+ fromTxId);
}
return streams;
}
/**
* Close all the streams in a collection
* @param streams The list of streams to close
*/
static void closeAllStreams(Iterable<EditLogInputStream> streams) {
for (EditLogInputStream s : streams) {
IOUtils.closeStream(s);
}
}
/**
* Container for a JournalManager paired with its currently
* active stream.
@ -1137,30 +1246,5 @@ void setCurrentStreamForTests(EditLogOutputStream stream) {
JournalManager getManager() {
return manager;
}
private EditLogInputStream getInProgressInputStream() throws IOException {
return manager.getInProgressInputStream(segmentStartsAtTxId);
}
}
/**
* @return an EditLogInputStream that reads from the same log that
* the edit log is currently writing. This is used from the BackupNode
* during edits synchronization.
* @throws IOException if no valid logs are available.
*/
synchronized EditLogInputStream getInProgressFileInputStream()
throws IOException {
for (JournalAndStream jas : journals) {
if (!jas.isActive()) continue;
try {
EditLogInputStream in = jas.getInProgressInputStream();
if (in != null) return in;
} catch (IOException ioe) {
LOG.warn("Unable to get the in-progress input stream from " + jas,
ioe);
}
}
throw new IOException("No in-progress stream provided edits");
}
}

View File

@ -446,24 +446,6 @@ private void check203UpgradeFailure(int logVersion, IOException ex)
}
}
static EditLogValidation validateEditLog(File file) throws IOException {
EditLogFileInputStream in;
try {
in = new EditLogFileInputStream(file);
} catch (LogHeaderCorruptException corrupt) {
// If it's missing its header, this is equivalent to no transactions
FSImage.LOG.warn("Log at " + file + " has no valid header",
corrupt);
return new EditLogValidation(0, 0);
}
try {
return validateEditLog(in);
} finally {
IOUtils.closeStream(in);
}
}
/**
* Return the number of valid transactions in the stream. If the stream is
* truncated during the header, returns a value indicating that there are
@ -473,12 +455,26 @@ static EditLogValidation validateEditLog(File file) throws IOException {
* if the log does not exist)
*/
static EditLogValidation validateEditLog(EditLogInputStream in) {
long numValid = 0;
long lastPos = 0;
long firstTxId = HdfsConstants.INVALID_TXID;
long lastTxId = HdfsConstants.INVALID_TXID;
long numValid = 0;
try {
FSEditLogOp op = null;
while (true) {
lastPos = in.getPosition();
if (in.readOp() == null) {
if ((op = in.readOp()) == null) {
break;
}
if (firstTxId == HdfsConstants.INVALID_TXID) {
firstTxId = op.txid;
}
if (lastTxId == HdfsConstants.INVALID_TXID
|| op.txid == lastTxId + 1) {
lastTxId = op.txid;
} else {
FSImage.LOG.error("Out of order txid found. Found " + op.txid
+ ", expected " + (lastTxId + 1));
break;
}
numValid++;
@ -489,16 +485,33 @@ static EditLogValidation validateEditLog(EditLogInputStream in) {
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length.", t);
}
return new EditLogValidation(lastPos, numValid);
return new EditLogValidation(lastPos, firstTxId, lastTxId);
}
static class EditLogValidation {
long validLength;
long numTransactions;
EditLogValidation(long validLength, long numTransactions) {
private long validLength;
private long startTxId;
private long endTxId;
EditLogValidation(long validLength,
long startTxId, long endTxId) {
this.validLength = validLength;
this.numTransactions = numTransactions;
this.startTxId = startTxId;
this.endTxId = endTxId;
}
long getValidLength() { return validLength; }
long getStartTxId() { return startTxId; }
long getEndTxId() { return endTxId; }
long getNumTransactions() {
if (endTxId == HdfsConstants.INVALID_TXID
|| startTxId == HdfsConstants.INVALID_TXID) {
return 0;
}
return (endTxId - startTxId) + 1;
}
}

View File

@ -46,7 +46,7 @@
import static org.apache.hadoop.hdfs.server.common.Util.now;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@ -584,32 +584,38 @@ boolean loadFSImage() throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs();
isUpgradeFinalized = inspector.isUpgradeFinalized();
FSImageStorageInspector.FSImageFile imageFile
= inspector.getLatestImage();
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
editLog.recoverUnclosedStreams();
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) {
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
inspector.getMaxSeenTxId());
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
}
LOG.debug("Planning to load image :\n" + imageFile);
for (EditLogInputStream l : editStreams) {
LOG.debug("\t Planning to load edit stream: " + l);
}
// Plan our load. This will throw if it's impossible to load from the
// data that's available.
LoadPlan loadPlan = inspector.createLoadPlan();
LOG.debug("Planning to load image using following plan:\n" + loadPlan);
// Recover from previous interrupted checkpoint, if any
needToSave |= loadPlan.doRecovery();
//
// Load in bits
//
StorageDirectory sdForProperties =
loadPlan.getStorageDirectoryForProperties();
storage.readProperties(sdForProperties);
File imageFile = loadPlan.getImageFile();
try {
StorageDirectory sdForProperties = imageFile.sd;
storage.readProperties(sdForProperties);
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file
// next to the image file
loadFSImage(imageFile);
loadFSImage(imageFile.getFile());
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file.
@ -621,17 +627,19 @@ boolean loadFSImage() throws IOException {
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot());
}
loadFSImage(imageFile, new MD5Hash(md5));
loadFSImage(imageFile.getFile(), new MD5Hash(md5));
} else {
// We don't have any record of the md5sum
loadFSImage(imageFile, null);
loadFSImage(imageFile.getFile(), null);
}
} catch (IOException ioe) {
throw new IOException("Failed to load image from " + loadPlan.getImageFile(), ioe);
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load image from " + imageFile, ioe);
}
long numLoaded = loadEdits(loadPlan.getEditsFiles());
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile, numLoaded);
long numLoaded = loadEdits(editStreams);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
numLoaded);
// update the txid for the edit log
editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
@ -663,22 +671,25 @@ private boolean needsResaveBasedOnStaleCheckpoint(
* Load the specified list of edit files into the image.
* @return the number of transactions loaded
*/
protected long loadEdits(List<File> editLogs) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editLogs));
protected long loadEdits(Iterable<EditLogInputStream> editStreams) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
long startingTxId = getLastAppliedTxId() + 1;
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
int numLoaded = 0;
// Load latest edits
for (File edits : editLogs) {
LOG.debug("Reading " + edits + " expecting start txid #" + startingTxId);
EditLogFileInputStream editIn = new EditLogFileInputStream(edits);
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
startingTxId += thisNumLoaded;
numLoaded += thisNumLoaded;
lastAppliedTxId += thisNumLoaded;
editIn.close();
try {
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
// Load latest edits
for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
startingTxId += thisNumLoaded;
numLoaded += thisNumLoaded;
lastAppliedTxId += thisNumLoaded;
}
} finally {
FSEditLog.closeAllStreams(editStreams);
}
// update the counts

View File

@ -32,6 +32,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
@ -55,6 +56,7 @@ class FSImagePreTransactionalStorageInspector extends FSImageStorageInspector {
private boolean hasOutOfDateStorageDirs = false;
/* Flag set false if there are any "previous" directories found */
private boolean isUpgradeFinalized = true;
private boolean needToSaveAfterRecovery = false;
// Track the name and edits dir with the latest times
private long latestNameCheckpointTime = Long.MIN_VALUE;
@ -139,15 +141,15 @@ static long readCheckpointTime(StorageDirectory sd) throws IOException {
boolean isUpgradeFinalized() {
return isUpgradeFinalized;
}
@Override
LoadPlan createLoadPlan() throws IOException {
FSImageFile getLatestImage() throws IOException {
// We should have at least one image and one edits dirs
if (latestNameSD == null)
throw new IOException("Image file is not found in " + imageDirs);
if (latestEditsSD == null)
throw new IOException("Edits file is not found in " + editsDirs);
// Make sure we are loading image and edits from same checkpoint
if (latestNameCheckpointTime > latestEditsCheckpointTime
&& latestNameSD != latestEditsSD
@ -168,92 +170,70 @@ LoadPlan createLoadPlan() throws IOException {
"image checkpoint time = " + latestNameCheckpointTime +
"edits checkpoint time = " + latestEditsCheckpointTime);
}
needToSaveAfterRecovery = doRecovery();
return new PreTransactionalLoadPlan();
return new FSImageFile(latestNameSD,
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE),
HdfsConstants.INVALID_TXID);
}
@Override
boolean needToSave() {
return hasOutOfDateStorageDirs ||
checkpointTimes.size() != 1 ||
latestNameCheckpointTime > latestEditsCheckpointTime;
latestNameCheckpointTime > latestEditsCheckpointTime ||
needToSaveAfterRecovery;
}
private class PreTransactionalLoadPlan extends LoadPlan {
@Override
boolean doRecovery() throws IOException {
LOG.debug(
boolean doRecovery() throws IOException {
LOG.debug(
"Performing recovery in "+ latestNameSD + " and " + latestEditsSD);
boolean needToSave = false;
File curFile =
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
File ckptFile =
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
//
// If we were in the midst of a checkpoint
//
if (ckptFile.exists()) {
needToSave = true;
if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
.exists()) {
//
// checkpointing migth have uploaded a new
// merged image, but we discard it here because we are
// not sure whether the entire merged image was uploaded
// before the namenode crashed.
//
if (!ckptFile.delete()) {
throw new IOException("Unable to delete " + ckptFile);
}
} else {
//
// checkpointing was in progress when the namenode
// shutdown. The fsimage.ckpt was created and the edits.new
// file was moved to edits. We complete that checkpoint by
// moving fsimage.new to fsimage. There is no need to
// update the fstime file here. renameTo fails on Windows
// if the destination file already exists.
//
boolean needToSave = false;
File curFile =
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
File ckptFile =
NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE_NEW);
//
// If we were in the midst of a checkpoint
//
if (ckptFile.exists()) {
needToSave = true;
if (NNStorage.getStorageFile(latestEditsSD, NameNodeFile.EDITS_NEW)
.exists()) {
//
// checkpointing migth have uploaded a new
// merged image, but we discard it here because we are
// not sure whether the entire merged image was uploaded
// before the namenode crashed.
//
if (!ckptFile.delete()) {
throw new IOException("Unable to delete " + ckptFile);
}
} else {
//
// checkpointing was in progress when the namenode
// shutdown. The fsimage.ckpt was created and the edits.new
// file was moved to edits. We complete that checkpoint by
// moving fsimage.new to fsimage. There is no need to
// update the fstime file here. renameTo fails on Windows
// if the destination file already exists.
//
if (!ckptFile.renameTo(curFile)) {
if (!curFile.delete())
LOG.warn("Unable to delete dir " + curFile + " before rename");
if (!ckptFile.renameTo(curFile)) {
if (!curFile.delete())
LOG.warn("Unable to delete dir " + curFile + " before rename");
if (!ckptFile.renameTo(curFile)) {
throw new IOException("Unable to rename " + ckptFile +
" to " + curFile);
}
throw new IOException("Unable to rename " + ckptFile +
" to " + curFile);
}
}
}
return needToSave;
}
@Override
File getImageFile() {
return NNStorage.getStorageFile(latestNameSD, NameNodeFile.IMAGE);
}
@Override
List<File> getEditsFiles() {
if (latestNameCheckpointTime > latestEditsCheckpointTime) {
// the image is already current, discard edits
LOG.debug(
"Name checkpoint time is newer than edits, not loading edits.");
return Collections.<File>emptyList();
}
return getEditsInStorageDir(latestEditsSD);
}
@Override
StorageDirectory getStorageDirectoryForProperties() {
return latestNameSD;
}
return needToSave;
}
/**
* @return a list with the paths to EDITS and EDITS_NEW (if it exists)
* in a given storage directory.
@ -269,4 +249,33 @@ static List<File> getEditsInStorageDir(StorageDirectory sd) {
}
return files;
}
private List<File> getLatestEditsFiles() {
if (latestNameCheckpointTime > latestEditsCheckpointTime) {
// the image is already current, discard edits
LOG.debug(
"Name checkpoint time is newer than edits, not loading edits.");
return Collections.<File>emptyList();
}
return getEditsInStorageDir(latestEditsSD);
}
@Override
long getMaxSeenTxId() {
return 0L;
}
static Iterable<EditLogInputStream> getEditLogStreams(NNStorage storage)
throws IOException {
FSImagePreTransactionalStorageInspector inspector
= new FSImagePreTransactionalStorageInspector();
storage.inspectStorageDirs(inspector);
List<EditLogInputStream> editStreams = new ArrayList<EditLogInputStream>();
for (File f : inspector.getLatestEditsFiles()) {
editStreams.add(new EditLogFileInputStream(f));
}
return editStreams;
}
}

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -43,60 +44,22 @@ abstract class FSImageStorageInspector {
abstract boolean isUpgradeFinalized();
/**
* Create a plan to load the image from the set of inspected storage directories.
* Get the image files which should be loaded into the filesystem.
* @throws IOException if not enough files are available (eg no image found in any directory)
*/
abstract LoadPlan createLoadPlan() throws IOException;
abstract FSImageFile getLatestImage() throws IOException;
/**
* Get the minimum tx id which should be loaded with this set of images.
*/
abstract long getMaxSeenTxId();
/**
* @return true if the directories are in such a state that the image should be re-saved
* following the load
*/
abstract boolean needToSave();
/**
* A plan to load the namespace from disk, providing the locations from which to load
* the image and a set of edits files.
*/
abstract static class LoadPlan {
/**
* Execute atomic move sequence in the chosen storage directories,
* in order to recover from an interrupted checkpoint.
* @return true if some recovery action was taken
*/
abstract boolean doRecovery() throws IOException;
/**
* @return the file from which to load the image data
*/
abstract File getImageFile();
/**
* @return a list of flies containing edits to replay
*/
abstract List<File> getEditsFiles();
/**
* @return the storage directory containing the VERSION file that should be
* loaded.
*/
abstract StorageDirectory getStorageDirectoryForProperties();
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Will load image file: ").append(getImageFile()).append("\n");
sb.append("Will load edits files:").append("\n");
for (File f : getEditsFiles()) {
sb.append(" ").append(f).append("\n");
}
sb.append("Will load metadata from: ")
.append(getStorageDirectoryForProperties())
.append("\n");
return sb.toString();
}
}
/**
* Record of an image that has been located and had its filename parsed.
*/
@ -106,7 +69,8 @@ static class FSImageFile {
private final File file;
FSImageFile(StorageDirectory sd, File file, long txId) {
assert txId >= 0 : "Invalid txid on " + file +": " + txId;
assert txId >= 0 || txId == HdfsConstants.INVALID_TXID
: "Invalid txid on " + file +": " + txId;
this.sd = sd;
this.txId = txId;

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -55,9 +54,7 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
private boolean isUpgradeFinalized = true;
List<FSImageFile> foundImages = new ArrayList<FSImageFile>();
List<EditLogFile> foundEditLogs = new ArrayList<EditLogFile>();
SortedMap<Long, LogGroup> logGroups = new TreeMap<Long, LogGroup>();
long maxSeenTxId = 0;
private long maxSeenTxId = 0;
private static final Pattern IMAGE_REGEX = Pattern.compile(
NameNodeFile.IMAGE.getName() + "_(\\d+)");
@ -71,6 +68,8 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
return;
}
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
File currentDir = sd.getCurrentDir();
File filesInStorage[];
try {
@ -113,34 +112,10 @@ public void inspectDirectory(StorageDirectory sd) throws IOException {
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
}
List<EditLogFile> editLogs
= FileJournalManager.matchEditLogs(filesInStorage);
if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
for (EditLogFile log : editLogs) {
addEditLog(log);
}
} else if (!editLogs.isEmpty()){
LOG.warn("Found the following edit log file(s) in " + sd +
" even though it was not configured to store edits:\n" +
" " + Joiner.on("\n ").join(editLogs));
}
// set finalized flag
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
}
private void addEditLog(EditLogFile foundEditLog) {
foundEditLogs.add(foundEditLog);
LogGroup group = logGroups.get(foundEditLog.getFirstTxId());
if (group == null) {
group = new LogGroup(foundEditLog.getFirstTxId());
logGroups.put(foundEditLog.getFirstTxId(), group);
}
group.add(foundEditLog);
}
@Override
public boolean isUpgradeFinalized() {
return isUpgradeFinalized;
@ -151,9 +126,13 @@ public boolean isUpgradeFinalized() {
* If there are multiple storage directories which contain equal images
* the storage directory that was inspected first will be preferred.
*
* Returns null if no images were found.
* @throws FileNotFoundException if not images are found.
*/
FSImageFile getLatestImage() {
FSImageFile getLatestImage() throws IOException {
if (foundImages.isEmpty()) {
throw new FileNotFoundException("No valid image files found");
}
FSImageFile ret = null;
for (FSImageFile img : foundImages) {
if (ret == null || img.txId > ret.txId) {
@ -167,349 +146,13 @@ public List<FSImageFile> getFoundImages() {
return ImmutableList.copyOf(foundImages);
}
public List<EditLogFile> getEditLogFiles() {
return ImmutableList.copyOf(foundEditLogs);
}
@Override
public LoadPlan createLoadPlan() throws IOException {
if (foundImages.isEmpty()) {
throw new FileNotFoundException("No valid image files found");
}
FSImageFile recoveryImage = getLatestImage();
LogLoadPlan logPlan = createLogLoadPlan(recoveryImage.txId, Long.MAX_VALUE);
return new TransactionalLoadPlan(recoveryImage,
logPlan);
}
/**
* Plan which logs to load in order to bring the namespace up-to-date.
* Transactions will be considered in the range (sinceTxId, maxTxId]
*
* @param sinceTxId the highest txid that is already loaded
* (eg from the image checkpoint)
* @param maxStartTxId ignore any log files that start after this txid
*/
LogLoadPlan createLogLoadPlan(long sinceTxId, long maxStartTxId) throws IOException {
long expectedTxId = sinceTxId + 1;
List<EditLogFile> recoveryLogs = new ArrayList<EditLogFile>();
SortedMap<Long, LogGroup> tailGroups = logGroups.tailMap(expectedTxId);
if (logGroups.size() > tailGroups.size()) {
LOG.debug("Excluded " + (logGroups.size() - tailGroups.size()) +
" groups of logs because they start with a txid less than image " +
"txid " + sinceTxId);
}
SortedMap<Long, LogGroup> usefulGroups;
if (maxStartTxId > sinceTxId) {
usefulGroups = tailGroups.headMap(maxStartTxId);
} else {
usefulGroups = new TreeMap<Long, LogGroup>();
}
if (usefulGroups.size() > tailGroups.size()) {
LOG.debug("Excluded " + (tailGroups.size() - usefulGroups.size()) +
" groups of logs because they start with a txid higher than max " +
"txid " + sinceTxId);
}
for (Map.Entry<Long, LogGroup> entry : usefulGroups.entrySet()) {
long logStartTxId = entry.getKey();
LogGroup logGroup = entry.getValue();
logGroup.planRecovery();
if (expectedTxId != HdfsConstants.INVALID_TXID && logStartTxId != expectedTxId) {
throw new IOException("Expected next log group would start at txid " +
expectedTxId + " but starts at txid " + logStartTxId);
}
// We can pick any of the non-corrupt logs here
recoveryLogs.add(logGroup.getBestNonCorruptLog());
// If this log group was finalized, we know to expect the next
// log group to start at the following txid (ie no gaps)
if (logGroup.hasKnownLastTxId()) {
expectedTxId = logGroup.getLastTxId() + 1;
} else {
// the log group was in-progress so we don't know what ID
// the next group should start from.
expectedTxId = HdfsConstants.INVALID_TXID;
}
}
long lastLogGroupStartTxId = usefulGroups.isEmpty() ?
0 : usefulGroups.lastKey();
if (maxSeenTxId > sinceTxId &&
maxSeenTxId > lastLogGroupStartTxId) {
String msg = "At least one storage directory indicated it has seen a " +
"log segment starting at txid " + maxSeenTxId;
if (usefulGroups.isEmpty()) {
msg += " but there are no logs to load.";
} else {
msg += " but the most recent log file found starts with txid " +
lastLogGroupStartTxId;
}
throw new IOException(msg);
}
return new LogLoadPlan(recoveryLogs,
Lists.newArrayList(usefulGroups.values()));
}
@Override
public boolean needToSave() {
return needToSave;
}
/**
* A group of logs that all start at the same txid.
*
* Handles determining which logs are corrupt and which should be considered
* candidates for loading.
*/
static class LogGroup {
long startTxId;
List<EditLogFile> logs = new ArrayList<EditLogFile>();;
private Set<Long> endTxIds = new TreeSet<Long>();
private boolean hasInProgress = false;
private boolean hasFinalized = false;
LogGroup(long startTxId) {
this.startTxId = startTxId;
}
EditLogFile getBestNonCorruptLog() {
// First look for non-corrupt finalized logs
for (EditLogFile log : logs) {
if (!log.isCorrupt() && !log.isInProgress()) {
return log;
}
}
// Then look for non-corrupt in-progress logs
for (EditLogFile log : logs) {
if (!log.isCorrupt()) {
return log;
}
}
// We should never get here, because we don't get to the planning stage
// without calling planRecovery first, and if we've called planRecovery,
// we would have already thrown if there were no non-corrupt logs!
throw new IllegalStateException(
"No non-corrupt logs for txid " + startTxId);
}
/**
* @return true if we can determine the last txid in this log group.
*/
boolean hasKnownLastTxId() {
for (EditLogFile log : logs) {
if (!log.isInProgress()) {
return true;
}
}
return false;
}
/**
* @return the last txid included in the logs in this group
* @throws IllegalStateException if it is unknown -
* {@see #hasKnownLastTxId()}
*/
long getLastTxId() {
for (EditLogFile log : logs) {
if (!log.isInProgress()) {
return log.getLastTxId();
}
}
throw new IllegalStateException("LogGroup only has in-progress logs");
}
void add(EditLogFile log) {
assert log.getFirstTxId() == startTxId;
logs.add(log);
if (log.isInProgress()) {
hasInProgress = true;
} else {
hasFinalized = true;
endTxIds.add(log.getLastTxId());
}
}
void planRecovery() throws IOException {
assert hasInProgress || hasFinalized;
checkConsistentEndTxIds();
if (hasFinalized && hasInProgress) {
planMixedLogRecovery();
} else if (!hasFinalized && hasInProgress) {
planAllInProgressRecovery();
} else if (hasFinalized && !hasInProgress) {
LOG.debug("No recovery necessary for logs starting at txid " +
startTxId);
}
}
/**
* Recovery case for when some logs in the group were in-progress, and
* others were finalized. This happens when one of the storage
* directories fails.
*
* The in-progress logs in this case should be considered corrupt.
*/
private void planMixedLogRecovery() throws IOException {
for (EditLogFile log : logs) {
if (log.isInProgress()) {
LOG.warn("Log at " + log.getFile() + " is in progress, but " +
"other logs starting at the same txid " + startTxId +
" are finalized. Moving aside.");
log.markCorrupt();
}
}
}
/**
* Recovery case for when all of the logs in the group were in progress.
* This happens if the NN completely crashes and restarts. In this case
* we check the non-zero lengths of each log file, and any logs that are
* less than the max of these lengths are considered corrupt.
*/
private void planAllInProgressRecovery() throws IOException {
// We only have in-progress logs. We need to figure out which logs have
// the latest data to reccover them
LOG.warn("Logs beginning at txid " + startTxId + " were are all " +
"in-progress (probably truncated due to a previous NameNode " +
"crash)");
if (logs.size() == 1) {
// Only one log, it's our only choice!
EditLogFile log = logs.get(0);
if (log.validateLog().numTransactions == 0) {
// If it has no transactions, we should consider it corrupt just
// to be conservative.
// See comment below for similar case
LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
"it has no transactions in it.");
log.markCorrupt();
}
return;
}
long maxValidTxnCount = Long.MIN_VALUE;
for (EditLogFile log : logs) {
long validTxnCount = log.validateLog().numTransactions;
LOG.warn(" Log " + log.getFile() +
" valid txns=" + validTxnCount +
" valid len=" + log.validateLog().validLength);
maxValidTxnCount = Math.max(maxValidTxnCount, validTxnCount);
}
for (EditLogFile log : logs) {
long txns = log.validateLog().numTransactions;
if (txns < maxValidTxnCount) {
LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
"it is has only " + txns + " valid txns whereas another " +
"log has " + maxValidTxnCount);
log.markCorrupt();
} else if (txns == 0) {
// this can happen if the NN crashes right after rolling a log
// but before the START_LOG_SEGMENT txn is written. Since the log
// is empty, we can just move it aside to its corrupt name.
LOG.warn("Marking log at " + log.getFile() + " as corrupt since " +
"it has no transactions in it.");
log.markCorrupt();
}
}
}
/**
* Check for the case when we have multiple finalized logs and they have
* different ending transaction IDs. This violates an invariant that all
* log directories should roll together. We should abort in this case.
*/
private void checkConsistentEndTxIds() throws IOException {
if (hasFinalized && endTxIds.size() > 1) {
throw new IOException("More than one ending txid was found " +
"for logs starting at txid " + startTxId + ". " +
"Found: " + StringUtils.join(endTxIds, ','));
}
}
void recover() throws IOException {
for (EditLogFile log : logs) {
if (log.isCorrupt()) {
log.moveAsideCorruptFile();
} else if (log.isInProgress()) {
log.finalizeLog();
}
}
}
}
static class TransactionalLoadPlan extends LoadPlan {
final FSImageFile image;
final LogLoadPlan logPlan;
public TransactionalLoadPlan(FSImageFile image,
LogLoadPlan logPlan) {
super();
this.image = image;
this.logPlan = logPlan;
}
@Override
boolean doRecovery() throws IOException {
logPlan.doRecovery();
return false;
}
@Override
File getImageFile() {
return image.getFile();
}
@Override
List<File> getEditsFiles() {
return logPlan.getEditsFiles();
}
@Override
StorageDirectory getStorageDirectoryForProperties() {
return image.sd;
}
}
static class LogLoadPlan {
final List<EditLogFile> editLogs;
final List<LogGroup> logGroupsToRecover;
LogLoadPlan(List<EditLogFile> editLogs,
List<LogGroup> logGroupsToRecover) {
this.editLogs = editLogs;
this.logGroupsToRecover = logGroupsToRecover;
}
public void doRecovery() throws IOException {
for (LogGroup g : logGroupsToRecover) {
g.recover();
}
}
public List<File> getEditsFiles() {
List<File> ret = new ArrayList<File>();
for (EditLogFile log : editLogs) {
ret.add(log.getFile());
}
return ret;
}
@Override
long getMaxSeenTxId() {
return maxSeenTxId;
}
}

View File

@ -23,11 +23,14 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.HashMap;
import java.util.Comparator;
import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
@ -57,6 +60,9 @@ class FileJournalManager implements JournalManager {
private static final Pattern EDITS_INPROGRESS_REGEX = Pattern.compile(
NameNodeFile.EDITS_INPROGRESS.getName() + "_(\\d+)");
private File currentInProgress = null;
private long maxSeenTransaction = 0L;
@VisibleForTesting
StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
@ -66,19 +72,20 @@ public FileJournalManager(StorageDirectory sd) {
}
@Override
public EditLogOutputStream startLogSegment(long txid) throws IOException {
File newInProgress = NNStorage.getInProgressEditsFile(sd, txid);
EditLogOutputStream stm = new EditLogFileOutputStream(newInProgress,
synchronized public EditLogOutputStream startLogSegment(long txid)
throws IOException {
currentInProgress = NNStorage.getInProgressEditsFile(sd, txid);
EditLogOutputStream stm = new EditLogFileOutputStream(currentInProgress,
outputBufferCapacity);
stm.create();
return stm;
}
@Override
public void finalizeLogSegment(long firstTxId, long lastTxId)
synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
throws IOException {
File inprogressFile = NNStorage.getInProgressEditsFile(
sd, firstTxId);
File inprogressFile = NNStorage.getInProgressEditsFile(sd, firstTxId);
File dstFile = NNStorage.getFinalizedEditsFile(
sd, firstTxId, lastTxId);
LOG.debug("Finalizing edits file " + inprogressFile + " -> " + dstFile);
@ -89,6 +96,9 @@ public void finalizeLogSegment(long firstTxId, long lastTxId)
if (!inprogressFile.renameTo(dstFile)) {
throw new IOException("Unable to finalize edits file " + inprogressFile);
}
if (inprogressFile.equals(currentInProgress)) {
currentInProgress = null;
}
}
@VisibleForTesting
@ -97,12 +107,7 @@ public StorageDirectory getStorageDirectory() {
}
@Override
public String toString() {
return "FileJournalManager for storage directory " + sd;
}
@Override
public void setOutputBufferCapacity(int size) {
synchronized public void setOutputBufferCapacity(int size) {
this.outputBufferCapacity = size;
}
@ -120,13 +125,6 @@ public void purgeLogsOlderThan(long minTxIdToKeep)
}
}
@Override
public EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
throws IOException {
File f = NNStorage.getInProgressEditsFile(sd, segmentStartsAtTxId);
return new EditLogFileInputStream(f);
}
/**
* Find all editlog segments starting at or above the given txid.
* @param fromTxId the txnid which to start looking
@ -178,17 +176,156 @@ static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
try {
long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
ret.add(
new EditLogFile(f, startTxId, EditLogFile.UNKNOWN_END));
new EditLogFile(f, startTxId, startTxId, true));
} catch (NumberFormatException nfe) {
LOG.error("In-progress edits file " + f + " has improperly " +
"formatted transaction ID");
// skip
}
}
}
}
return ret;
}
@Override
synchronized public EditLogInputStream getInputStream(long fromTxId)
throws IOException {
for (EditLogFile elf : getLogFiles(fromTxId)) {
if (elf.getFirstTxId() == fromTxId) {
if (elf.isInProgress()) {
elf.validateLog();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Returning edit stream reading from " + elf);
}
return new EditLogFileInputStream(elf.getFile(),
elf.getFirstTxId(), elf.getLastTxId());
}
}
throw new IOException("Cannot find editlog file with " + fromTxId
+ " as first first txid");
}
@Override
public long getNumberOfTransactions(long fromTxId)
throws IOException, CorruptionException {
long numTxns = 0L;
for (EditLogFile elf : getLogFiles(fromTxId)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Counting " + elf);
}
if (elf.getFirstTxId() > fromTxId) { // there must be a gap
LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+ fromTxId + " - " + (elf.getFirstTxId() - 1));
break;
} else if (fromTxId == elf.getFirstTxId()) {
if (elf.isInProgress()) {
elf.validateLog();
}
if (elf.isCorrupt()) {
break;
}
fromTxId = elf.getLastTxId() + 1;
numTxns += fromTxId - elf.getFirstTxId();
if (elf.isInProgress()) {
break;
}
} // else skip
}
if (LOG.isDebugEnabled()) {
LOG.debug("Journal " + this + " has " + numTxns
+ " txns from " + fromTxId);
}
long max = findMaxTransaction();
// fromTxId should be greater than max, as it points to the next
// transaction we should expect to find. If it is less than or equal
// to max, it means that a transaction with txid == max has not been found
if (numTxns == 0 && fromTxId <= max) {
String error = String.format("Gap in transactions, max txnid is %d"
+ ", 0 txns from %d", max, fromTxId);
LOG.error(error);
throw new CorruptionException(error);
}
return numTxns;
}
@Override
synchronized public void recoverUnfinalizedSegments() throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
// make sure journal is aware of max seen transaction before moving corrupt
// files aside
findMaxTransaction();
for (EditLogFile elf : allLogFiles) {
if (elf.getFile().equals(currentInProgress)) {
continue;
}
if (elf.isInProgress()) {
elf.validateLog();
if (elf.isCorrupt()) {
elf.moveAsideCorruptFile();
continue;
}
finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
}
}
}
private List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
List<EditLogFile> logFiles = Lists.newArrayList();
for (EditLogFile elf : allLogFiles) {
if (fromTxId > elf.getFirstTxId()
&& fromTxId <= elf.getLastTxId()) {
throw new IOException("Asked for fromTxId " + fromTxId
+ " which is in middle of file " + elf.file);
}
if (fromTxId <= elf.getFirstTxId()) {
logFiles.add(elf);
}
}
Collections.sort(logFiles, EditLogFile.COMPARE_BY_START_TXID);
return logFiles;
}
/**
* Find the maximum transaction in the journal.
* This gets stored in a member variable, as corrupt edit logs
* will be moved aside, but we still need to remember their first
* tranaction id in the case that it was the maximum transaction in
* the journal.
*/
private long findMaxTransaction()
throws IOException {
for (EditLogFile elf : getLogFiles(0)) {
if (elf.isInProgress()) {
maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
elf.validateLog();
}
maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
}
return maxSeenTransaction;
}
@Override
public String toString() {
return String.format("FileJournalManager(root=%s)", sd.getRoot());
}
/**
* Record of an edit log that has been located and had its filename parsed.
*/
@ -196,12 +333,10 @@ static class EditLogFile {
private File file;
private final long firstTxId;
private long lastTxId;
private EditLogValidation cachedValidation = null;
private boolean isCorrupt = false;
static final long UNKNOWN_END = -1;
private final boolean isInProgress;
final static Comparator<EditLogFile> COMPARE_BY_START_TXID
= new Comparator<EditLogFile>() {
public int compare(EditLogFile a, EditLogFile b) {
@ -214,30 +349,24 @@ public int compare(EditLogFile a, EditLogFile b) {
EditLogFile(File file,
long firstTxId, long lastTxId) {
assert lastTxId == UNKNOWN_END || lastTxId >= firstTxId;
assert firstTxId > 0;
this(file, firstTxId, lastTxId, false);
assert (lastTxId != HdfsConstants.INVALID_TXID)
&& (lastTxId >= firstTxId);
}
EditLogFile(File file, long firstTxId,
long lastTxId, boolean isInProgress) {
assert (lastTxId == HdfsConstants.INVALID_TXID && isInProgress)
|| (lastTxId != HdfsConstants.INVALID_TXID && lastTxId >= firstTxId);
assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
assert file != null;
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.file = file;
this.isInProgress = isInProgress;
}
public void finalizeLog() throws IOException {
long numTransactions = validateLog().numTransactions;
long lastTxId = firstTxId + numTransactions - 1;
File dst = new File(file.getParentFile(),
NNStorage.getFinalizedEditsFileName(firstTxId, lastTxId));
LOG.info("Finalizing edits log " + file + " by renaming to "
+ dst.getName());
if (!file.renameTo(dst)) {
throw new IOException("Couldn't finalize log " +
file + " to " + dst);
}
this.lastTxId = lastTxId;
file = dst;
}
long getFirstTxId() {
return firstTxId;
}
@ -246,15 +375,22 @@ long getLastTxId() {
return lastTxId;
}
EditLogValidation validateLog() throws IOException {
if (cachedValidation == null) {
cachedValidation = EditLogFileInputStream.validateEditLog(file);
/**
* Count the number of valid transactions in a log.
* This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is.
*/
void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
if (val.getNumTransactions() == 0) {
markCorrupt();
} else {
this.lastTxId = val.getEndTxId();
}
return cachedValidation;
}
boolean isInProgress() {
return (lastTxId == UNKNOWN_END);
return isInProgress;
}
File getFile() {

View File

@ -41,6 +41,25 @@ interface JournalManager {
*/
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
/**
* Get the input stream starting with fromTxnId from this journal manager
* @param fromTxnId the first transaction id we want to read
* @return the stream starting with transaction fromTxnId
* @throws IOException if a stream cannot be found.
*/
EditLogInputStream getInputStream(long fromTxnId) throws IOException;
/**
* Get the number of transaction contiguously available from fromTxnId.
*
* @param fromTxnId Transaction id to count from
* @return The number of transactions available from fromTxnId
* @throws IOException if the journal cannot be read.
* @throws CorruptionException if there is a gap in the journal at fromTxnId.
*/
long getNumberOfTransactions(long fromTxnId)
throws IOException, CorruptionException;
/**
* Set the amount of memory that this stream should use to buffer edits
*/
@ -59,10 +78,21 @@ void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException;
/**
* @return an EditLogInputStream that reads from the same log that
* the edit log is currently writing. May return null if this journal
* manager does not support this operation.
*/
EditLogInputStream getInProgressInputStream(long segmentStartsAtTxId)
throws IOException;
* Recover segments which have not been finalized.
*/
void recoverUnfinalizedSegments() throws IOException;
/**
* Indicate that a journal is cannot be used to load a certain range of
* edits.
* This exception occurs in the case of a gap in the transactions, or a
* corrupt edit file.
*/
public static class CorruptionException extends IOException {
static final long serialVersionUID = -4687802717006172702L;
public CorruptionException(String reason) {
super(reason);
}
}
}

View File

@ -248,7 +248,7 @@ public void testRollback() throws Exception {
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "edits.*");
startNameNodeShouldFail(StartupOption.ROLLBACK,
"but there are no logs to load");
"No non-corrupt logs for txid ");
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs);

View File

@ -353,12 +353,9 @@ static List<File> getNameNodeCurrentDirs(MiniDFSCluster cluster) {
*/
public static EditLogFile findLatestEditsLog(StorageDirectory sd)
throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(sd);
List<EditLogFile> foundEditLogs = Lists.newArrayList(
inspector.getEditLogFiles());
File currentDir = sd.getCurrentDir();
List<EditLogFile> foundEditLogs
= Lists.newArrayList(FileJournalManager.matchEditLogs(currentDir.listFiles()));
return Collections.max(foundEditLogs, EditLogFile.COMPARE_BY_START_TXID);
}

View File

@ -84,8 +84,10 @@ public void testSaveNamespace() throws IOException {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress());
log.validateLog();
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should have 5 transactions",
5, log.validateLog().numTransactions);
5, numTransactions);;
}
// Saving image in safe mode should succeed
@ -99,8 +101,10 @@ public void testSaveNamespace() throws IOException {
for (StorageDirectory sd : nn.getFSImage().getStorage().dirIterable(null)) {
EditLogFile log = FSImageTestUtil.findLatestEditsLog(sd);
assertTrue(log.isInProgress());
log.validateLog();
long numTransactions = (log.getLastTxId() - log.getFirstTxId()) + 1;
assertEquals("In-progress log " + log + " should only have START txn",
1, log.validateLog().numTransactions);
1, numTransactions);
}
// restart cluster

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -80,7 +81,7 @@ public class TestEditLog extends TestCase {
static final int NUM_TRANSACTIONS = 100;
static final int NUM_THREADS = 100;
private static final File TEST_DIR = new File(
static final File TEST_DIR = new File(
System.getProperty("test.build.data","build/test/data"));
/** An edits log with 3 edits from 0.20 - the result of
@ -627,13 +628,23 @@ private void testCrashRecovery(int numTransactions) throws Exception {
}
public void testCrashRecoveryEmptyLogOneDir() throws Exception {
doTestCrashRecoveryEmptyLog(false);
doTestCrashRecoveryEmptyLog(false, true);
}
public void testCrashRecoveryEmptyLogBothDirs() throws Exception {
doTestCrashRecoveryEmptyLog(true);
doTestCrashRecoveryEmptyLog(true, true);
}
public void testCrashRecoveryEmptyLogOneDirNoUpdateSeenTxId()
throws Exception {
doTestCrashRecoveryEmptyLog(false, false);
}
public void testCrashRecoveryEmptyLogBothDirsNoUpdateSeenTxId()
throws Exception {
doTestCrashRecoveryEmptyLog(true, false);
}
/**
* Test that the NN handles the corruption properly
* after it crashes just after creating an edit log
@ -646,8 +657,14 @@ public void testCrashRecoveryEmptyLogBothDirs() throws Exception {
* will only be in one of the directories. In both cases, the
* NN should fail to start up, because it's aware that txid 3
* was reached, but unable to find a non-corrupt log starting there.
* @param updateTransactionIdFile if true update the seen_txid file.
* If false, the it will not be updated. This will simulate a case
* where the NN crashed between creating the new segment and updating
* seen_txid.
*/
private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
private void doTestCrashRecoveryEmptyLog(boolean inBothDirs,
boolean updateTransactionIdFile)
throws Exception {
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
@ -665,6 +682,14 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
// Make a truncated edits_3_inprogress
File log = new File(currentDir,
NNStorage.getInProgressEditsFileName(3));
NNStorage storage = new NNStorage(conf,
Collections.<URI>emptyList(),
Lists.newArrayList(uri));
if (updateTransactionIdFile) {
storage.writeTransactionIdFileToStorage(3);
}
storage.close();
new EditLogFileOutputStream(log, 1024).create();
if (!inBothDirs) {
break;
@ -675,9 +700,9 @@ private void doTestCrashRecoveryEmptyLog(boolean inBothDirs) throws Exception {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_DATA_NODES).format(false).build();
fail("Did not fail to start with all-corrupt logs");
} catch (IllegalStateException ise) {
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid 3", ise);
"No non-corrupt logs for txid 3", ioe);
}
cluster.shutdown();
}
@ -702,7 +727,17 @@ public EditLogByteInputStream(byte[] data) throws IOException {
reader = new FSEditLogOp.Reader(in, version);
}
@Override
public long getFirstTxId() throws IOException {
return HdfsConstants.INVALID_TXID;
}
@Override
public long getLastTxId() throws IOException {
return HdfsConstants.INVALID_TXID;
}
@Override
public long length() throws IOException {
return len;
@ -852,6 +887,168 @@ private NNStorage mockStorageWithEdits(String... editsDirSpecs) {
Mockito.doReturn(sds).when(storage).dirIterable(NameNodeDirType.EDITS);
return storage;
}
/**
* Specification for a failure during #setupEdits
*/
static class AbortSpec {
final int roll;
final int logindex;
/**
* Construct the failure specification.
* @param roll number to fail after. e.g. 1 to fail after the first roll
* @param loginfo index of journal to fail.
*/
AbortSpec(int roll, int logindex) {
this.roll = roll;
this.logindex = logindex;
}
}
final static int TXNS_PER_ROLL = 10;
final static int TXNS_PER_FAIL = 2;
/**
* Set up directories for tests.
*
* Each rolled file is 10 txns long.
* A failed file is 2 txns long.
*
* @param editUris directories to create edit logs in
* @param numrolls number of times to roll the edit log during setup
* @param abortAtRolls Specifications for when to fail, see AbortSpec
*/
public static NNStorage setupEdits(List<URI> editUris, int numrolls,
AbortSpec... abortAtRolls)
throws IOException {
List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls));
NNStorage storage = new NNStorage(new Configuration(),
Collections.<URI>emptyList(),
editUris);
storage.format("test-cluster-id");
FSEditLog editlog = new FSEditLog(storage);
// open the edit log and add two transactions
// logGenerationStamp is used, simply because it doesn't
// require complex arguments.
editlog.open();
for (int i = 2; i < TXNS_PER_ROLL; i++) {
editlog.logGenerationStamp((long)0);
}
editlog.logSync();
// Go into edit log rolling loop.
// On each roll, the abortAtRolls abort specs are
// checked to see if an abort is required. If so the
// the specified journal is aborted. It will be brought
// back into rotation automatically by rollEditLog
for (int i = 0; i < numrolls; i++) {
editlog.rollEditLog();
editlog.logGenerationStamp((long)i);
editlog.logSync();
while (aborts.size() > 0
&& aborts.get(0).roll == (i+1)) {
AbortSpec spec = aborts.remove(0);
editlog.getJournals().get(spec.logindex).abort();
}
for (int j = 3; j < TXNS_PER_ROLL; j++) {
editlog.logGenerationStamp((long)i);
}
editlog.logSync();
}
editlog.close();
FSImageTestUtil.logStorageContents(LOG, storage);
return storage;
}
/**
* Test loading an editlog which has had both its storage fail
* on alternating rolls. Two edit log directories are created.
* The first on fails on odd rolls, the second on even. Test
* that we are able to load the entire editlog regardless.
*/
@Test
public void testAlternatingJournalFailure() throws IOException {
File f1 = new File(TEST_DIR + "/alternatingjournaltest0");
File f2 = new File(TEST_DIR + "/alternatingjournaltest1");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI());
NNStorage storage = setupEdits(editUris, 10,
new AbortSpec(1, 0),
new AbortSpec(2, 1),
new AbortSpec(3, 0),
new AbortSpec(4, 1),
new AbortSpec(5, 0),
new AbortSpec(6, 1),
new AbortSpec(7, 0),
new AbortSpec(8, 1),
new AbortSpec(9, 0),
new AbortSpec(10, 1));
long totaltxnread = 0;
FSEditLog editlog = new FSEditLog(storage);
long startTxId = 1;
Iterable<EditLogInputStream> editStreams = editlog.selectInputStreams(startTxId,
TXNS_PER_ROLL*11);
for (EditLogInputStream edits : editStreams) {
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
long read = val.getNumTransactions();
LOG.info("Loading edits " + edits + " read " + read);
assertEquals(startTxId, val.getStartTxId());
startTxId += read;
totaltxnread += read;
}
editlog.close();
storage.close();
assertEquals(TXNS_PER_ROLL*11, totaltxnread);
}
/**
* Test loading an editlog with gaps. A single editlog directory
* is set up. On of the edit log files is deleted. This should
* fail when selecting the input streams as it will not be able
* to select enough streams to load up to 4*TXNS_PER_ROLL.
* There should be 4*TXNS_PER_ROLL transactions as we rolled 3
* times.
*/
@Test
public void testLoadingWithGaps() throws IOException {
File f1 = new File(TEST_DIR + "/gaptest0");
List<URI> editUris = ImmutableList.of(f1.toURI());
NNStorage storage = setupEdits(editUris, 3);
final long startGapTxId = 1*TXNS_PER_ROLL + 1;
final long endGapTxId = 2*TXNS_PER_ROLL;
File[] files = new File(f1, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId,
endGapTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
assertTrue(files[0].delete());
FSEditLog editlog = new FSEditLog(storage);
long startTxId = 1;
try {
Iterable<EditLogInputStream> editStreams
= editlog.selectInputStreams(startTxId, 4*TXNS_PER_ROLL);
fail("Should have thrown exception");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid " + startGapTxId, ioe);
}
}
}

View File

@ -63,8 +63,8 @@ public void testPreallocation() throws IOException {
EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
assertEquals("Edit log should contain a header as valid length",
HEADER_LEN, validation.validLength);
assertEquals(1, validation.numTransactions);
HEADER_LEN, validation.getValidLength());
assertEquals(1, validation.getNumTransactions());
assertEquals("Edit log should have 1MB of bytes allocated",
1024*1024, editLog.length());
@ -72,12 +72,12 @@ public void testPreallocation() throws IOException {
cluster.getFileSystem().mkdirs(new Path("/tmp"),
new FsPermission((short)777));
long oldLength = validation.validLength;
long oldLength = validation.getValidLength();
validation = EditLogFileInputStream.validateEditLog(editLog);
assertTrue("Edit log should have more valid data after writing a txn " +
"(was: " + oldLength + " now: " + validation.validLength + ")",
validation.validLength > oldLength);
assertEquals(2, validation.numTransactions);
"(was: " + oldLength + " now: " + validation.getValidLength() + ")",
validation.getValidLength() > oldLength);
assertEquals(2, validation.getNumTransactions());
assertEquals("Edit log should be 1MB long",
1024 * 1024, editLog.length());

View File

@ -187,8 +187,8 @@ public void testCountValidTransactions() throws IOException {
// Make sure that uncorrupted log has the expected length and number
// of transactions.
EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals(NUM_TXNS + 2, validation.numTransactions);
assertEquals(validLength, validation.validLength);
assertEquals(NUM_TXNS + 2, validation.getNumTransactions());
assertEquals(validLength, validation.getValidLength());
// Back up the uncorrupted log
File logFileBak = new File(testDir, logFile.getName() + ".bak");
@ -204,8 +204,8 @@ public void testCountValidTransactions() throws IOException {
truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + txOffset,
txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength);
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, truncate the file with one byte in the txn,
// also isn't valid
@ -213,24 +213,24 @@ public void testCountValidTransactions() throws IOException {
truncateFile(logFile, txOffset + 1);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + (txOffset + 1),
txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength);
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn opcode at " + txOffset,
txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength);
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, corrupt a byte a few bytes into the txn
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset+5);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn data at " + (txOffset+5),
txid - 1, validation.numTransactions);
assertEquals(txOffset, validation.validLength);
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
}
// Corrupt the log at every offset to make sure that validation itself
@ -241,8 +241,8 @@ public void testCountValidTransactions() throws IOException {
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, offset);
EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
assertTrue(val.numTransactions >= prevNumValid);
prevNumValid = val.numTransactions;
assertTrue(val.getNumTransactions() >= prevNumValid);
prevNumValid = val.getNumTransactions();
}
}

View File

@ -36,9 +36,6 @@
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.FSImageFile;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.TransactionalLoadPlan;
import org.apache.hadoop.hdfs.server.namenode.FSImageTransactionalStorageInspector.LogGroup;
import org.apache.hadoop.hdfs.server.namenode.FSImageStorageInspector.LoadPlan;
import org.junit.Test;
import org.mockito.Mockito;
@ -63,335 +60,14 @@ public void testCurrentStorageInspector() throws IOException {
"/foo/current/" + getInProgressEditsFileName(457));
inspector.inspectDirectory(mockDir);
mockLogValidation(inspector,
"/foo/current/" + getInProgressEditsFileName(457), 10);
assertEquals(2, inspector.foundEditLogs.size());
assertEquals(2, inspector.foundImages.size());
assertTrue(inspector.foundEditLogs.get(1).isInProgress());
FSImageFile latestImage = inspector.getLatestImage();
assertEquals(456, latestImage.txId);
assertSame(mockDir, latestImage.sd);
assertTrue(inspector.isUpgradeFinalized());
LoadPlan plan = inspector.createLoadPlan();
LOG.info("Plan: " + plan);
assertEquals(new File("/foo/current/"+getImageFileName(456)),
plan.getImageFile());
assertArrayEquals(new File[] {
new File("/foo/current/" + getInProgressEditsFileName(457)) },
plan.getEditsFiles().toArray(new File[0]));
}
/**
* Test that we check for gaps in txids when devising a load plan.
*/
@Test
public void testPlanWithGaps() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.IMAGE_AND_EDITS,
false,
"/foo/current/" + getImageFileName(123),
"/foo/current/" + getImageFileName(456),
"/foo/current/" + getFinalizedEditsFileName(457,900),
"/foo/current/" + getFinalizedEditsFileName(901,950),
"/foo/current/" + getFinalizedEditsFileName(952,1000)); // <-- missing edit 951!
inspector.inspectDirectory(mockDir);
try {
inspector.createLoadPlan();
fail("Didn't throw IOE trying to load with gaps in edits");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains(
"would start at txid 951 but starts at txid 952"));
}
}
/**
* Test the case where an in-progress log comes in the middle of a sequence
* of logs
*/
@Test
public void testPlanWithInProgressInMiddle() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
StorageDirectory mockDir = FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.IMAGE_AND_EDITS,
false,
"/foo/current/" + getImageFileName(123),
"/foo/current/" + getImageFileName(456),
"/foo/current/" + getFinalizedEditsFileName(457,900),
"/foo/current/" + getInProgressEditsFileName(901), // <-- inprogress in middle
"/foo/current/" + getFinalizedEditsFileName(952,1000));
inspector.inspectDirectory(mockDir);
mockLogValidation(inspector,
"/foo/current/" + getInProgressEditsFileName(901), 51);
LoadPlan plan = inspector.createLoadPlan();
LOG.info("Plan: " + plan);
assertEquals(new File("/foo/current/" + getImageFileName(456)),
plan.getImageFile());
assertArrayEquals(new File[] {
new File("/foo/current/" + getFinalizedEditsFileName(457,900)),
new File("/foo/current/" + getInProgressEditsFileName(901)),
new File("/foo/current/" + getFinalizedEditsFileName(952,1000)) },
plan.getEditsFiles().toArray(new File[0]));
}
/**
* Test case for the usual case where no recovery of a log group is necessary
* (i.e all logs have the same start and end txids and finalized)
*/
@Test
public void testLogGroupRecoveryNoop() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo1/current/"
+ getFinalizedEditsFileName(123,456)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo2/current/"
+ getFinalizedEditsFileName(123,456)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo3/current/"
+ getFinalizedEditsFileName(123,456)));
LogGroup lg = inspector.logGroups.get(123L);
assertEquals(3, lg.logs.size());
lg.planRecovery();
assertFalse(lg.logs.get(0).isCorrupt());
assertFalse(lg.logs.get(1).isCorrupt());
assertFalse(lg.logs.get(2).isCorrupt());
}
/**
* Test case where we have some in-progress and some finalized logs
* for a given txid.
*/
@Test
public void testLogGroupRecoveryMixed() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo1/current/"
+ getFinalizedEditsFileName(123,456)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo2/current/"
+ getFinalizedEditsFileName(123,456)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo3/current/"
+ getInProgressEditsFileName(123)));
inspector.inspectDirectory(FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.IMAGE,
false,
"/foo4/current/" + getImageFileName(122)));
LogGroup lg = inspector.logGroups.get(123L);
assertEquals(3, lg.logs.size());
EditLogFile inProgressLog = lg.logs.get(2);
assertTrue(inProgressLog.isInProgress());
LoadPlan plan = inspector.createLoadPlan();
// Check that it was marked corrupt.
assertFalse(lg.logs.get(0).isCorrupt());
assertFalse(lg.logs.get(1).isCorrupt());
assertTrue(lg.logs.get(2).isCorrupt());
// Calling recover should move it aside
inProgressLog = spy(inProgressLog);
Mockito.doNothing().when(inProgressLog).moveAsideCorruptFile();
lg.logs.set(2, inProgressLog);
plan.doRecovery();
Mockito.verify(inProgressLog).moveAsideCorruptFile();
}
/**
* Test case where we have finalized logs with different end txids
*/
@Test
public void testLogGroupRecoveryInconsistentEndTxIds() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo1/current/"
+ getFinalizedEditsFileName(123,456)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo2/current/"
+ getFinalizedEditsFileName(123,678)));
LogGroup lg = inspector.logGroups.get(123L);
assertEquals(2, lg.logs.size());
try {
lg.planRecovery();
fail("Didn't throw IOE on inconsistent end txids");
} catch (IOException ioe) {
assertTrue(ioe.getMessage().contains("More than one ending txid"));
}
}
/**
* Test case where we have only in-progress logs and need to synchronize
* based on valid length.
*/
@Test
public void testLogGroupRecoveryInProgress() throws IOException {
String paths[] = new String[] {
"/foo1/current/" + getInProgressEditsFileName(123),
"/foo2/current/" + getInProgressEditsFileName(123),
"/foo3/current/" + getInProgressEditsFileName(123)
};
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[0]));
inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[1]));
inspector.inspectDirectory(mockDirectoryWithEditLogs(paths[2]));
// Inject spies to return the valid counts we would like to see
mockLogValidation(inspector, paths[0], 2000);
mockLogValidation(inspector, paths[1], 2000);
mockLogValidation(inspector, paths[2], 1000);
LogGroup lg = inspector.logGroups.get(123L);
assertEquals(3, lg.logs.size());
lg.planRecovery();
// Check that the short one was marked corrupt
assertFalse(lg.logs.get(0).isCorrupt());
assertFalse(lg.logs.get(1).isCorrupt());
assertTrue(lg.logs.get(2).isCorrupt());
// Calling recover should move it aside
EditLogFile badLog = lg.logs.get(2);
Mockito.doNothing().when(badLog).moveAsideCorruptFile();
Mockito.doNothing().when(lg.logs.get(0)).finalizeLog();
Mockito.doNothing().when(lg.logs.get(1)).finalizeLog();
lg.recover();
Mockito.verify(badLog).moveAsideCorruptFile();
Mockito.verify(lg.logs.get(0)).finalizeLog();
Mockito.verify(lg.logs.get(1)).finalizeLog();
}
/**
* Mock out the log at the given path to return a specified number
* of transactions upon validation.
*/
private void mockLogValidation(
FSImageTransactionalStorageInspector inspector,
String path, int numValidTransactions) throws IOException {
for (LogGroup lg : inspector.logGroups.values()) {
List<EditLogFile> logs = lg.logs;
for (int i = 0; i < logs.size(); i++) {
EditLogFile log = logs.get(i);
if (log.getFile().getPath().equals(path)) {
// mock out its validation
EditLogFile spyLog = spy(log);
doReturn(new FSEditLogLoader.EditLogValidation(-1, numValidTransactions))
.when(spyLog).validateLog();
logs.set(i, spyLog);
return;
}
}
}
fail("No log found to mock out at " + path);
}
/**
* Test when edits and image are in separate directories.
*/
@Test
public void testCurrentSplitEditsAndImage() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
StorageDirectory mockImageDir = FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.IMAGE,
false,
"/foo/current/" + getImageFileName(123));
StorageDirectory mockImageDir2 = FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.IMAGE,
false,
"/foo2/current/" + getImageFileName(456));
StorageDirectory mockEditsDir = FSImageTestUtil.mockStorageDirectory(
NameNodeDirType.EDITS,
false,
"/foo3/current/" + getFinalizedEditsFileName(123, 456),
"/foo3/current/" + getInProgressEditsFileName(457));
inspector.inspectDirectory(mockImageDir);
inspector.inspectDirectory(mockEditsDir);
inspector.inspectDirectory(mockImageDir2);
mockLogValidation(inspector,
"/foo3/current/" + getInProgressEditsFileName(457), 2);
assertEquals(2, inspector.foundEditLogs.size());
assertEquals(2, inspector.foundImages.size());
assertTrue(inspector.foundEditLogs.get(1).isInProgress());
assertTrue(inspector.isUpgradeFinalized());
// Check plan
TransactionalLoadPlan plan =
(TransactionalLoadPlan)inspector.createLoadPlan();
FSImageFile pickedImage = plan.image;
assertEquals(456, pickedImage.txId);
assertSame(mockImageDir2, pickedImage.sd);
assertEquals(new File("/foo2/current/" + getImageFileName(456)),
plan.getImageFile());
assertArrayEquals(new File[] {
new File("/foo3/current/" + getInProgressEditsFileName(457))
}, plan.getEditsFiles().toArray(new File[0]));
}
/**
* Test case where an in-progress log is in an earlier name directory
* than a finalized log. Previously, getEditLogManifest wouldn't
* see this log.
*/
@Test
public void testLogManifestInProgressComesFirst() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo1/current/"
+ getFinalizedEditsFileName(2622,2623),
"/foo1/current/"
+ getFinalizedEditsFileName(2624,2625),
"/foo1/current/"
+ getInProgressEditsFileName(2626)));
inspector.inspectDirectory(
mockDirectoryWithEditLogs("/foo2/current/"
+ getFinalizedEditsFileName(2622,2623),
"/foo2/current/"
+ getFinalizedEditsFileName(2624,2625),
"/foo2/current/"
+ getFinalizedEditsFileName(2626,2627),
"/foo2/current/"
+ getFinalizedEditsFileName(2628,2629)));
}
static StorageDirectory mockDirectoryWithEditLogs(String... fileNames) {
return FSImageTestUtil.mockStorageDirectory(NameNodeDirType.EDITS, false, fileNames);
latestImage.getFile());
}
}

View File

@ -19,17 +19,277 @@
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.RandomAccessFile;
import java.io.File;
import java.io.FilenameFilter;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.SecurityUtil;
import org.junit.Test;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Joiner;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
public class TestFileJournalManager {
/**
* Test the normal operation of loading transactions from
* file journal manager. 3 edits directories are setup without any
* failures. Test that we read in the expected number of transactions.
*/
@Test
public void testNormalOperation() throws IOException {
File f1 = new File(TestEditLog.TEST_DIR + "/normtest0");
File f2 = new File(TestEditLog.TEST_DIR + "/normtest1");
File f3 = new File(TestEditLog.TEST_DIR + "/normtest2");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
NNStorage storage = setupEdits(editUris, 5);
long numJournals = 0;
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
numJournals++;
}
assertEquals(3, numJournals);
}
/**
* Test that inprogress files are handled correct. Set up a single
* edits directory. Fail on after the last roll. Then verify that the
* logs have the expected number of transactions.
*/
@Test
public void testInprogressRecovery() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest0");
// abort after the 5th roll
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
5, new AbortSpec(5, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
jm.getNumberOfTransactions(1));
}
/**
* Test a mixture of inprogress files and finalised. Set up 3 edits
* directories and fail the second on the last roll. Verify that reading
* the transactions, reads from the finalised directories.
*/
@Test
public void testInprogressRecoveryMixed() throws IOException {
File f1 = new File(TestEditLog.TEST_DIR + "/mixtest0");
File f2 = new File(TestEditLog.TEST_DIR + "/mixtest1");
File f3 = new File(TestEditLog.TEST_DIR + "/mixtest2");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
// abort after the 5th roll
NNStorage storage = setupEdits(editUris,
5, new AbortSpec(5, 1));
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
sd = dirs.next();
jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
sd = dirs.next();
jm = new FileJournalManager(sd);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1));
}
/**
* Test that FileJournalManager behaves correctly despite inprogress
* files in all its edit log directories. Set up 3 directories and fail
* all on the last roll. Verify that the correct number of transaction
* are then loaded.
*/
@Test
public void testInprogressRecoveryAll() throws IOException {
File f1 = new File(TestEditLog.TEST_DIR + "/failalltest0");
File f2 = new File(TestEditLog.TEST_DIR + "/failalltest1");
File f3 = new File(TestEditLog.TEST_DIR + "/failalltest2");
List<URI> editUris = ImmutableList.of(f1.toURI(), f2.toURI(), f3.toURI());
// abort after the 5th roll
NNStorage storage = setupEdits(editUris, 5,
new AbortSpec(5, 0),
new AbortSpec(5, 1),
new AbortSpec(5, 2));
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
sd = dirs.next();
jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
sd = dirs.next();
jm = new FileJournalManager(sd);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1));
}
/**
* Corrupt an edit log file after the start segment transaction
*/
private void corruptAfterStartSegment(File f) throws IOException {
RandomAccessFile raf = new RandomAccessFile(f, "rw");
raf.seek(0x16); // skip version and first tranaction and a bit of next transaction
for (int i = 0; i < 1000; i++) {
raf.writeInt(0xdeadbeef);
}
raf.close();
}
/**
* Test that we can read from a stream created by FileJournalManager.
* Create a single edits directory, failing it on the final roll.
* Then try loading from the point of the 3rd roll. Verify that we read
* the correct number of transactions from this point.
*/
@Test
public void testReadFromStream() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest1");
// abort after 10th roll
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10, new AbortSpec(10, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1));
long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files
long startingTxId = skippedTxns + 1;
long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId);
long numLoaded = 0;
while (numLoaded < numTransactionsToLoad) {
EditLogInputStream editIn = jm.getInputStream(startingTxId);
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn);
long count = val.getNumTransactions();
editIn.close();
startingTxId += count;
numLoaded += count;
}
assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded);
}
/**
* Try to make a request with a start transaction id which doesn't
* match the start ID of some log segment.
* This should fail as edit logs must currently be treated as indevisable
* units.
*/
@Test(expected=IOException.class)
public void testAskForTransactionsMidfile() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
FileJournalManager jm = new FileJournalManager(sd);
jm.getNumberOfTransactions(2);
}
/**
* Test that we receive the correct number of transactions when we count
* the number of transactions around gaps.
* Set up a single edits directory, with no failures. Delete the 4th logfile.
* Test that getNumberOfTransactions returns the correct number of
* transactions before this gap and after this gap. Also verify that if you
* try to count on the gap that an exception is thrown.
*/
@Test
public void testManyLogsWithGaps() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest3");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 10);
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
final long startGapTxId = 3*TXNS_PER_ROLL + 1;
final long endGapTxId = 4*TXNS_PER_ROLL;
File[] files = new File(f, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith(NNStorage.getFinalizedEditsFileName(startGapTxId, endGapTxId))) {
return true;
}
return false;
}
});
assertEquals(1, files.length);
assertTrue(files[0].delete());
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1));
try {
jm.getNumberOfTransactions(startGapTxId);
fail("Should have thrown an exception by now");
} catch (IOException ioe) {
assertTrue(true);
}
// rolled 10 times so there should be 11 files.
assertEquals(11*TXNS_PER_ROLL - endGapTxId,
jm.getNumberOfTransactions(endGapTxId+1));
}
/**
* Test that we can load an edits directory with a corrupt inprogress file.
* The corrupt inprogress file should be moved to the side.
*/
@Test
public void testManyLogsWithCorruptInprogress() throws IOException {
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest5");
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()), 10, new AbortSpec(10, 0));
StorageDirectory sd = storage.dirIterator(NameNodeDirType.EDITS).next();
File[] files = new File(f, "current").listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
if (name.startsWith("edits_inprogress")) {
return true;
}
return false;
}
});
assertEquals(files.length, 1);
corruptAfterStartSegment(files[0]);
FileJournalManager jm = new FileJournalManager(sd);
assertEquals(10*TXNS_PER_ROLL+1,
jm.getNumberOfTransactions(1));
}
@Test
public void testGetRemoteEditLog() throws IOException {
StorageDirectory sd = FSImageTestUtil.mockStorageDirectory(
@ -58,5 +318,4 @@ private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));
}
}

View File

@ -20,6 +20,7 @@
import junit.framework.TestCase;
import java.io.*;
import java.util.Random;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -80,10 +81,12 @@ void checkImageAndEditsFilesExistence(File dir,
assertTrue("Expect no images in " + dir, ins.foundImages.isEmpty());
}
List<FileJournalManager.EditLogFile> editlogs
= FileJournalManager.matchEditLogs(new File(dir, "current").listFiles());
if (shouldHaveEdits) {
assertTrue("Expect edits in " + dir, ins.foundEditLogs.size() > 0);
assertTrue("Expect edits in " + dir, editlogs.size() > 0);
} else {
assertTrue("Expect no edits in " + dir, ins.foundEditLogs.isEmpty());
assertTrue("Expect no edits in " + dir, editlogs.isEmpty());
}
}