HDFS-3004. svn merge -c 1311394 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1311401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2012-04-09 19:47:08 +00:00
parent 75f105d0b4
commit 70142209b9
29 changed files with 1006 additions and 389 deletions

View File

@ -77,6 +77,8 @@ Release 2.0.0 - UNRELEASED
HDFS-3102. Add CLI tool to initialize the shared-edits dir. (atm) HDFS-3102. Add CLI tool to initialize the shared-edits dir. (atm)
HDFS-3004. Implement Recovery Mode. (Colin Patrick McCabe via eli)
IMPROVEMENTS IMPROVEMENTS
HDFS-2018. Move all journal stream management code into one place. HDFS-2018. Move all journal stream management code into one place.

View File

@ -264,4 +264,10 @@
<Method name="doRefreshNamenodes" /> <Method name="doRefreshNamenodes" />
<Bug category="PERFORMANCE" /> <Bug category="PERFORMANCE" />
</Match> </Match>
<!-- Don't complain about System.exit() being called from quit() -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext" />
<Method name="quit" />
<Bug pattern="DM_EXIT" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -537,7 +537,32 @@
For command usage, see <a href="http://hadoop.apache.org/common/docs/current/commands_manual.html#fetchdt"><code>fetchdt</code> command</a>. For command usage, see <a href="http://hadoop.apache.org/common/docs/current/commands_manual.html#fetchdt"><code>fetchdt</code> command</a>.
</p> </p>
</section><section> <title> Upgrade and Rollback </title> </section>
<section> <title>Recovery Mode</title>
<p>Typically, you will configure multiple metadata storage locations.
Then, if one storage location is corrupt, you can read the
metadata from one of the other storage locations.</p>
<p>However, what can you do if the only storage locations available are
corrupt? In this case, there is a special NameNode startup mode called
Recovery mode that may allow you to recover most of your data.</p>
<p>You can start the NameNode in recovery mode like so:
<code>namenode -recover</code></p>
<p>When in recovery mode, the NameNode will interactively prompt you at
the command line about possible courses of action you can take to
recover your data.</p>
<p>If you don't want to be prompted, you can give the
<code>-force</code> option. This option will force
recovery mode to always select the first choice. Normally, this
will be the most reasonable choice.</p>
<p>Because Recovery mode can cause you to lose data, you should always
back up your edit log and fsimage before using it.</p>
</section>
<section> <title> Upgrade and Rollback </title>
<p> <p>
When Hadoop is upgraded on an existing cluster, as with any When Hadoop is upgraded on an existing cluster, as with any
software upgrade, it is possible there are new bugs or software upgrade, it is possible there are new bugs or

View File

@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.MetaRecoveryContext;
/************************************ /************************************
* Some handy internal HDFS constants * Some handy internal HDFS constants
@ -54,13 +55,18 @@ public final class HdfsServerConstants {
FINALIZE("-finalize"), FINALIZE("-finalize"),
IMPORT ("-importCheckpoint"), IMPORT ("-importCheckpoint"),
BOOTSTRAPSTANDBY("-bootstrapStandby"), BOOTSTRAPSTANDBY("-bootstrapStandby"),
INITIALIZESHAREDEDITS("-initializeSharedEdits"); INITIALIZESHAREDEDITS("-initializeSharedEdits"),
RECOVER ("-recover"),
FORCE("-force");
private String name = null; private String name = null;
// Used only with format and upgrade options // Used only with format and upgrade options
private String clusterId = null; private String clusterId = null;
// Used only with recovery option
private int force = 0;
private StartupOption(String arg) {this.name = arg;} private StartupOption(String arg) {this.name = arg;}
public String getName() {return name;} public String getName() {return name;}
public NamenodeRole toNodeRole() { public NamenodeRole toNodeRole() {
@ -77,10 +83,24 @@ public final class HdfsServerConstants {
public void setClusterId(String cid) { public void setClusterId(String cid) {
clusterId = cid; clusterId = cid;
} }
public String getClusterId() { public String getClusterId() {
return clusterId; return clusterId;
} }
public MetaRecoveryContext createRecoveryContext() {
if (!name.equals(RECOVER.name))
return null;
return new MetaRecoveryContext(force);
}
public void setForce(int force) {
this.force = force;
}
public int getForce() {
return this.force;
}
} }
// Timeouts for communicating with DataNode for streaming writes/reads // Timeouts for communicating with DataNode for streaming writes/reads

View File

@ -215,19 +215,21 @@ public class BackupImage extends FSImage {
LOG.debug("data:" + StringUtils.byteToHexString(data)); LOG.debug("data:" + StringUtils.byteToHexString(data));
} }
FSEditLogLoader logLoader = new FSEditLogLoader(namesystem); FSEditLogLoader logLoader =
new FSEditLogLoader(namesystem, lastAppliedTxId);
int logVersion = storage.getLayoutVersion(); int logVersion = storage.getLayoutVersion();
backupInputStream.setBytes(data, logVersion); backupInputStream.setBytes(data, logVersion);
long numLoaded = logLoader.loadEditRecords(logVersion, backupInputStream, long numTxnsAdvanced = logLoader.loadEditRecords(logVersion,
true, lastAppliedTxId + 1); backupInputStream, true, lastAppliedTxId + 1, null);
if (numLoaded != numTxns) { if (numTxnsAdvanced != numTxns) {
throw new IOException("Batch of txns starting at txnid " + throw new IOException("Batch of txns starting at txnid " +
firstTxId + " was supposed to contain " + numTxns + firstTxId + " was supposed to contain " + numTxns +
" transactions but only was able to apply " + numLoaded); " transactions, but we were only able to advance by " +
numTxnsAdvanced);
} }
lastAppliedTxId += numTxns; lastAppliedTxId = logLoader.getLastAppliedTxId();
namesystem.dir.updateCountForINodeWithQuota(); // inefficient! namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
} finally { } finally {
backupInputStream.clear(); backupInputStream.clear();
@ -277,7 +279,7 @@ public class BackupImage extends FSImage {
editStreams.add(s); editStreams.add(s);
} }
} }
loadEdits(editStreams, namesystem); loadEdits(editStreams, namesystem, null);
} }
// now, need to load the in-progress file // now, need to load the in-progress file
@ -311,12 +313,11 @@ public class BackupImage extends FSImage {
LOG.info("Going to finish converging with remaining " + remainingTxns LOG.info("Going to finish converging with remaining " + remainingTxns
+ " txns from in-progress stream " + stream); + " txns from in-progress stream " + stream);
FSEditLogLoader loader = new FSEditLogLoader(namesystem); FSEditLogLoader loader =
long numLoaded = loader.loadFSEdits(stream, lastAppliedTxId + 1); new FSEditLogLoader(namesystem, lastAppliedTxId);
lastAppliedTxId += numLoaded; loader.loadFSEdits(stream, lastAppliedTxId + 1, null);
assert numLoaded == remainingTxns : lastAppliedTxId = loader.getLastAppliedTxId();
"expected to load " + remainingTxns + " but loaded " + assert lastAppliedTxId == getEditLog().getLastWrittenTxId();
numLoaded + " from " + stream;
} finally { } finally {
FSEditLog.closeAllStreams(editStreams); FSEditLog.closeAllStreams(editStreams);
} }

View File

@ -292,6 +292,6 @@ class Checkpointer extends Daemon {
} }
LOG.info("Checkpointer about to load edits from " + LOG.info("Checkpointer about to load edits from " +
editsStreams.size() + " stream(s)."); editsStreams.size() + " stream(s).");
dstImage.loadEdits(editsStreams, dstNamesystem); dstImage.loadEdits(editsStreams, dstNamesystem, null);
} }
} }

View File

@ -70,21 +70,25 @@ class EditLogBackupInputStream extends EditLogInputStream {
reader = null; reader = null;
} }
@Override // JournalStream @Override
public String getName() { public String getName() {
return address; return address;
} }
@Override // JournalStream @Override
public JournalType getType() { protected FSEditLogOp nextOp() throws IOException {
return JournalType.BACKUP; Preconditions.checkState(reader != null,
"Must call setBytes() before readOp()");
return reader.readOp(false);
} }
@Override @Override
public FSEditLogOp readOp() throws IOException { protected FSEditLogOp nextValidOp() {
Preconditions.checkState(reader != null, try {
"Must call setBytes() before readOp()"); return reader.readOp(true);
return reader.readOp(); } catch (IOException e) {
throw new RuntimeException("got unexpected IOException " + e, e);
}
} }
@Override @Override

View File

@ -91,24 +91,6 @@ public class EditLogFileInputStream extends EditLogInputStream {
this.isInProgress = isInProgress; this.isInProgress = isInProgress;
} }
/**
* Skip over a number of transactions. Subsequent calls to
* {@link EditLogFileInputStream#readOp()} will begin after these skipped
* transactions. If more transactions are requested to be skipped than remain
* in the edit log, all edit log ops in the log will be skipped and subsequent
* calls to {@link EditLogInputStream#readOp} will return null.
*
* @param transactionsToSkip number of transactions to skip over.
* @throws IOException if there's an error while reading an operation
*/
public void skipTransactions(long transactionsToSkip) throws IOException {
assert firstTxId != HdfsConstants.INVALID_TXID &&
lastTxId != HdfsConstants.INVALID_TXID;
for (long i = 0; i < transactionsToSkip; i++) {
reader.readOp();
}
}
@Override @Override
public long getFirstTxId() throws IOException { public long getFirstTxId() throws IOException {
return firstTxId; return firstTxId;
@ -119,19 +101,23 @@ public class EditLogFileInputStream extends EditLogInputStream {
return lastTxId; return lastTxId;
} }
@Override // JournalStream @Override
public String getName() { public String getName() {
return file.getPath(); return file.getPath();
} }
@Override // JournalStream
public JournalType getType() {
return JournalType.FILE;
}
@Override @Override
public FSEditLogOp readOp() throws IOException { protected FSEditLogOp nextOp() throws IOException {
return reader.readOp(); return reader.readOp(false);
}
@Override
protected FSEditLogOp nextValidOp() {
try {
return reader.readOp(true);
} catch (IOException e) {
return null;
}
} }
@Override @Override

View File

@ -34,7 +34,14 @@ import org.apache.hadoop.classification.InterfaceStability;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class EditLogInputStream implements JournalStream, Closeable { public abstract class EditLogInputStream implements Closeable {
private FSEditLogOp cachedOp = null;
/**
* @return the name of the EditLogInputStream
*/
public abstract String getName();
/** /**
* @return the first transaction which will be found in this stream * @return the first transaction which will be found in this stream
*/ */
@ -57,8 +64,81 @@ public abstract class EditLogInputStream implements JournalStream, Closeable {
* @return an operation from the stream or null if at end of stream * @return an operation from the stream or null if at end of stream
* @throws IOException if there is an error reading from the stream * @throws IOException if there is an error reading from the stream
*/ */
public abstract FSEditLogOp readOp() throws IOException; public FSEditLogOp readOp() throws IOException {
FSEditLogOp ret;
if (cachedOp != null) {
ret = cachedOp;
cachedOp = null;
return ret;
}
return nextOp();
}
/**
* Position the stream so that a valid operation can be read from it with
* readOp().
*
* This method can be used to skip over corrupted sections of edit logs.
*/
public void resync() throws IOException {
if (cachedOp != null) {
return;
}
cachedOp = nextValidOp();
}
/**
* Get the next operation from the stream storage.
*
* @return an operation from the stream or null if at end of stream
* @throws IOException if there is an error reading from the stream
*/
protected abstract FSEditLogOp nextOp() throws IOException;
/**
* Get the next valid operation from the stream storage.
*
* This is exactly like nextOp, except that we attempt to skip over damaged
* parts of the edit log
*
* @return an operation from the stream or null if at end of stream
*/
protected FSEditLogOp nextValidOp() {
// This is a trivial implementation which just assumes that any errors mean
// that there is nothing more of value in the log. Subclasses that support
// error recovery will want to override this.
try {
return nextOp();
} catch (IOException e) {
return null;
}
}
/**
* Skip edit log operations up to a given transaction ID, or until the
* end of the edit log is reached.
*
* After this function returns, the next call to readOp will return either
* end-of-file (null) or a transaction with a txid equal to or higher than
* the one we asked for.
*
* @param txid The transaction ID to read up until.
* @return Returns true if we found a transaction ID greater than
* or equal to 'txid' in the log.
*/
public boolean skipUntil(long txid) throws IOException {
while (true) {
FSEditLogOp op = readOp();
if (op == null) {
return false;
}
if (op.getTransactionId() >= txid) {
cachedOp = op;
return true;
}
}
}
/** /**
* Get the layout version of the data in the stream. * Get the layout version of the data in the stream.
* @return the layout version of the ops in the stream. * @return the layout version of the ops in the stream.

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.io.Closeable;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
@ -30,7 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class EditLogOutputStream { public abstract class EditLogOutputStream implements Closeable {
// these are statistics counters // these are statistics counters
private long numSync; // number of sync(s) to disk private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync private long totalTimeSync; // total time to sync

View File

@ -128,6 +128,14 @@ public class FSEditLog {
private Configuration conf; private Configuration conf;
private List<URI> editsDirs; private List<URI> editsDirs;
private ThreadLocal<OpInstanceCache> cache =
new ThreadLocal<OpInstanceCache>() {
@Override
protected OpInstanceCache initialValue() {
return new OpInstanceCache();
}
};
/** /**
* The edit directories that are shared between primary and secondary. * The edit directories that are shared between primary and secondary.
@ -597,7 +605,7 @@ public class FSEditLog {
* Records the block locations of the last block. * Records the block locations of the last block.
*/ */
public void logOpenFile(String path, INodeFileUnderConstruction newNode) { public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
AddOp op = AddOp.getInstance() AddOp op = AddOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getReplication()) .setReplication(newNode.getReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
@ -615,7 +623,7 @@ public class FSEditLog {
* Add close lease record to edit log. * Add close lease record to edit log.
*/ */
public void logCloseFile(String path, INodeFile newNode) { public void logCloseFile(String path, INodeFile newNode) {
CloseOp op = CloseOp.getInstance() CloseOp op = CloseOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setReplication(newNode.getReplication()) .setReplication(newNode.getReplication())
.setModificationTime(newNode.getModificationTime()) .setModificationTime(newNode.getModificationTime())
@ -628,7 +636,7 @@ public class FSEditLog {
} }
public void logUpdateBlocks(String path, INodeFileUnderConstruction file) { public void logUpdateBlocks(String path, INodeFileUnderConstruction file) {
UpdateBlocksOp op = UpdateBlocksOp.getInstance() UpdateBlocksOp op = UpdateBlocksOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setBlocks(file.getBlocks()); .setBlocks(file.getBlocks());
logEdit(op); logEdit(op);
@ -638,7 +646,7 @@ public class FSEditLog {
* Add create directory record to edit log * Add create directory record to edit log
*/ */
public void logMkDir(String path, INode newNode) { public void logMkDir(String path, INode newNode) {
MkdirOp op = MkdirOp.getInstance() MkdirOp op = MkdirOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setTimestamp(newNode.getModificationTime()) .setTimestamp(newNode.getModificationTime())
.setPermissionStatus(newNode.getPermissionStatus()); .setPermissionStatus(newNode.getPermissionStatus());
@ -650,7 +658,7 @@ public class FSEditLog {
* TODO: use String parameters until just before writing to disk * TODO: use String parameters until just before writing to disk
*/ */
void logRename(String src, String dst, long timestamp) { void logRename(String src, String dst, long timestamp) {
RenameOldOp op = RenameOldOp.getInstance() RenameOldOp op = RenameOldOp.getInstance(cache.get())
.setSource(src) .setSource(src)
.setDestination(dst) .setDestination(dst)
.setTimestamp(timestamp); .setTimestamp(timestamp);
@ -661,7 +669,7 @@ public class FSEditLog {
* Add rename record to edit log * Add rename record to edit log
*/ */
void logRename(String src, String dst, long timestamp, Options.Rename... options) { void logRename(String src, String dst, long timestamp, Options.Rename... options) {
RenameOp op = RenameOp.getInstance() RenameOp op = RenameOp.getInstance(cache.get())
.setSource(src) .setSource(src)
.setDestination(dst) .setDestination(dst)
.setTimestamp(timestamp) .setTimestamp(timestamp)
@ -673,7 +681,7 @@ public class FSEditLog {
* Add set replication record to edit log * Add set replication record to edit log
*/ */
void logSetReplication(String src, short replication) { void logSetReplication(String src, short replication) {
SetReplicationOp op = SetReplicationOp.getInstance() SetReplicationOp op = SetReplicationOp.getInstance(cache.get())
.setPath(src) .setPath(src)
.setReplication(replication); .setReplication(replication);
logEdit(op); logEdit(op);
@ -685,7 +693,7 @@ public class FSEditLog {
* @param quota the directory size limit * @param quota the directory size limit
*/ */
void logSetQuota(String src, long nsQuota, long dsQuota) { void logSetQuota(String src, long nsQuota, long dsQuota) {
SetQuotaOp op = SetQuotaOp.getInstance() SetQuotaOp op = SetQuotaOp.getInstance(cache.get())
.setSource(src) .setSource(src)
.setNSQuota(nsQuota) .setNSQuota(nsQuota)
.setDSQuota(dsQuota); .setDSQuota(dsQuota);
@ -694,7 +702,7 @@ public class FSEditLog {
/** Add set permissions record to edit log */ /** Add set permissions record to edit log */
void logSetPermissions(String src, FsPermission permissions) { void logSetPermissions(String src, FsPermission permissions) {
SetPermissionsOp op = SetPermissionsOp.getInstance() SetPermissionsOp op = SetPermissionsOp.getInstance(cache.get())
.setSource(src) .setSource(src)
.setPermissions(permissions); .setPermissions(permissions);
logEdit(op); logEdit(op);
@ -702,7 +710,7 @@ public class FSEditLog {
/** Add set owner record to edit log */ /** Add set owner record to edit log */
void logSetOwner(String src, String username, String groupname) { void logSetOwner(String src, String username, String groupname) {
SetOwnerOp op = SetOwnerOp.getInstance() SetOwnerOp op = SetOwnerOp.getInstance(cache.get())
.setSource(src) .setSource(src)
.setUser(username) .setUser(username)
.setGroup(groupname); .setGroup(groupname);
@ -713,7 +721,7 @@ public class FSEditLog {
* concat(trg,src..) log * concat(trg,src..) log
*/ */
void logConcat(String trg, String [] srcs, long timestamp) { void logConcat(String trg, String [] srcs, long timestamp) {
ConcatDeleteOp op = ConcatDeleteOp.getInstance() ConcatDeleteOp op = ConcatDeleteOp.getInstance(cache.get())
.setTarget(trg) .setTarget(trg)
.setSources(srcs) .setSources(srcs)
.setTimestamp(timestamp); .setTimestamp(timestamp);
@ -724,7 +732,7 @@ public class FSEditLog {
* Add delete file record to edit log * Add delete file record to edit log
*/ */
void logDelete(String src, long timestamp) { void logDelete(String src, long timestamp) {
DeleteOp op = DeleteOp.getInstance() DeleteOp op = DeleteOp.getInstance(cache.get())
.setPath(src) .setPath(src)
.setTimestamp(timestamp); .setTimestamp(timestamp);
logEdit(op); logEdit(op);
@ -734,7 +742,7 @@ public class FSEditLog {
* Add generation stamp record to edit log * Add generation stamp record to edit log
*/ */
void logGenerationStamp(long genstamp) { void logGenerationStamp(long genstamp) {
SetGenstampOp op = SetGenstampOp.getInstance() SetGenstampOp op = SetGenstampOp.getInstance(cache.get())
.setGenerationStamp(genstamp); .setGenerationStamp(genstamp);
logEdit(op); logEdit(op);
} }
@ -743,7 +751,7 @@ public class FSEditLog {
* Add access time record to edit log * Add access time record to edit log
*/ */
void logTimes(String src, long mtime, long atime) { void logTimes(String src, long mtime, long atime) {
TimesOp op = TimesOp.getInstance() TimesOp op = TimesOp.getInstance(cache.get())
.setPath(src) .setPath(src)
.setModificationTime(mtime) .setModificationTime(mtime)
.setAccessTime(atime); .setAccessTime(atime);
@ -755,7 +763,7 @@ public class FSEditLog {
*/ */
void logSymlink(String path, String value, long mtime, void logSymlink(String path, String value, long mtime,
long atime, INodeSymlink node) { long atime, INodeSymlink node) {
SymlinkOp op = SymlinkOp.getInstance() SymlinkOp op = SymlinkOp.getInstance(cache.get())
.setPath(path) .setPath(path)
.setValue(value) .setValue(value)
.setModificationTime(mtime) .setModificationTime(mtime)
@ -771,7 +779,7 @@ public class FSEditLog {
*/ */
void logGetDelegationToken(DelegationTokenIdentifier id, void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) { long expiryTime) {
GetDelegationTokenOp op = GetDelegationTokenOp.getInstance() GetDelegationTokenOp op = GetDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id) .setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime); .setExpiryTime(expiryTime);
logEdit(op); logEdit(op);
@ -779,26 +787,26 @@ public class FSEditLog {
void logRenewDelegationToken(DelegationTokenIdentifier id, void logRenewDelegationToken(DelegationTokenIdentifier id,
long expiryTime) { long expiryTime) {
RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance() RenewDelegationTokenOp op = RenewDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id) .setDelegationTokenIdentifier(id)
.setExpiryTime(expiryTime); .setExpiryTime(expiryTime);
logEdit(op); logEdit(op);
} }
void logCancelDelegationToken(DelegationTokenIdentifier id) { void logCancelDelegationToken(DelegationTokenIdentifier id) {
CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance() CancelDelegationTokenOp op = CancelDelegationTokenOp.getInstance(cache.get())
.setDelegationTokenIdentifier(id); .setDelegationTokenIdentifier(id);
logEdit(op); logEdit(op);
} }
void logUpdateMasterKey(DelegationKey key) { void logUpdateMasterKey(DelegationKey key) {
UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance() UpdateMasterKeyOp op = UpdateMasterKeyOp.getInstance(cache.get())
.setDelegationKey(key); .setDelegationKey(key);
logEdit(op); logEdit(op);
} }
void logReassignLease(String leaseHolder, String src, String newHolder) { void logReassignLease(String leaseHolder, String src, String newHolder) {
ReassignLeaseOp op = ReassignLeaseOp.getInstance() ReassignLeaseOp op = ReassignLeaseOp.getInstance(cache.get())
.setLeaseHolder(leaseHolder) .setLeaseHolder(leaseHolder)
.setPath(src) .setPath(src)
.setNewHolder(newHolder); .setNewHolder(newHolder);
@ -897,7 +905,7 @@ public class FSEditLog {
state = State.IN_SEGMENT; state = State.IN_SEGMENT;
if (writeHeaderTxn) { if (writeHeaderTxn) {
logEdit(LogSegmentOp.getInstance( logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_START_LOG_SEGMENT)); FSEditLogOpCodes.OP_START_LOG_SEGMENT));
logSync(); logSync();
} }
@ -913,7 +921,7 @@ public class FSEditLog {
"Bad state: %s", state); "Bad state: %s", state);
if (writeEndTxn) { if (writeEndTxn) {
logEdit(LogSegmentOp.getInstance( logEdit(LogSegmentOp.getInstance(cache.get(),
FSEditLogOpCodes.OP_END_LOG_SEGMENT)); FSEditLogOpCodes.OP_END_LOG_SEGMENT));
logSync(); logSync();
} }

View File

@ -73,9 +73,11 @@ public class FSEditLogLoader {
static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName()); static final Log LOG = LogFactory.getLog(FSEditLogLoader.class.getName());
static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec static long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
private final FSNamesystem fsNamesys; private final FSNamesystem fsNamesys;
private long lastAppliedTxId;
public FSEditLogLoader(FSNamesystem fsNamesys) {
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
this.fsNamesys = fsNamesys; this.fsNamesys = fsNamesys;
this.lastAppliedTxId = lastAppliedTxId;
} }
/** /**
@ -83,32 +85,29 @@ public class FSEditLogLoader {
* This is where we apply edits that we've been writing to disk all * This is where we apply edits that we've been writing to disk all
* along. * along.
*/ */
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId) long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
throws IOException { MetaRecoveryContext recovery) throws IOException {
long numEdits = 0;
int logVersion = edits.getVersion(); int logVersion = edits.getVersion();
fsNamesys.writeLock(); fsNamesys.writeLock();
try { try {
long startTime = now(); long startTime = now();
numEdits = loadEditRecords(logVersion, edits, false, long numEdits = loadEditRecords(logVersion, edits, false,
expectedStartingTxId); expectedStartingTxId, recovery);
FSImage.LOG.info("Edits file " + edits.getName() FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits + " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds."); + " loaded in " + (now()-startTime)/1000 + " seconds.");
return numEdits;
} finally { } finally {
edits.close(); edits.close();
fsNamesys.writeUnlock(); fsNamesys.writeUnlock();
} }
return numEdits;
} }
long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit, long loadEditRecords(int logVersion, EditLogInputStream in, boolean closeOnExit,
long expectedStartingTxId) long expectedStartingTxId, MetaRecoveryContext recovery)
throws IOException, EditLogInputException { throws IOException {
FSDirectory fsDir = fsNamesys.dir; FSDirectory fsDir = fsNamesys.dir;
long numEdits = 0;
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts = EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts =
new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class); new EnumMap<FSEditLogOpCodes, Holder<Integer>>(FSEditLogOpCodes.class);
@ -122,72 +121,99 @@ public class FSEditLogLoader {
long recentOpcodeOffsets[] = new long[4]; long recentOpcodeOffsets[] = new long[4];
Arrays.fill(recentOpcodeOffsets, -1); Arrays.fill(recentOpcodeOffsets, -1);
long txId = expectedStartingTxId - 1; long expectedTxId = expectedStartingTxId;
long numEdits = 0;
long lastTxId = in.getLastTxId(); long lastTxId = in.getLastTxId();
long numTxns = (lastTxId - expectedStartingTxId) + 1; long numTxns = (lastTxId - expectedStartingTxId) + 1;
long lastLogTime = now(); long lastLogTime = now();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("edit log length: " + in.length() + ", start txid: " LOG.debug("edit log length: " + in.length() + ", start txid: "
+ expectedStartingTxId + ", last txid: " + lastTxId); + expectedStartingTxId + ", last txid: " + lastTxId);
} }
try { try {
try { while (true) {
while (true) { try {
FSEditLogOp op; FSEditLogOp op;
try { try {
if ((op = in.readOp()) == null) { op = in.readOp();
if (op == null) {
break; break;
} }
} catch (IOException ioe) { } catch (Throwable e) {
long badTxId = txId + 1; // because txId hasn't been incremented yet // Handle a problem with our input
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, badTxId); check203UpgradeFailure(logVersion, e);
String errorMessage =
formatEditLogReplayError(in, recentOpcodeOffsets, expectedTxId);
FSImage.LOG.error(errorMessage); FSImage.LOG.error(errorMessage);
throw new EditLogInputException(errorMessage, if (recovery == null) {
ioe, numEdits); // We will only try to skip over problematic opcodes when in
// recovery mode.
throw new EditLogInputException(errorMessage, e, numEdits);
}
MetaRecoveryContext.editLogLoaderPrompt(
"We failed to read txId " + expectedTxId,
recovery, "skipping the bad section in the log");
in.resync();
continue;
} }
recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] = recentOpcodeOffsets[(int)(numEdits % recentOpcodeOffsets.length)] =
in.getPosition(); in.getPosition();
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
long expectedTxId = txId + 1; if (op.getTransactionId() > expectedTxId) {
txId = op.txid; MetaRecoveryContext.editLogLoaderPrompt("There appears " +
if (txId != expectedTxId) { "to be a gap in the edit log. We expected txid " +
throw new IOException("Expected transaction ID " + expectedTxId + ", but got txid " +
expectedTxId + " but got " + txId); op.getTransactionId() + ".", recovery, "ignoring missing " +
" transaction IDs");
} else if (op.getTransactionId() < expectedTxId) {
MetaRecoveryContext.editLogLoaderPrompt("There appears " +
"to be an out-of-order edit in the edit log. We " +
"expected txid " + expectedTxId + ", but got txid " +
op.getTransactionId() + ".", recovery,
"skipping the out-of-order edit");
continue;
} }
} }
incrOpCount(op.opCode, opCounts);
try { try {
applyEditLogOp(op, fsDir, logVersion); applyEditLogOp(op, fsDir, logVersion);
} catch (Throwable t) { } catch (Throwable e) {
// Catch Throwable because in the case of a truly corrupt edits log, any LOG.error("Encountered exception on operation " + op, e);
// sort of error might be thrown (NumberFormat, NullPointer, EOF, etc.) MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
String errorMessage = formatEditLogReplayError(in, recentOpcodeOffsets, txId); "apply edit log operation " + op + ": error " +
FSImage.LOG.error(errorMessage); e.getMessage(), recovery, "applying edits");
throw new IOException(errorMessage, t); }
// Now that the operation has been successfully decoded and
// applied, update our bookkeeping.
incrOpCount(op.opCode, opCounts);
if (op.hasTransactionId()) {
lastAppliedTxId = op.getTransactionId();
expectedTxId = lastAppliedTxId + 1;
} else {
expectedTxId = lastAppliedTxId = expectedStartingTxId;
} }
// log progress // log progress
if (now() - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) { if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
int percent = Math.round((float) txId / numTxns * 100); long now = now();
LOG.info("replaying edit log: " + txId + "/" + numTxns if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
+ " transactions completed. (" + percent + "%)"); int percent = Math.round((float)lastAppliedTxId / numTxns * 100);
lastLogTime = now(); LOG.info("replaying edit log: " + lastAppliedTxId + "/" + numTxns
+ " transactions completed. (" + percent + "%)");
lastLogTime = now;
}
} }
numEdits++; numEdits++;
} catch (MetaRecoveryContext.RequestStopException e) {
MetaRecoveryContext.LOG.warn("Stopped reading edit log at " +
in.getPosition() + "/" + in.length());
break;
} }
} catch (IOException ex) {
check203UpgradeFailure(logVersion, ex);
} finally {
if(closeOnExit)
in.close();
} }
} finally { } finally {
if(closeOnExit) {
in.close();
}
fsDir.writeUnlock(); fsDir.writeUnlock();
fsNamesys.writeUnlock(); fsNamesys.writeUnlock();
@ -474,7 +500,7 @@ public class FSEditLogLoader {
long recentOpcodeOffsets[], long txid) { long recentOpcodeOffsets[], long txid) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("Error replaying edit log at offset " + in.getPosition()); sb.append("Error replaying edit log at offset " + in.getPosition());
sb.append(" on transaction ID ").append(txid); sb.append(". Expected transaction ID was ").append(txid);
if (recentOpcodeOffsets[0] != -1) { if (recentOpcodeOffsets[0] != -1) {
Arrays.sort(recentOpcodeOffsets); Arrays.sort(recentOpcodeOffsets);
sb.append("\nRecent opcode offsets:"); sb.append("\nRecent opcode offsets:");
@ -521,7 +547,7 @@ public class FSEditLogLoader {
if (oldBlock.getBlockId() != newBlock.getBlockId() || if (oldBlock.getBlockId() != newBlock.getBlockId() ||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() && (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
!(isGenStampUpdate && isLastBlock))) { !(isGenStampUpdate && isLastBlock))) {
throw new IOException("Mismatched block IDs or generation stamps, " + throw new IOException("Mismatched block IDs or generation stamps, " +
"attempting to replace block " + oldBlock + " with " + newBlock + "attempting to replace block " + oldBlock + " with " + newBlock +
" as block # " + i + "/" + newBlocks.length + " of " + " as block # " + i + "/" + newBlocks.length + " of " +
path); path);
@ -607,7 +633,7 @@ public class FSEditLogLoader {
* Throw appropriate exception during upgrade from 203, when editlog loading * Throw appropriate exception during upgrade from 203, when editlog loading
* could fail due to opcode conflicts. * could fail due to opcode conflicts.
*/ */
private void check203UpgradeFailure(int logVersion, IOException ex) private void check203UpgradeFailure(int logVersion, Throwable e)
throws IOException { throws IOException {
// 0.20.203 version version has conflicting opcodes with the later releases. // 0.20.203 version version has conflicting opcodes with the later releases.
// The editlog must be emptied by restarting the namenode, before proceeding // The editlog must be emptied by restarting the namenode, before proceeding
@ -618,9 +644,7 @@ public class FSEditLogLoader {
+ logVersion + " from release 0.20.203. Please go back to the old " + logVersion + " from release 0.20.203. Please go back to the old "
+ " release and restart the namenode. This empties the editlog " + " release and restart the namenode. This empties the editlog "
+ " and saves the namespace. Resume the upgrade after this step."; + " and saves the namespace. Resume the upgrade after this step.";
throw new IOException(msg, ex); throw new IOException(msg, e);
} else {
throw ex;
} }
} }
@ -645,14 +669,14 @@ public class FSEditLogLoader {
break; break;
} }
if (firstTxId == HdfsConstants.INVALID_TXID) { if (firstTxId == HdfsConstants.INVALID_TXID) {
firstTxId = op.txid; firstTxId = op.getTransactionId();
} }
if (lastTxId == HdfsConstants.INVALID_TXID if (lastTxId == HdfsConstants.INVALID_TXID
|| op.txid == lastTxId + 1) { || op.getTransactionId() == lastTxId + 1) {
lastTxId = op.txid; lastTxId = op.getTransactionId();
} else { } else {
FSImage.LOG.error("Out of order txid found. Found " + op.txid FSImage.LOG.error("Out of order txid found. Found " +
+ ", expected " + (lastTxId + 1)); op.getTransactionId() + ", expected " + (lastTxId + 1));
break; break;
} }
numValid++; numValid++;
@ -745,4 +769,7 @@ public class FSEditLogLoader {
} }
} }
public long getLastAppliedTxId() {
return lastAppliedTxId;
}
} }

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.util.PureJavaCrc32; import org.apache.hadoop.util.PureJavaCrc32;
@ -54,6 +56,8 @@ import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl; import org.xml.sax.helpers.AttributesImpl;
import com.google.common.base.Preconditions;
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -74,42 +78,44 @@ public abstract class FSEditLogOp {
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
private static ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>> opInstances = final public static class OpInstanceCache {
new ThreadLocal<EnumMap<FSEditLogOpCodes, FSEditLogOp>>() { private EnumMap<FSEditLogOpCodes, FSEditLogOp> inst =
@Override new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class);
protected EnumMap<FSEditLogOpCodes, FSEditLogOp> initialValue() {
EnumMap<FSEditLogOpCodes, FSEditLogOp> instances public OpInstanceCache() {
= new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class); inst.put(OP_ADD, new AddOp());
instances.put(OP_ADD, new AddOp()); inst.put(OP_CLOSE, new CloseOp());
instances.put(OP_CLOSE, new CloseOp()); inst.put(OP_SET_REPLICATION, new SetReplicationOp());
instances.put(OP_SET_REPLICATION, new SetReplicationOp()); inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp());
instances.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); inst.put(OP_RENAME_OLD, new RenameOldOp());
instances.put(OP_RENAME_OLD, new RenameOldOp()); inst.put(OP_DELETE, new DeleteOp());
instances.put(OP_DELETE, new DeleteOp()); inst.put(OP_MKDIR, new MkdirOp());
instances.put(OP_MKDIR, new MkdirOp()); inst.put(OP_SET_GENSTAMP, new SetGenstampOp());
instances.put(OP_SET_GENSTAMP, new SetGenstampOp()); inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp());
instances.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); inst.put(OP_SET_OWNER, new SetOwnerOp());
instances.put(OP_SET_OWNER, new SetOwnerOp()); inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp());
instances.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp());
instances.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); inst.put(OP_SET_QUOTA, new SetQuotaOp());
instances.put(OP_SET_QUOTA, new SetQuotaOp()); inst.put(OP_TIMES, new TimesOp());
instances.put(OP_TIMES, new TimesOp()); inst.put(OP_SYMLINK, new SymlinkOp());
instances.put(OP_SYMLINK, new SymlinkOp()); inst.put(OP_RENAME, new RenameOp());
instances.put(OP_RENAME, new RenameOp()); inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp());
instances.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp());
instances.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp());
instances.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); inst.put(OP_CANCEL_DELEGATION_TOKEN,
instances.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp());
new CancelDelegationTokenOp()); inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
instances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); inst.put(OP_START_LOG_SEGMENT,
instances.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT));
new LogSegmentOp(OP_START_LOG_SEGMENT)); inst.put(OP_END_LOG_SEGMENT,
instances.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT));
new LogSegmentOp(OP_END_LOG_SEGMENT)); inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp());
instances.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp()); }
return instances;
} public FSEditLogOp get(FSEditLogOpCodes opcode) {
}; return inst.get(opcode);
}
}
/** /**
* Constructor for an EditLog Op. EditLog ops cannot be constructed * Constructor for an EditLog Op. EditLog ops cannot be constructed
@ -117,13 +123,22 @@ public abstract class FSEditLogOp {
*/ */
private FSEditLogOp(FSEditLogOpCodes opCode) { private FSEditLogOp(FSEditLogOpCodes opCode) {
this.opCode = opCode; this.opCode = opCode;
this.txid = 0; this.txid = HdfsConstants.INVALID_TXID;
} }
public long getTransactionId() { public long getTransactionId() {
Preconditions.checkState(txid != HdfsConstants.INVALID_TXID);
return txid; return txid;
} }
public String getTransactionIdStr() {
return (txid == HdfsConstants.INVALID_TXID) ? "(none)" : "" + txid;
}
public boolean hasTransactionId() {
return (txid != HdfsConstants.INVALID_TXID);
}
public void setTransactionId(long txid) { public void setTransactionId(long txid) {
this.txid = txid; this.txid = txid;
} }
@ -373,8 +388,8 @@ public abstract class FSEditLogOp {
super(OP_ADD); super(OP_ADD);
} }
static AddOp getInstance() { static AddOp getInstance(OpInstanceCache cache) {
return (AddOp)opInstances.get().get(OP_ADD); return (AddOp)cache.get(OP_ADD);
} }
public boolean shouldCompleteLastBlock() { public boolean shouldCompleteLastBlock() {
@ -395,8 +410,8 @@ public abstract class FSEditLogOp {
super(OP_CLOSE); super(OP_CLOSE);
} }
static CloseOp getInstance() { static CloseOp getInstance(OpInstanceCache cache) {
return (CloseOp)opInstances.get().get(OP_CLOSE); return (CloseOp)cache.get(OP_CLOSE);
} }
public boolean shouldCompleteLastBlock() { public boolean shouldCompleteLastBlock() {
@ -420,9 +435,8 @@ public abstract class FSEditLogOp {
super(OP_UPDATE_BLOCKS); super(OP_UPDATE_BLOCKS);
} }
static UpdateBlocksOp getInstance() { static UpdateBlocksOp getInstance(OpInstanceCache cache) {
return (UpdateBlocksOp)opInstances.get() return (UpdateBlocksOp)cache.get(OP_UPDATE_BLOCKS);
.get(OP_UPDATE_BLOCKS);
} }
@ -500,9 +514,8 @@ public abstract class FSEditLogOp {
super(OP_SET_REPLICATION); super(OP_SET_REPLICATION);
} }
static SetReplicationOp getInstance() { static SetReplicationOp getInstance(OpInstanceCache cache) {
return (SetReplicationOp)opInstances.get() return (SetReplicationOp)cache.get(OP_SET_REPLICATION);
.get(OP_SET_REPLICATION);
} }
SetReplicationOp setPath(String path) { SetReplicationOp setPath(String path) {
@ -571,9 +584,8 @@ public abstract class FSEditLogOp {
super(OP_CONCAT_DELETE); super(OP_CONCAT_DELETE);
} }
static ConcatDeleteOp getInstance() { static ConcatDeleteOp getInstance(OpInstanceCache cache) {
return (ConcatDeleteOp)opInstances.get() return (ConcatDeleteOp)cache.get(OP_CONCAT_DELETE);
.get(OP_CONCAT_DELETE);
} }
ConcatDeleteOp setTarget(String trg) { ConcatDeleteOp setTarget(String trg) {
@ -697,9 +709,8 @@ public abstract class FSEditLogOp {
super(OP_RENAME_OLD); super(OP_RENAME_OLD);
} }
static RenameOldOp getInstance() { static RenameOldOp getInstance(OpInstanceCache cache) {
return (RenameOldOp)opInstances.get() return (RenameOldOp)cache.get(OP_RENAME_OLD);
.get(OP_RENAME_OLD);
} }
RenameOldOp setSource(String src) { RenameOldOp setSource(String src) {
@ -790,9 +801,8 @@ public abstract class FSEditLogOp {
super(OP_DELETE); super(OP_DELETE);
} }
static DeleteOp getInstance() { static DeleteOp getInstance(OpInstanceCache cache) {
return (DeleteOp)opInstances.get() return (DeleteOp)cache.get(OP_DELETE);
.get(OP_DELETE);
} }
DeleteOp setPath(String path) { DeleteOp setPath(String path) {
@ -872,9 +882,8 @@ public abstract class FSEditLogOp {
super(OP_MKDIR); super(OP_MKDIR);
} }
static MkdirOp getInstance() { static MkdirOp getInstance(OpInstanceCache cache) {
return (MkdirOp)opInstances.get() return (MkdirOp)cache.get(OP_MKDIR);
.get(OP_MKDIR);
} }
MkdirOp setPath(String path) { MkdirOp setPath(String path) {
@ -977,9 +986,8 @@ public abstract class FSEditLogOp {
super(OP_SET_GENSTAMP); super(OP_SET_GENSTAMP);
} }
static SetGenstampOp getInstance() { static SetGenstampOp getInstance(OpInstanceCache cache) {
return (SetGenstampOp)opInstances.get() return (SetGenstampOp)cache.get(OP_SET_GENSTAMP);
.get(OP_SET_GENSTAMP);
} }
SetGenstampOp setGenerationStamp(long genStamp) { SetGenstampOp setGenerationStamp(long genStamp) {
@ -1031,9 +1039,8 @@ public abstract class FSEditLogOp {
super(OP_SET_PERMISSIONS); super(OP_SET_PERMISSIONS);
} }
static SetPermissionsOp getInstance() { static SetPermissionsOp getInstance(OpInstanceCache cache) {
return (SetPermissionsOp)opInstances.get() return (SetPermissionsOp)cache.get(OP_SET_PERMISSIONS);
.get(OP_SET_PERMISSIONS);
} }
SetPermissionsOp setSource(String src) { SetPermissionsOp setSource(String src) {
@ -1098,9 +1105,8 @@ public abstract class FSEditLogOp {
super(OP_SET_OWNER); super(OP_SET_OWNER);
} }
static SetOwnerOp getInstance() { static SetOwnerOp getInstance(OpInstanceCache cache) {
return (SetOwnerOp)opInstances.get() return (SetOwnerOp)cache.get(OP_SET_OWNER);
.get(OP_SET_OWNER);
} }
SetOwnerOp setSource(String src) { SetOwnerOp setSource(String src) {
@ -1179,9 +1185,8 @@ public abstract class FSEditLogOp {
super(OP_SET_NS_QUOTA); super(OP_SET_NS_QUOTA);
} }
static SetNSQuotaOp getInstance() { static SetNSQuotaOp getInstance(OpInstanceCache cache) {
return (SetNSQuotaOp)opInstances.get() return (SetNSQuotaOp)cache.get(OP_SET_NS_QUOTA);
.get(OP_SET_NS_QUOTA);
} }
@Override @Override
@ -1232,9 +1237,8 @@ public abstract class FSEditLogOp {
super(OP_CLEAR_NS_QUOTA); super(OP_CLEAR_NS_QUOTA);
} }
static ClearNSQuotaOp getInstance() { static ClearNSQuotaOp getInstance(OpInstanceCache cache) {
return (ClearNSQuotaOp)opInstances.get() return (ClearNSQuotaOp)cache.get(OP_CLEAR_NS_QUOTA);
.get(OP_CLEAR_NS_QUOTA);
} }
@Override @Override
@ -1281,9 +1285,8 @@ public abstract class FSEditLogOp {
super(OP_SET_QUOTA); super(OP_SET_QUOTA);
} }
static SetQuotaOp getInstance() { static SetQuotaOp getInstance(OpInstanceCache cache) {
return (SetQuotaOp)opInstances.get() return (SetQuotaOp)cache.get(OP_SET_QUOTA);
.get(OP_SET_QUOTA);
} }
SetQuotaOp setSource(String src) { SetQuotaOp setSource(String src) {
@ -1360,9 +1363,8 @@ public abstract class FSEditLogOp {
super(OP_TIMES); super(OP_TIMES);
} }
static TimesOp getInstance() { static TimesOp getInstance(OpInstanceCache cache) {
return (TimesOp)opInstances.get() return (TimesOp)cache.get(OP_TIMES);
.get(OP_TIMES);
} }
TimesOp setPath(String path) { TimesOp setPath(String path) {
@ -1458,9 +1460,8 @@ public abstract class FSEditLogOp {
super(OP_SYMLINK); super(OP_SYMLINK);
} }
static SymlinkOp getInstance() { static SymlinkOp getInstance(OpInstanceCache cache) {
return (SymlinkOp)opInstances.get() return (SymlinkOp)cache.get(OP_SYMLINK);
.get(OP_SYMLINK);
} }
SymlinkOp setPath(String path) { SymlinkOp setPath(String path) {
@ -1579,9 +1580,8 @@ public abstract class FSEditLogOp {
super(OP_RENAME); super(OP_RENAME);
} }
static RenameOp getInstance() { static RenameOp getInstance(OpInstanceCache cache) {
return (RenameOp)opInstances.get() return (RenameOp)cache.get(OP_RENAME);
.get(OP_RENAME);
} }
RenameOp setSource(String src) { RenameOp setSource(String src) {
@ -1723,9 +1723,8 @@ public abstract class FSEditLogOp {
super(OP_REASSIGN_LEASE); super(OP_REASSIGN_LEASE);
} }
static ReassignLeaseOp getInstance() { static ReassignLeaseOp getInstance(OpInstanceCache cache) {
return (ReassignLeaseOp)opInstances.get() return (ReassignLeaseOp)cache.get(OP_REASSIGN_LEASE);
.get(OP_REASSIGN_LEASE);
} }
ReassignLeaseOp setLeaseHolder(String leaseHolder) { ReassignLeaseOp setLeaseHolder(String leaseHolder) {
@ -1798,9 +1797,8 @@ public abstract class FSEditLogOp {
super(OP_GET_DELEGATION_TOKEN); super(OP_GET_DELEGATION_TOKEN);
} }
static GetDelegationTokenOp getInstance() { static GetDelegationTokenOp getInstance(OpInstanceCache cache) {
return (GetDelegationTokenOp)opInstances.get() return (GetDelegationTokenOp)cache.get(OP_GET_DELEGATION_TOKEN);
.get(OP_GET_DELEGATION_TOKEN);
} }
GetDelegationTokenOp setDelegationTokenIdentifier( GetDelegationTokenOp setDelegationTokenIdentifier(
@ -1870,9 +1868,8 @@ public abstract class FSEditLogOp {
super(OP_RENEW_DELEGATION_TOKEN); super(OP_RENEW_DELEGATION_TOKEN);
} }
static RenewDelegationTokenOp getInstance() { static RenewDelegationTokenOp getInstance(OpInstanceCache cache) {
return (RenewDelegationTokenOp)opInstances.get() return (RenewDelegationTokenOp)cache.get(OP_RENEW_DELEGATION_TOKEN);
.get(OP_RENEW_DELEGATION_TOKEN);
} }
RenewDelegationTokenOp setDelegationTokenIdentifier( RenewDelegationTokenOp setDelegationTokenIdentifier(
@ -1941,9 +1938,8 @@ public abstract class FSEditLogOp {
super(OP_CANCEL_DELEGATION_TOKEN); super(OP_CANCEL_DELEGATION_TOKEN);
} }
static CancelDelegationTokenOp getInstance() { static CancelDelegationTokenOp getInstance(OpInstanceCache cache) {
return (CancelDelegationTokenOp)opInstances.get() return (CancelDelegationTokenOp)cache.get(OP_CANCEL_DELEGATION_TOKEN);
.get(OP_CANCEL_DELEGATION_TOKEN);
} }
CancelDelegationTokenOp setDelegationTokenIdentifier( CancelDelegationTokenOp setDelegationTokenIdentifier(
@ -1996,9 +1992,8 @@ public abstract class FSEditLogOp {
super(OP_UPDATE_MASTER_KEY); super(OP_UPDATE_MASTER_KEY);
} }
static UpdateMasterKeyOp getInstance() { static UpdateMasterKeyOp getInstance(OpInstanceCache cache) {
return (UpdateMasterKeyOp)opInstances.get() return (UpdateMasterKeyOp)cache.get(OP_UPDATE_MASTER_KEY);
.get(OP_UPDATE_MASTER_KEY);
} }
UpdateMasterKeyOp setDelegationKey(DelegationKey key) { UpdateMasterKeyOp setDelegationKey(DelegationKey key) {
@ -2050,8 +2045,9 @@ public abstract class FSEditLogOp {
code == OP_END_LOG_SEGMENT : "Bad op: " + code; code == OP_END_LOG_SEGMENT : "Bad op: " + code;
} }
static LogSegmentOp getInstance(FSEditLogOpCodes code) { static LogSegmentOp getInstance(OpInstanceCache cache,
return (LogSegmentOp)opInstances.get().get(code); FSEditLogOpCodes code) {
return (LogSegmentOp)cache.get(code);
} }
public void readFields(DataInputStream in, int logVersion) public void readFields(DataInputStream in, int logVersion)
@ -2091,8 +2087,8 @@ public abstract class FSEditLogOp {
super(OP_INVALID); super(OP_INVALID);
} }
static InvalidOp getInstance() { static InvalidOp getInstance(OpInstanceCache cache) {
return (InvalidOp)opInstances.get().get(OP_INVALID); return (InvalidOp)cache.get(OP_INVALID);
} }
@Override @Override
@ -2207,6 +2203,7 @@ public abstract class FSEditLogOp {
private final DataInputStream in; private final DataInputStream in;
private final int logVersion; private final int logVersion;
private final Checksum checksum; private final Checksum checksum;
private final OpInstanceCache cache;
/** /**
* Construct the reader * Construct the reader
@ -2228,6 +2225,7 @@ public abstract class FSEditLogOp {
} else { } else {
this.in = in; this.in = in;
} }
this.cache = new OpInstanceCache();
} }
/** /**
@ -2236,16 +2234,42 @@ public abstract class FSEditLogOp {
* Note that the objects returned from this method may be re-used by future * Note that the objects returned from this method may be re-used by future
* calls to the same method. * calls to the same method.
* *
* @param skipBrokenEdits If true, attempt to skip over damaged parts of
* the input stream, rather than throwing an IOException
* @return the operation read from the stream, or null at the end of the file * @return the operation read from the stream, or null at the end of the file
* @throws IOException on error. * @throws IOException on error.
*/ */
public FSEditLogOp readOp() throws IOException { public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
FSEditLogOp op = null;
while (true) {
try {
in.mark(in.available());
try {
op = decodeOp();
} finally {
// If we encountered an exception or an end-of-file condition,
// do not advance the input stream.
if (op == null) {
in.reset();
}
}
return op;
} catch (IOException e) {
if (!skipBrokenEdits) {
throw e;
}
if (in.skip(1) < 1) {
return null;
}
}
}
}
private FSEditLogOp decodeOp() throws IOException {
if (checksum != null) { if (checksum != null) {
checksum.reset(); checksum.reset();
} }
in.mark(1);
byte opCodeByte; byte opCodeByte;
try { try {
opCodeByte = in.readByte(); opCodeByte = in.readByte();
@ -2255,12 +2279,10 @@ public abstract class FSEditLogOp {
} }
FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
if (opCode == OP_INVALID) { if (opCode == OP_INVALID)
in.reset(); // reset back to end of file if somebody reads it again
return null; return null;
}
FSEditLogOp op = opInstances.get().get(opCode); FSEditLogOp op = cache.get(opCode);
if (op == null) { if (op == null) {
throw new IOException("Read invalid opcode " + opCode); throw new IOException("Read invalid opcode " + opCode);
} }
@ -2268,6 +2290,8 @@ public abstract class FSEditLogOp {
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) { if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
// Read the txid // Read the txid
op.setTransactionId(in.readLong()); op.setTransactionId(in.readLong());
} else {
op.setTransactionId(HdfsConstants.INVALID_TXID);
} }
op.readFields(in, logVersion); op.readFields(in, logVersion);
@ -2426,8 +2450,4 @@ public abstract class FSEditLogOp {
short mode = Short.valueOf(st.getValue("MODE")); short mode = Short.valueOf(st.getValue("MODE"));
return new PermissionStatus(username, groupname, new FsPermission(mode)); return new PermissionStatus(username, groupname, new FsPermission(mode));
} }
}
public static FSEditLogOp getOpInstance(FSEditLogOpCodes opCode) {
return opInstances.get().get(opCode);
}
}

View File

@ -158,8 +158,8 @@ public class FSImage implements Closeable {
* @throws IOException * @throws IOException
* @return true if the image needs to be saved or false otherwise * @return true if the image needs to be saved or false otherwise
*/ */
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target) boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
throws IOException { MetaRecoveryContext recovery) throws IOException {
assert startOpt != StartupOption.FORMAT : assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image"; "NameNode formatting should be performed before reading the image";
@ -244,7 +244,7 @@ public class FSImage implements Closeable {
// just load the image // just load the image
} }
return loadFSImage(target); return loadFSImage(target, recovery);
} }
/** /**
@ -304,7 +304,7 @@ public class FSImage implements Closeable {
if(storage.getDistributedUpgradeState()) { if(storage.getDistributedUpgradeState()) {
// only distributed upgrade need to continue // only distributed upgrade need to continue
// don't do version upgrade // don't do version upgrade
this.loadFSImage(target); this.loadFSImage(target, null);
storage.initializeDistributedUpgrade(); storage.initializeDistributedUpgrade();
return; return;
} }
@ -319,7 +319,7 @@ public class FSImage implements Closeable {
} }
// load the latest image // load the latest image
this.loadFSImage(target); this.loadFSImage(target, null);
// Do upgrade for each directory // Do upgrade for each directory
long oldCTime = storage.getCTime(); long oldCTime = storage.getCTime();
@ -505,7 +505,7 @@ public class FSImage implements Closeable {
target.dir.fsImage = ckptImage; target.dir.fsImage = ckptImage;
// load from the checkpoint dirs // load from the checkpoint dirs
try { try {
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target); ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
} finally { } finally {
ckptImage.close(); ckptImage.close();
} }
@ -550,7 +550,7 @@ public class FSImage implements Closeable {
target.dir.reset(); target.dir.reset();
LOG.debug("Reloading namespace from " + file); LOG.debug("Reloading namespace from " + file);
loadFSImage(file, target); loadFSImage(file, target, null);
} }
/** /**
@ -568,7 +568,8 @@ public class FSImage implements Closeable {
* @return whether the image should be saved * @return whether the image should be saved
* @throws IOException * @throws IOException
*/ */
boolean loadFSImage(FSNamesystem target) throws IOException { boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery)
throws IOException {
FSImageStorageInspector inspector = storage.readAndInspectDirs(); FSImageStorageInspector inspector = storage.readAndInspectDirs();
isUpgradeFinalized = inspector.isUpgradeFinalized(); isUpgradeFinalized = inspector.isUpgradeFinalized();
@ -583,7 +584,6 @@ public class FSImage implements Closeable {
// We only want to recover streams if we're going into Active mode. // We only want to recover streams if we're going into Active mode.
editLog.recoverUnclosedStreams(); editLog.recoverUnclosedStreams();
} }
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) { getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so // If we're open for write, we're either non-HA or we're the active NN, so
@ -610,7 +610,7 @@ public class FSImage implements Closeable {
getLayoutVersion())) { getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file // For txid-based layout, we should have a .md5 file
// next to the image file // next to the image file
loadFSImage(imageFile.getFile(), target); loadFSImage(imageFile.getFile(), target, recovery);
} else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM, } else if (LayoutVersion.supports(Feature.FSIMAGE_CHECKSUM,
getLayoutVersion())) { getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file. // In 0.22, we have the checksum stored in the VERSION file.
@ -622,22 +622,19 @@ public class FSImage implements Closeable {
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY + NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY +
" not set for storage directory " + sdForProperties.getRoot()); " not set for storage directory " + sdForProperties.getRoot());
} }
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target); loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery);
} else { } else {
// We don't have any record of the md5sum // We don't have any record of the md5sum
loadFSImage(imageFile.getFile(), null, target); loadFSImage(imageFile.getFile(), null, target, recovery);
} }
} catch (IOException ioe) { } catch (IOException ioe) {
FSEditLog.closeAllStreams(editStreams); FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load image from " + imageFile, ioe); throw new IOException("Failed to load image from " + imageFile, ioe);
} }
long txnsAdvanced = loadEdits(editStreams, target, recovery);
long numLoaded = loadEdits(editStreams, target);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(), needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
numLoaded); txnsAdvanced);
editLog.setNextTxId(lastAppliedTxId + 1);
// update the txid for the edit log
editLog.setNextTxId(storage.getMostRecentCheckpointTxId() + numLoaded + 1);
return needToSave; return needToSave;
} }
@ -664,33 +661,29 @@ public class FSImage implements Closeable {
/** /**
* Load the specified list of edit files into the image. * Load the specified list of edit files into the image.
* @return the number of transactions loaded
*/ */
public long loadEdits(Iterable<EditLogInputStream> editStreams, public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException, EditLogInputException { FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams)); LOG.debug("About to load edits:\n " + Joiner.on("\n ").join(editStreams));
long startingTxId = getLastAppliedTxId() + 1; long prevLastAppliedTxId = lastAppliedTxId;
long numLoaded = 0;
try { try {
FSEditLogLoader loader = new FSEditLogLoader(target); FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
// Load latest edits // Load latest edits
for (EditLogInputStream editIn : editStreams) { for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId); LOG.info("Reading " + editIn + " expecting start txid #" +
long thisNumLoaded = 0; (lastAppliedTxId + 1));
try { try {
thisNumLoaded = loader.loadFSEdits(editIn, startingTxId); loader.loadFSEdits(editIn, lastAppliedTxId + 1, recovery);
} catch (EditLogInputException elie) {
thisNumLoaded = elie.getNumEditsLoaded();
throw elie;
} finally { } finally {
// Update lastAppliedTxId even in case of error, since some ops may // Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error. // have been successfully applied before the error.
lastAppliedTxId = startingTxId + thisNumLoaded - 1; lastAppliedTxId = loader.getLastAppliedTxId();
startingTxId += thisNumLoaded; }
numLoaded += thisNumLoaded; // If we are in recovery mode, we may have skipped over some txids.
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
lastAppliedTxId = editIn.getLastTxId();
} }
} }
} finally { } finally {
@ -698,8 +691,7 @@ public class FSImage implements Closeable {
// update the counts // update the counts
target.dir.updateCountForINodeWithQuota(); target.dir.updateCountForINodeWithQuota();
} }
return lastAppliedTxId - prevLastAppliedTxId;
return numLoaded;
} }
@ -707,14 +699,14 @@ public class FSImage implements Closeable {
* Load the image namespace from the given image file, verifying * Load the image namespace from the given image file, verifying
* it against the MD5 sum stored in its associated .md5 file. * it against the MD5 sum stored in its associated .md5 file.
*/ */
private void loadFSImage(File imageFile, FSNamesystem target) private void loadFSImage(File imageFile, FSNamesystem target,
throws IOException { MetaRecoveryContext recovery) throws IOException {
MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile); MD5Hash expectedMD5 = MD5FileUtils.readStoredMd5ForFile(imageFile);
if (expectedMD5 == null) { if (expectedMD5 == null) {
throw new IOException("No MD5 file found corresponding to image file " throw new IOException("No MD5 file found corresponding to image file "
+ imageFile); + imageFile);
} }
loadFSImage(imageFile, expectedMD5, target); loadFSImage(imageFile, expectedMD5, target, recovery);
} }
/** /**
@ -722,7 +714,7 @@ public class FSImage implements Closeable {
* filenames and blocks. * filenames and blocks.
*/ */
private void loadFSImage(File curFile, MD5Hash expectedMd5, private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target) throws IOException { FSNamesystem target, MetaRecoveryContext recovery) throws IOException {
FSImageFormat.Loader loader = new FSImageFormat.Loader( FSImageFormat.Loader loader = new FSImageFormat.Loader(
conf, target); conf, target);
loader.load(curFile); loader.load(curFile);

View File

@ -65,7 +65,14 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
return; return;
} }
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd)); // Check for a seen_txid file, which marks a minimum transaction ID that
// must be included in our load plan.
try {
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
} catch (IOException ioe) {
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
return;
}
File currentDir = sd.getCurrentDir(); File currentDir = sd.getCurrentDir();
File filesInStorage[]; File filesInStorage[];
@ -100,15 +107,6 @@ class FSImageTransactionalStorageInspector extends FSImageStorageInspector {
} }
} }
// Check for a seen_txid file, which marks a minimum transaction ID that
// must be included in our load plan.
try {
maxSeenTxId = Math.max(maxSeenTxId, NNStorage.readTransactionIdFile(sd));
} catch (IOException ioe) {
LOG.warn("Unable to determine the max transaction ID seen by " + sd, ioe);
}
// set finalized flag // set finalized flag
isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists(); isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
} }

View File

@ -380,9 +380,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs); FSImage fsImage = new FSImage(conf, namespaceDirs, namespaceEditsDirs);
FSNamesystem namesystem = new FSNamesystem(conf, fsImage); FSNamesystem namesystem = new FSNamesystem(conf, fsImage);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = now(); long loadStart = now();
StartupOption startOpt = NameNode.getStartupOption(conf);
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf); String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
namesystem.loadFSImage(startOpt, fsImage, namesystem.loadFSImage(startOpt, fsImage,
HAUtil.isHAEnabled(conf, nameserviceId)); HAUtil.isHAEnabled(conf, nameserviceId));
@ -491,7 +494,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock(); writeLock();
try { try {
// We shouldn't be calling saveNamespace if we've come up in standby state. // We shouldn't be calling saveNamespace if we've come up in standby state.
if (fsImage.recoverTransitionRead(startOpt, this) && !haEnabled) { MetaRecoveryContext recovery = startOpt.createRecoveryContext();
if (fsImage.recoverTransitionRead(startOpt, this, recovery) && !haEnabled) {
fsImage.saveNamespace(this); fsImage.saveNamespace(this);
} }
// This will start a new log segment and write to the seen_txid file, so // This will start a new log segment and write to the seen_txid file, so

View File

@ -232,7 +232,10 @@ class FileJournalManager implements JournalManager {
LOG.info(String.format("Log begins at txid %d, but requested start " LOG.info(String.format("Log begins at txid %d, but requested start "
+ "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId, + "txid is %d. Skipping %d edits.", elf.getFirstTxId(), fromTxId,
transactionsToSkip)); transactionsToSkip));
elfis.skipTransactions(transactionsToSkip); }
if (elfis.skipUntil(fromTxId) == false) {
throw new IOException("failed to advance input stream to txid " +
fromTxId);
} }
return elfis; return elfis;
} }

View File

@ -1,56 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
/**
* A generic interface for journal input and output streams.
*/
interface JournalStream {
/**
* Type of the underlying persistent storage type the stream is based upon.
* <ul>
* <li>{@link JournalType#FILE} - streams edits into a local file, see
* {@link FSEditLog.EditLogFileOutputStream} and
* {@link FSEditLog.EditLogFileInputStream}</li>
* <li>{@link JournalType#BACKUP} - streams edits to a backup node, see
* {@link EditLogBackupOutputStream} and {@link EditLogBackupInputStream}</li>
* </ul>
*/
static enum JournalType {
FILE,
BACKUP;
boolean isOfType(JournalType other) {
return other == null || this == other;
}
};
/**
* Get this stream name.
*
* @return name of the stream
*/
String getName();
/**
* Get the type of the stream.
* Determines the underlying persistent storage type.
* @see JournalType
* @return type
*/
JournalType getType();
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** Context data for an ongoing NameNode metadata recovery process. */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class MetaRecoveryContext {
public static final Log LOG = LogFactory.getLog(MetaRecoveryContext.class.getName());
public final static int FORCE_NONE = 0;
public final static int FORCE_FIRST_CHOICE = 1;
public final static int FORCE_ALL = 2;
private int force;
/** Exception thrown when the user has requested processing to stop. */
static public class RequestStopException extends IOException {
private static final long serialVersionUID = 1L;
public RequestStopException(String msg) {
super(msg);
}
}
public MetaRecoveryContext(int force) {
this.force = force;
}
/**
* Display a prompt to the user and get his or her choice.
*
* @param prompt The prompt to display
* @param default First choice (will be taken if autoChooseDefault is
* true)
* @param choices Other choies
*
* @return The choice that was taken
* @throws IOException
*/
public String ask(String prompt, String firstChoice, String... choices)
throws IOException {
while (true) {
LOG.info(prompt);
if (force > FORCE_NONE) {
LOG.info("automatically choosing " + firstChoice);
return firstChoice;
}
StringBuilder responseBuilder = new StringBuilder();
while (true) {
int c = System.in.read();
if (c == -1 || c == '\r' || c == '\n') {
break;
}
responseBuilder.append((char)c);
}
String response = responseBuilder.toString();
if (response.equalsIgnoreCase(firstChoice))
return firstChoice;
for (String c : choices) {
if (response.equalsIgnoreCase(c)) {
return c;
}
}
LOG.error("I'm sorry, I cannot understand your response.\n");
}
}
public static void editLogLoaderPrompt(String prompt,
MetaRecoveryContext recovery, String contStr)
throws IOException, RequestStopException
{
if (recovery == null) {
throw new IOException(prompt);
}
LOG.error(prompt);
String answer = recovery.ask("\nEnter 'c' to continue, " + contStr + "\n" +
"Enter 's' to stop reading the edit log here, abandoning any later " +
"edits\n" +
"Enter 'q' to quit without saving\n" +
"Enter 'a' to always select the first choice in the future " +
"without prompting. " +
"(c/s/q/a)\n", "c", "s", "q", "a");
if (answer.equals("c")) {
LOG.info("Continuing.");
return;
} else if (answer.equals("s")) {
throw new RequestStopException("user requested stop");
} else if (answer.equals("q")) {
recovery.quit();
} else {
recovery.setForce(FORCE_FIRST_CHOICE);
return;
}
}
/** Log a message and quit */
public void quit() {
LOG.error("Exiting on user request.");
System.exit(0);
}
public int getForce() {
return this.force;
}
public void setForce(int force) {
this.force = force;
}
}

View File

@ -49,7 +49,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeManager; import org.apache.hadoop.hdfs.server.common.UpgradeManager;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
@ -297,8 +296,7 @@ public class NNStorage extends Storage implements Closeable {
NameNodeDirType.IMAGE; NameNodeDirType.IMAGE;
// Add to the list of storage directories, only if the // Add to the list of storage directories, only if the
// URI is of type file:// // URI is of type file://
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) if(dirName.getScheme().compareTo("file") == 0) {
== 0){
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
dirType, dirType,
!sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared. !sharedEditsDirs.contains(dirName))); // Don't lock the dir if it's shared.
@ -310,8 +308,7 @@ public class NNStorage extends Storage implements Closeable {
checkSchemeConsistency(dirName); checkSchemeConsistency(dirName);
// Add to the list of storage directories, only if the // Add to the list of storage directories, only if the
// URI is of type file:// // URI is of type file://
if(dirName.getScheme().compareTo(JournalType.FILE.name().toLowerCase()) if(dirName.getScheme().compareTo("file") == 0)
== 0)
this.addStorageDir(new StorageDirectory(new File(dirName.getPath()), this.addStorageDir(new StorageDirectory(new File(dirName.getPath()),
NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName))); NameNodeDirType.EDITS, !sharedEditsDirs.contains(dirName)));
} }

View File

@ -514,6 +514,8 @@ public class NameNode {
* <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li> * <li>{@link StartupOption#CHECKPOINT CHECKPOINT} - start checkpoint node</li>
* <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster
* upgrade and create a snapshot of the current file system state</li> * upgrade and create a snapshot of the current file system state</li>
* <li>{@link StartupOption#RECOVERY RECOVERY} - recover name node
* metadata</li>
* <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the
* cluster back to the previous state</li> * cluster back to the previous state</li>
* <li>{@link StartupOption#FINALIZE FINALIZE} - finalize * <li>{@link StartupOption#FINALIZE FINALIZE} - finalize
@ -832,7 +834,10 @@ public class NameNode {
StartupOption.FINALIZE.getName() + "] | [" + StartupOption.FINALIZE.getName() + "] | [" +
StartupOption.IMPORT.getName() + "] | [" + StartupOption.IMPORT.getName() + "] | [" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" + StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
StartupOption.INITIALIZESHAREDEDITS.getName() + "]"); StartupOption.INITIALIZESHAREDEDITS.getName() + "] | [" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
StartupOption.RECOVER.getName() + " [ " +
StartupOption.FORCE.getName() + " ] ]");
} }
private static StartupOption parseArguments(String args[]) { private static StartupOption parseArguments(String args[]) {
@ -876,6 +881,21 @@ public class NameNode {
} else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) { } else if (StartupOption.INITIALIZESHAREDEDITS.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.INITIALIZESHAREDEDITS; startOpt = StartupOption.INITIALIZESHAREDEDITS;
return startOpt; return startOpt;
} else if (StartupOption.RECOVER.getName().equalsIgnoreCase(cmd)) {
if (startOpt != StartupOption.REGULAR) {
throw new RuntimeException("Can't combine -recover with " +
"other startup options.");
}
startOpt = StartupOption.RECOVER;
while (++i < argsLen) {
if (args[i].equalsIgnoreCase(
StartupOption.FORCE.getName())) {
startOpt.setForce(MetaRecoveryContext.FORCE_FIRST_CHOICE);
} else {
throw new RuntimeException("Error parsing recovery options: " +
"can't understand option \"" + args[i] + "\"");
}
}
} else { } else {
return null; return null;
} }
@ -892,6 +912,39 @@ public class NameNode {
StartupOption.REGULAR.toString())); StartupOption.REGULAR.toString()));
} }
private static void doRecovery(StartupOption startOpt, Configuration conf)
throws IOException {
if (startOpt.getForce() < MetaRecoveryContext.FORCE_ALL) {
if (!confirmPrompt("You have selected Metadata Recovery mode. " +
"This mode is intended to recover lost metadata on a corrupt " +
"filesystem. Metadata recovery mode often permanently deletes " +
"data from your HDFS filesystem. Please back up your edit log " +
"and fsimage before trying this!\n\n" +
"Are you ready to proceed? (Y/N)\n")) {
System.err.println("Recovery aborted at user request.\n");
return;
}
}
MetaRecoveryContext.LOG.info("starting recovery...");
UserGroupInformation.setConfiguration(conf);
NameNode.initMetrics(conf, startOpt.toNodeRole());
FSNamesystem fsn = null;
try {
fsn = FSNamesystem.loadFromDisk(conf);
fsn.saveNamespace();
MetaRecoveryContext.LOG.info("RECOVERY COMPLETE");
} catch (IOException e) {
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
throw e;
} catch (RuntimeException e) {
MetaRecoveryContext.LOG.info("RECOVERY FAILED: caught exception", e);
throw e;
} finally {
if (fsn != null)
fsn.close();
}
}
/** /**
* Print out a prompt to the user, and return true if the user * Print out a prompt to the user, and return true if the user
* responds with "Y" or "yes". * responds with "Y" or "yes".
@ -973,6 +1026,10 @@ public class NameNode {
DefaultMetricsSystem.initialize(role.toString().replace(" ", "")); DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role); return new BackupNode(conf, role);
} }
case RECOVER: {
NameNode.doRecovery(startOpt, conf);
return null;
}
default: default:
DefaultMetricsSystem.initialize("NameNode"); DefaultMetricsSystem.initialize("NameNode");
return new NameNode(conf); return new NameNode(conf);

View File

@ -219,7 +219,7 @@ public class EditLogTailer {
// disk are ignored. // disk are ignored.
long editsLoaded = 0; long editsLoaded = 0;
try { try {
editsLoaded = image.loadEdits(streams, namesystem); editsLoaded = image.loadEdits(streams, namesystem, null);
} catch (EditLogInputException elie) { } catch (EditLogInputException elie) {
editsLoaded = elie.getNumEditsLoaded(); editsLoaded = elie.getNumEditsLoaded();
throw elie; throw elie;

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException; import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.util.XMLUtils.Stanza; import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
import org.xml.sax.Attributes; import org.xml.sax.Attributes;
@ -54,6 +55,7 @@ class OfflineEditsXmlLoader
private FSEditLogOpCodes opCode; private FSEditLogOpCodes opCode;
private StringBuffer cbuf; private StringBuffer cbuf;
private long nextTxId; private long nextTxId;
private final OpInstanceCache opCache = new OpInstanceCache();
static enum ParseState { static enum ParseState {
EXPECT_EDITS_TAG, EXPECT_EDITS_TAG,
@ -207,7 +209,7 @@ class OfflineEditsXmlLoader
throw new InvalidXmlException("expected </DATA>"); throw new InvalidXmlException("expected </DATA>");
} }
state = ParseState.EXPECT_RECORD; state = ParseState.EXPECT_RECORD;
FSEditLogOp op = FSEditLogOp.getOpInstance(opCode); FSEditLogOp op = opCache.get(opCode);
opCode = null; opCode = null;
try { try {
op.decodeXml(stanza); op.decodeXml(stanza);

View File

@ -581,6 +581,10 @@ public class MiniDFSCluster {
} }
} }
if (operation == StartupOption.RECOVER) {
return;
}
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
hosts, simulatedCapacities, setupHostsFile); hosts, simulatedCapacities, setupHostsFile);
@ -781,6 +785,9 @@ public class MiniDFSCluster {
operation == StartupOption.REGULAR) ? operation == StartupOption.REGULAR) ?
new String[] {} : new String[] {operation.getName()}; new String[] {} : new String[] {operation.getName()};
NameNode nn = NameNode.createNameNode(args, conf); NameNode nn = NameNode.createNameNode(args, conf);
if (operation == StartupOption.RECOVER) {
return;
}
// After the NN has started, set back the bound ports into // After the NN has started, set back the bound ports into
// the conf // the conf
@ -956,6 +963,9 @@ public class MiniDFSCluster {
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig) throws IOException {
if (operation == StartupOption.RECOVER) {
return;
}
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1"); conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
int curDatanodesNum = dataNodes.size(); int curDatanodesNum = dataNodes.size();

View File

@ -180,8 +180,8 @@ public class TestEditLog extends TestCase {
} }
private long testLoad(byte[] data, FSNamesystem namesys) throws IOException { private long testLoad(byte[] data, FSNamesystem namesys) throws IOException {
FSEditLogLoader loader = new FSEditLogLoader(namesys); FSEditLogLoader loader = new FSEditLogLoader(namesys, 0);
return loader.loadFSEdits(new EditLogByteInputStream(data), 1); return loader.loadFSEdits(new EditLogByteInputStream(data), 1, null);
} }
/** /**
@ -316,7 +316,7 @@ public class TestEditLog extends TestCase {
// //
for (Iterator<StorageDirectory> it = for (Iterator<StorageDirectory> it =
fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) { fsimage.getStorage().dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
FSEditLogLoader loader = new FSEditLogLoader(namesystem); FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3, File editFile = NNStorage.getFinalizedEditsFile(it.next(), 3,
3 + expectedTxns - 1); 3 + expectedTxns - 1);
@ -324,7 +324,7 @@ public class TestEditLog extends TestCase {
System.out.println("Verifying file: " + editFile); System.out.println("Verifying file: " + editFile);
long numEdits = loader.loadFSEdits( long numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile), 3); new EditLogFileInputStream(editFile), 3, null);
int numLeases = namesystem.leaseManager.countLease(); int numLeases = namesystem.leaseManager.countLease();
System.out.println("Number of outstanding leases " + numLeases); System.out.println("Number of outstanding leases " + numLeases);
assertEquals(0, numLeases); assertEquals(0, numLeases);
@ -775,8 +775,8 @@ public class TestEditLog extends TestCase {
} }
@Override @Override
public FSEditLogOp readOp() throws IOException { protected FSEditLogOp nextOp() throws IOException {
return reader.readOp(); return reader.readOp(false);
} }
@Override @Override
@ -789,16 +789,11 @@ public class TestEditLog extends TestCase {
input.close(); input.close();
} }
@Override // JournalStream @Override
public String getName() { public String getName() {
return "AnonEditLogByteInputStream"; return "AnonEditLogByteInputStream";
} }
@Override // JournalStream
public JournalType getType() {
return JournalType.FILE;
}
@Override @Override
public boolean isInProgress() { public boolean isInProgress() {
return true; return true;

View File

@ -236,9 +236,9 @@ public class TestEditLogRace {
File editFile = new File(sd.getCurrentDir(), logFileName); File editFile = new File(sd.getCurrentDir(), logFileName);
System.out.println("Verifying file: " + editFile); System.out.println("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem); FSEditLogLoader loader = new FSEditLogLoader(namesystem, startTxId);
long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile), long numEditsThisLog = loader.loadFSEdits(new EditLogFileInputStream(editFile),
startTxId); startTxId, null);
System.out.println("Number of edits: " + numEditsThisLog); System.out.println("Number of edits: " + numEditsThisLog);
assertTrue(numEdits == -1 || numEditsThisLog == numEdits); assertTrue(numEdits == -1 || numEditsThisLog == numEdits);

View File

@ -92,8 +92,8 @@ public class TestFSEditLogLoader {
rwf.close(); rwf.close();
StringBuilder bld = new StringBuilder(); StringBuilder bld = new StringBuilder();
bld.append("^Error replaying edit log at offset \\d+"); bld.append("^Error replaying edit log at offset \\d+. ");
bld.append(" on transaction ID \\d+\n"); bld.append("Expected transaction ID was \\d+\n");
bld.append("Recent opcode offsets: (\\d+\\s*){4}$"); bld.append("Recent opcode offsets: (\\d+\\s*){4}$");
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES)

View File

@ -0,0 +1,305 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DeleteOp;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import com.google.common.collect.Sets;
/**
* This tests data recovery mode for the NameNode.
*/
public class TestNameNodeRecovery {
private static final Log LOG = LogFactory.getLog(TestNameNodeRecovery.class);
private static StartupOption recoverStartOpt = StartupOption.RECOVER;
static {
recoverStartOpt.setForce(MetaRecoveryContext.FORCE_ALL);
}
static void runEditLogTest(EditLogTestSetup elts) throws IOException {
final String TEST_LOG_NAME = "test_edit_log";
final OpInstanceCache cache = new OpInstanceCache();
EditLogFileOutputStream elfos = null;
File file = null;
EditLogFileInputStream elfis = null;
try {
file = new File(TEST_LOG_NAME);
elfos = new EditLogFileOutputStream(file, 0);
elfos.create();
elts.addTransactionsToLog(elfos, cache);
elfos.setReadyToFlush();
elfos.flushAndSync();
elfos.close();
elfos = null;
file = new File(TEST_LOG_NAME);
elfis = new EditLogFileInputStream(file);
// reading through normally will get you an exception
Set<Long> validTxIds = elts.getValidTxIds();
FSEditLogOp op = null;
long prevTxId = 0;
try {
while (true) {
op = elfis.nextOp();
if (op == null) {
break;
}
LOG.debug("read txid " + op.txid);
if (!validTxIds.contains(op.getTransactionId())) {
fail("read txid " + op.getTransactionId() +
", which we did not expect to find.");
}
validTxIds.remove(op.getTransactionId());
prevTxId = op.getTransactionId();
}
if (elts.getLastValidTxId() != -1) {
fail("failed to throw IoException as expected");
}
} catch (IOException e) {
if (elts.getLastValidTxId() == -1) {
fail("expected all transactions to be valid, but got exception " +
"on txid " + prevTxId);
} else {
assertEquals(prevTxId, elts.getLastValidTxId());
}
}
if (elts.getLastValidTxId() != -1) {
// let's skip over the bad transaction
op = null;
prevTxId = 0;
try {
while (true) {
op = elfis.nextValidOp();
if (op == null) {
break;
}
prevTxId = op.getTransactionId();
assertTrue(validTxIds.remove(op.getTransactionId()));
}
} catch (Throwable e) {
fail("caught IOException while trying to skip over bad " +
"transaction. message was " + e.getMessage() +
"\nstack trace\n" + StringUtils.stringifyException(e));
}
}
// We should have read every valid transaction.
assertTrue(validTxIds.isEmpty());
} finally {
IOUtils.cleanup(LOG, elfos, elfis);
}
}
private interface EditLogTestSetup {
/**
* Set up the edit log.
*/
abstract public void addTransactionsToLog(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException;
/**
* Get the transaction ID right before the transaction which causes the
* normal edit log loading process to bail out-- or -1 if the first
* transaction should be bad.
*/
abstract public long getLastValidTxId();
/**
* Get the transaction IDs which should exist and be valid in this
* edit log.
**/
abstract public Set<Long> getValidTxIds();
}
private class EltsTestEmptyLog implements EditLogTestSetup {
public void addTransactionsToLog(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException {
// do nothing
}
public long getLastValidTxId() {
return -1;
}
public Set<Long> getValidTxIds() {
return new HashSet<Long>();
}
}
/** Test an empty edit log */
@Test(timeout=180000)
public void testEmptyLog() throws IOException {
runEditLogTest(new EltsTestEmptyLog());
}
private class EltsTestGarbageInEditLog implements EditLogTestSetup {
final private long BAD_TXID = 4;
final private long MAX_TXID = 10;
public void addTransactionsToLog(EditLogOutputStream elos,
OpInstanceCache cache) throws IOException {
for (long txid = 1; txid <= MAX_TXID; txid++) {
if (txid == BAD_TXID) {
byte garbage[] = { 0x1, 0x2, 0x3 };
elos.writeRaw(garbage, 0, garbage.length);
}
else {
DeleteOp op;
op = DeleteOp.getInstance(cache);
op.setTransactionId(txid);
op.setPath("/foo." + txid);
op.setTimestamp(txid);
elos.write(op);
}
}
}
public long getLastValidTxId() {
return BAD_TXID - 1;
}
public Set<Long> getValidTxIds() {
return Sets.newHashSet(1L , 2L, 3L, 5L, 6L, 7L, 8L, 9L, 10L);
}
}
/** Test that we can successfully recover from a situation where there is
* garbage in the middle of the edit log file output stream. */
@Test(timeout=180000)
public void testSkipEdit() throws IOException {
runEditLogTest(new EltsTestGarbageInEditLog());
}
/** Test that we can successfully recover from a situation where the last
* entry in the edit log has been truncated. */
@Test(timeout=180000)
public void testRecoverTruncatedEditLog() throws IOException {
final String TEST_PATH = "/test/path/dir";
final int NUM_TEST_MKDIRS = 10;
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
StorageDirectory sd = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.build();
cluster.waitActive();
fileSys = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
FSImage fsimage = namesystem.getFSImage();
for (int i = 0; i < NUM_TEST_MKDIRS; i++) {
fileSys.mkdirs(new Path(TEST_PATH));
}
sd = fsimage.getStorage().dirIterator(NameNodeDirType.EDITS).next();
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
File editFile = FSImageTestUtil.findLatestEditsLog(sd).getFile();
assertTrue("Should exist: " + editFile, editFile.exists());
// Corrupt the last edit
long fileLen = editFile.length();
RandomAccessFile rwf = new RandomAccessFile(editFile, "rw");
rwf.setLength(fileLen - 1);
rwf.close();
// Make sure that we can't start the cluster normally before recovery
cluster = null;
try {
LOG.debug("trying to start normally (this should fail)...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build();
cluster.waitActive();
cluster.shutdown();
fail("expected the truncated edit log to prevent normal startup");
} catch (IOException e) {
// success
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Perform recovery
cluster = null;
try {
LOG.debug("running recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).startupOption(recoverStartOpt).build();
} catch (IOException e) {
fail("caught IOException while trying to recover. " +
"message was " + e.getMessage() +
"\nstack trace\n" + StringUtils.stringifyException(e));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
// Make sure that we can start the cluster normally after recovery
cluster = null;
try {
LOG.debug("starting cluster normally after recovery...");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(false).build();
LOG.debug("testRecoverTruncatedEditLog: successfully recovered the " +
"truncated edit log");
assertTrue(cluster.getFileSystem().exists(new Path(TEST_PATH)));
} catch (IOException e) {
fail("failed to recover. Error message: " + e.getMessage());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -143,9 +143,9 @@ public class TestSecurityTokenEditLog extends TestCase {
File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 1 + expectedTransactions - 1); File editFile = NNStorage.getFinalizedEditsFile(sd, 1, 1 + expectedTransactions - 1);
System.out.println("Verifying file: " + editFile); System.out.println("Verifying file: " + editFile);
FSEditLogLoader loader = new FSEditLogLoader(namesystem); FSEditLogLoader loader = new FSEditLogLoader(namesystem, 0);
long numEdits = loader.loadFSEdits( long numEdits = loader.loadFSEdits(
new EditLogFileInputStream(editFile), 1); new EditLogFileInputStream(editFile), 1, null);
assertEquals("Verification for " + editFile, expectedTransactions, numEdits); assertEquals("Verification for " + editFile, expectedTransactions, numEdits);
} }
} finally { } finally {