HDFS-3004. Implement Recovery Mode. Contributed by Colin Patrick McCabe
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1311394 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3ced5ea066
commit
706394d039
|
@ -199,6 +199,8 @@ Release 2.0.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-3102. Add CLI tool to initialize the shared-edits dir. (atm)
|
HDFS-3102. Add CLI tool to initialize the shared-edits dir. (atm)
|
||||||
|
|
||||||
|
HDFS-3004. Implement Recovery Mode. (Colin Patrick McCabe via eli)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-2018. Move all journal stream management code into one place.
|
HDFS-2018. Move all journal stream management code into one place.
|
||||||
|
|
|
@ -264,4 +264,10 @@
|
||||||
<Method name="doRefreshNamenodes" />
|
<Method name="doRefreshNamenodes" />
|
||||||
<Bug category="PERFORMANCE" />
|
<Bug category="PERFORMANCE" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<!-- Don't complain about System.exit() being called from quit() -->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext" />
|
||||||
|
<Method name="quit" />
|
||||||
|
<Bug pattern="DM_EXIT" />
|
||||||
|
</Match>
|
||||||
</FindBugsFilter>
|
</FindBugsFilter>
|
||||||
|
|
|
@ -94,8 +94,8 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSEditLogOp readOp() throws IOException {
|
protected FSEditLogOp nextOp() throws IOException {
|
||||||
return reader.readOp();
|
return reader.readOp(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,12 +123,6 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
|
||||||
lh.toString(), firstTxId, lastTxId);
|
lh.toString(), firstTxId, lastTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public JournalType getType() {
|
|
||||||
assert (false);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(HA): Test this.
|
// TODO(HA): Test this.
|
||||||
@Override
|
@Override
|
||||||
public boolean isInProgress() {
|
public boolean isInProgress() {
|
||||||
|
|
|
@ -18,13 +18,17 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for testing edit logs
|
* Utilities for testing edit logs
|
||||||
*/
|
*/
|
||||||
public class FSEditLogTestUtil {
|
public class FSEditLogTestUtil {
|
||||||
|
private static OpInstanceCache cache = new OpInstanceCache();
|
||||||
|
|
||||||
public static FSEditLogOp getNoOpInstance() {
|
public static FSEditLogOp getNoOpInstance() {
|
||||||
return FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT);
|
return FSEditLogOp.LogSegmentOp.getInstance(cache,
|
||||||
|
FSEditLogOpCodes.OP_END_LOG_SEGMENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long countTransactionsInStream(EditLogInputStream in)
|
public static long countTransactionsInStream(EditLogInputStream in)
|
||||||
|
@ -32,4 +36,4 @@ public class FSEditLogTestUtil {
|
||||||
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
|
FSEditLogLoader.EditLogValidation validation = FSEditLogLoader.validateEditLog(in);
|
||||||
return validation.getNumTransactions();
|
return validation.getNumTransactions();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -537,7 +537,32 @@
|
||||||
For command usage, see <a href="http://hadoop.apache.org/common/docs/current/commands_manual.html#fetchdt"><code>fetchdt</code> command</a>.
|
For command usage, see <a href="http://hadoop.apache.org/common/docs/current/commands_manual.html#fetchdt"><code>fetchdt</code> command</a>.
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
</section><section> <title> Upgrade and Rollback </title>
|
</section>
|
||||||
|
<section> <title>Recovery Mode</title>
|
||||||
|
<p>Typically, you will configure multiple metadata storage locations.
|
||||||
|
Then, if one storage location is corrupt, you can read the
|
||||||
|
metadata from one of the other storage locations.</p>
|
||||||
|
|
||||||
|
<p>However, what can you do if the only storage locations available are
|
||||||
|
corrupt? In this case, there is a special NameNode startup mode called
|
||||||
|
Recovery mode that may allow you to recover most of your data.</p>
|
||||||
|
|
||||||
|
<p>You can start the NameNode in recovery mode like so:
|
||||||
|
<code>namenode -recover</code></p>
|
||||||
|
|
||||||
|
<p>When in recovery mode, the NameNode will interactively prompt you at
|
||||||
|
the command line about possible courses of action you can take to
|
||||||
|
recover your data.</p>
|
||||||
|
|
||||||
|
<p>If you don't want to be prompted, you can give the
|
||||||
|
<code>-force</code> option. This option will force
|
||||||
|
recovery mode to always select the first choice. Normally, this
|
||||||
|
will be the most reasonable choice.</p>
|
||||||
|
|
||||||
|
<p>Because Recovery mode can cause you to lose data, you should always
|
||||||
|
back up your edit log and fsimage before using it.</p>
|
||||||
|
</section>
|
||||||
|
<section> <title> Upgrade and Rollback </title>
|
||||||
<p>
|
<p>
|
||||||
When Hadoop is upgraded on an existing cluster, as with any
|
When Hadoop is upgraded on an existing cluster, as with any
|
||||||
software upgrade, it is possible there are new bugs or
|
software upgrade, it is possible there are new bugs or
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.DataOutput;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
|
||||||
|
|
||||||
/************************************
|
/************************************
|
||||||
* Some handy internal HDFS constants
|
* Some handy internal HDFS constants
|
||||||
|
@ -54,13 +55,18 @@ public final class HdfsServerConstants {
|
||||||
FINALIZE("-finalize"),
|
FINALIZE("-finalize"),
|
||||||
IMPORT ("-importCheckpoint"),
|
IMPORT ("-importCheckpoint"),
|
||||||
BOOTSTRAPSTANDBY("-bootstrapStandby"),
|
BOOTSTRAPSTANDBY("-bootstrapStandby"),
|
||||||
INITIALIZESHAREDEDITS("-initializeSharedEdits");
|
INITIALIZESHAREDEDITS("-initializeSharedEdits"),
|
||||||
|
RECOVER ("-recover"),
|
||||||
|
FORCE("-force");
|
||||||
|
|
||||||
private String name = null;
|
private String name = null;
|
||||||
|
|
||||||
// Used only with format and upgrade options
|
// Used only with format and upgrade options
|
||||||
private String clusterId = null;
|
private String clusterId = null;
|
||||||
|
|
||||||
|
// Used only with recovery option
|
||||||
|
private int force = 0;
|
||||||
|
|
||||||
private StartupOption(String arg) {this.name = arg;}
|
private StartupOption(String arg) {this.name = arg;}
|
||||||
public String getName() {return name;}
|
public String getName() {return name;}
|
||||||
public NamenodeRole toNodeRole() {
|
public NamenodeRole toNodeRole() {
|
||||||
|
@ -77,10 +83,24 @@ public final class HdfsServerConstants {
|
||||||
public void setClusterId(String cid) {
|
public void setClusterId(String cid) {
|
||||||
clusterId = cid;
|
clusterId = cid;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getClusterId() {
|
public String getClusterId() {
|
||||||
return clusterId;
|
return clusterId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MetaRecoveryContext createRecoveryContext() {
|
||||||
|
if (!name.equals(RECOVER.name))
|
||||||
|
return null;
|
||||||
|
return new MetaRecoveryContext(force);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setForce(int force) {
|
||||||
|
this.force = force;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getForce() {
|
||||||
|
return this.force;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Timeouts for communicating with DataNode for streaming writes/reads
|
// Timeouts for communicating with DataNode for streaming writes/reads
|
||||||
|
|
|
@ -213,19 +213,21 @@ public class BackupImage extends FSImage {
|
||||||
LOG.debug("data:" + StringUtils.byteToHexString(data));
|
LOG.debug("data:" + StringUtils.byteToHexString(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem);
|
FSEditLogLoader logLoader =
|
||||||
|
new FSEditLogLoader(namesystem, lastAppliedTxId);
|
||||||
int logVersion = storage.getLayoutVersion();
|
int logVersion = storage.getLayoutVersion();
|
||||||
backupInputStream.setBytes(data, logVersion);
|
backupInputStream.setBytes(data, logVersion);
|
||||||
|
|
||||||
long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream,
|
long numTxnsAdvanced = logLoader.loadEditRecords(logVersion,
|
||||||
true, lastAppliedTxId + 1);
|
backupInputStream, true, lastAppliedTxId + 1, null);
|
||||||
if (numLoaded != numTxns) {
|
if (numTxnsAdvanced != numTxns) {
|
||||||
throw new IOException("Batch of txns starting at txnid " +
|
throw new IOException("Batch of txns starting at txnid " +
|
||||||
firstTxId + " was supposed to contain " + numTxns +
|
firstTxId + " was supposed to contain " + numTxns +
|
||||||
" transactions but only was able to apply " + numLoaded);
|
" transactions, but we were only able to advance by " +
|
||||||
|
numTxnsAdvanced);
|
||||||
}
|
}
|
||||||
lastAppliedTxId += numTxns;
|
lastAppliedTxId = logLoader.getLastAppliedTxId();
|
||||||
|
|
||||||
namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
|
namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
|
||||||
} finally {
|
} finally {
|
||||||
backupInputStream.clear();
|
backupInputStream.clear();
|
||||||
|
@ -275,7 +277,7 @@ public class BackupImage extends FSImage {
|
||||||
editStreams.add(s);
|
editStreams.add(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loadEdits(editStreams, namesystem);
|
loadEdits(editStreams, namesystem, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
// now, need to load the in-progress file
|
// now, need to load the in-progress file
|
||||||
|
@ -309,12 +311,11 @@ public class BackupImage extends FSImage {
|
||||||
LOG.info("Going to finish converging with remaining " + remainingTxns
|
LOG.info("Going to finish converging with remaining " + remainingTxns
|
||||||
+ " txns from in-progress stream " + stream);
|
+ " txns from in-progress stream " + stream);
|
||||||
|
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
|
FSEditLogLoader loader =
|
||||||
long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1);
|
new FSEditLogLoader(namesystem, lastAppliedTxId);
|
||||||
lastAppliedTxId += numLoaded;
|
loader.loadFSEdits(stream, lastAppliedTxId + 1, null);
|
||||||
assert numLoaded == remainingTxns :
|
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||||
"expected to load " + remainingTxns + " but loaded " +
|
assert lastAppliedTxId == getEditLog().getLastWrittenTxId();
|
||||||
numLoaded + " from " + stream;
|
|
||||||
} finally {
|
} finally {
|
||||||
FSEditLog.closeAllStreams(editStreams);
|
FSEditLog.closeAllStreams(editStreams);
|
||||||
}
|
}
|
||||||
|
|
|
@ -292,6 +292,6 @@ class Checkpointer extends Daemon {
|
||||||
}
|
}
|
||||||
LOG.info("Checkpointer about to load edits from " +
|
LOG.info("Checkpointer about to load edits from " +
|
||||||
editsStreams.size() + " stream(s).");
|
editsStreams.size() + " stream(s).");
|
||||||
dstImage.loadEdits(editsStreams, dstNamesystem);
|
dstImage.loadEdits(editsStreams, dstNamesystem, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,21 +70,25 @@ class EditLogBackupInputStream extends EditLogInputStream {
|
||||||
reader = null;
|
reader = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return address;
|
return address;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
@Override
|
||||||
public JournalType getType() {
|
protected FSEditLogOp nextOp() throws IOException {
|
||||||
return JournalType.BACKUP;
|
Preconditions.checkState(reader != null,
|
||||||
|
"Must call setBytes() before readOp()");
|
||||||
|
return reader.readOp(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSEditLogOp readOp() throws IOException {
|
protected FSEditLogOp nextValidOp() {
|
||||||
Preconditions.checkState(reader != null,
|
try {
|
||||||
"Must call setBytes() before readOp()");
|
return reader.readOp(true);
|
||||||
return reader.readOp();
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException("got unexpected IOException " + e, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -89,24 +89,6 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
||||||
this.isInProgress = isInProgress;
|
this.isInProgress = isInProgress;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Skip over a number of transactions. Subsequent calls to
|
|
||||||
* {@link EditLogFileInputStream#readOp()} will begin after these skipped
|
|
||||||
* transactions. If more transactions are requested to be skipped than remain
|
|
||||||
* in the edit log, all edit log ops in the log will be skipped and subsequent
|
|
||||||
* calls to {@link EditLogInputStream#readOp} will return null.
|
|
||||||
*
|
|
||||||
* @param transactionsToSkip number of transactions to skip over.
|
|
||||||
* @throws IOException if there's an error while reading an operation
|
|
||||||
*/
|
|
||||||
public void skipTransactions(long transactionsToSkip) throws IOException {
|
|
||||||
assert firstTxId != HdfsConstants.INVALID_TXID &&
|
|
||||||
lastTxId != HdfsConstants.INVALID_TXID;
|
|
||||||
for (long i = 0; i < transactionsToSkip; i++) {
|
|
||||||
reader.readOp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getFirstTxId() throws IOException {
|
public long getFirstTxId() throws IOException {
|
||||||
return firstTxId;
|
return firstTxId;
|
||||||
|
@ -117,19 +99,23 @@ public class EditLogFileInputStream extends EditLogInputStream {
|
||||||
return lastTxId;
|
return lastTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return file.getPath();
|
return file.getPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
|
||||||
public JournalType getType() {
|
|
||||||
return JournalType.FILE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSEditLogOp readOp() throws IOException {
|
protected FSEditLogOp nextOp() throws IOException {
|
||||||
return reader.readOp();
|
return reader.readOp(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected FSEditLogOp nextValidOp() {
|
||||||
|
try {
|
||||||
|
return reader.readOp(true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -34,7 +34,14 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class EditLogInputStream implements JournalStream, Closeable {
|
public abstract class EditLogInputStream implements Closeable {
|
||||||
|
private FSEditLogOp cachedOp = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the name of the EditLogInputStream
|
||||||
|
*/
|
||||||
|
public abstract String getName();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the first transaction which will be found in this stream
|
* @return the first transaction which will be found in this stream
|
||||||
*/
|
*/
|
||||||
|
@ -57,8 +64,81 @@ public abstract class EditLogInputStream implements JournalStream, Closeable {
|
||||||
* @return an operation from the stream or null if at end of stream
|
* @return an operation from the stream or null if at end of stream
|
||||||
* @throws IOException if there is an error reading from the stream
|
* @throws IOException if there is an error reading from the stream
|
||||||
*/
|
*/
|
||||||
public abstract FSEditLogOp readOp() throws IOException;
|
public FSEditLogOp readOp() throws IOException {
|
||||||
|
FSEditLogOp ret;
|
||||||
|
if (cachedOp != null) {
|
||||||
|
ret = cachedOp;
|
||||||
|
cachedOp = null;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
return nextOp();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Position the stream so that a valid operation can be read from it with
|
||||||
|
* readOp().
|
||||||
|
*
|
||||||
|
* This method can be used to skip over corrupted sections of edit logs.
|
||||||
|
*/
|
||||||
|
public void resync() throws IOException {
|
||||||
|
if (cachedOp != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cachedOp = nextValidOp();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the next operation from the stream storage.
|
||||||
|
*
|
||||||
|
* @return an operation from the stream or null if at end of stream
|
||||||
|
* @throws IOException if there is an error reading from the stream
|
||||||
|
*/
|
||||||
|
protected abstract FSEditLogOp nextOp() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the next valid operation from the stream storage.
|
||||||
|
*
|
||||||
|
* This is exactly like nextOp, except that we attempt to skip over damaged
|
||||||
|
* parts of the edit log
|
||||||
|
*
|
||||||
|
* @return an operation from the stream or null if at end of stream
|
||||||
|
*/
|
||||||
|
protected FSEditLogOp nextValidOp() {
|
||||||
|
// This is a trivial implementation which just assumes that any errors mean
|
||||||
|
// that there is nothing more of value in the log. Subclasses that support
|
||||||
|
// error recovery will want to override this.
|
||||||
|
try {
|
||||||
|
return nextOp();
|
||||||
|
} catch (IOException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Skip edit log operations up to a given transaction ID, or until the
|
||||||
|
* end of the edit log is reached.
|
||||||
|
*
|
||||||
|
* After this function returns, the next call to readOp will return either
|
||||||
|
* end-of-file (null) or a transaction with a txid equal to or higher than
|
||||||
|
* the one we asked for.
|
||||||
|
*
|
||||||
|
* @param txid The transaction ID to read up until.
|
||||||
|
* @return Returns true if we found a transaction ID greater than
|
||||||
|
* or equal to 'txid' in the log.
|
||||||
|
*/
|
||||||
|
public boolean skipUntil(long txid) throws IOException {
|
||||||
|
while (true) {
|
||||||
|
FSEditLogOp op = readOp();
|
||||||
|
if (op == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (op.getTransactionId() >= txid) {
|
||||||
|
cachedOp = op;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the layout version of the data in the stream.
|
* Get the layout version of the data in the stream.
|
||||||
* @return the layout version of the ops in the stream.
|
* @return the layout version of the ops in the stream.
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.Closeable;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class EditLogOutputStream {
|
public abstract class EditLogOutputStream implements Closeable {
|
||||||
// these are statistics counters
|
// these are statistics counters
|
||||||
private long numSync; // number of sync(s) to disk
|
private long numSync; // number of sync(s) to disk
|
||||||
private long totalTimeSync; // total time to sync
|
private long totalTimeSync; // total time to sync
|
||||||
|
|
|
@ -127,6 +127,14 @@ public class FSEditLog {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
private List<URI> editsDirs;
|
private List<URI> editsDirs;
|
||||||
|
|
||||||
|
private ThreadLocal<OpInstanceCache> cache =
|
||||||
|
new ThreadLocal<OpInstanceCache>() {
|
||||||
|
@Override
|
||||||
|
protected OpInstanceCache initialValue() {
|
||||||
|
return new OpInstanceCache();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The edit directories that are shared between primary and secondary.
|
* The edit directories that are shared between primary and secondary.
|
||||||
|
@ -596,7 +604,7 @@ public class FSEditLog {
|
||||||
* Records the block locations of the last block.
|
* Records the block locations of the last block.
|
||||||
*/
|
*/
|
||||||
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
|
public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
|
||||||
AddOp op = AddOp.getInstance()
|
AddOp op = AddOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setReplication(newNode.getReplication())
|
.setReplication(newNode.getReplication())
|
||||||
.setModificationTime(newNode.getModificationTime())
|
.setModificationTime(newNode.getModificationTime())
|
||||||
|
@ -614,7 +622,7 @@ public class FSEditLog {
|
||||||
* Add close lease record to edit log.
|
* Add close lease record to edit log.
|
||||||
*/
|
*/
|
||||||
public void logCloseFile(String path, INodeFile newNode) {
|
public void logCloseFile(String path, INodeFile newNode) {
|
||||||
CloseOp op = CloseOp.getInstance()
|
CloseOp op = CloseOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setReplication(newNode.getReplication())
|
.setReplication(newNode.getReplication())
|
||||||
.setModificationTime(newNode.getModificationTime())
|
.setModificationTime(newNode.getModificationTime())
|
||||||
|
@ -627,7 +635,7 @@ public class FSEditLog {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
|
public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
|
||||||
UpdateBlocksOp op = UpdateBlocksOp.getInstance()
|
UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setBlocks(file.getBlocks());
|
.setBlocks(file.getBlocks());
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
|
@ -637,7 +645,7 @@ public class FSEditLog {
|
||||||
* Add create directory record to edit log
|
* Add create directory record to edit log
|
||||||
*/
|
*/
|
||||||
public void logMkDir(String path, INode newNode) {
|
public void logMkDir(String path, INode newNode) {
|
||||||
MkdirOp op = MkdirOp.getInstance()
|
MkdirOp op = MkdirOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setTimestamp(newNode.getModificationTime())
|
.setTimestamp(newNode.getModificationTime())
|
||||||
.setPermissionStatus(newNode.getPermissionStatus());
|
.setPermissionStatus(newNode.getPermissionStatus());
|
||||||
|
@ -649,7 +657,7 @@ public class FSEditLog {
|
||||||
* TODO: use String parameters until just before writing to disk
|
* TODO: use String parameters until just before writing to disk
|
||||||
*/
|
*/
|
||||||
void logRename(String src, String dst, long timestamp) {
|
void logRename(String src, String dst, long timestamp) {
|
||||||
RenameOldOp op = RenameOldOp.getInstance()
|
RenameOldOp op = RenameOldOp.getInstance(cache.get())
|
||||||
.setSource(src)
|
.setSource(src)
|
||||||
.setDestination(dst)
|
.setDestination(dst)
|
||||||
.setTimestamp(timestamp);
|
.setTimestamp(timestamp);
|
||||||
|
@ -660,7 +668,7 @@ public class FSEditLog {
|
||||||
* Add rename record to edit log
|
* Add rename record to edit log
|
||||||
*/
|
*/
|
||||||
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
|
void logRename(String src, String dst, long timestamp, Options.Rename... options) {
|
||||||
RenameOp op = RenameOp.getInstance()
|
RenameOp op = RenameOp.getInstance(cache.get())
|
||||||
.setSource(src)
|
.setSource(src)
|
||||||
.setDestination(dst)
|
.setDestination(dst)
|
||||||
.setTimestamp(timestamp)
|
.setTimestamp(timestamp)
|
||||||
|
@ -672,7 +680,7 @@ public class FSEditLog {
|
||||||
* Add set replication record to edit log
|
* Add set replication record to edit log
|
||||||
*/
|
*/
|
||||||
void logSetReplication(String src, short replication) {
|
void logSetReplication(String src, short replication) {
|
||||||
SetReplicationOp op = SetReplicationOp.getInstance()
|
SetReplicationOp op = SetReplicationOp.getInstance(cache.get())
|
||||||
.setPath(src)
|
.setPath(src)
|
||||||
.setReplication(replication);
|
.setReplication(replication);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
|
@ -684,7 +692,7 @@ public class FSEditLog {
|
||||||
* @param quota the directory size limit
|
* @param quota the directory size limit
|
||||||
*/
|
*/
|
||||||
void logSetQuota(String src, long nsQuota, long dsQuota) {
|
void logSetQuota(String src, long nsQuota, long dsQuota) {
|
||||||
SetQuotaOp op = SetQuotaOp.getInstance()
|
SetQuotaOp op = SetQuotaOp.getInstance(cache.get())
|
||||||
.setSource(src)
|
.setSource(src)
|
||||||
.setNSQuota(nsQuota)
|
.setNSQuota(nsQuota)
|
||||||
.setDSQuota(dsQuota);
|
.setDSQuota(dsQuota);
|
||||||
|
@ -693,7 +701,7 @@ public class FSEditLog {
|
||||||
|
|
||||||
/** Add set permissions record to edit log */
|
/** Add set permissions record to edit log */
|
||||||
void logSetPermissions(String src, FsPermission permissions) {
|
void logSetPermissions(String src, FsPermission permissions) {
|
||||||
SetPermissionsOp op = SetPermissionsOp.getInstance()
|
SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get())
|
||||||
.setSource(src)
|
.setSource(src)
|
||||||
.setPermissions(permissions);
|
.setPermissions(permissions);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
|
@ -701,7 +709,7 @@ public class FSEditLog {
|
||||||
|
|
||||||
/** Add set owner record to edit log */
|
/** Add set owner record to edit log */
|
||||||
void logSetOwner(String src, String username, String groupname) {
|
void logSetOwner(String src, String username, String groupname) {
|
||||||
SetOwnerOp op = SetOwnerOp.getInstance()
|
SetOwnerOp op = SetOwnerOp.getInstance(cache.get())
|
||||||
.setSource(src)
|
.setSource(src)
|
||||||
.setUser(username)
|
.setUser(username)
|
||||||
.setGroup(groupname);
|
.setGroup(groupname);
|
||||||
|
@ -712,7 +720,7 @@ public class FSEditLog {
|
||||||
* concat(trg,src..) log
|
* concat(trg,src..) log
|
||||||
*/
|
*/
|
||||||
void logConcat(String trg, String [] srcs, long timestamp) {
|
void logConcat(String trg, String [] srcs, long timestamp) {
|
||||||
ConcatDeleteOp op = ConcatDeleteOp.getInstance()
|
ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
|
||||||
.setTarget(trg)
|
.setTarget(trg)
|
||||||
.setSources(srcs)
|
.setSources(srcs)
|
||||||
.setTimestamp(timestamp);
|
.setTimestamp(timestamp);
|
||||||
|
@ -723,7 +731,7 @@ public class FSEditLog {
|
||||||
* Add delete file record to edit log
|
* Add delete file record to edit log
|
||||||
*/
|
*/
|
||||||
void logDelete(String src, long timestamp) {
|
void logDelete(String src, long timestamp) {
|
||||||
DeleteOp op = DeleteOp.getInstance()
|
DeleteOp op = DeleteOp.getInstance(cache.get())
|
||||||
.setPath(src)
|
.setPath(src)
|
||||||
.setTimestamp(timestamp);
|
.setTimestamp(timestamp);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
|
@ -733,7 +741,7 @@ public class FSEditLog {
|
||||||
* Add generation stamp record to edit log
|
* Add generation stamp record to edit log
|
||||||
*/
|
*/
|
||||||
void logGenerationStamp(long genstamp) {
|
void logGenerationStamp(long genstamp) {
|
||||||
SetGenstampOp op = SetGenstampOp.getInstance()
|
SetGenstampOp op = SetGenstampOp.getInstance(cache.get())
|
||||||
.setGenerationStamp(genstamp);
|
.setGenerationStamp(genstamp);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
@ -742,7 +750,7 @@ public class FSEditLog {
|
||||||
* Add access time record to edit log
|
* Add access time record to edit log
|
||||||
*/
|
*/
|
||||||
void logTimes(String src, long mtime, long atime) {
|
void logTimes(String src, long mtime, long atime) {
|
||||||
TimesOp op = TimesOp.getInstance()
|
TimesOp op = TimesOp.getInstance(cache.get())
|
||||||
.setPath(src)
|
.setPath(src)
|
||||||
.setModificationTime(mtime)
|
.setModificationTime(mtime)
|
||||||
.setAccessTime(atime);
|
.setAccessTime(atime);
|
||||||
|
@ -754,7 +762,7 @@ public class FSEditLog {
|
||||||
*/
|
*/
|
||||||
void logSymlink(String path, String value, long mtime,
|
void logSymlink(String path, String value, long mtime,
|
||||||
long atime, INodeSymlink node) {
|
long atime, INodeSymlink node) {
|
||||||
SymlinkOp op = SymlinkOp.getInstance()
|
SymlinkOp op = SymlinkOp.getInstance(cache.get())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setValue(value)
|
.setValue(value)
|
||||||
.setModificationTime(mtime)
|
.setModificationTime(mtime)
|
||||||
|
@ -770,7 +778,7 @@ public class FSEditLog {
|
||||||
*/
|
*/
|
||||||
void logGetDelegationToken(DelegationTokenIdentifier id,
|
void logGetDelegationToken(DelegationTokenIdentifier id,
|
||||||
long expiryTime) {
|
long expiryTime) {
|
||||||
GetDelegationTokenOp op = GetDelegationTokenOp.getInstance()
|
GetDelegationTokenOp op = GetDelegationTokenOp.getInstance(cache.get())
|
||||||
.setDelegationTokenIdentifier(id)
|
.setDelegationTokenIdentifier(id)
|
||||||
.setExpiryTime(expiryTime);
|
.setExpiryTime(expiryTime);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
|
@ -778,26 +786,26 @@ public class FSEditLog {
|
||||||
|
|
||||||
void logRenewDelegationToken(DelegationTokenIdentifier id,
|
void logRenewDelegationToken(DelegationTokenIdentifier id,
|
||||||
long expiryTime) {
|
long expiryTime) {
|
||||||
RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance()
|
RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance(cache.get())
|
||||||
.setDelegationTokenIdentifier(id)
|
.setDelegationTokenIdentifier(id)
|
||||||
.setExpiryTime(expiryTime);
|
.setExpiryTime(expiryTime);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
void logCancelDelegationToken(DelegationTokenIdentifier id) {
|
void logCancelDelegationToken(DelegationTokenIdentifier id) {
|
||||||
CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance()
|
CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance(cache.get())
|
||||||
.setDelegationTokenIdentifier(id);
|
.setDelegationTokenIdentifier(id);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
void logUpdateMasterKey(DelegationKey key) {
|
void logUpdateMasterKey(DelegationKey key) {
|
||||||
UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance()
|
UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance(cache.get())
|
||||||
.setDelegationKey(key);
|
.setDelegationKey(key);
|
||||||
logEdit(op);
|
logEdit(op);
|
||||||
}
|
}
|
||||||
|
|
||||||
void logReassignLease(String leaseHolder, String src, String newHolder) {
|
void logReassignLease(String leaseHolder, String src, String newHolder) {
|
||||||
ReassignLeaseOp op = ReassignLeaseOp.getInstance()
|
ReassignLeaseOp op = ReassignLeaseOp.getInstance(cache.get())
|
||||||
.setLeaseHolder(leaseHolder)
|
.setLeaseHolder(leaseHolder)
|
||||||
.setPath(src)
|
.setPath(src)
|
||||||
.setNewHolder(newHolder);
|
.setNewHolder(newHolder);
|
||||||
|
@ -896,7 +904,7 @@ public class FSEditLog {
|
||||||
state = State.IN_SEGMENT;
|
state = State.IN_SEGMENT;
|
||||||
|
|
||||||
if (writeHeaderTxn) {
|
if (writeHeaderTxn) {
|
||||||
logEdit(LogSegmentOp.getInstance(
|
logEdit(LogSegmentOp.getInstance(cache.get(),
|
||||||
FSEditLogOpCodes.OP_START_LOG_SEGMENT));
|
FSEditLogOpCodes.OP_START_LOG_SEGMENT));
|
||||||
logSync();
|
logSync();
|
||||||
}
|
}
|
||||||
|
@ -912,7 +920,7 @@ public class FSEditLog {
|
||||||
"Bad state: %s", state);
|
"Bad state: %s", state);
|
||||||
|
|
||||||
if (writeEndTxn) {
|
if (writeEndTxn) {
|
||||||
logEdit(LogSegmentOp.getInstance(
|
logEdit(LogSegmentOp.getInstance(cache.get(),
|
||||||
FSEditLogOpCodes.OP_END_LOG_SEGMENT));
|
FSEditLogOpCodes.OP_END_LOG_SEGMENT));
|
||||||
logSync();
|
logSync();
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,9 +71,11 @@ public class FSEditLogLoader {
|
||||||
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
|
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
|
||||||
static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
||||||
private final FSNamesystem fsNamesys;
|
private final FSNamesystem fsNamesys;
|
||||||
|
private long lastAppliedTxId;
|
||||||
public FSEditLogLoader(FSNamesystem fsNamesys) {
|
|
||||||
|
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
|
||||||
this.fsNamesys = fsNamesys;
|
this.fsNamesys = fsNamesys;
|
||||||
|
this.lastAppliedTxId = lastAppliedTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -81,32 +83,29 @@ public class FSEditLogLoader {
|
||||||
* This is where we apply edits that we've been writing to disk all
|
* This is where we apply edits that we've been writing to disk all
|
||||||
* along.
|
* along.
|
||||||
*/
|
*/
|
||||||
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId)
|
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
||||||
throws IOException {
|
MetaRecoveryContext recovery) throws IOException {
|
||||||
long numEdits = 0;
|
|
||||||
int logVersion = edits.getVersion();
|
int logVersion = edits.getVersion();
|
||||||
|
|
||||||
fsNamesys.writeLock();
|
fsNamesys.writeLock();
|
||||||
try {
|
try {
|
||||||
long startTime = now();
|
long startTime = now();
|
||||||
numEdits = loadEditRecords(logVersion, edits, false,
|
long numEdits = loadEditRecords(logVersion, edits, false,
|
||||||
expectedStartingTxId);
|
expectedStartingTxId, recovery);
|
||||||
FSImage.LOG.info("Edits file " + edits.getName()
|
FSImage.LOG.info("Edits file " + edits.getName()
|
||||||
+ " of size " + edits.length() + " edits # " + numEdits
|
+ " of size " + edits.length() + " edits # " + numEdits
|
||||||
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
|
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
|
||||||
|
return numEdits;
|
||||||
} finally {
|
} finally {
|
||||||
edits.close();
|
edits.close();
|
||||||
fsNamesys.writeUnlock();
|
fsNamesys.writeUnlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return numEdits;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
|
long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
|
||||||
long expectedStartingTxId)
|
long expectedStartingTxId, MetaRecoveryContext recovery)
|
||||||
throws IOException, EditLogInputException {
|
throws IOException {
|
||||||
FSDirectory fsDir = fsNamesys.dir;
|
FSDirectory fsDir = fsNamesys.dir;
|
||||||
long numEdits = 0;
|
|
||||||
|
|
||||||
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
|
||||||
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
|
||||||
|
@ -120,72 +119,99 @@ public class FSEditLogLoader {
|
||||||
|
|
||||||
long recentOpcodeOffsets[] = new long[4];
|
long recentOpcodeOffsets[] = new long[4];
|
||||||
Arrays.fill(recentOpcodeOffsets, -1);
|
Arrays.fill(recentOpcodeOffsets, -1);
|
||||||
|
|
||||||
long txId = expectedStartingTxId - 1;
|
long expectedTxId = expectedStartingTxId;
|
||||||
|
long numEdits = 0;
|
||||||
long lastTxId = in.getLastTxId();
|
long lastTxId = in.getLastTxId();
|
||||||
long numTxns = (lastTxId - expectedStartingTxId) + 1;
|
long numTxns = (lastTxId - expectedStartingTxId) + 1;
|
||||||
|
|
||||||
long lastLogTime = now();
|
long lastLogTime = now();
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("edit log length: " + in.length() + ", start txid: "
|
LOG.debug("edit log length: " + in.length() + ", start txid: "
|
||||||
+ expectedStartingTxId + ", last txid: " + lastTxId);
|
+ expectedStartingTxId + ", last txid: " + lastTxId);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
try {
|
while (true) {
|
||||||
while (true) {
|
try {
|
||||||
FSEditLogOp op;
|
FSEditLogOp op;
|
||||||
try {
|
try {
|
||||||
if ((op = in.readOp()) == null) {
|
op = in.readOp();
|
||||||
|
if (op == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (Throwable e) {
|
||||||
long badTxId = txId + 1; // because txId hasn't been incremented yet
|
// Handle a problem with our input
|
||||||
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId);
|
check203UpgradeFailure(logVersion, e);
|
||||||
|
String errorMessage =
|
||||||
|
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
|
||||||
FSImage.LOG.error(errorMessage);
|
FSImage.LOG.error(errorMessage);
|
||||||
throw new EditLogInputException(errorMessage,
|
if (recovery == null) {
|
||||||
ioe, numEdits);
|
// We will only try to skip over problematic opcodes when in
|
||||||
|
// recovery mode.
|
||||||
|
throw new EditLogInputException(errorMessage, e, numEdits);
|
||||||
|
}
|
||||||
|
MetaRecoveryContext.editLogLoaderPrompt(
|
||||||
|
"We failed to read txId " + expectedTxId,
|
||||||
|
recovery, "skipping the bad section in the log");
|
||||||
|
in.resync();
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
|
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
|
||||||
in.getPosition();
|
in.getPosition();
|
||||||
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
||||||
long expectedTxId = txId + 1;
|
if (op.getTransactionId() > expectedTxId) {
|
||||||
txId = op.txid;
|
MetaRecoveryContext.editLogLoaderPrompt("There appears " +
|
||||||
if (txId != expectedTxId) {
|
"to be a gap in the edit log. We expected txid " +
|
||||||
throw new IOException("Expected transaction ID " +
|
expectedTxId + ", but got txid " +
|
||||||
expectedTxId + " but got " + txId);
|
op.getTransactionId() + ".", recovery, "ignoring missing " +
|
||||||
|
" transaction IDs");
|
||||||
|
} else if (op.getTransactionId() < expectedTxId) {
|
||||||
|
MetaRecoveryContext.editLogLoaderPrompt("There appears " +
|
||||||
|
"to be an out-of-order edit in the edit log. We " +
|
||||||
|
"expected txid " + expectedTxId + ", but got txid " +
|
||||||
|
op.getTransactionId() + ".", recovery,
|
||||||
|
"skipping the out-of-order edit");
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
incrOpCount(op.opCode, opCounts);
|
|
||||||
try {
|
try {
|
||||||
applyEditLogOp(op, fsDir, logVersion);
|
applyEditLogOp(op, fsDir, logVersion);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable e) {
|
||||||
// Catch Throwable because in the case of a truly corrupt edits log, any
|
LOG.error("Encountered exception on operation " + op, e);
|
||||||
// sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.)
|
MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
|
||||||
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId);
|
"apply edit log operation " + op + ": error " +
|
||||||
FSImage.LOG.error(errorMessage);
|
e.getMessage(), recovery, "applying edits");
|
||||||
throw new IOException(errorMessage, t);
|
}
|
||||||
|
// Now that the operation has been successfully decoded and
|
||||||
|
// applied, update our bookkeeping.
|
||||||
|
incrOpCount(op.opCode, opCounts);
|
||||||
|
if (op.hasTransactionId()) {
|
||||||
|
lastAppliedTxId = op.getTransactionId();
|
||||||
|
expectedTxId = lastAppliedTxId + 1;
|
||||||
|
} else {
|
||||||
|
expectedTxId = lastAppliedTxId = expectedStartingTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
// log progress
|
// log progress
|
||||||
if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
||||||
int percent = Math.round((float) txId / numTxns * 100);
|
long now = now();
|
||||||
LOG.info("replaying edit log: " + txId + "/" + numTxns
|
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
||||||
+ " transactions completed. (" + percent + "%)");
|
int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
|
||||||
lastLogTime = now();
|
LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns
|
||||||
|
+ " transactions completed. (" + percent + "%)");
|
||||||
|
lastLogTime = now;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
numEdits++;
|
numEdits++;
|
||||||
|
} catch (MetaRecoveryContext.RequestStopException e) {
|
||||||
|
MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
|
||||||
|
in.getPosition() + "/" + in.length());
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
|
||||||
check203UpgradeFailure(logVersion, ex);
|
|
||||||
} finally {
|
|
||||||
if(closeOnExit)
|
|
||||||
in.close();
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if(closeOnExit) {
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
fsDir.writeUnlock();
|
fsDir.writeUnlock();
|
||||||
fsNamesys.writeUnlock();
|
fsNamesys.writeUnlock();
|
||||||
|
|
||||||
|
@ -472,7 +498,7 @@ public class FSEditLogLoader {
|
||||||
long recentOpcodeOffsets[], long txid) {
|
long recentOpcodeOffsets[], long txid) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("Error replaying edit log at offset " + in.getPosition());
|
sb.append("Error replaying edit log at offset " + in.getPosition());
|
||||||
sb.append(" on transaction ID ").append(txid);
|
sb.append(". Expected transaction ID was ").append(txid);
|
||||||
if (recentOpcodeOffsets[0] != -1) {
|
if (recentOpcodeOffsets[0] != -1) {
|
||||||
Arrays.sort(recentOpcodeOffsets);
|
Arrays.sort(recentOpcodeOffsets);
|
||||||
sb.append("\nRecent opcode offsets:");
|
sb.append("\nRecent opcode offsets:");
|
||||||
|
@ -519,7 +545,7 @@ public class FSEditLogLoader {
|
||||||
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
|
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
|
||||||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
|
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
|
||||||
!(isGenStampUpdate && isLastBlock))) {
|
!(isGenStampUpdate && isLastBlock))) {
|
||||||
throw new IOException("Mismatched block IDs or generation stamps, " +
|
throw new IOException("Mismatched block IDs or generation stamps, " +
|
||||||
"attempting to replace block " + oldBlock + " with " + newBlock +
|
"attempting to replace block " + oldBlock + " with " + newBlock +
|
||||||
" as block # " + i + "/" + newBlocks.length + " of " +
|
" as block # " + i + "/" + newBlocks.length + " of " +
|
||||||
path);
|
path);
|
||||||
|
@ -605,7 +631,7 @@ public class FSEditLogLoader {
|
||||||
* Throw appropriate exception during upgrade from 203, when editlog loading
|
* Throw appropriate exception during upgrade from 203, when editlog loading
|
||||||
* could fail due to opcode conflicts.
|
* could fail due to opcode conflicts.
|
||||||
*/
|
*/
|
||||||
private void check203UpgradeFailure(int logVersion, IOException ex)
|
private void check203UpgradeFailure(int logVersion, Throwable e)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// 0.20.203 version version has conflicting opcodes with the later releases.
|
// 0.20.203 version version has conflicting opcodes with the later releases.
|
||||||
// The editlog must be emptied by restarting the namenode, before proceeding
|
// The editlog must be emptied by restarting the namenode, before proceeding
|
||||||
|
@ -616,9 +642,7 @@ public class FSEditLogLoader {
|
||||||
+ logVersion + " from release 0.20.203. Please go back to the old "
|
+ logVersion + " from release 0.20.203. Please go back to the old "
|
||||||
+ " release and restart the namenode. This empties the editlog "
|
+ " release and restart the namenode. This empties the editlog "
|
||||||
+ " and saves the namespace. Resume the upgrade after this step.";
|
+ " and saves the namespace. Resume the upgrade after this step.";
|
||||||
throw new IOException(msg, ex);
|
throw new IOException(msg, e);
|
||||||
} else {
|
|
||||||
throw ex;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -643,14 +667,14 @@ public class FSEditLogLoader {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (firstTxId == HdfsConstants.INVALID_TXID) {
|
if (firstTxId == HdfsConstants.INVALID_TXID) {
|
||||||
firstTxId = op.txid;
|
firstTxId = op.getTransactionId();
|
||||||
}
|
}
|
||||||
if (lastTxId == HdfsConstants.INVALID_TXID
|
if (lastTxId == HdfsConstants.INVALID_TXID
|
||||||
|| op.txid == lastTxId + 1) {
|
|| op.getTransactionId() == lastTxId + 1) {
|
||||||
lastTxId = op.txid;
|
lastTxId = op.getTransactionId();
|
||||||
} else {
|
} else {
|
||||||
FSImage.LOG.error("Out of order txid found. Found " + op.txid
|
FSImage.LOG.error("Out of order txid found. Found " +
|
||||||
+ ", expected " + (lastTxId + 1));
|
op.getTransactionId() + ", expected " + (lastTxId + 1));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
numValid++;
|
numValid++;
|
||||||
|
@ -743,4 +767,7 @@ public class FSEditLogLoader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getLastAppliedTxId() {
|
||||||
|
return lastAppliedTxId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Options.Rename;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||||
import org.apache.hadoop.util.PureJavaCrc32;
|
import org.apache.hadoop.util.PureJavaCrc32;
|
||||||
|
@ -54,6 +56,8 @@ import org.xml.sax.ContentHandler;
|
||||||
import org.xml.sax.SAXException;
|
import org.xml.sax.SAXException;
|
||||||
import org.xml.sax.helpers.AttributesImpl;
|
import org.xml.sax.helpers.AttributesImpl;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.DataInput;
|
import java.io.DataInput;
|
||||||
import java.io.DataOutput;
|
import java.io.DataOutput;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
@ -74,42 +78,44 @@ public abstract class FSEditLogOp {
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances =
|
final public static class OpInstanceCache {
|
||||||
new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() {
|
private EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
|
||||||
@Override
|
new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
|
||||||
protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
|
|
||||||
EnumMap<FSEditLogOpCodes, FSEditLogOp> instances
|
public OpInstanceCache() {
|
||||||
= new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
|
inst.put(OP_ADD, new AddOp());
|
||||||
instances.put(OP_ADD, new AddOp());
|
inst.put(OP_CLOSE, new CloseOp());
|
||||||
instances.put(OP_CLOSE, new CloseOp());
|
inst.put(OP_SET_REPLICATION, new SetReplicationOp());
|
||||||
instances.put(OP_SET_REPLICATION, new SetReplicationOp());
|
inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
|
||||||
instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
|
inst.put(OP_RENAME_OLD, new RenameOldOp());
|
||||||
instances.put(OP_RENAME_OLD, new RenameOldOp());
|
inst.put(OP_DELETE, new DeleteOp());
|
||||||
instances.put(OP_DELETE, new DeleteOp());
|
inst.put(OP_MKDIR, new MkdirOp());
|
||||||
instances.put(OP_MKDIR, new MkdirOp());
|
inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
|
||||||
instances.put(OP_SET_GENSTAMP, new SetGenstampOp());
|
inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
||||||
instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
|
inst.put(OP_SET_OWNER, new SetOwnerOp());
|
||||||
instances.put(OP_SET_OWNER, new SetOwnerOp());
|
inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
||||||
instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
|
inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
|
||||||
instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
|
inst.put(OP_SET_QUOTA, new SetQuotaOp());
|
||||||
instances.put(OP_SET_QUOTA, new SetQuotaOp());
|
inst.put(OP_TIMES, new TimesOp());
|
||||||
instances.put(OP_TIMES, new TimesOp());
|
inst.put(OP_SYMLINK, new SymlinkOp());
|
||||||
instances.put(OP_SYMLINK, new SymlinkOp());
|
inst.put(OP_RENAME, new RenameOp());
|
||||||
instances.put(OP_RENAME, new RenameOp());
|
inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
|
||||||
instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
|
inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
|
||||||
instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
|
inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
|
||||||
instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
|
inst.put(OP_CANCEL_DELEGATION_TOKEN,
|
||||||
instances.put(OP_CANCEL_DELEGATION_TOKEN,
|
new CancelDelegationTokenOp());
|
||||||
new CancelDelegationTokenOp());
|
inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
|
||||||
instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
|
inst.put(OP_START_LOG_SEGMENT,
|
||||||
instances.put(OP_START_LOG_SEGMENT,
|
new LogSegmentOp(OP_START_LOG_SEGMENT));
|
||||||
new LogSegmentOp(OP_START_LOG_SEGMENT));
|
inst.put(OP_END_LOG_SEGMENT,
|
||||||
instances.put(OP_END_LOG_SEGMENT,
|
new LogSegmentOp(OP_END_LOG_SEGMENT));
|
||||||
new LogSegmentOp(OP_END_LOG_SEGMENT));
|
inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
|
||||||
instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
|
}
|
||||||
return instances;
|
|
||||||
}
|
public FSEditLogOp get(FSEditLogOpCodes opcode) {
|
||||||
};
|
return inst.get(opcode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for an EditLog Op. EditLog ops cannot be constructed
|
* Constructor for an EditLog Op. EditLog ops cannot be constructed
|
||||||
|
@ -117,13 +123,22 @@ public abstract class FSEditLogOp {
|
||||||
*/
|
*/
|
||||||
private FSEditLogOp(FSEditLogOpCodes opCode) {
|
private FSEditLogOp(FSEditLogOpCodes opCode) {
|
||||||
this.opCode = opCode;
|
this.opCode = opCode;
|
||||||
this.txid = 0;
|
this.txid = HdfsConstants.INVALID_TXID;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTransactionId() {
|
public long getTransactionId() {
|
||||||
|
Preconditions.checkState(txid != HdfsConstants.INVALID_TXID);
|
||||||
return txid;
|
return txid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getTransactionIdStr() {
|
||||||
|
return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasTransactionId() {
|
||||||
|
return (txid != HdfsConstants.INVALID_TXID);
|
||||||
|
}
|
||||||
|
|
||||||
public void setTransactionId(long txid) {
|
public void setTransactionId(long txid) {
|
||||||
this.txid = txid;
|
this.txid = txid;
|
||||||
}
|
}
|
||||||
|
@ -373,8 +388,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_ADD);
|
super(OP_ADD);
|
||||||
}
|
}
|
||||||
|
|
||||||
static AddOp getInstance() {
|
static AddOp getInstance(OpInstanceCache cache) {
|
||||||
return (AddOp)opInstances.get().get(OP_ADD);
|
return (AddOp)cache.get(OP_ADD);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shouldCompleteLastBlock() {
|
public boolean shouldCompleteLastBlock() {
|
||||||
|
@ -395,8 +410,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_CLOSE);
|
super(OP_CLOSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static CloseOp getInstance() {
|
static CloseOp getInstance(OpInstanceCache cache) {
|
||||||
return (CloseOp)opInstances.get().get(OP_CLOSE);
|
return (CloseOp)cache.get(OP_CLOSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean shouldCompleteLastBlock() {
|
public boolean shouldCompleteLastBlock() {
|
||||||
|
@ -420,9 +435,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_UPDATE_BLOCKS);
|
super(OP_UPDATE_BLOCKS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static UpdateBlocksOp getInstance() {
|
static UpdateBlocksOp getInstance(OpInstanceCache cache) {
|
||||||
return (UpdateBlocksOp)opInstances.get()
|
return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
|
||||||
.get(OP_UPDATE_BLOCKS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -500,9 +514,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_REPLICATION);
|
super(OP_SET_REPLICATION);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetReplicationOp getInstance() {
|
static SetReplicationOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetReplicationOp)opInstances.get()
|
return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
|
||||||
.get(OP_SET_REPLICATION);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetReplicationOp setPath(String path) {
|
SetReplicationOp setPath(String path) {
|
||||||
|
@ -571,9 +584,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_CONCAT_DELETE);
|
super(OP_CONCAT_DELETE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ConcatDeleteOp getInstance() {
|
static ConcatDeleteOp getInstance(OpInstanceCache cache) {
|
||||||
return (ConcatDeleteOp)opInstances.get()
|
return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
|
||||||
.get(OP_CONCAT_DELETE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ConcatDeleteOp setTarget(String trg) {
|
ConcatDeleteOp setTarget(String trg) {
|
||||||
|
@ -697,9 +709,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_RENAME_OLD);
|
super(OP_RENAME_OLD);
|
||||||
}
|
}
|
||||||
|
|
||||||
static RenameOldOp getInstance() {
|
static RenameOldOp getInstance(OpInstanceCache cache) {
|
||||||
return (RenameOldOp)opInstances.get()
|
return (RenameOldOp)cache.get(OP_RENAME_OLD);
|
||||||
.get(OP_RENAME_OLD);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RenameOldOp setSource(String src) {
|
RenameOldOp setSource(String src) {
|
||||||
|
@ -790,9 +801,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_DELETE);
|
super(OP_DELETE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static DeleteOp getInstance() {
|
static DeleteOp getInstance(OpInstanceCache cache) {
|
||||||
return (DeleteOp)opInstances.get()
|
return (DeleteOp)cache.get(OP_DELETE);
|
||||||
.get(OP_DELETE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
DeleteOp setPath(String path) {
|
DeleteOp setPath(String path) {
|
||||||
|
@ -872,9 +882,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_MKDIR);
|
super(OP_MKDIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
static MkdirOp getInstance() {
|
static MkdirOp getInstance(OpInstanceCache cache) {
|
||||||
return (MkdirOp)opInstances.get()
|
return (MkdirOp)cache.get(OP_MKDIR);
|
||||||
.get(OP_MKDIR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MkdirOp setPath(String path) {
|
MkdirOp setPath(String path) {
|
||||||
|
@ -977,9 +986,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_GENSTAMP);
|
super(OP_SET_GENSTAMP);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetGenstampOp getInstance() {
|
static SetGenstampOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetGenstampOp)opInstances.get()
|
return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
|
||||||
.get(OP_SET_GENSTAMP);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetGenstampOp setGenerationStamp(long genStamp) {
|
SetGenstampOp setGenerationStamp(long genStamp) {
|
||||||
|
@ -1031,9 +1039,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_PERMISSIONS);
|
super(OP_SET_PERMISSIONS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetPermissionsOp getInstance() {
|
static SetPermissionsOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetPermissionsOp)opInstances.get()
|
return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
|
||||||
.get(OP_SET_PERMISSIONS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetPermissionsOp setSource(String src) {
|
SetPermissionsOp setSource(String src) {
|
||||||
|
@ -1098,9 +1105,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_OWNER);
|
super(OP_SET_OWNER);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetOwnerOp getInstance() {
|
static SetOwnerOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetOwnerOp)opInstances.get()
|
return (SetOwnerOp)cache.get(OP_SET_OWNER);
|
||||||
.get(OP_SET_OWNER);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetOwnerOp setSource(String src) {
|
SetOwnerOp setSource(String src) {
|
||||||
|
@ -1179,9 +1185,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_NS_QUOTA);
|
super(OP_SET_NS_QUOTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetNSQuotaOp getInstance() {
|
static SetNSQuotaOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetNSQuotaOp)opInstances.get()
|
return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
|
||||||
.get(OP_SET_NS_QUOTA);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1232,9 +1237,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_CLEAR_NS_QUOTA);
|
super(OP_CLEAR_NS_QUOTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ClearNSQuotaOp getInstance() {
|
static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
|
||||||
return (ClearNSQuotaOp)opInstances.get()
|
return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
|
||||||
.get(OP_CLEAR_NS_QUOTA);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1281,9 +1285,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SET_QUOTA);
|
super(OP_SET_QUOTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SetQuotaOp getInstance() {
|
static SetQuotaOp getInstance(OpInstanceCache cache) {
|
||||||
return (SetQuotaOp)opInstances.get()
|
return (SetQuotaOp)cache.get(OP_SET_QUOTA);
|
||||||
.get(OP_SET_QUOTA);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SetQuotaOp setSource(String src) {
|
SetQuotaOp setSource(String src) {
|
||||||
|
@ -1360,9 +1363,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_TIMES);
|
super(OP_TIMES);
|
||||||
}
|
}
|
||||||
|
|
||||||
static TimesOp getInstance() {
|
static TimesOp getInstance(OpInstanceCache cache) {
|
||||||
return (TimesOp)opInstances.get()
|
return (TimesOp)cache.get(OP_TIMES);
|
||||||
.get(OP_TIMES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TimesOp setPath(String path) {
|
TimesOp setPath(String path) {
|
||||||
|
@ -1458,9 +1460,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_SYMLINK);
|
super(OP_SYMLINK);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SymlinkOp getInstance() {
|
static SymlinkOp getInstance(OpInstanceCache cache) {
|
||||||
return (SymlinkOp)opInstances.get()
|
return (SymlinkOp)cache.get(OP_SYMLINK);
|
||||||
.get(OP_SYMLINK);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SymlinkOp setPath(String path) {
|
SymlinkOp setPath(String path) {
|
||||||
|
@ -1579,9 +1580,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_RENAME);
|
super(OP_RENAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
static RenameOp getInstance() {
|
static RenameOp getInstance(OpInstanceCache cache) {
|
||||||
return (RenameOp)opInstances.get()
|
return (RenameOp)cache.get(OP_RENAME);
|
||||||
.get(OP_RENAME);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RenameOp setSource(String src) {
|
RenameOp setSource(String src) {
|
||||||
|
@ -1723,9 +1723,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_REASSIGN_LEASE);
|
super(OP_REASSIGN_LEASE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static ReassignLeaseOp getInstance() {
|
static ReassignLeaseOp getInstance(OpInstanceCache cache) {
|
||||||
return (ReassignLeaseOp)opInstances.get()
|
return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
|
||||||
.get(OP_REASSIGN_LEASE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ReassignLeaseOp setLeaseHolder(String leaseHolder) {
|
ReassignLeaseOp setLeaseHolder(String leaseHolder) {
|
||||||
|
@ -1798,9 +1797,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_GET_DELEGATION_TOKEN);
|
super(OP_GET_DELEGATION_TOKEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
static GetDelegationTokenOp getInstance() {
|
static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
|
||||||
return (GetDelegationTokenOp)opInstances.get()
|
return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
|
||||||
.get(OP_GET_DELEGATION_TOKEN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
GetDelegationTokenOp setDelegationTokenIdentifier(
|
GetDelegationTokenOp setDelegationTokenIdentifier(
|
||||||
|
@ -1870,9 +1868,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_RENEW_DELEGATION_TOKEN);
|
super(OP_RENEW_DELEGATION_TOKEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
static RenewDelegationTokenOp getInstance() {
|
static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
|
||||||
return (RenewDelegationTokenOp)opInstances.get()
|
return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
|
||||||
.get(OP_RENEW_DELEGATION_TOKEN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RenewDelegationTokenOp setDelegationTokenIdentifier(
|
RenewDelegationTokenOp setDelegationTokenIdentifier(
|
||||||
|
@ -1941,9 +1938,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_CANCEL_DELEGATION_TOKEN);
|
super(OP_CANCEL_DELEGATION_TOKEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
static CancelDelegationTokenOp getInstance() {
|
static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
|
||||||
return (CancelDelegationTokenOp)opInstances.get()
|
return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
|
||||||
.get(OP_CANCEL_DELEGATION_TOKEN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CancelDelegationTokenOp setDelegationTokenIdentifier(
|
CancelDelegationTokenOp setDelegationTokenIdentifier(
|
||||||
|
@ -1996,9 +1992,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_UPDATE_MASTER_KEY);
|
super(OP_UPDATE_MASTER_KEY);
|
||||||
}
|
}
|
||||||
|
|
||||||
static UpdateMasterKeyOp getInstance() {
|
static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
|
||||||
return (UpdateMasterKeyOp)opInstances.get()
|
return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
|
||||||
.get(OP_UPDATE_MASTER_KEY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
|
UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
|
||||||
|
@ -2050,8 +2045,9 @@ public abstract class FSEditLogOp {
|
||||||
code == OP_END_LOG_SEGMENT : "Bad op: " + code;
|
code == OP_END_LOG_SEGMENT : "Bad op: " + code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static LogSegmentOp getInstance(FSEditLogOpCodes code) {
|
static LogSegmentOp getInstance(OpInstanceCache cache,
|
||||||
return (LogSegmentOp)opInstances.get().get(code);
|
FSEditLogOpCodes code) {
|
||||||
|
return (LogSegmentOp)cache.get(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void readFields(DataInputStream in, int logVersion)
|
public void readFields(DataInputStream in, int logVersion)
|
||||||
|
@ -2091,8 +2087,8 @@ public abstract class FSEditLogOp {
|
||||||
super(OP_INVALID);
|
super(OP_INVALID);
|
||||||
}
|
}
|
||||||
|
|
||||||
static InvalidOp getInstance() {
|
static InvalidOp getInstance(OpInstanceCache cache) {
|
||||||
return (InvalidOp)opInstances.get().get(OP_INVALID);
|
return (InvalidOp)cache.get(OP_INVALID);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2207,6 +2203,7 @@ public abstract class FSEditLogOp {
|
||||||
private final DataInputStream in;
|
private final DataInputStream in;
|
||||||
private final int logVersion;
|
private final int logVersion;
|
||||||
private final Checksum checksum;
|
private final Checksum checksum;
|
||||||
|
private final OpInstanceCache cache;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct the reader
|
* Construct the reader
|
||||||
|
@ -2228,6 +2225,7 @@ public abstract class FSEditLogOp {
|
||||||
} else {
|
} else {
|
||||||
this.in = in;
|
this.in = in;
|
||||||
}
|
}
|
||||||
|
this.cache = new OpInstanceCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2236,16 +2234,42 @@ public abstract class FSEditLogOp {
|
||||||
* Note that the objects returned from this method may be re-used by future
|
* Note that the objects returned from this method may be re-used by future
|
||||||
* calls to the same method.
|
* calls to the same method.
|
||||||
*
|
*
|
||||||
|
* @param skipBrokenEdits If true, attempt to skip over damaged parts of
|
||||||
|
* the input stream, rather than throwing an IOException
|
||||||
* @return the operation read from the stream, or null at the end of the file
|
* @return the operation read from the stream, or null at the end of the file
|
||||||
* @throws IOException on error.
|
* @throws IOException on error.
|
||||||
*/
|
*/
|
||||||
public FSEditLogOp readOp() throws IOException {
|
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
|
||||||
|
FSEditLogOp op = null;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
in.mark(in.available());
|
||||||
|
try {
|
||||||
|
op = decodeOp();
|
||||||
|
} finally {
|
||||||
|
// If we encountered an exception or an end-of-file condition,
|
||||||
|
// do not advance the input stream.
|
||||||
|
if (op == null) {
|
||||||
|
in.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return op;
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (!skipBrokenEdits) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
if (in.skip(1) < 1) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSEditLogOp decodeOp() throws IOException {
|
||||||
if (checksum != null) {
|
if (checksum != null) {
|
||||||
checksum.reset();
|
checksum.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
in.mark(1);
|
|
||||||
|
|
||||||
byte opCodeByte;
|
byte opCodeByte;
|
||||||
try {
|
try {
|
||||||
opCodeByte = in.readByte();
|
opCodeByte = in.readByte();
|
||||||
|
@ -2255,12 +2279,10 @@ public abstract class FSEditLogOp {
|
||||||
}
|
}
|
||||||
|
|
||||||
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
|
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
|
||||||
if (opCode == OP_INVALID) {
|
if (opCode == OP_INVALID)
|
||||||
in.reset(); // reset back to end of file if somebody reads it again
|
|
||||||
return null;
|
return null;
|
||||||
}
|
|
||||||
|
|
||||||
FSEditLogOp op = opInstances.get().get(opCode);
|
FSEditLogOp op = cache.get(opCode);
|
||||||
if (op == null) {
|
if (op == null) {
|
||||||
throw new IOException("Read invalid opcode " + opCode);
|
throw new IOException("Read invalid opcode " + opCode);
|
||||||
}
|
}
|
||||||
|
@ -2268,6 +2290,8 @@ public abstract class FSEditLogOp {
|
||||||
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
|
||||||
// Read the txid
|
// Read the txid
|
||||||
op.setTransactionId(in.readLong());
|
op.setTransactionId(in.readLong());
|
||||||
|
} else {
|
||||||
|
op.setTransactionId(HdfsConstants.INVALID_TXID);
|
||||||
}
|
}
|
||||||
|
|
||||||
op.readFields(in, logVersion);
|
op.readFields(in, logVersion);
|
||||||
|
@ -2426,8 +2450,4 @@ public abstract class FSEditLogOp {
|
||||||
short mode = Short.valueOf(st.getValue("MODE"));
|
short mode = Short.valueOf(st.getValue("MODE"));
|
||||||
return new PermissionStatus(username, groupname, new FsPermission(mode));
|
return new PermissionStatus(username, groupname, new FsPermission(mode));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) {
|
|
||||||
return opInstances.get().get(opCode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -158,8 +158,8 @@ public class FSImage implements Closeable {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @return true if the image needs to be saved or false otherwise
|
* @return true if the image needs to be saved or false otherwise
|
||||||
*/
|
*/
|
||||||
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target)
|
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
|
||||||
throws IOException {
|
MetaRecoveryContext recovery) throws IOException {
|
||||||
assert startOpt != StartupOption.FORMAT :
|
assert startOpt != StartupOption.FORMAT :
|
||||||
"NameNode formatting should be performed before reading the image";
|
"NameNode formatting should be performed before reading the image";
|
||||||
|
|
||||||
|
@ -244,7 +244,7 @@ public class FSImage implements Closeable {
|
||||||
// just load the image
|
// just load the image
|
||||||
}
|
}
|
||||||
|
|
||||||
return loadFSImage(target);
|
return loadFSImage(target, recovery);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -304,7 +304,7 @@ public class FSImage implements Closeable {
|
||||||
if(storage.getDistributedUpgradeState()) {
|
if(storage.getDistributedUpgradeState()) {
|
||||||
// only distributed upgrade need to continue
|
// only distributed upgrade need to continue
|
||||||
// don't do version upgrade
|
// don't do version upgrade
|
||||||
this.loadFSImage(target);
|
this.loadFSImage(target, null);
|
||||||
storage.initializeDistributedUpgrade();
|
storage.initializeDistributedUpgrade();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -319,7 +319,7 @@ public class FSImage implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// load the latest image
|
// load the latest image
|
||||||
this.loadFSImage(target);
|
this.loadFSImage(target, null);
|
||||||
|
|
||||||
// Do upgrade for each directory
|
// Do upgrade for each directory
|
||||||
long oldCTime = storage.getCTime();
|
long oldCTime = storage.getCTime();
|
||||||
|
@ -505,7 +505,7 @@ public class FSImage implements Closeable {
|
||||||
target.dir.fsImage = ckptImage;
|
target.dir.fsImage = ckptImage;
|
||||||
// load from the checkpoint dirs
|
// load from the checkpoint dirs
|
||||||
try {
|
try {
|
||||||
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target);
|
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
|
||||||
} finally {
|
} finally {
|
||||||
ckptImage.close();
|
ckptImage.close();
|
||||||
}
|
}
|
||||||
|
@ -550,7 +550,7 @@ public class FSImage implements Closeable {
|
||||||
target.dir.reset();
|
target.dir.reset();
|
||||||
|
|
||||||
LOG.debug("Reloading namespace from " + file);
|
LOG.debug("Reloading namespace from " + file);
|
||||||
loadFSImage(file, target);
|
loadFSImage(file, target, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -568,7 +568,8 @@ public class FSImage implements Closeable {
|
||||||
* @return whether the image should be saved
|
* @return whether the image should be saved
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
boolean loadFSImage(FSNamesystem target) throws IOException {
|
boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
|
||||||
|
throws IOException {
|
||||||
FSImageStorageInspector inspector = storage.readAndInspectDirs();
|
FSImageStorageInspector inspector = storage.readAndInspectDirs();
|
||||||
|
|
||||||
isUpgradeFinalized = inspector.isUpgradeFinalized();
|
isUpgradeFinalized = inspector.isUpgradeFinalized();
|
||||||
|
@ -583,7 +584,6 @@ public class FSImage implements Closeable {
|
||||||
// We only want to recover streams if we're going into Active mode.
|
// We only want to recover streams if we're going into Active mode.
|
||||||
editLog.recoverUnclosedStreams();
|
editLog.recoverUnclosedStreams();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
|
||||||
getLayoutVersion())) {
|
getLayoutVersion())) {
|
||||||
// If we're open for write, we're either non-HA or we're the active NN, so
|
// If we're open for write, we're either non-HA or we're the active NN, so
|
||||||
|
@ -610,7 +610,7 @@ public class FSImage implements Closeable {
|
||||||
getLayoutVersion())) {
|
getLayoutVersion())) {
|
||||||
// For txid-based layout, we should have a .md5 file
|
// For txid-based layout, we should have a .md5 file
|
||||||
// next to the image file
|
// next to the image file
|
||||||
loadFSImage(imageFile.getFile(), target);
|
loadFSImage(imageFile.getFile(), target, recovery);
|
||||||
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
|
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
|
||||||
getLayoutVersion())) {
|
getLayoutVersion())) {
|
||||||
// In 0.22, we have the checksum stored in the VERSION file.
|
// In 0.22, we have the checksum stored in the VERSION file.
|
||||||
|
@ -622,22 +622,19 @@ public class FSImage implements Closeable {
|
||||||
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
|
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
|
||||||
" not set for storage directory " + sdForProperties.getRoot());
|
" not set for storage directory " + sdForProperties.getRoot());
|
||||||
}
|
}
|
||||||
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target);
|
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery);
|
||||||
} else {
|
} else {
|
||||||
// We don't have any record of the md5sum
|
// We don't have any record of the md5sum
|
||||||
loadFSImage(imageFile.getFile(), null, target);
|
loadFSImage(imageFile.getFile(), null, target, recovery);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
FSEditLog.closeAllStreams(editStreams);
|
FSEditLog.closeAllStreams(editStreams);
|
||||||
throw new IOException("Failed to load image from " + imageFile, ioe);
|
throw new IOException("Failed to load image from " + imageFile, ioe);
|
||||||
}
|
}
|
||||||
|
long txnsAdvanced = loadEdits(editStreams, target, recovery);
|
||||||
long numLoaded = loadEdits(editStreams, target);
|
|
||||||
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
|
||||||
numLoaded);
|
txnsAdvanced);
|
||||||
|
editLog.setNextTxId(lastAppliedTxId + 1);
|
||||||
// update the txid for the edit log
|
|
||||||
editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
|
|
||||||
return needToSave;
|
return needToSave;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,33 +661,29 @@ public class FSImage implements Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the specified list of edit files into the image.
|
* Load the specified list of edit files into the image.
|
||||||
* @return the number of transactions loaded
|
|
||||||
*/
|
*/
|
||||||
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
public long loadEdits(Iterable<EditLogInputStream> editStreams,
|
||||||
FSNamesystem target) throws IOException, EditLogInputException {
|
FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
|
||||||
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
|
||||||
|
|
||||||
long startingTxId = getLastAppliedTxId() + 1;
|
long prevLastAppliedTxId = lastAppliedTxId;
|
||||||
long numLoaded = 0;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(target);
|
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
|
||||||
|
|
||||||
// Load latest edits
|
// Load latest edits
|
||||||
for (EditLogInputStream editIn : editStreams) {
|
for (EditLogInputStream editIn : editStreams) {
|
||||||
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
|
LOG.info("Reading " + editIn + " expecting start txid #" +
|
||||||
long thisNumLoaded = 0;
|
(lastAppliedTxId + 1));
|
||||||
try {
|
try {
|
||||||
thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
|
loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
|
||||||
} catch (EditLogInputException elie) {
|
|
||||||
thisNumLoaded = elie.getNumEditsLoaded();
|
|
||||||
throw elie;
|
|
||||||
} finally {
|
} finally {
|
||||||
// Update lastAppliedTxId even in case of error, since some ops may
|
// Update lastAppliedTxId even in case of error, since some ops may
|
||||||
// have been successfully applied before the error.
|
// have been successfully applied before the error.
|
||||||
lastAppliedTxId = startingTxId + thisNumLoaded - 1;
|
lastAppliedTxId = loader.getLastAppliedTxId();
|
||||||
startingTxId += thisNumLoaded;
|
}
|
||||||
numLoaded += thisNumLoaded;
|
// If we are in recovery mode, we may have skipped over some txids.
|
||||||
|
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
|
||||||
|
lastAppliedTxId = editIn.getLastTxId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -698,8 +691,7 @@ public class FSImage implements Closeable {
|
||||||
// update the counts
|
// update the counts
|
||||||
target.dir.updateCountForINodeWithQuota();
|
target.dir.updateCountForINodeWithQuota();
|
||||||
}
|
}
|
||||||
|
return lastAppliedTxId - prevLastAppliedTxId;
|
||||||
return numLoaded;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -707,14 +699,14 @@ public class FSImage implements Closeable {
|
||||||
* Load the image namespace from the given image file, verifying
|
* Load the image namespace from the given image file, verifying
|
||||||
* it against the MD5 sum stored in its associated .md5 file.
|
* it against the MD5 sum stored in its associated .md5 file.
|
||||||
*/
|
*/
|
||||||
private void loadFSImage(File imageFile, FSNamesystem target)
|
private void loadFSImage(File imageFile, FSNamesystem target,
|
||||||
throws IOException {
|
MetaRecoveryContext recovery) throws IOException {
|
||||||
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
|
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
|
||||||
if (expectedMD5 == null) {
|
if (expectedMD5 == null) {
|
||||||
throw new IOException("No MD5 file found corresponding to image file "
|
throw new IOException("No MD5 file found corresponding to image file "
|
||||||
+ imageFile);
|
+ imageFile);
|
||||||
}
|
}
|
||||||
loadFSImage(imageFile, expectedMD5, target);
|
loadFSImage(imageFile, expectedMD5, target, recovery);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -722,7 +714,7 @@ public class FSImage implements Closeable {
|
||||||
* filenames and blocks.
|
* filenames and blocks.
|
||||||
*/
|
*/
|
||||||
private void loadFSImage(File curFile, MD5Hash expectedMd5,
|
private void loadFSImage(File curFile, MD5Hash expectedMd5,
|
||||||
FSNamesystem target) throws IOException {
|
FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
|
||||||
FSImageFormat.Loader loader = new FSImageFormat.Loader(
|
FSImageFormat.Loader loader = new FSImageFormat.Loader(
|
||||||
conf, target);
|
conf, target);
|
||||||
loader.load(curFile);
|
loader.load(curFile);
|
||||||
|
|
|
@ -56,7 +56,14 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
|
// Check for a seen_txid file, which marks a minimum transaction ID that
|
||||||
|
// must be included in our load plan.
|
||||||
|
try {
|
||||||
|
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
File currentDir = sd.getCurrentDir();
|
File currentDir = sd.getCurrentDir();
|
||||||
File filesInStorage[];
|
File filesInStorage[];
|
||||||
|
@ -91,15 +98,6 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Check for a seen_txid file, which marks a minimum transaction ID that
|
|
||||||
// must be included in our load plan.
|
|
||||||
try {
|
|
||||||
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
|
|
||||||
}
|
|
||||||
|
|
||||||
// set finalized flag
|
// set finalized flag
|
||||||
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
|
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
|
||||||
}
|
}
|
||||||
|
|
|
@ -380,9 +380,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
|
|
||||||
FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
|
FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
|
||||||
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
|
FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
|
||||||
|
StartupOption startOpt = NameNode.getStartupOption(conf);
|
||||||
|
if (startOpt == StartupOption.RECOVER) {
|
||||||
|
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||||
|
}
|
||||||
|
|
||||||
long loadStart = now();
|
long loadStart = now();
|
||||||
StartupOption startOpt = NameNode.getStartupOption(conf);
|
|
||||||
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
|
||||||
namesystem.loadFSImage(startOpt, fsImage,
|
namesystem.loadFSImage(startOpt, fsImage,
|
||||||
HAUtil.isHAEnabled(conf, nameserviceId));
|
HAUtil.isHAEnabled(conf, nameserviceId));
|
||||||
|
@ -491,7 +494,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
// We shouldn't be calling saveNamespace if we've come up in standby state.
|
// We shouldn't be calling saveNamespace if we've come up in standby state.
|
||||||
if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) {
|
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
|
||||||
|
if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) {
|
||||||
fsImage.saveNamespace(this);
|
fsImage.saveNamespace(this);
|
||||||
}
|
}
|
||||||
// This will start a new log segment and write to the seen_txid file, so
|
// This will start a new log segment and write to the seen_txid file, so
|
||||||
|
|
|
@ -232,7 +232,10 @@ class FileJournalManager implements JournalManager {
|
||||||
LOG.info(String.format("Log begins at txid %d, but requested start "
|
LOG.info(String.format("Log begins at txid %d, but requested start "
|
||||||
+ "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
|
+ "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
|
||||||
transactionsToSkip));
|
transactionsToSkip));
|
||||||
elfis.skipTransactions(transactionsToSkip);
|
}
|
||||||
|
if (elfis.skipUntil(fromTxId) == false) {
|
||||||
|
throw new IOException("failed to advance input stream to txid " +
|
||||||
|
fromTxId);
|
||||||
}
|
}
|
||||||
return elfis;
|
return elfis;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,56 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A generic interface for journal input and output streams.
|
|
||||||
*/
|
|
||||||
interface JournalStream {
|
|
||||||
/**
|
|
||||||
* Type of the underlying persistent storage type the stream is based upon.
|
|
||||||
* <ul>
|
|
||||||
* <li>{@link JournalType#FILE} - streams edits into a local file, see
|
|
||||||
* {@link FSEditLog.EditLogFileOutputStream} and
|
|
||||||
* {@link FSEditLog.EditLogFileInputStream}</li>
|
|
||||||
* <li>{@link JournalType#BACKUP} - streams edits to a backup node, see
|
|
||||||
* {@link EditLogBackupOutputStream} and {@link EditLogBackupInputStream}</li>
|
|
||||||
* </ul>
|
|
||||||
*/
|
|
||||||
static enum JournalType {
|
|
||||||
FILE,
|
|
||||||
BACKUP;
|
|
||||||
boolean isOfType(JournalType other) {
|
|
||||||
return other == null || this == other;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get this stream name.
|
|
||||||
*
|
|
||||||
* @return name of the stream
|
|
||||||
*/
|
|
||||||
String getName();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the type of the stream.
|
|
||||||
* Determines the underlying persistent storage type.
|
|
||||||
* @see JournalType
|
|
||||||
* @return type
|
|
||||||
*/
|
|
||||||
JournalType getType();
|
|
||||||
}
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/** Context data for an ongoing NameNode metadata recovery process. */
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public final class MetaRecoveryContext {
|
||||||
|
public static final Log LOG = LogFactory.getLog(MetaRecoveryContext.class.getName());
|
||||||
|
public final static int FORCE_NONE = 0;
|
||||||
|
public final static int FORCE_FIRST_CHOICE = 1;
|
||||||
|
public final static int FORCE_ALL = 2;
|
||||||
|
private int force;
|
||||||
|
|
||||||
|
/** Exception thrown when the user has requested processing to stop. */
|
||||||
|
static public class RequestStopException extends IOException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
public RequestStopException(String msg) {
|
||||||
|
super(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public MetaRecoveryContext(int force) {
|
||||||
|
this.force = force;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Display a prompt to the user and get his or her choice.
|
||||||
|
*
|
||||||
|
* @param prompt The prompt to display
|
||||||
|
* @param default First choice (will be taken if autoChooseDefault is
|
||||||
|
* true)
|
||||||
|
* @param choices Other choies
|
||||||
|
*
|
||||||
|
* @return The choice that was taken
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public String ask(String prompt, String firstChoice, String... choices)
|
||||||
|
throws IOException {
|
||||||
|
while (true) {
|
||||||
|
LOG.info(prompt);
|
||||||
|
if (force > FORCE_NONE) {
|
||||||
|
LOG.info("automatically choosing " + firstChoice);
|
||||||
|
return firstChoice;
|
||||||
|
}
|
||||||
|
StringBuilder responseBuilder = new StringBuilder();
|
||||||
|
while (true) {
|
||||||
|
int c = System.in.read();
|
||||||
|
if (c == -1 || c == '\r' || c == '\n') {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
responseBuilder.append((char)c);
|
||||||
|
}
|
||||||
|
String response = responseBuilder.toString();
|
||||||
|
if (response.equalsIgnoreCase(firstChoice))
|
||||||
|
return firstChoice;
|
||||||
|
for (String c : choices) {
|
||||||
|
if (response.equalsIgnoreCase(c)) {
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.error("I'm sorry, I cannot understand your response.\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void editLogLoaderPrompt(String prompt,
|
||||||
|
MetaRecoveryContext recovery, String contStr)
|
||||||
|
throws IOException, RequestStopException
|
||||||
|
{
|
||||||
|
if (recovery == null) {
|
||||||
|
throw new IOException(prompt);
|
||||||
|
}
|
||||||
|
LOG.error(prompt);
|
||||||
|
String answer = recovery.ask("\nEnter 'c' to continue, " + contStr + "\n" +
|
||||||
|
"Enter 's' to stop reading the edit log here, abandoning any later " +
|
||||||
|
"edits\n" +
|
||||||
|
"Enter 'q' to quit without saving\n" +
|
||||||
|
"Enter 'a' to always select the first choice in the future " +
|
||||||
|
"without prompting. " +
|
||||||
|
"(c/s/q/a)\n", "c", "s", "q", "a");
|
||||||
|
if (answer.equals("c")) {
|
||||||
|
LOG.info("Continuing.");
|
||||||
|
return;
|
||||||
|
} else if (answer.equals("s")) {
|
||||||
|
throw new RequestStopException("user requested stop");
|
||||||
|
} else if (answer.equals("q")) {
|
||||||
|
recovery.quit();
|
||||||
|
} else {
|
||||||
|
recovery.setForce(FORCE_FIRST_CHOICE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Log a message and quit */
|
||||||
|
public void quit() {
|
||||||
|
LOG.error("Exiting on user request.");
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getForce() {
|
||||||
|
return this.force;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setForce(int force) {
|
||||||
|
this.force = force;
|
||||||
|
}
|
||||||
|
}
|
|
@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||||
|
|
||||||
|
@ -299,8 +298,7 @@ public class NNStorage extends Storage implements Closeable {
|
||||||
NameNodeDirType.IMAGE;
|
NameNodeDirType.IMAGE;
|
||||||
// Add to the list of storage directories, only if the
|
// Add to the list of storage directories, only if the
|
||||||
// URI is of type file://
|
// URI is of type file://
|
||||||
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
|
if(dirName.getScheme().compareTo("file") == 0) {
|
||||||
== 0){
|
|
||||||
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
||||||
dirType,
|
dirType,
|
||||||
!sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
|
!sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
|
||||||
|
@ -312,8 +310,7 @@ public class NNStorage extends Storage implements Closeable {
|
||||||
checkSchemeConsistency(dirName);
|
checkSchemeConsistency(dirName);
|
||||||
// Add to the list of storage directories, only if the
|
// Add to the list of storage directories, only if the
|
||||||
// URI is of type file://
|
// URI is of type file://
|
||||||
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase())
|
if(dirName.getScheme().compareTo("file") == 0)
|
||||||
== 0)
|
|
||||||
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
|
||||||
NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
|
NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -514,6 +514,8 @@ public class NameNode {
|
||||||
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
|
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
|
||||||
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
|
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
|
||||||
* upgrade and create a snapshot of the current file system state</li>
|
* upgrade and create a snapshot of the current file system state</li>
|
||||||
|
* <li>{@link StartupOption#RECOVERY RECOVERY} - recover name node
|
||||||
|
* metadata</li>
|
||||||
* <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
|
* <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
|
||||||
* cluster back to the previous state</li>
|
* cluster back to the previous state</li>
|
||||||
* <li>{@link StartupOption#FINALIZE FINALIZE} - finalize
|
* <li>{@link StartupOption#FINALIZE FINALIZE} - finalize
|
||||||
|
@ -832,7 +834,10 @@ public class NameNode {
|
||||||
StartupOption.FINALIZE.getName() + "] | [" +
|
StartupOption.FINALIZE.getName() + "] | [" +
|
||||||
StartupOption.IMPORT.getName() + "] | [" +
|
StartupOption.IMPORT.getName() + "] | [" +
|
||||||
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
|
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
|
||||||
StartupOption.INITIALIZESHAREDEDITS.getName() + "]");
|
StartupOption.INITIALIZESHAREDEDITS.getName() + "] | [" +
|
||||||
|
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
|
||||||
|
StartupOption.RECOVER.getName() + " [ " +
|
||||||
|
StartupOption.FORCE.getName() + " ] ]");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static StartupOption parseArguments(String args[]) {
|
private static StartupOption parseArguments(String args[]) {
|
||||||
|
@ -876,6 +881,21 @@ public class NameNode {
|
||||||
} else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
|
} else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
|
||||||
startOpt = StartupOption.INITIALIZESHAREDEDITS;
|
startOpt = StartupOption.INITIALIZESHAREDEDITS;
|
||||||
return startOpt;
|
return startOpt;
|
||||||
|
} else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
|
||||||
|
if (startOpt != StartupOption.REGULAR) {
|
||||||
|
throw new RuntimeException("Can't combine -recover with " +
|
||||||
|
"other startup options.");
|
||||||
|
}
|
||||||
|
startOpt = StartupOption.RECOVER;
|
||||||
|
while (++i < argsLen) {
|
||||||
|
if (args[i].equalsIgnoreCase(
|
||||||
|
StartupOption.FORCE.getName())) {
|
||||||
|
startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Error parsing recovery options: " +
|
||||||
|
"can't understand option \"" + args[i] + "\"");
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -892,6 +912,39 @@ public class NameNode {
|
||||||
StartupOption.REGULAR.toString()));
|
StartupOption.REGULAR.toString()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void doRecovery(StartupOption startOpt, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
|
||||||
|
if (!confirmPrompt("You have selected Metadata Recovery mode. " +
|
||||||
|
"This mode is intended to recover lost metadata on a corrupt " +
|
||||||
|
"filesystem. Metadata recovery mode often permanently deletes " +
|
||||||
|
"data from your HDFS filesystem. Please back up your edit log " +
|
||||||
|
"and fsimage before trying this!\n\n" +
|
||||||
|
"Are you ready to proceed? (Y/N)\n")) {
|
||||||
|
System.err.println("Recovery aborted at user request.\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MetaRecoveryContext.LOG.info("starting recovery...");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
NameNode.initMetrics(conf, startOpt.toNodeRole());
|
||||||
|
FSNamesystem fsn = null;
|
||||||
|
try {
|
||||||
|
fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
fsn.saveNamespace();
|
||||||
|
MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
|
||||||
|
} catch (IOException e) {
|
||||||
|
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
|
||||||
|
throw e;
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
if (fsn != null)
|
||||||
|
fsn.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Print out a prompt to the user, and return true if the user
|
* Print out a prompt to the user, and return true if the user
|
||||||
* responds with "Y" or "yes".
|
* responds with "Y" or "yes".
|
||||||
|
@ -973,6 +1026,10 @@ public class NameNode {
|
||||||
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
|
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
|
||||||
return new BackupNode(conf, role);
|
return new BackupNode(conf, role);
|
||||||
}
|
}
|
||||||
|
case RECOVER: {
|
||||||
|
NameNode.doRecovery(startOpt, conf);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
DefaultMetricsSystem.initialize("NameNode");
|
DefaultMetricsSystem.initialize("NameNode");
|
||||||
return new NameNode(conf);
|
return new NameNode(conf);
|
||||||
|
|
|
@ -219,7 +219,7 @@ public class EditLogTailer {
|
||||||
// disk are ignored.
|
// disk are ignored.
|
||||||
long editsLoaded = 0;
|
long editsLoaded = 0;
|
||||||
try {
|
try {
|
||||||
editsLoaded = image.loadEdits(streams, namesystem);
|
editsLoaded = image.loadEdits(streams, namesystem, null);
|
||||||
} catch (EditLogInputException elie) {
|
} catch (EditLogInputException elie) {
|
||||||
editsLoaded = elie.getNumEditsLoaded();
|
editsLoaded = elie.getNumEditsLoaded();
|
||||||
throw elie;
|
throw elie;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
|
||||||
import org.xml.sax.Attributes;
|
import org.xml.sax.Attributes;
|
||||||
|
@ -54,6 +55,7 @@ class OfflineEditsXmlLoader
|
||||||
private FSEditLogOpCodes opCode;
|
private FSEditLogOpCodes opCode;
|
||||||
private StringBuffer cbuf;
|
private StringBuffer cbuf;
|
||||||
private long nextTxId;
|
private long nextTxId;
|
||||||
|
private final OpInstanceCache opCache = new OpInstanceCache();
|
||||||
|
|
||||||
static enum ParseState {
|
static enum ParseState {
|
||||||
EXPECT_EDITS_TAG,
|
EXPECT_EDITS_TAG,
|
||||||
|
@ -207,7 +209,7 @@ class OfflineEditsXmlLoader
|
||||||
throw new InvalidXmlException("expected </DATA>");
|
throw new InvalidXmlException("expected </DATA>");
|
||||||
}
|
}
|
||||||
state = ParseState.EXPECT_RECORD;
|
state = ParseState.EXPECT_RECORD;
|
||||||
FSEditLogOp op = FSEditLogOp.getOpInstance(opCode);
|
FSEditLogOp op = opCache.get(opCode);
|
||||||
opCode = null;
|
opCode = null;
|
||||||
try {
|
try {
|
||||||
op.decodeXml(stanza);
|
op.decodeXml(stanza);
|
||||||
|
|
|
@ -581,6 +581,10 @@ public class MiniDFSCluster {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (operation == StartupOption.RECOVER) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Start the DataNodes
|
// Start the DataNodes
|
||||||
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
|
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
|
||||||
hosts, simulatedCapacities, setupHostsFile);
|
hosts, simulatedCapacities, setupHostsFile);
|
||||||
|
@ -781,6 +785,9 @@ public class MiniDFSCluster {
|
||||||
operation == StartupOption.REGULAR) ?
|
operation == StartupOption.REGULAR) ?
|
||||||
new String[] {} : new String[] {operation.getName()};
|
new String[] {} : new String[] {operation.getName()};
|
||||||
NameNode nn = NameNode.createNameNode(args, conf);
|
NameNode nn = NameNode.createNameNode(args, conf);
|
||||||
|
if (operation == StartupOption.RECOVER) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// After the NN has started, set back the bound ports into
|
// After the NN has started, set back the bound ports into
|
||||||
// the conf
|
// the conf
|
||||||
|
@ -956,6 +963,9 @@ public class MiniDFSCluster {
|
||||||
long[] simulatedCapacities,
|
long[] simulatedCapacities,
|
||||||
boolean setupHostsFile,
|
boolean setupHostsFile,
|
||||||
boolean checkDataNodeAddrConfig) throws IOException {
|
boolean checkDataNodeAddrConfig) throws IOException {
|
||||||
|
if (operation == StartupOption.RECOVER) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
|
||||||
|
|
||||||
int curDatanodesNum = dataNodes.size();
|
int curDatanodesNum = dataNodes.size();
|
||||||
|
|
|
@ -179,8 +179,8 @@ public class TestEditLog extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
|
private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesys);
|
FSEditLogLoader loader = new FSEditLogLoader(namesys, 0);
|
||||||
return loader.loadFSEdits(new EditLogByteInputStream(data), 1);
|
return loader.loadFSEdits(new EditLogByteInputStream(data), 1, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -315,7 +315,7 @@ public class TestEditLog extends TestCase {
|
||||||
//
|
//
|
||||||
for (Iterator<StorageDirectory> it =
|
for (Iterator<StorageDirectory> it =
|
||||||
fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
|
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
|
||||||
|
|
||||||
File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3,
|
File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3,
|
||||||
3 + expectedTxns - 1);
|
3 + expectedTxns - 1);
|
||||||
|
@ -323,7 +323,7 @@ public class TestEditLog extends TestCase {
|
||||||
|
|
||||||
System.out.println("Verifying file: " + editFile);
|
System.out.println("Verifying file: " + editFile);
|
||||||
long numEdits = loader.loadFSEdits(
|
long numEdits = loader.loadFSEdits(
|
||||||
new EditLogFileInputStream(editFile), 3);
|
new EditLogFileInputStream(editFile), 3, null);
|
||||||
int numLeases = namesystem.leaseManager.countLease();
|
int numLeases = namesystem.leaseManager.countLease();
|
||||||
System.out.println("Number of outstanding leases " + numLeases);
|
System.out.println("Number of outstanding leases " + numLeases);
|
||||||
assertEquals(0, numLeases);
|
assertEquals(0, numLeases);
|
||||||
|
@ -774,8 +774,8 @@ public class TestEditLog extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FSEditLogOp readOp() throws IOException {
|
protected FSEditLogOp nextOp() throws IOException {
|
||||||
return reader.readOp();
|
return reader.readOp(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -788,16 +788,11 @@ public class TestEditLog extends TestCase {
|
||||||
input.close();
|
input.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return "AnonEditLogByteInputStream";
|
return "AnonEditLogByteInputStream";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // JournalStream
|
|
||||||
public JournalType getType() {
|
|
||||||
return JournalType.FILE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isInProgress() {
|
public boolean isInProgress() {
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -236,9 +236,9 @@ public class TestEditLogRace {
|
||||||
File editFile = new File(sd.getCurrentDir(), logFileName);
|
File editFile = new File(sd.getCurrentDir(), logFileName);
|
||||||
|
|
||||||
System.out.println("Verifying file: " + editFile);
|
System.out.println("Verifying file: " + editFile);
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
|
FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
|
||||||
long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
|
long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
|
||||||
startTxId);
|
startTxId, null);
|
||||||
|
|
||||||
System.out.println("Number of edits: " + numEditsThisLog);
|
System.out.println("Number of edits: " + numEditsThisLog);
|
||||||
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
|
assertTrue(numEdits == -1 || numEditsThisLog == numEdits);
|
||||||
|
|
|
@ -92,8 +92,8 @@ public class TestFSEditLogLoader {
|
||||||
rwf.close();
|
rwf.close();
|
||||||
|
|
||||||
StringBuilder bld = new StringBuilder();
|
StringBuilder bld = new StringBuilder();
|
||||||
bld.append("^Error replaying edit log at offset \\d+");
|
bld.append("^Error replaying edit log at offset \\d+. ");
|
||||||
bld.append(" on transaction ID \\d+\n");
|
bld.append("Expected transaction ID was \\d+\n");
|
||||||
bld.append("Recent opcode offsets: (\\d+\\s*){4}$");
|
bld.append("Recent opcode offsets: (\\d+\\s*){4}$");
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)
|
||||||
|
|
|
@ -0,0 +1,305 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.RandomAccessFile;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests data recovery mode for the NameNode.
|
||||||
|
*/
|
||||||
|
public class TestNameNodeRecovery {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
|
||||||
|
private static StartupOption recoverStartOpt = StartupOption.RECOVER;
|
||||||
|
|
||||||
|
static {
|
||||||
|
recoverStartOpt.setForce(MetaRecoveryContext.FORCE_ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void runEditLogTest(EditLogTestSetup elts) throws IOException {
|
||||||
|
final String TEST_LOG_NAME = "test_edit_log";
|
||||||
|
final OpInstanceCache cache = new OpInstanceCache();
|
||||||
|
|
||||||
|
EditLogFileOutputStream elfos = null;
|
||||||
|
File file = null;
|
||||||
|
EditLogFileInputStream elfis = null;
|
||||||
|
try {
|
||||||
|
file = new File(TEST_LOG_NAME);
|
||||||
|
elfos = new EditLogFileOutputStream(file, 0);
|
||||||
|
elfos.create();
|
||||||
|
|
||||||
|
elts.addTransactionsToLog(elfos, cache);
|
||||||
|
elfos.setReadyToFlush();
|
||||||
|
elfos.flushAndSync();
|
||||||
|
elfos.close();
|
||||||
|
elfos = null;
|
||||||
|
file = new File(TEST_LOG_NAME);
|
||||||
|
elfis = new EditLogFileInputStream(file);
|
||||||
|
|
||||||
|
// reading through normally will get you an exception
|
||||||
|
Set<Long> validTxIds = elts.getValidTxIds();
|
||||||
|
FSEditLogOp op = null;
|
||||||
|
long prevTxId = 0;
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
op = elfis.nextOp();
|
||||||
|
if (op == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
LOG.debug("read txid " + op.txid);
|
||||||
|
if (!validTxIds.contains(op.getTransactionId())) {
|
||||||
|
fail("read txid " + op.getTransactionId() +
|
||||||
|
", which we did not expect to find.");
|
||||||
|
}
|
||||||
|
validTxIds.remove(op.getTransactionId());
|
||||||
|
prevTxId = op.getTransactionId();
|
||||||
|
}
|
||||||
|
if (elts.getLastValidTxId() != -1) {
|
||||||
|
fail("failed to throw IoException as expected");
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (elts.getLastValidTxId() == -1) {
|
||||||
|
fail("expected all transactions to be valid, but got exception " +
|
||||||
|
"on txid " + prevTxId);
|
||||||
|
} else {
|
||||||
|
assertEquals(prevTxId, elts.getLastValidTxId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (elts.getLastValidTxId() != -1) {
|
||||||
|
// let's skip over the bad transaction
|
||||||
|
op = null;
|
||||||
|
prevTxId = 0;
|
||||||
|
try {
|
||||||
|
while (true) {
|
||||||
|
op = elfis.nextValidOp();
|
||||||
|
if (op == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
prevTxId = op.getTransactionId();
|
||||||
|
assertTrue(validTxIds.remove(op.getTransactionId()));
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
fail("caught IOException while trying to skip over bad " +
|
||||||
|
"transaction. message was " + e.getMessage() +
|
||||||
|
"\nstack trace\n" + StringUtils.stringifyException(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// We should have read every valid transaction.
|
||||||
|
assertTrue(validTxIds.isEmpty());
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanup(LOG, elfos, elfis);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private interface EditLogTestSetup {
|
||||||
|
/**
|
||||||
|
* Set up the edit log.
|
||||||
|
*/
|
||||||
|
abstract public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the transaction ID right before the transaction which causes the
|
||||||
|
* normal edit log loading process to bail out-- or -1 if the first
|
||||||
|
* transaction should be bad.
|
||||||
|
*/
|
||||||
|
abstract public long getLastValidTxId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the transaction IDs which should exist and be valid in this
|
||||||
|
* edit log.
|
||||||
|
**/
|
||||||
|
abstract public Set<Long> getValidTxIds();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class EltsTestEmptyLog implements EditLogTestSetup {
|
||||||
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastValidTxId() {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<Long> getValidTxIds() {
|
||||||
|
return new HashSet<Long>();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test an empty edit log */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testEmptyLog() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestEmptyLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
private class EltsTestGarbageInEditLog implements EditLogTestSetup {
|
||||||
|
final private long BAD_TXID = 4;
|
||||||
|
final private long MAX_TXID = 10;
|
||||||
|
|
||||||
|
public void addTransactionsToLog(EditLogOutputStream elos,
|
||||||
|
OpInstanceCache cache) throws IOException {
|
||||||
|
for (long txid = 1; txid <= MAX_TXID; txid++) {
|
||||||
|
if (txid == BAD_TXID) {
|
||||||
|
byte garbage[] = { 0x1, 0x2, 0x3 };
|
||||||
|
elos.writeRaw(garbage, 0, garbage.length);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
DeleteOp op;
|
||||||
|
op = DeleteOp.getInstance(cache);
|
||||||
|
op.setTransactionId(txid);
|
||||||
|
op.setPath("/foo." + txid);
|
||||||
|
op.setTimestamp(txid);
|
||||||
|
elos.write(op);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastValidTxId() {
|
||||||
|
return BAD_TXID - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<Long> getValidTxIds() {
|
||||||
|
return Sets.newHashSet(1L , 2L, 3L, 5L, 6L, 7L, 8L, 9L, 10L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test that we can successfully recover from a situation where there is
|
||||||
|
* garbage in the middle of the edit log file output stream. */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testSkipEdit() throws IOException {
|
||||||
|
runEditLogTest(new EltsTestGarbageInEditLog());
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Test that we can successfully recover from a situation where the last
|
||||||
|
* entry in the edit log has been truncated. */
|
||||||
|
@Test(timeout=180000)
|
||||||
|
public void testRecoverTruncatedEditLog() throws IOException {
|
||||||
|
final String TEST_PATH = "/test/path/dir";
|
||||||
|
final int NUM_TEST_MKDIRS = 10;
|
||||||
|
|
||||||
|
// start a cluster
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
FileSystem fileSys = null;
|
||||||
|
StorageDirectory sd = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
cluster.waitActive();
|
||||||
|
fileSys = cluster.getFileSystem();
|
||||||
|
final FSNamesystem namesystem = cluster.getNamesystem();
|
||||||
|
FSImage fsimage = namesystem.getFSImage();
|
||||||
|
for (int i = 0; i < NUM_TEST_MKDIRS; i++) {
|
||||||
|
fileSys.mkdirs(new Path(TEST_PATH));
|
||||||
|
}
|
||||||
|
sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile();
|
||||||
|
assertTrue("Should exist: " + editFile, editFile.exists());
|
||||||
|
|
||||||
|
// Corrupt the last edit
|
||||||
|
long fileLen = editFile.length();
|
||||||
|
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
|
||||||
|
rwf.setLength(fileLen - 1);
|
||||||
|
rwf.close();
|
||||||
|
|
||||||
|
// Make sure that we can't start the cluster normally before recovery
|
||||||
|
cluster = null;
|
||||||
|
try {
|
||||||
|
LOG.debug("trying to start normally (this should fail)...");
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||||
|
.format(false).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.shutdown();
|
||||||
|
fail("expected the truncated edit log to prevent normal startup");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// success
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform recovery
|
||||||
|
cluster = null;
|
||||||
|
try {
|
||||||
|
LOG.debug("running recovery...");
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||||
|
.format(false).startupOption(recoverStartOpt).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("caught IOException while trying to recover. " +
|
||||||
|
"message was " + e.getMessage() +
|
||||||
|
"\nstack trace\n" + StringUtils.stringifyException(e));
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure that we can start the cluster normally after recovery
|
||||||
|
cluster = null;
|
||||||
|
try {
|
||||||
|
LOG.debug("starting cluster normally after recovery...");
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
|
||||||
|
.format(false).build();
|
||||||
|
LOG.debug("testRecoverTruncatedEditLog: successfully recovered the " +
|
||||||
|
"truncated edit log");
|
||||||
|
assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("failed to recover. Error message: " + e.getMessage());
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -143,9 +143,9 @@ public class TestSecurityTokenEditLog extends TestCase {
|
||||||
File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 1 + expectedTransactions - 1);
|
File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 1 + expectedTransactions - 1);
|
||||||
System.out.println("Verifying file: " + editFile);
|
System.out.println("Verifying file: " + editFile);
|
||||||
|
|
||||||
FSEditLogLoader loader = new FSEditLogLoader(namesystem);
|
FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
|
||||||
long numEdits = loader.loadFSEdits(
|
long numEdits = loader.loadFSEdits(
|
||||||
new EditLogFileInputStream(editFile), 1);
|
new EditLogFileInputStream(editFile), 1, null);
|
||||||
assertEquals("Verification for " + editFile, expectedTransactions, numEdits);
|
assertEquals("Verification for " + editFile, expectedTransactions, numEdits);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Reference in New Issue