HDFS-2982. Startup performance suffers when there are many edit log segments. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1342042 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-23 20:42:48 +00:00
parent 432c4a6ffc
commit 74dfa8f1f2
25 changed files with 658 additions and 559 deletions

View File

@ -205,6 +205,9 @@ Release 2.0.1-alpha - UNRELEASED
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
segments. (Colin Patrick McCabe via todd)
BUG FIXES BUG FIXES
HDFS-3385. The last block of INodeFileUnderConstruction is not HDFS-3385. The last block of INodeFileUnderConstruction is not

View File

@ -79,12 +79,12 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
} }
@Override @Override
public long getFirstTxId() throws IOException { public long getFirstTxId() {
return firstTxId; return firstTxId;
} }
@Override @Override
public long getLastTxId() throws IOException { public long getLastTxId() {
return lastTxId; return lastTxId;
} }

View File

@ -37,6 +37,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -313,8 +314,7 @@ public class BookKeeperJournalManager implements JournalManager {
} }
// TODO(HA): Handle inProgressOk // TODO(HA): Handle inProgressOk
@Override EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
for (EditLogLedgerMetadata l : getLedgerList()) { for (EditLogLedgerMetadata l : getLedgerList()) {
if (l.getFirstTxId() == fromTxnId) { if (l.getFirstTxId() == fromTxnId) {
@ -328,12 +328,34 @@ public class BookKeeperJournalManager implements JournalManager {
} }
} }
} }
throw new IOException("No ledger for fromTxnId " + fromTxnId + " found."); return null;
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
// NOTE: could probably be rewritten more efficiently
while (true) {
EditLogInputStream elis;
try {
elis = getInputStream(fromTxId, inProgressOk);
} catch (IOException e) {
LOG.error(e);
return;
}
if (elis == null) {
return;
}
streams.add(elis);
if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
return;
}
fromTxId = elis.getLastTxId() + 1;
}
} }
// TODO(HA): Handle inProgressOk // TODO(HA): Handle inProgressOk
@Override long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException { throws IOException {
long count = 0; long count = 0;
long expectedStart = 0; long expectedStart = 0;

View File

@ -34,6 +34,6 @@ public class FSEditLogTestUtil {
public static long countTransactionsInStream(EditLogInputStream in) public static long countTransactionsInStream(EditLogInputStream in)
throws IOException { throws IOException {
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in); FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
return validation.getNumTransactions(); return (validation.getEndTxId() - in.getFirstTxId()) + 1;
} }
} }

View File

@ -207,7 +207,7 @@ public class BackupImage extends FSImage {
int logVersion = storage.getLayoutVersion(); int logVersion = storage.getLayoutVersion();
backupInputStream.setBytes(data, logVersion); backupInputStream.setBytes(data, logVersion);
long numTxnsAdvanced = logLoader.loadEditRecords(logVersion, long numTxnsAdvanced = logLoader.loadEditRecords(
backupInputStream, true, lastAppliedTxId + 1, null); backupInputStream, true, lastAppliedTxId + 1, null);
if (numTxnsAdvanced != numTxns) { if (numTxnsAdvanced != numTxns) {
throw new IOException("Batch of txns starting at txnid " + throw new IOException("Batch of txns starting at txnid " +

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -60,19 +61,10 @@ class BackupJournalManager implements JournalManager {
} }
@Override @Override
public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) public void selectInputStreams(Collection<EditLogInputStream> streams,
throws IOException, CorruptionException { long fromTxnId, boolean inProgressOk) {
// This JournalManager is never used for input. Therefore it cannot // This JournalManager is never used for input. Therefore it cannot
// return any transactions // return any transactions
return 0;
}
@Override
public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException {
// This JournalManager is never used for input. Therefore it cannot
// return any transactions
throw new IOException("Unsupported operation");
} }
@Override @Override

View File

@ -129,12 +129,12 @@ class EditLogBackupInputStream extends EditLogInputStream {
} }
@Override @Override
public long getFirstTxId() throws IOException { public long getFirstTxId() {
return HdfsConstants.INVALID_TXID; return HdfsConstants.INVALID_TXID;
} }
@Override @Override
public long getLastTxId() throws IOException { public long getLastTxId() {
return HdfsConstants.INVALID_TXID; return HdfsConstants.INVALID_TXID;
} }

View File

@ -24,10 +24,14 @@ import java.io.IOException;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.DataInputStream; import java.io.DataInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
/** /**
* An implementation of the abstract class {@link EditLogInputStream}, which * An implementation of the abstract class {@link EditLogInputStream}, which
@ -35,13 +39,21 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
public class EditLogFileInputStream extends EditLogInputStream { public class EditLogFileInputStream extends EditLogInputStream {
private final File file; private final File file;
private final FileInputStream fStream; private final long firstTxId;
final private long firstTxId; private final long lastTxId;
final private long lastTxId;
private final int logVersion;
private final FSEditLogOp.Reader reader;
private final FSEditLogLoader.PositionTrackingInputStream tracker;
private final boolean isInProgress; private final boolean isInProgress;
static private enum State {
UNINIT,
OPEN,
CLOSED
}
private State state = State.UNINIT;
private FileInputStream fStream = null;
private int logVersion = 0;
private FSEditLogOp.Reader reader = null;
private FSEditLogLoader.PositionTrackingInputStream tracker = null;
private DataInputStream dataIn = null;
static final Log LOG = LogFactory.getLog(EditLogInputStream.class);
/** /**
* Open an EditLogInputStream for the given file. * Open an EditLogInputStream for the given file.
@ -68,34 +80,43 @@ public class EditLogFileInputStream extends EditLogInputStream {
* header * header
*/ */
public EditLogFileInputStream(File name, long firstTxId, long lastTxId, public EditLogFileInputStream(File name, long firstTxId, long lastTxId,
boolean isInProgress) boolean isInProgress) {
throws LogHeaderCorruptException, IOException { this.file = name;
file = name;
fStream = new FileInputStream(name);
BufferedInputStream bin = new BufferedInputStream(fStream);
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
try {
logVersion = readLogVersion(in);
} catch (EOFException eofe) {
throw new LogHeaderCorruptException("No header found in log");
}
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
this.firstTxId = firstTxId; this.firstTxId = firstTxId;
this.lastTxId = lastTxId; this.lastTxId = lastTxId;
this.isInProgress = isInProgress; this.isInProgress = isInProgress;
} }
private void init() throws LogHeaderCorruptException, IOException {
Preconditions.checkState(state == State.UNINIT);
BufferedInputStream bin = null;
try {
fStream = new FileInputStream(file);
bin = new BufferedInputStream(fStream);
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
dataIn = new DataInputStream(tracker);
try {
logVersion = readLogVersion(dataIn);
} catch (EOFException eofe) {
throw new LogHeaderCorruptException("No header found in log");
}
reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion);
state = State.OPEN;
} finally {
if (reader == null) {
IOUtils.cleanup(LOG, dataIn, tracker, bin, fStream);
state = State.CLOSED;
}
}
}
@Override @Override
public long getFirstTxId() throws IOException { public long getFirstTxId() {
return firstTxId; return firstTxId;
} }
@Override @Override
public long getLastTxId() throws IOException { public long getLastTxId() {
return lastTxId; return lastTxId;
} }
@ -104,9 +125,23 @@ public class EditLogFileInputStream extends EditLogInputStream {
return file.getPath(); return file.getPath();
} }
@Override private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
protected FSEditLogOp nextOp() throws IOException { FSEditLogOp op = null;
FSEditLogOp op = reader.readOp(false); switch (state) {
case UNINIT:
try {
init();
} catch (Throwable e) {
LOG.error("caught exception initializing " + this, e);
if (skipBrokenEdits) {
return null;
}
Throwables.propagateIfPossible(e, IOException.class);
}
Preconditions.checkState(state != State.UNINIT);
return nextOpImpl(skipBrokenEdits);
case OPEN:
op = reader.readOp(skipBrokenEdits);
if ((op != null) && (op.hasTransactionId())) { if ((op != null) && (op.hasTransactionId())) {
long txId = op.getTransactionId(); long txId = op.getTransactionId();
if ((txId >= lastTxId) && if ((txId >= lastTxId) &&
@ -127,38 +162,58 @@ public class EditLogFileInputStream extends EditLogInputStream {
// //
long skipAmt = file.length() - tracker.getPos(); long skipAmt = file.length() - tracker.getPos();
if (skipAmt > 0) { if (skipAmt > 0) {
FSImage.LOG.warn("skipping " + skipAmt + " bytes at the end " + LOG.warn("skipping " + skipAmt + " bytes at the end " +
"of edit log '" + getName() + "': reached txid " + txId + "of edit log '" + getName() + "': reached txid " + txId +
" out of " + lastTxId); " out of " + lastTxId);
tracker.skip(skipAmt); tracker.skip(skipAmt);
} }
} }
} }
break;
case CLOSED:
break; // return null
}
return op; return op;
} }
@Override
protected FSEditLogOp nextOp() throws IOException {
return nextOpImpl(false);
}
@Override @Override
protected FSEditLogOp nextValidOp() { protected FSEditLogOp nextValidOp() {
try { try {
return reader.readOp(true); return nextOpImpl(true);
} catch (IOException e) { } catch (Throwable e) {
LOG.error("nextValidOp: got exception while reading " + this, e);
return null; return null;
} }
} }
@Override @Override
public int getVersion() throws IOException { public int getVersion() throws IOException {
if (state == State.UNINIT) {
init();
}
return logVersion; return logVersion;
} }
@Override @Override
public long getPosition() { public long getPosition() {
if (state == State.OPEN) {
return tracker.getPos(); return tracker.getPos();
} else {
return 0;
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
fStream.close(); if (state == State.OPEN) {
dataIn.close();
}
state = State.CLOSED;
} }
@Override @Override
@ -181,12 +236,12 @@ public class EditLogFileInputStream extends EditLogInputStream {
EditLogFileInputStream in; EditLogFileInputStream in;
try { try {
in = new EditLogFileInputStream(file); in = new EditLogFileInputStream(file);
} catch (LogHeaderCorruptException corrupt) { in.getVersion(); // causes us to read the header
} catch (LogHeaderCorruptException e) {
// If the header is malformed or the wrong value, this indicates a corruption // If the header is malformed or the wrong value, this indicates a corruption
FSImage.LOG.warn("Log at " + file + " has no valid header", LOG.warn("Log file " + file + " has no valid header", e);
corrupt);
return new FSEditLogLoader.EditLogValidation(0, return new FSEditLogLoader.EditLogValidation(0,
HdfsConstants.INVALID_TXID, HdfsConstants.INVALID_TXID, true); HdfsConstants.INVALID_TXID, true);
} }
try { try {

View File

@ -45,12 +45,12 @@ public abstract class EditLogInputStream implements Closeable {
/** /**
* @return the first transaction which will be found in this stream * @return the first transaction which will be found in this stream
*/ */
public abstract long getFirstTxId() throws IOException; public abstract long getFirstTxId();
/** /**
* @return the last transaction which will be found in this stream * @return the last transaction which will be found in this stream
*/ */
public abstract long getLastTxId() throws IOException; public abstract long getLastTxId();
/** /**
@ -80,7 +80,7 @@ public abstract class EditLogInputStream implements Closeable {
* *
* This method can be used to skip over corrupted sections of edit logs. * This method can be used to skip over corrupted sections of edit logs.
*/ */
public void resync() throws IOException { public void resync() {
if (cachedOp != null) { if (cachedOp != null) {
return; return;
} }
@ -109,7 +109,7 @@ public abstract class EditLogInputStream implements Closeable {
// error recovery will want to override this. // error recovery will want to override this.
try { try {
return nextOp(); return nextOp();
} catch (IOException e) { } catch (Throwable e) {
return null; return null;
} }
} }

View File

@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -267,13 +268,14 @@ public class FSEditLog {
long segmentTxId = getLastWrittenTxId() + 1; long segmentTxId = getLastWrittenTxId() + 1;
// Safety check: we should never start a segment if there are // Safety check: we should never start a segment if there are
// newer txids readable. // newer txids readable.
EditLogInputStream s = journalSet.getInputStream(segmentTxId, true); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
try { journalSet.selectInputStreams(streams, segmentTxId, true);
Preconditions.checkState(s == null, if (!streams.isEmpty()) {
"Cannot start writing at txid %s when there is a stream " + String error = String.format("Cannot start writing at txid %s " +
"available for read: %s", segmentTxId, s); "when there is a stream available for read: %s",
} finally { segmentTxId, streams.get(0));
IOUtils.closeStream(s); IOUtils.cleanup(LOG, streams.toArray(new EditLogInputStream[0]));
throw new IllegalStateException(error);
} }
startLogSegmentAndWriteHeaderTxn(segmentTxId); startLogSegmentAndWriteHeaderTxn(segmentTxId);
@ -1137,9 +1139,9 @@ public class FSEditLog {
} }
} }
Collection<EditLogInputStream> selectInputStreams(long fromTxId, public Collection<EditLogInputStream> selectInputStreams(
long toAtLeastTxId) throws IOException { long fromTxId, long toAtLeastTxId) throws IOException {
return selectInputStreams(fromTxId, toAtLeastTxId, true); return selectInputStreams(fromTxId, toAtLeastTxId, null, true);
} }
/** /**
@ -1149,26 +1151,72 @@ public class FSEditLog {
* @param toAtLeast the selected streams must contain this transaction * @param toAtLeast the selected streams must contain this transaction
* @param inProgessOk set to true if in-progress streams are OK * @param inProgessOk set to true if in-progress streams are OK
*/ */
public synchronized Collection<EditLogInputStream> selectInputStreams(long fromTxId, public synchronized Collection<EditLogInputStream> selectInputStreams(
long toAtLeastTxId, boolean inProgressOk) throws IOException { long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
EditLogInputStream stream = journalSet.getInputStream(fromTxId, inProgressOk); journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
while (stream != null) {
streams.add(stream);
// We're now looking for a higher range, so reset the fromTxId
fromTxId = stream.getLastTxId() + 1;
stream = journalSet.getInputStream(fromTxId, inProgressOk);
}
if (fromTxId <= toAtLeastTxId) { try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
} catch (IOException e) {
if (recovery != null) {
// If recovery mode is enabled, continue loading even if we know we
// can't load up to toAtLeastTxId.
LOG.error(e);
} else {
closeAllStreams(streams); closeAllStreams(streams);
throw new IOException(String.format("Gap in transactions. Expected to " throw e;
+ "be able to read up until at least txid %d but unable to find any " }
+ "edit logs containing txid %d", toAtLeastTxId, fromTxId)); }
// This code will go away as soon as RedundantEditLogInputStream is
// introduced. (HDFS-3049)
try {
if (!streams.isEmpty()) {
streams.get(0).skipUntil(fromTxId);
}
} catch (IOException e) {
// We don't want to throw an exception from here, because that would make
// recovery impossible even if the user requested it. An exception will
// be thrown later, when we don't read the starting txid we expect.
LOG.error("error skipping until transaction " + fromTxId, e);
} }
return streams; return streams;
} }
/**
* Check for gaps in the edit log input stream list.
* Note: we're assuming that the list is sorted and that txid ranges don't
* overlap. This could be done better and with more generality with an
* interval tree.
*/
private void checkForGaps(List<EditLogInputStream> streams, long fromTxId,
long toAtLeastTxId, boolean inProgressOk) throws IOException {
Iterator<EditLogInputStream> iter = streams.iterator();
long txId = fromTxId;
while (true) {
if (txId > toAtLeastTxId) return;
if (!iter.hasNext()) break;
EditLogInputStream elis = iter.next();
if (elis.getFirstTxId() > txId) break;
long next = elis.getLastTxId();
if (next == HdfsConstants.INVALID_TXID) {
if (!inProgressOk) {
throw new RuntimeException("inProgressOk = false, but " +
"selectInputStreams returned an in-progress edit " +
"log input stream (" + elis + ")");
}
// We don't know where the in-progress stream ends.
// It could certainly go all the way up to toAtLeastTxId.
return;
}
txId = next + 1;
}
throw new IOException(String.format("Gap in transactions. Expected to "
+ "be able to read up until at least txid %d but unable to find any "
+ "edit logs containing txid %d", toAtLeastTxId, txId));
}
/** /**
* Close all the streams in a collection * Close all the streams in a collection
* @param streams The list of streams to close * @param streams The list of streams to close

View File

@ -85,12 +85,10 @@ public class FSEditLogLoader {
*/ */
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId, long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
MetaRecoveryContext recovery) throws IOException { MetaRecoveryContext recovery) throws IOException {
int logVersion = edits.getVersion();
fsNamesys.writeLock(); fsNamesys.writeLock();
try { try {
long startTime = now(); long startTime = now();
long numEdits = loadEditRecords(logVersion, edits, false, long numEdits = loadEditRecords(edits, false,
expectedStartingTxId, recovery); expectedStartingTxId, recovery);
FSImage.LOG.info("Edits file " + edits.getName() FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits + " of size " + edits.length() + " edits # " + numEdits
@ -102,7 +100,7 @@ public class FSEditLogLoader {
} }
} }
long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId, MetaRecoveryContext recovery) long expectedStartingTxId, MetaRecoveryContext recovery)
throws IOException { throws IOException {
FSDirectory fsDir = fsNamesys.dir; FSDirectory fsDir = fsNamesys.dir;
@ -141,7 +139,7 @@ public class FSEditLogLoader {
} }
} catch (Throwable e) { } catch (Throwable e) {
// Handle a problem with our input // Handle a problem with our input
check203UpgradeFailure(logVersion, e); check203UpgradeFailure(in.getVersion(), e);
String errorMessage = String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId); formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
FSImage.LOG.error(errorMessage, e); FSImage.LOG.error(errorMessage, e);
@ -158,7 +156,7 @@ public class FSEditLogLoader {
} }
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
in.getPosition(); in.getPosition();
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { if (op.hasTransactionId()) {
if (op.getTransactionId() > expectedTxId) { if (op.getTransactionId() > expectedTxId) {
MetaRecoveryContext.editLogLoaderPrompt("There appears " + MetaRecoveryContext.editLogLoaderPrompt("There appears " +
"to be a gap in the edit log. We expected txid " + "to be a gap in the edit log. We expected txid " +
@ -175,7 +173,7 @@ public class FSEditLogLoader {
} }
} }
try { try {
applyEditLogOp(op, fsDir, logVersion); applyEditLogOp(op, fsDir, in.getVersion());
} catch (Throwable e) { } catch (Throwable e) {
LOG.error("Encountered exception on operation " + op, e); LOG.error("Encountered exception on operation " + op, e);
MetaRecoveryContext.editLogLoaderPrompt("Failed to " + MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
@ -192,7 +190,7 @@ public class FSEditLogLoader {
expectedTxId = lastAppliedTxId = expectedStartingTxId; expectedTxId = lastAppliedTxId = expectedStartingTxId;
} }
// log progress // log progress
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { if (op.hasTransactionId()) {
long now = now(); long now = now();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
int percent = Math.round((float)lastAppliedTxId / numTxns * 100); int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
@ -647,76 +645,57 @@ public class FSEditLogLoader {
} }
/** /**
* Return the number of valid transactions in the stream. If the stream is * Find the last valid transaction ID in the stream.
* truncated during the header, returns a value indicating that there are * If there are invalid or corrupt transactions in the middle of the stream,
* 0 valid transactions. This reads through the stream but does not close * validateEditLog will skip over them.
* it. * This reads through the stream but does not close it.
*
* @throws IOException if the stream cannot be read due to an IO error (eg * @throws IOException if the stream cannot be read due to an IO error (eg
* if the log does not exist) * if the log does not exist)
*/ */
static EditLogValidation validateEditLog(EditLogInputStream in) { static EditLogValidation validateEditLog(EditLogInputStream in) {
long lastPos = 0; long lastPos = 0;
long firstTxId = HdfsConstants.INVALID_TXID;
long lastTxId = HdfsConstants.INVALID_TXID; long lastTxId = HdfsConstants.INVALID_TXID;
long numValid = 0; long numValid = 0;
try {
FSEditLogOp op = null; FSEditLogOp op = null;
while (true) { while (true) {
lastPos = in.getPosition(); lastPos = in.getPosition();
try {
if ((op = in.readOp()) == null) { if ((op = in.readOp()) == null) {
break; break;
} }
if (firstTxId == HdfsConstants.INVALID_TXID) { } catch (Throwable t) {
firstTxId = op.getTransactionId(); FSImage.LOG.warn("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length." +
"Position was " + lastPos, t);
break;
} }
if (lastTxId == HdfsConstants.INVALID_TXID if (lastTxId == HdfsConstants.INVALID_TXID
|| op.getTransactionId() == lastTxId + 1) { || op.getTransactionId() > lastTxId) {
lastTxId = op.getTransactionId(); lastTxId = op.getTransactionId();
} else {
FSImage.LOG.error("Out of order txid found. Found " +
op.getTransactionId() + ", expected " + (lastTxId + 1));
break;
} }
numValid++; numValid++;
} }
} catch (Throwable t) { return new EditLogValidation(lastPos, lastTxId, false);
// Catch Throwable and not just IOE, since bad edits may generate
// NumberFormatExceptions, AssertionErrors, OutOfMemoryErrors, etc.
FSImage.LOG.debug("Caught exception after reading " + numValid +
" ops from " + in + " while determining its valid length.", t);
}
return new EditLogValidation(lastPos, firstTxId, lastTxId, false);
} }
static class EditLogValidation { static class EditLogValidation {
private final long validLength; private final long validLength;
private final long startTxId;
private final long endTxId; private final long endTxId;
private final boolean corruptionDetected; private final boolean hasCorruptHeader;
EditLogValidation(long validLength, long startTxId, long endTxId, EditLogValidation(long validLength, long endTxId,
boolean corruptionDetected) { boolean hasCorruptHeader) {
this.validLength = validLength; this.validLength = validLength;
this.startTxId = startTxId;
this.endTxId = endTxId; this.endTxId = endTxId;
this.corruptionDetected = corruptionDetected; this.hasCorruptHeader = hasCorruptHeader;
} }
long getValidLength() { return validLength; } long getValidLength() { return validLength; }
long getStartTxId() { return startTxId; }
long getEndTxId() { return endTxId; } long getEndTxId() { return endTxId; }
long getNumTransactions() { boolean hasCorruptHeader() { return hasCorruptHeader; }
if (endTxId == HdfsConstants.INVALID_TXID
|| startTxId == HdfsConstants.INVALID_TXID) {
return 0;
}
return (endTxId - startTxId) + 1;
}
boolean hasCorruptHeader() { return corruptionDetected; }
} }
/** /**

View File

@ -559,7 +559,7 @@ public class FSImage implements Closeable {
/** /**
* Choose latest image from one of the directories, * Choose latest image from one of the directories,
* load it and merge with the edits from that directory. * load it and merge with the edits.
* *
* Saving and loading fsimage should never trigger symlink resolution. * Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks * The paths that are persisted do not have *intermediate* symlinks
@ -595,7 +595,7 @@ public class FSImage implements Closeable {
// OK to not be able to read all of edits right now. // OK to not be able to read all of edits right now.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0; long toAtLeastTxId = editLog.isOpenForWrite() ? inspector.getMaxSeenTxId() : 0;
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
toAtLeastTxId, false); toAtLeastTxId, recovery, false);
} else { } else {
editStreams = FSImagePreTransactionalStorageInspector editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage); .getEditLogStreams(storage);
@ -603,7 +603,10 @@ public class FSImage implements Closeable {
LOG.debug("Planning to load image :\n" + imageFile); LOG.debug("Planning to load image :\n" + imageFile);
for (EditLogInputStream l : editStreams) { for (EditLogInputStream l : editStreams) {
LOG.debug("\t Planning to load edit stream: " + l); LOG.debug("Planning to load edit log stream: " + l);
}
if (!editStreams.iterator().hasNext()) {
LOG.info("No edit log streams selected.");
} }
try { try {

View File

@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Comparator; import java.util.Comparator;
import java.util.Collections; import java.util.Collections;
@ -212,90 +213,46 @@ class FileJournalManager implements JournalManager {
} }
@Override @Override
synchronized public EditLogInputStream getInputStream(long fromTxId, synchronized public void selectInputStreams(
boolean inProgressOk) throws IOException { Collection<EditLogInputStream> streams, long fromTxId,
for (EditLogFile elf : getLogFiles(fromTxId)) { boolean inProgressOk) {
if (elf.containsTxId(fromTxId)) { List<EditLogFile> elfs;
if (!inProgressOk && elf.isInProgress()) { try {
elfs = matchEditLogs(sd.getCurrentDir());
} catch (IOException e) {
LOG.error("error listing files in " + this + ". " +
"Skipping all edit logs in this directory.", e);
return;
}
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
"from among " + elfs.size() + " candidate file(s)");
for (EditLogFile elf : elfs) {
if (elf.lastTxId < fromTxId) {
LOG.debug("passing over " + elf + " because it ends at " +
elf.lastTxId + ", but we only care about transactions " +
"as new as " + fromTxId);
continue; continue;
} }
if (elf.isInProgress()) { if (elf.isInProgress()) {
elf.validateLog(); if (!inProgressOk) {
LOG.debug("passing over " + elf + " because it is in progress " +
"and we are ignoring in-progress logs.");
continue;
}
try {
elf.validateLog();
} catch (IOException e) {
LOG.error("got IOException while trying to validate header of " +
elf + ". Skipping.", e);
continue;
} }
if (LOG.isTraceEnabled()) {
LOG.trace("Returning edit stream reading from " + elf);
} }
EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(), EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress()); elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
long transactionsToSkip = fromTxId - elf.getFirstTxId(); LOG.debug("selecting edit log stream " + elf);
if (transactionsToSkip > 0) { streams.add(elfis);
LOG.info(String.format("Log begins at txid %d, but requested start "
+ "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
transactionsToSkip));
} }
if (elfis.skipUntil(fromTxId) == false) {
throw new IOException("failed to advance input stream to txid " +
fromTxId);
}
return elfis;
}
}
throw new IOException("Cannot find editlog file containing " + fromTxId);
}
@Override
public long getNumberOfTransactions(long fromTxId, boolean inProgressOk)
throws IOException, CorruptionException {
long numTxns = 0L;
for (EditLogFile elf : getLogFiles(fromTxId)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Counting " + elf);
}
if (elf.getFirstTxId() > fromTxId) { // there must be a gap
LOG.warn("Gap in transactions in " + sd.getRoot() + ". Gap is "
+ fromTxId + " - " + (elf.getFirstTxId() - 1));
break;
} else if (elf.containsTxId(fromTxId)) {
if (!inProgressOk && elf.isInProgress()) {
break;
}
if (elf.isInProgress()) {
elf.validateLog();
}
if (elf.hasCorruptHeader()) {
break;
}
numTxns += elf.getLastTxId() + 1 - fromTxId;
fromTxId = elf.getLastTxId() + 1;
if (elf.isInProgress()) {
break;
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Journal " + this + " has " + numTxns
+ " txns from " + fromTxId);
}
long max = findMaxTransaction(inProgressOk);
// fromTxId should be greater than max, as it points to the next
// transaction we should expect to find. If it is less than or equal
// to max, it means that a transaction with txid == max has not been found
if (numTxns == 0 && fromTxId <= max) {
String error = String.format("Gap in transactions, max txnid is %d"
+ ", 0 txns from %d", max, fromTxId);
LOG.error(error);
throw new CorruptionException(error);
}
return numTxns;
} }
@Override @Override
@ -326,19 +283,16 @@ class FileJournalManager implements JournalManager {
throw new CorruptionException("In-progress edit log file is corrupt: " throw new CorruptionException("In-progress edit log file is corrupt: "
+ elf); + elf);
} }
if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
// If the file has a valid header (isn't corrupt) but contains no // If the file has a valid header (isn't corrupt) but contains no
// transactions, we likely just crashed after opening the file and // transactions, we likely just crashed after opening the file and
// writing the header, but before syncing any transactions. Safe to // writing the header, but before syncing any transactions. Safe to
// delete the file. // delete the file.
if (elf.getNumTransactions() == 0) { LOG.info("Moving aside edit log file that seems to have zero " +
LOG.info("Deleting edit log file with zero transactions " + elf); "transactions " + elf);
if (!elf.getFile().delete()) { elf.moveAsideEmptyFile();
throw new IOException("Unable to delete " + elf.getFile());
}
continue; continue;
} }
finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId()); finalizeLogSegment(elf.getFirstTxId(), elf.getLastTxId());
} }
} }
@ -361,39 +315,6 @@ class FileJournalManager implements JournalManager {
return logFiles; return logFiles;
} }
/**
* Find the maximum transaction in the journal.
*/
private long findMaxTransaction(boolean inProgressOk)
throws IOException {
boolean considerSeenTxId = true;
long seenTxId = NNStorage.readTransactionIdFile(sd);
long maxSeenTransaction = 0;
for (EditLogFile elf : getLogFiles(0)) {
if (elf.isInProgress() && !inProgressOk) {
if (elf.getFirstTxId() != HdfsConstants.INVALID_TXID &&
elf.getFirstTxId() <= seenTxId) {
// don't look at the seen_txid file if in-progress logs are not to be
// examined, and the value in seen_txid falls within the in-progress
// segment.
considerSeenTxId = false;
}
continue;
}
if (elf.isInProgress()) {
maxSeenTransaction = Math.max(elf.getFirstTxId(), maxSeenTransaction);
elf.validateLog();
}
maxSeenTransaction = Math.max(elf.getLastTxId(), maxSeenTransaction);
}
if (considerSeenTxId) {
return Math.max(maxSeenTransaction, seenTxId);
} else {
return maxSeenTransaction;
}
}
@Override @Override
public String toString() { public String toString() {
return String.format("FileJournalManager(root=%s)", sd.getRoot()); return String.format("FileJournalManager(root=%s)", sd.getRoot());
@ -406,7 +327,6 @@ class FileJournalManager implements JournalManager {
private File file; private File file;
private final long firstTxId; private final long firstTxId;
private long lastTxId; private long lastTxId;
private long numTx = -1;
private boolean hasCorruptHeader = false; private boolean hasCorruptHeader = false;
private final boolean isInProgress; private final boolean isInProgress;
@ -454,21 +374,16 @@ class FileJournalManager implements JournalManager {
} }
/** /**
* Count the number of valid transactions in a log. * Find out where the edit log ends.
* This will update the lastTxId of the EditLogFile or * This will update the lastTxId of the EditLogFile or
* mark it as corrupt if it is. * mark it as corrupt if it is.
*/ */
void validateLog() throws IOException { void validateLog() throws IOException {
EditLogValidation val = EditLogFileInputStream.validateEditLog(file); EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
this.numTx = val.getNumTransactions();
this.lastTxId = val.getEndTxId(); this.lastTxId = val.getEndTxId();
this.hasCorruptHeader = val.hasCorruptHeader(); this.hasCorruptHeader = val.hasCorruptHeader();
} }
long getNumTransactions() {
return numTx;
}
boolean isInProgress() { boolean isInProgress() {
return isInProgress; return isInProgress;
} }
@ -483,13 +398,21 @@ class FileJournalManager implements JournalManager {
void moveAsideCorruptFile() throws IOException { void moveAsideCorruptFile() throws IOException {
assert hasCorruptHeader; assert hasCorruptHeader;
renameSelf(".corrupt");
}
void moveAsideEmptyFile() throws IOException {
assert lastTxId == HdfsConstants.INVALID_TXID;
renameSelf(".empty");
}
private void renameSelf(String newSuffix) throws IOException {
File src = file; File src = file;
File dst = new File(src.getParent(), src.getName() + ".corrupt"); File dst = new File(src.getParent(), src.getName() + newSuffix);
boolean success = src.renameTo(dst); boolean success = src.renameTo(dst);
if (!success) { if (!success) {
throw new IOException( throw new IOException(
"Couldn't rename corrupt log " + src + " to " + dst); "Couldn't rename log " + src + " to " + dst);
} }
file = dst; file = dst;
} }
@ -497,9 +420,9 @@ class FileJournalManager implements JournalManager {
@Override @Override
public String toString() { public String toString() {
return String.format("EditLogFile(file=%s,first=%019d,last=%019d," return String.format("EditLogFile(file=%s,first=%019d,last=%019d,"
+"inProgress=%b,hasCorruptHeader=%b,numTx=%d)", +"inProgress=%b,hasCorruptHeader=%b)",
file.toString(), firstTxId, lastTxId, file.toString(), firstTxId, lastTxId,
isInProgress(), hasCorruptHeader, numTx); isInProgress(), hasCorruptHeader);
} }
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -46,26 +47,17 @@ public interface JournalManager extends Closeable {
void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException; void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException;
/** /**
* Get the input stream starting with fromTxnId from this journal manager * Get a list of edit log input streams. The list will start with the
* stream that contains fromTxnId, and continue until the end of the journal
* being managed.
*
* @param fromTxnId the first transaction id we want to read * @param fromTxnId the first transaction id we want to read
* @param inProgressOk whether or not in-progress streams should be returned * @param inProgressOk whether or not in-progress streams should be returned
* @return the stream starting with transaction fromTxnId
* @throws IOException if a stream cannot be found.
*/
EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk)
throws IOException;
/**
* Get the number of transaction contiguously available from fromTxnId.
* *
* @param fromTxnId Transaction id to count from * @return a list of streams
* @param inProgressOk whether or not in-progress streams should be counted
* @return The number of transactions available from fromTxnId
* @throws IOException if the journal cannot be read.
* @throws CorruptionException if there is a gap in the journal at fromTxnId.
*/ */
long getNumberOfTransactions(long fromTxnId, boolean inProgressOk) void selectInputStreams(Collection<EditLogInputStream> streams,
throws IOException, CorruptionException; long fromTxnId, boolean inProgressOk);
/** /**
* Set the amount of memory that this stream should use to buffer edits * Set the amount of memory that this stream should use to buffer edits

View File

@ -19,7 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
@ -31,11 +34,13 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps; import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
/** /**
* Manages a collection of Journals. None of the methods are synchronized, it is * Manages a collection of Journals. None of the methods are synchronized, it is
@ -47,6 +52,17 @@ public class JournalSet implements JournalManager {
static final Log LOG = LogFactory.getLog(FSEditLog.class); static final Log LOG = LogFactory.getLog(FSEditLog.class);
static final public Comparator<EditLogInputStream>
EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start().
compare(a.getFirstTxId(), b.getFirstTxId()).
compare(b.getLastTxId(), a.getLastTxId()).
result();
}
};
/** /**
* Container for a JournalManager paired with its currently * Container for a JournalManager paired with its currently
* active stream. * active stream.
@ -194,75 +210,57 @@ public class JournalSet implements JournalManager {
}, "close journal"); }, "close journal");
} }
/** /**
* Find the best editlog input stream to read from txid. * In this function, we get a bunch of streams from all of our JournalManager
* If a journal throws an CorruptionException while reading from a txn id, * objects. Then we add these to the collection one by one.
* it means that it has more transactions, but can't find any from fromTxId.
* If this is the case and no other journal has transactions, we should throw
* an exception as it means more transactions exist, we just can't load them.
* *
* @param fromTxnId Transaction id to start from. * @param streams The collection to add the streams to. It may or
* @return A edit log input stream with tranactions fromTxId * may not be sorted-- this is up to the caller.
* or null if no more exist * @param fromTxId The transaction ID to start looking for streams at
* @param inProgressOk Should we consider unfinalized streams?
*/ */
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) public void selectInputStreams(Collection<EditLogInputStream> streams,
throws IOException { long fromTxId, boolean inProgressOk) {
JournalManager bestjm = null; final TreeMultiset<EditLogInputStream> allStreams =
long bestjmNumTxns = 0; TreeMultiset.create(EDIT_LOG_INPUT_STREAM_COMPARATOR);
CorruptionException corruption = null;
for (JournalAndStream jas : journals) { for (JournalAndStream jas : journals) {
if (jas.isDisabled()) continue;
JournalManager candidate = jas.getManager();
long candidateNumTxns = 0;
try {
candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId,
inProgressOk);
} catch (CorruptionException ce) {
corruption = ce;
} catch (IOException ioe) {
LOG.warn("Unable to read input streams from JournalManager " + candidate,
ioe);
continue; // error reading disk, just skip
}
if (candidateNumTxns > bestjmNumTxns) {
bestjm = candidate;
bestjmNumTxns = candidateNumTxns;
}
}
if (bestjm == null) {
if (corruption != null) {
throw new IOException("No non-corrupt logs for txid "
+ fromTxnId, corruption);
} else {
return null;
}
}
return bestjm.getInputStream(fromTxnId, inProgressOk);
}
@Override
public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException {
long num = 0;
for (JournalAndStream jas: journals) {
if (jas.isDisabled()) { if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled"); LOG.info("Skipping jas " + jas + " since it's disabled");
continue; continue;
}
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
}
// We want to group together all the streams that start on the same start
// transaction ID. To do this, we maintain an accumulator (acc) of all
// the streams we've seen at a given start transaction ID. When we see a
// higher start transaction ID, we select a stream from the accumulator and
// clear it. Then we begin accumulating streams with the new, higher start
// transaction ID.
LinkedList<EditLogInputStream> acc =
new LinkedList<EditLogInputStream>();
for (EditLogInputStream elis : allStreams) {
if (acc.isEmpty()) {
acc.add(elis);
} else { } else {
long newNum = jas.getManager().getNumberOfTransactions(fromTxnId, long accFirstTxId = acc.get(0).getFirstTxId();
inProgressOk); if (accFirstTxId == elis.getFirstTxId()) {
if (newNum > num) { acc.add(elis);
num = newNum; } else if (accFirstTxId < elis.getFirstTxId()) {
streams.add(acc.get(0));
acc.clear();
acc.add(elis);
} else if (accFirstTxId > elis.getFirstTxId()) {
throw new RuntimeException("sorted set invariants violated! " +
"Got stream with first txid " + elis.getFirstTxId() +
", but the last firstTxId was " + accFirstTxId);
} }
} }
} }
return num; if (!acc.isEmpty()) {
streams.add(acc.get(0));
acc.clear();
}
} }
/** /**

View File

@ -225,7 +225,7 @@ public class BootstrapStandby implements Tool, Configurable {
try { try {
Collection<EditLogInputStream> streams = Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams( image.getEditLog().selectInputStreams(
firstTxIdInLogs, curTxIdOnOtherNode, true); firstTxIdInLogs, curTxIdOnOtherNode, null, true);
for (EditLogInputStream stream : streams) { for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream); IOUtils.closeStream(stream);
} }

View File

@ -201,7 +201,7 @@ public class EditLogTailer {
} }
Collection<EditLogInputStream> streams; Collection<EditLogInputStream> streams;
try { try {
streams = editLog.selectInputStreams(lastTxnId + 1, 0, false); streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);
} catch (IOException ioe) { } catch (IOException ioe) {
// This is acceptable. If we try to tail edits in the middle of an edits // This is acceptable. If we try to tail edits in the middle of an edits
// log roll, i.e. the last one has been finalized but the new inprogress // log roll, i.e. the last one has been finalized but the new inprogress

View File

@ -248,7 +248,7 @@ public class TestDFSRollback extends TestCase {
baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous"); baseDirs = UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "previous");
deleteMatchingFiles(baseDirs, "edits.*"); deleteMatchingFiles(baseDirs, "edits.*");
startNameNodeShouldFail(StartupOption.ROLLBACK, startNameNodeShouldFail(StartupOption.ROLLBACK,
"No non-corrupt logs for txid "); "Gap in transactions");
UpgradeUtilities.createEmptyDirs(nameNodeDirs); UpgradeUtilities.createEmptyDirs(nameNodeDirs);
log("NameNode rollback with no image file", numDirs); log("NameNode rollback with no image file", numDirs);

View File

@ -22,6 +22,7 @@ import java.io.*;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -739,8 +740,9 @@ public class TestEditLog extends TestCase {
throw ioe; throw ioe;
} else { } else {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid 3", "Gap in transactions. Expected to be able to read up until " +
ioe); "at least txid 3 but unable to find any edit logs containing " +
"txid 3", ioe);
} }
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
@ -769,12 +771,12 @@ public class TestEditLog extends TestCase {
} }
@Override @Override
public long getFirstTxId() throws IOException { public long getFirstTxId() {
return HdfsConstants.INVALID_TXID; return HdfsConstants.INVALID_TXID;
} }
@Override @Override
public long getLastTxId() throws IOException { public long getLastTxId() {
return HdfsConstants.INVALID_TXID; return HdfsConstants.INVALID_TXID;
} }
@ -1103,9 +1105,9 @@ public class TestEditLog extends TestCase {
for (EditLogInputStream edits : editStreams) { for (EditLogInputStream edits : editStreams) {
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits); FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(edits);
long read = val.getNumTransactions(); long read = (val.getEndTxId() - edits.getFirstTxId()) + 1;
LOG.info("Loading edits " + edits + " read " + read); LOG.info("Loading edits " + edits + " read " + read);
assertEquals(startTxId, val.getStartTxId()); assertEquals(startTxId, edits.getFirstTxId());
startTxId += read; startTxId += read;
totaltxnread += read; totaltxnread += read;
} }
@ -1153,7 +1155,9 @@ public class TestEditLog extends TestCase {
fail("Should have thrown exception"); fail("Should have thrown exception");
} catch (IOException ioe) { } catch (IOException ioe) {
GenericTestUtils.assertExceptionContains( GenericTestUtils.assertExceptionContains(
"No non-corrupt logs for txid " + startGapTxId, ioe); "Gap in transactions. Expected to be able to read up until " +
"at least txid 40 but unable to find any edit logs containing " +
"txid 11", ioe);
} }
} }
@ -1227,4 +1231,55 @@ public class TestEditLog extends TestCase {
validateNoCrash(garbage); validateNoCrash(garbage);
} }
} }
/**
* Test creating a directory with lots and lots of edit log segments
*/
@Test
public void testManyEditLogSegments() throws IOException {
final int NUM_EDIT_LOG_ROLLS = 1000;
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage();
final FSEditLog editLog = fsimage.getEditLog();
for (int i = 0; i < NUM_EDIT_LOG_ROLLS; i++){
editLog.logSetReplication("fakefile" + i, (short)(i % 3));
assertExistsInStorageDirs(
cluster, NameNodeDirType.EDITS,
NNStorage.getInProgressEditsFileName((i * 3) + 1));
editLog.logSync();
editLog.rollEditLog();
assertExistsInStorageDirs(
cluster, NameNodeDirType.EDITS,
NNStorage.getFinalizedEditsFileName((i * 3) + 1, (i * 3) + 3));
}
editLog.close();
} finally {
if(fileSys != null) fileSys.close();
if(cluster != null) cluster.shutdown();
}
// How long does it take to read through all these edit logs?
long startTime = System.currentTimeMillis();
try {
cluster = new MiniDFSCluster.Builder(conf).
numDataNodes(NUM_DATA_NODES).build();
cluster.waitActive();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
long endTime = System.currentTimeMillis();
double delta = ((float)(endTime - startTime)) / 1000.0;
LOG.info(String.format("loaded %d edit log segments in %.2f seconds",
NUM_EDIT_LOG_ROLLS, delta));
}
} }

View File

@ -40,8 +40,6 @@ import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
public class TestEditLogFileOutputStream { public class TestEditLogFileOutputStream {
private final static long PREALLOCATION_LENGTH = (1024 * 1024) + 4;
private final static int HEADER_LEN = 17; private final static int HEADER_LEN = 17;
private static final File TEST_EDITS = private static final File TEST_EDITS =
new File(System.getProperty("test.build.data","/tmp"), new File(System.getProperty("test.build.data","/tmp"),
@ -58,14 +56,15 @@ public class TestEditLogFileOutputStream {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build(); .build();
final long START_TXID = 1;
StorageDirectory sd = cluster.getNameNode().getFSImage() StorageDirectory sd = cluster.getNameNode().getFSImage()
.getStorage().getStorageDir(0); .getStorage().getStorageDir(0);
File editLog = NNStorage.getInProgressEditsFile(sd, 1); File editLog = NNStorage.getInProgressEditsFile(sd, START_TXID);
EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog); EditLogValidation validation = EditLogFileInputStream.validateEditLog(editLog);
assertEquals("Edit log should contain a header as valid length", assertEquals("Edit log should contain a header as valid length",
HEADER_LEN, validation.getValidLength()); HEADER_LEN, validation.getValidLength());
assertEquals(1, validation.getNumTransactions()); assertEquals(validation.getEndTxId(), START_TXID);
assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " + assertEquals("Edit log should have 1MB pre-allocated, plus 4 bytes " +
"for the version number", "for the version number",
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());
@ -79,7 +78,7 @@ public class TestEditLogFileOutputStream {
assertTrue("Edit log should have more valid data after writing a txn " + assertTrue("Edit log should have more valid data after writing a txn " +
"(was: " + oldLength + " now: " + validation.getValidLength() + ")", "(was: " + oldLength + " now: " + validation.getValidLength() + ")",
validation.getValidLength() > oldLength); validation.getValidLength() > oldLength);
assertEquals(2, validation.getNumTransactions()); assertEquals(1, validation.getEndTxId() - START_TXID);
assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number", assertEquals("Edit log should be 1MB long, plus 4 bytes for the version number",
EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length()); EditLogFileOutputStream.PREALLOCATION_LENGTH + 4, editLog.length());

View File

@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -40,16 +41,23 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files; import com.google.common.io.Files;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
public class TestFSEditLogLoader { public class TestFSEditLogLoader {
static { static {
@ -153,108 +161,6 @@ public class TestFSEditLogLoader {
} }
} }
/**
* Test that the valid number of transactions can be counted from a file.
* @throws IOException
*/
@Test
public void testCountValidTransactions() throws IOException {
File testDir = new File(TEST_DIR, "testCountValidTransactions");
File logFile = new File(testDir,
NNStorage.getInProgressEditsFileName(1));
// Create a log file, and return the offsets at which each
// transaction starts.
FSEditLog fsel = null;
final int NUM_TXNS = 30;
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
try {
fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
fsel.openForWrite();
assertTrue("should exist: " + logFile, logFile.exists());
for (int i = 0; i < NUM_TXNS; i++) {
long trueOffset = getNonTrailerLength(logFile);
long thisTxId = fsel.getLastWrittenTxId() + 1;
offsetToTxId.put(trueOffset, thisTxId);
System.err.println("txid " + thisTxId + " at offset " + trueOffset);
fsel.logDelete("path" + i, i);
fsel.logSync();
}
} finally {
if (fsel != null) {
fsel.close();
}
}
// The file got renamed when the log was closed.
logFile = testDir.listFiles()[0];
long validLength = getNonTrailerLength(logFile);
// Make sure that uncorrupted log has the expected length and number
// of transactions.
EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals(NUM_TXNS + 2, validation.getNumTransactions());
assertEquals(validLength, validation.getValidLength());
// Back up the uncorrupted log
File logFileBak = new File(testDir, logFile.getName() + ".bak");
Files.copy(logFile, logFileBak);
// Corrupt the log file in various ways for each txn
for (Map.Entry<Long, Long> entry : offsetToTxId.entrySet()) {
long txOffset = entry.getKey();
long txid = entry.getValue();
// Restore backup, truncate the file exactly before the txn
Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + txOffset,
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, truncate the file with one byte in the txn,
// also isn't valid
Files.copy(logFileBak, logFile);
truncateFile(logFile, txOffset + 1);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when truncating to length " + (txOffset + 1),
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, corrupt the txn opcode
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn opcode at " + txOffset,
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
// Restore backup, corrupt a byte a few bytes into the txn
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, txOffset+5);
validation = EditLogFileInputStream.validateEditLog(logFile);
assertEquals("Failed when corrupting txn data at " + (txOffset+5),
txid - 1, validation.getNumTransactions());
assertEquals(txOffset, validation.getValidLength());
}
// Corrupt the log at every offset to make sure that validation itself
// never throws an exception, and that the calculated lengths are monotonically
// increasing
long prevNumValid = 0;
for (long offset = 0; offset < validLength; offset++) {
Files.copy(logFileBak, logFile);
corruptByteInFile(logFile, offset);
EditLogValidation val = EditLogFileInputStream.validateEditLog(logFile);
assertTrue(String.format("%d should have been >= %d",
val.getNumTransactions(), prevNumValid),
val.getNumTransactions() >= prevNumValid);
prevNumValid = val.getNumTransactions();
}
}
/** /**
* Corrupt the byte at the given offset in the given file, * Corrupt the byte at the given offset in the given file,
* by subtracting 1 from it. * by subtracting 1 from it.
@ -361,4 +267,75 @@ public class TestFSEditLogLoader {
tracker.close(); tracker.close();
} }
} }
/**
* Create an unfinalized edit log for testing purposes
*
* @param testDir Directory to create the edit log in
* @param numTx Number of transactions to add to the new edit log
* @param offsetToTxId A map from transaction IDs to offsets in the
* edit log file.
* @return The new edit log file name.
* @throws IOException
*/
static private File prepareUnfinalizedTestEditLog(File testDir, int numTx,
SortedMap<Long, Long> offsetToTxId) throws IOException {
File inProgressFile = new File(testDir, NNStorage.getInProgressEditsFileName(1));
FSEditLog fsel = null, spyLog = null;
try {
fsel = FSImageTestUtil.createStandaloneEditLog(testDir);
spyLog = spy(fsel);
// Normally, the in-progress edit log would be finalized by
// FSEditLog#endCurrentLogSegment. For testing purposes, we
// disable that here.
doNothing().when(spyLog).endCurrentLogSegment(true);
spyLog.openForWrite();
assertTrue("should exist: " + inProgressFile, inProgressFile.exists());
for (int i = 0; i < numTx; i++) {
long trueOffset = getNonTrailerLength(inProgressFile);
long thisTxId = spyLog.getLastWrittenTxId() + 1;
offsetToTxId.put(trueOffset, thisTxId);
System.err.println("txid " + thisTxId + " at offset " + trueOffset);
spyLog.logDelete("path" + i, i);
spyLog.logSync();
}
} finally {
if (spyLog != null) {
spyLog.close();
} else if (fsel != null) {
fsel.close();
}
}
return inProgressFile;
}
@Test
public void testValidateEditLogWithCorruptHeader() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEditLogWithCorruptHeader");
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
File logFile = prepareUnfinalizedTestEditLog(testDir, 2, offsetToTxId);
RandomAccessFile rwf = new RandomAccessFile(logFile, "rw");
try {
rwf.seek(0);
rwf.writeLong(42); // corrupt header
} finally {
rwf.close();
}
EditLogValidation validation = EditLogFileInputStream.validateEditLog(logFile);
assertTrue(validation.hasCorruptHeader());
}
@Test
public void testValidateEmptyEditLog() throws IOException {
File testDir = new File(TEST_DIR, "testValidateEmptyEditLog");
SortedMap<Long, Long> offsetToTxId = Maps.newTreeMap();
File logFile = prepareUnfinalizedTestEditLog(testDir, 0, offsetToTxId);
// Truncate the file so that there is nothing except the header
truncateFile(logFile, 4);
EditLogValidation validation =
EditLogFileInputStream.validateEditLog(logFile);
assertTrue(!validation.hasCorruptHeader());
assertEquals(HdfsConstants.INVALID_TXID, validation.getEndTxId());
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Iterator; import java.util.Iterator;
@ -29,10 +30,14 @@ import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import org.junit.Test; import org.junit.Test;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException; import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.setupEdits;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.AbortSpec;
@ -40,9 +45,51 @@ import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_ROLL;
import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL; import static org.apache.hadoop.hdfs.server.namenode.TestEditLog.TXNS_PER_FAIL;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.TreeMultiset;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
public class TestFileJournalManager { public class TestFileJournalManager {
static final Log LOG = LogFactory.getLog(TestFileJournalManager.class);
/**
* Find out how many transactions we can read from a
* FileJournalManager, starting at a given transaction ID.
*
* @param jm The journal manager
* @param fromTxId Transaction ID to start at
* @param inProgressOk Should we consider edit logs that are not finalized?
* @return The number of transactions
* @throws IOException
*/
static long getNumberOfTransactions(FileJournalManager jm, long fromTxId,
boolean inProgressOk, boolean abortOnGap) throws IOException {
long numTransactions = 0, txId = fromTxId;
final TreeMultiset<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
try {
for (EditLogInputStream elis : allStreams) {
elis.skipUntil(txId);
while (true) {
FSEditLogOp op = elis.readOp();
if (op == null) {
break;
}
if (abortOnGap && (op.getTransactionId() != txId)) {
LOG.info("getNumberOfTransactions: detected gap at txId " +
fromTxId);
return numTransactions;
}
txId = op.getTransactionId() + 1;
numTransactions++;
}
}
} finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
}
return numTransactions;
}
/** /**
* Test the normal operation of loading transactions from * Test the normal operation of loading transactions from
@ -61,7 +108,7 @@ public class TestFileJournalManager {
long numJournals = 0; long numJournals = 0;
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
numJournals++; numJournals++;
} }
assertEquals(3, numJournals); assertEquals(3, numJournals);
@ -82,7 +129,7 @@ public class TestFileJournalManager {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL,
jm.getNumberOfTransactions(1, true)); getNumberOfTransactions(jm, 1, true, false));
} }
/** /**
@ -104,16 +151,16 @@ public class TestFileJournalManager {
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(sd, storage);
assertEquals(6*TXNS_PER_ROLL, jm.getNumberOfTransactions(1, true)); assertEquals(6*TXNS_PER_ROLL, getNumberOfTransactions(jm, 1, true, false));
} }
/** /**
@ -137,18 +184,18 @@ public class TestFileJournalManager {
Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS); Iterator<StorageDirectory> dirs = storage.dirIterator(NameNodeDirType.EDITS);
StorageDirectory sd = dirs.next(); StorageDirectory sd = dirs.next();
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true)); true, false));
sd = dirs.next(); sd = dirs.next();
jm = new FileJournalManager(sd, storage); jm = new FileJournalManager(sd, storage);
assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, jm.getNumberOfTransactions(1, assertEquals(5*TXNS_PER_ROLL + TXNS_PER_FAIL, getNumberOfTransactions(jm, 1,
true)); true, false));
} }
/** /**
@ -198,24 +245,15 @@ public class TestFileJournalManager {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL; long expectedTotalTxnCount = TXNS_PER_ROLL*10 + TXNS_PER_FAIL;
assertEquals(expectedTotalTxnCount, jm.getNumberOfTransactions(1, true)); assertEquals(expectedTotalTxnCount, getNumberOfTransactions(jm, 1,
true, false));
long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files long skippedTxns = (3*TXNS_PER_ROLL); // skip first 3 files
long startingTxId = skippedTxns + 1; long startingTxId = skippedTxns + 1;
long numTransactionsToLoad = jm.getNumberOfTransactions(startingTxId, true); long numLoadable = getNumberOfTransactions(jm, startingTxId,
long numLoaded = 0; true, false);
while (numLoaded < numTransactionsToLoad) { assertEquals(expectedTotalTxnCount - skippedTxns, numLoadable);
EditLogInputStream editIn = jm.getInputStream(startingTxId, true);
FSEditLogLoader.EditLogValidation val = FSEditLogLoader.validateEditLog(editIn);
long count = val.getNumTransactions();
editIn.close();
startingTxId += count;
numLoaded += count;
}
assertEquals(expectedTotalTxnCount - skippedTxns, numLoaded);
} }
/** /**
@ -236,8 +274,8 @@ public class TestFileJournalManager {
// 10 rolls, so 11 rolled files, 110 txids total. // 10 rolls, so 11 rolled files, 110 txids total.
final int TOTAL_TXIDS = 10 * 11; final int TOTAL_TXIDS = 10 * 11;
for (int txid = 1; txid <= TOTAL_TXIDS; txid++) { for (int txid = 1; txid <= TOTAL_TXIDS; txid++) {
assertEquals((TOTAL_TXIDS - txid) + 1, jm.getNumberOfTransactions(txid, assertEquals((TOTAL_TXIDS - txid) + 1, getNumberOfTransactions(jm, txid,
true)); true, false));
} }
} }
@ -269,19 +307,13 @@ public class TestFileJournalManager {
assertTrue(files[0].delete()); assertTrue(files[0].delete());
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(startGapTxId-1, jm.getNumberOfTransactions(1, true)); assertEquals(startGapTxId-1, getNumberOfTransactions(jm, 1, true, true));
try { assertEquals(0, getNumberOfTransactions(jm, startGapTxId, true, true));
jm.getNumberOfTransactions(startGapTxId, true);
fail("Should have thrown an exception by now");
} catch (IOException ioe) {
GenericTestUtils.assertExceptionContains(
"Gap in transactions, max txnid is 110, 0 txns from 31", ioe);
}
// rolled 10 times so there should be 11 files. // rolled 10 times so there should be 11 files.
assertEquals(11*TXNS_PER_ROLL - endGapTxId, assertEquals(11*TXNS_PER_ROLL - endGapTxId,
jm.getNumberOfTransactions(endGapTxId + 1, true)); getNumberOfTransactions(jm, endGapTxId + 1, true, true));
} }
/** /**
@ -308,7 +340,7 @@ public class TestFileJournalManager {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
assertEquals(10*TXNS_PER_ROLL+1, assertEquals(10*TXNS_PER_ROLL+1,
jm.getNumberOfTransactions(1, true)); getNumberOfTransactions(jm, 1, true, false));
} }
@Test @Test
@ -345,6 +377,33 @@ public class TestFileJournalManager {
FileJournalManager.matchEditLogs(badDir); FileJournalManager.matchEditLogs(badDir);
} }
private static EditLogInputStream getJournalInputStream(JournalManager jm,
long txId, boolean inProgressOk) throws IOException {
final TreeMultiset<EditLogInputStream> allStreams =
TreeMultiset.create(JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk);
try {
for (Iterator<EditLogInputStream> iter = allStreams.iterator();
iter.hasNext();) {
EditLogInputStream elis = iter.next();
if (elis.getFirstTxId() > txId) {
break;
}
if (elis.getLastTxId() < txId) {
iter.remove();
elis.close();
continue;
}
elis.skipUntil(txId);
iter.remove();
return elis;
}
} finally {
IOUtils.cleanup(LOG, allStreams.toArray(new EditLogInputStream[0]));
}
return null;
}
/** /**
* Make sure that we starting reading the correct op when we request a stream * Make sure that we starting reading the correct op when we request a stream
* with a txid in the middle of an edit log file. * with a txid in the middle of an edit log file.
@ -359,7 +418,7 @@ public class TestFileJournalManager {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
EditLogInputStream elis = jm.getInputStream(5, true); EditLogInputStream elis = getJournalInputStream(jm, 5, true);
FSEditLogOp op = elis.readOp(); FSEditLogOp op = elis.readOp();
assertEquals("read unexpected op", op.getTransactionId(), 5); assertEquals("read unexpected op", op.getTransactionId(), 5);
} }
@ -381,9 +440,9 @@ public class TestFileJournalManager {
FileJournalManager jm = new FileJournalManager(sd, storage); FileJournalManager jm = new FileJournalManager(sd, storage);
// If we exclude the in-progess stream, we should only have 100 tx. // If we exclude the in-progess stream, we should only have 100 tx.
assertEquals(100, jm.getNumberOfTransactions(1, false)); assertEquals(100, getNumberOfTransactions(jm, 1, false, false));
EditLogInputStream elis = jm.getInputStream(90, false); EditLogInputStream elis = getJournalInputStream(jm, 90, false);
FSEditLogOp lastReadOp = null; FSEditLogOp lastReadOp = null;
while ((lastReadOp = elis.readOp()) != null) { while ((lastReadOp = elis.readOp()) != null) {
assertTrue(lastReadOp.getTransactionId() <= 100); assertTrue(lastReadOp.getTransactionId() <= 100);

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -26,9 +24,9 @@ import static org.junit.Assert.*;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Writable;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import java.io.IOException; import java.io.IOException;
public class TestGenericJournalConf { public class TestGenericJournalConf {
@ -144,15 +142,8 @@ public class TestGenericJournalConf {
} }
@Override @Override
public EditLogInputStream getInputStream(long fromTxnId, boolean inProgressOk) public void selectInputStreams(Collection<EditLogInputStream> streams,
throws IOException { long fromTxnId, boolean inProgressOk) {
return null;
}
@Override
public long getNumberOfTransactions(long fromTxnId, boolean inProgressOk)
throws IOException {
return 0;
} }
@Override @Override

View File

@ -333,7 +333,7 @@ public class TestNameNodeRecovery {
static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize) static void testNameNodeRecoveryImpl(Corruptor corruptor, boolean finalize)
throws IOException { throws IOException {
final String TEST_PATH = "/test/path/dir"; final String TEST_PATH = "/test/path/dir";
final int NUM_TEST_MKDIRS = 10; final String TEST_PATH2 = "/second/dir";
final boolean needRecovery = corruptor.needRecovery(finalize); final boolean needRecovery = corruptor.needRecovery(finalize);
// start a cluster // start a cluster
@ -357,9 +357,8 @@ public class TestNameNodeRecovery {
fileSys = cluster.getFileSystem(); fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem(); final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
for (int i = 0; i < NUM_TEST_MKDIRS; i++) {
fileSys.mkdirs(new Path(TEST_PATH)); fileSys.mkdirs(new Path(TEST_PATH));
} fileSys.mkdirs(new Path(TEST_PATH2));
sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next(); sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
} finally { } finally {
if (cluster != null) { if (cluster != null) {
@ -371,6 +370,7 @@ public class TestNameNodeRecovery {
assertTrue("Should exist: " + editFile, editFile.exists()); assertTrue("Should exist: " + editFile, editFile.exists());
// Corrupt the edit log // Corrupt the edit log
LOG.info("corrupting edit log file '" + editFile + "'");
corruptor.corrupt(editFile); corruptor.corrupt(editFile);
// If needRecovery == true, make sure that we can't start the // If needRecovery == true, make sure that we can't start the
@ -423,6 +423,7 @@ public class TestNameNodeRecovery {
.format(false).build(); .format(false).build();
LOG.debug("successfully recovered the " + corruptor.getName() + LOG.debug("successfully recovered the " + corruptor.getName() +
" corrupted edit log"); " corrupted edit log");
cluster.waitActive();
assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH))); assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
} catch (IOException e) { } catch (IOException e) {
fail("failed to recover. Error message: " + e.getMessage()); fail("failed to recover. Error message: " + e.getMessage());

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -47,6 +48,7 @@ import org.apache.hadoop.hdfs.server.namenode.EditLogInputException;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog; import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.junit.After; import org.junit.After;
@ -278,7 +280,7 @@ public class TestFailureToReadEdits {
.getEditLog()); .getEditLog());
LimitedEditLogAnswer answer = new LimitedEditLogAnswer(); LimitedEditLogAnswer answer = new LimitedEditLogAnswer();
doAnswer(answer).when(spyEditLog).selectInputStreams( doAnswer(answer).when(spyEditLog).selectInputStreams(
anyLong(), anyLong(), anyBoolean()); anyLong(), anyLong(), (MetaRecoveryContext)anyObject(), anyBoolean());
nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog); nn1.getNamesystem().getEditLogTailer().setEditLog(spyEditLog);
return answer; return answer;