HDFS-2158. Merging change r1177473 from trunk to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-03-07 06:15:35 +00:00
parent b6d9e6b5f7
commit e765df40dd
11 changed files with 724 additions and 507 deletions

View File

@ -98,6 +98,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3030. Remove getProtocolVersion and getProtocolSignature from HDFS-3030. Remove getProtocolVersion and getProtocolSignature from
translators. (jitendra) translators. (jitendra)
HDFS-2158. Add JournalSet to manage the set of journals. (jitendra)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -54,7 +54,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
this.nnRegistration = nnReg; this.nnRegistration = nnReg;
InetSocketAddress bnAddress = InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress()); NetUtils.createSocketAddr(bnRegistration.getAddress());
Storage.LOG.info("EditLogBackupOutputStream connects to: " + bnAddress);
try { try {
this.backupNode = this.backupNode =
new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration()); new JournalProtocolTranslatorPB(bnAddress, new HdfsConfiguration());
@ -66,16 +65,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE); this.out = new DataOutputBuffer(DEFAULT_BUFFER_SIZE);
} }
@Override // JournalStream
public String getName() {
return bnRegistration.getAddress();
}
@Override // JournalStream
public JournalType getType() {
return JournalType.BACKUP;
}
@Override // EditLogOutputStream @Override // EditLogOutputStream
void write(FSEditLogOp op) throws IOException { void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
@ -140,16 +129,6 @@ protected void flushAndSync() throws IOException {
} }
} }
/**
* There is no persistent storage. Therefore length is 0.<p>
* Length is used to check when it is large enough to start a checkpoint.
* This criteria should not be used for backup streams.
*/
@Override // EditLogOutputStream
long length() throws IOException {
return 0;
}
/** /**
* Get backup node registration. * Get backup node registration.
*/ */

View File

@ -37,9 +37,7 @@
* stores edits in a local file. * stores edits in a local file.
*/ */
class EditLogFileOutputStream extends EditLogOutputStream { class EditLogFileOutputStream extends EditLogOutputStream {
private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);; private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
private static int EDITS_FILE_HEADER_SIZE_BYTES = Integer.SIZE / Byte.SIZE;
private File file; private File file;
private FileOutputStream fp; // file stream for storing edit logs private FileOutputStream fp; // file stream for storing edit logs
@ -73,16 +71,6 @@ class EditLogFileOutputStream extends EditLogOutputStream {
fc.position(fc.size()); fc.position(fc.size());
} }
@Override // JournalStream
public String getName() {
return file.getPath();
}
@Override // JournalStream
public JournalType getType() {
return JournalType.FILE;
}
@Override @Override
void write(FSEditLogOp op) throws IOException { void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op); doubleBuf.writeOp(op);
@ -175,7 +163,10 @@ protected void flushAndSync() throws IOException {
if (fp == null) { if (fp == null) {
throw new IOException("Trying to use aborted output stream"); throw new IOException("Trying to use aborted output stream");
} }
if (doubleBuf.isFlushed()) {
LOG.info("Nothing to flush");
return;
}
preallocate(); // preallocate file if necessary preallocate(); // preallocate file if necessary
doubleBuf.flushTo(fp); doubleBuf.flushTo(fp);
fc.force(false); // metadata updates not needed because of preallocation fc.force(false); // metadata updates not needed because of preallocation
@ -189,16 +180,6 @@ protected void flushAndSync() throws IOException {
public boolean shouldForceSync() { public boolean shouldForceSync() {
return doubleBuf.shouldForceSync(); return doubleBuf.shouldForceSync();
} }
/**
* Return the size of the current edit log including buffered data.
*/
@Override
long length() throws IOException {
// file size - header size + size of both buffers
return fc.size() - EDITS_FILE_HEADER_SIZE_BYTES +
doubleBuf.countBufferedBytes();
}
// allocate a big chunk of data // allocate a big chunk of data
private void preallocate() throws IOException { private void preallocate() throws IOException {

View File

@ -18,18 +18,20 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
/** /**
* A generic abstract class to support journaling of edits logs into * A generic abstract class to support journaling of edits logs into
* a persistent storage. * a persistent storage.
*/ */
abstract class EditLogOutputStream implements JournalStream { abstract class EditLogOutputStream {
// these are statistics counters // these are statistics counters
private long numSync; // number of sync(s) to disk private long numSync; // number of sync(s) to disk
private long totalTimeSync; // total time to sync private long totalTimeSync; // total time to sync
EditLogOutputStream() throws IOException { EditLogOutputStream() {
numSync = totalTimeSync = 0; numSync = totalTimeSync = 0;
} }
@ -100,12 +102,6 @@ public void flush() throws IOException {
totalTimeSync += (end - start); totalTimeSync += (end - start);
} }
/**
* Return the size of the current edits log.
* Length is used to check when it is large enough to start a checkpoint.
*/
abstract long length() throws IOException;
/** /**
* Implement the policy when to automatically sync the buffered edits log * Implement the policy when to automatically sync the buffered edits log
* The buffered edits can be flushed when the buffer becomes full or * The buffered edits can be flushed when the buffer becomes full or
@ -127,12 +123,7 @@ long getTotalSyncTime() {
/** /**
* Return number of calls to {@link #flushAndSync()} * Return number of calls to {@link #flushAndSync()}
*/ */
long getNumSync() { protected long getNumSync() {
return numSync; return numSync;
} }
@Override // Object
public String toString() {
return getName();
}
} }

View File

@ -17,12 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -34,25 +34,17 @@
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import static org.apache.hadoop.hdfs.server.common.Util.now; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.JournalManager.CorruptionException;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
/** /**
* FSEditLog maintains a log of the namespace modifications. * FSEditLog maintains a log of the namespace modifications.
@ -62,9 +54,6 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSEditLog { public class FSEditLog {
static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!!" +
" File system changes are not persistent. No journal streams.";
static final Log LOG = LogFactory.getLog(FSEditLog.class); static final Log LOG = LogFactory.getLog(FSEditLog.class);
/** /**
@ -82,10 +71,11 @@ private enum State {
CLOSED; CLOSED;
} }
private State state = State.UNINITIALIZED; private State state = State.UNINITIALIZED;
//initialize
final private JournalSet journalSet;
private EditLogOutputStream editLogStream = null;
private List<JournalAndStream> journals = Lists.newArrayList();
// a monotonically increasing counter that represents transactionIds. // a monotonically increasing counter that represents transactionIds.
private long txid = 0; private long txid = 0;
@ -137,15 +127,15 @@ protected synchronized TransactionId initialValue() {
this.storage = storage; this.storage = storage;
metrics = NameNode.getNameNodeMetrics(); metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now(); lastPrintTime = now();
this.journalSet = new JournalSet();
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) { for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.EDITS)) {
journals.add(new JournalAndStream(new FileJournalManager(sd))); journalSet.add(new FileJournalManager(sd));
} }
if (journals.isEmpty()) { if (journalSet.isEmpty()) {
LOG.error("No edits directories configured!"); LOG.error("No edits directories configured!");
} }
state = State.BETWEEN_LOG_SEGMENTS; state = State.BETWEEN_LOG_SEGMENTS;
} }
@ -172,9 +162,8 @@ synchronized void close() {
LOG.debug("Closing log when already closed"); LOG.debug("Closing log when already closed");
return; return;
} }
if (state == State.IN_SEGMENT) { if (state == State.IN_SEGMENT) {
assert !journals.isEmpty(); assert editLogStream != null;
waitForSyncToFinish(); waitForSyncToFinish();
endCurrentLogSegment(true); endCurrentLogSegment(true);
} }
@ -193,20 +182,14 @@ void logEdit(final FSEditLogOp op) {
// wait if an automatic sync is scheduled // wait if an automatic sync is scheduled
waitIfAutoSyncScheduled(); waitIfAutoSyncScheduled();
if (journals.isEmpty()) {
throw new java.lang.IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
}
long start = beginTransaction(); long start = beginTransaction();
op.setTransactionId(txid); op.setTransactionId(txid);
mapJournalsAndReportErrors(new JournalClosure() { try {
@Override editLogStream.write(op);
public void apply(JournalAndStream jas) throws IOException { } catch (IOException ex) {
if (!jas.isActive()) return; // All journals failed, it is handled in logSync.
jas.stream.write(op); }
}
}, "logging edit");
endTransaction(start); endTransaction(start);
@ -251,14 +234,7 @@ synchronized void doneWithAutoSyncScheduling() {
* @return true if any of the edit stream says that it should sync * @return true if any of the edit stream says that it should sync
*/ */
private boolean shouldForceSync() { private boolean shouldForceSync() {
for (JournalAndStream jas : journals) { return editLogStream.shouldForceSync();
if (!jas.isActive()) continue;
if (jas.getCurrentStream().shouldForceSync()) {
return true;
}
}
return false;
} }
private long beginTransaction() { private long beginTransaction() {
@ -322,7 +298,7 @@ synchronized void setNextTxId(long nextTxId) {
* NOTE: this should be done while holding the FSNamesystem lock, or * NOTE: this should be done while holding the FSNamesystem lock, or
* else more operations can start writing while this is in progress. * else more operations can start writing while this is in progress.
*/ */
void logSyncAll() throws IOException { void logSyncAll() {
// Record the most recent transaction ID as our own id // Record the most recent transaction ID as our own id
synchronized (this) { synchronized (this) {
TransactionId id = myTransactionId.get(); TransactionId id = myTransactionId.get();
@ -366,74 +342,73 @@ public void logSync() {
// Fetch the transactionId of this thread. // Fetch the transactionId of this thread.
long mytxid = myTransactionId.get().txid; long mytxid = myTransactionId.get().txid;
List<JournalAndStream> candidateJournals =
Lists.newArrayListWithCapacity(journals.size());
List<JournalAndStream> badJournals = Lists.newArrayList();
boolean sync = false; boolean sync = false;
try { try {
EditLogOutputStream logStream = null;
synchronized (this) { synchronized (this) {
try { try {
printStatistics(false); printStatistics(false);
// if somebody is already syncing, then wait // if somebody is already syncing, then wait
while (mytxid > synctxid && isSyncRunning) { while (mytxid > synctxid && isSyncRunning) {
try { try {
wait(1000); wait(1000);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
}
} }
}
// //
// If this transaction was already flushed, then nothing to do // If this transaction was already flushed, then nothing to do
// //
if (mytxid <= synctxid) { if (mytxid <= synctxid) {
numTransactionsBatchedInSync++; numTransactionsBatchedInSync++;
if (metrics != null) // Metrics is non-null only when used inside name node if (metrics != null) {
metrics.incrTransactionsBatchedInSync(); // Metrics is non-null only when used inside name node
return; metrics.incrTransactionsBatchedInSync();
} }
return;
}
// now, this thread will do the sync // now, this thread will do the sync
syncStart = txid; syncStart = txid;
isSyncRunning = true; isSyncRunning = true;
sync = true; sync = true;
// swap buffers // swap buffers
assert !journals.isEmpty() : "no editlog streams";
for (JournalAndStream jas : journals) {
if (!jas.isActive()) continue;
try { try {
jas.getCurrentStream().setReadyToFlush(); if (journalSet.isEmpty()) {
candidateJournals.add(jas); throw new IOException("No journals available to flush");
} catch (IOException ie) { }
LOG.error("Unable to get ready to flush.", ie); editLogStream.setReadyToFlush();
badJournals.add(jas); } catch (IOException e) {
LOG.fatal("Could not sync any journal to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid),
new Exception());
runtime.exit(1);
} }
}
} finally { } finally {
// Prevent RuntimeException from blocking other log edit write // Prevent RuntimeException from blocking other log edit write
doneWithAutoSyncScheduling(); doneWithAutoSyncScheduling();
} }
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
} }
// do the sync // do the sync
long start = now(); long start = now();
for (JournalAndStream jas : candidateJournals) { try {
if (!jas.isActive()) continue; if (logStream != null) {
try { logStream.flush();
jas.getCurrentStream().flush(); }
} catch (IOException ie) { } catch (IOException ex) {
LOG.error("Unable to sync edit log.", ie); synchronized (this) {
// LOG.fatal("Could not sync any journal to persistent storage. "
// remember the streams that encountered an error. + "Unsynced transactions: " + (txid - synctxid), new Exception());
// runtime.exit(1);
badJournals.add(jas);
} }
} }
long elapsed = now() - start; long elapsed = now() - start;
disableAndReportErrorOnJournals(badJournals);
if (metrics != null) { // Metrics non-null only when used inside name node if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed); metrics.addSync(elapsed);
@ -443,13 +418,6 @@ public void logSync() {
// Prevent RuntimeException from blocking other log edit sync // Prevent RuntimeException from blocking other log edit sync
synchronized (this) { synchronized (this) {
if (sync) { if (sync) {
if (badJournals.size() >= journals.size()) {
LOG.fatal("Could not sync any journal to persistent storage. " +
"Unsynced transactions: " + (txid - synctxid),
new Exception());
runtime.exit(1);
}
synctxid = syncStart; synctxid = syncStart;
isSyncRunning = false; isSyncRunning = false;
} }
@ -466,9 +434,6 @@ private void printStatistics(boolean force) {
if (lastPrintTime + 60000 > now && !force) { if (lastPrintTime + 60000 > now && !force) {
return; return;
} }
if (journals.isEmpty()) {
return;
}
lastPrintTime = now; lastPrintTime = now;
StringBuilder buf = new StringBuilder(); StringBuilder buf = new StringBuilder();
buf.append("Number of transactions: "); buf.append("Number of transactions: ");
@ -478,20 +443,9 @@ private void printStatistics(boolean force) {
buf.append("Number of transactions batched in Syncs: "); buf.append("Number of transactions batched in Syncs: ");
buf.append(numTransactionsBatchedInSync); buf.append(numTransactionsBatchedInSync);
buf.append(" Number of syncs: "); buf.append(" Number of syncs: ");
for (JournalAndStream jas : journals) { buf.append(editLogStream.getNumSync());
if (!jas.isActive()) continue;
buf.append(jas.getCurrentStream().getNumSync());
break;
}
buf.append(" SyncTimes(ms): "); buf.append(" SyncTimes(ms): ");
buf.append(journalSet.getSyncTimes());
for (JournalAndStream jas : journals) {
if (!jas.isActive()) continue;
EditLogOutputStream eStream = jas.getCurrentStream();
buf.append(eStream.getTotalSyncTime());
buf.append(" ");
}
LOG.info(buf); LOG.info(buf);
} }
@ -664,7 +618,6 @@ void logSymlink(String path, String value, long mtime,
* log delegation token to edit log * log delegation token to edit log
* @param id DelegationTokenIdentifier * @param id DelegationTokenIdentifier
* @param expiryTime of the token * @param expiryTime of the token
* @return
*/ */
void logGetDelegationToken(DelegationTokenIdentifier id, void logGetDelegationToken(DelegationTokenIdentifier id,
long expiryTime) { long expiryTime) {
@ -702,25 +655,12 @@ void logReassignLease(String leaseHolder, String src, String newHolder) {
logEdit(op); logEdit(op);
} }
/**
* @return the number of active (non-failed) journals
*/
private int countActiveJournals() {
int count = 0;
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
count++;
}
}
return count;
}
/** /**
* Used only by unit tests. * Used only by unit tests.
*/ */
@VisibleForTesting @VisibleForTesting
List<JournalAndStream> getJournals() { List<JournalAndStream> getJournals() {
return journals; return journalSet.getAllJournalStreams();
} }
/** /**
@ -742,62 +682,9 @@ void setMetricsForTests(NameNodeMetrics metrics) {
/** /**
* Return a manifest of what finalized edit logs are available * Return a manifest of what finalized edit logs are available
*/ */
public synchronized RemoteEditLogManifest getEditLogManifest( public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
long fromTxId) throws IOException { throws IOException {
// Collect RemoteEditLogs available from each FileJournalManager return journalSet.getEditLogManifest(fromTxId);
List<RemoteEditLog> allLogs = Lists.newArrayList();
for (JournalAndStream j : journals) {
if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager();
try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
} catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t);
}
}
}
// Group logs by their starting txid
ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
long curStartTxId = fromTxId;
List<RemoteEditLog> logs = Lists.newArrayList();
while (true) {
ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
if (logGroup.isEmpty()) {
// we have a gap in logs - for example because we recovered some old
// storage directory with ancient logs. Clear out any logs we've
// accumulated so far, and then skip to the next segment of logs
// after the gap.
SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
startTxIds = startTxIds.tailSet(curStartTxId);
if (startTxIds.isEmpty()) {
break;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Found gap in logs at " + curStartTxId + ": " +
"not returning previous logs in manifest.");
}
logs.clear();
curStartTxId = startTxIds.first();
continue;
}
}
// Find the one that extends the farthest forward
RemoteEditLog bestLog = Collections.max(logGroup);
logs.add(bestLog);
// And then start looking from after that point
curStartTxId = bestLog.getEndTxId() + 1;
}
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+ ret);
}
return ret;
} }
/** /**
@ -840,14 +727,9 @@ synchronized void startLogSegment(final long segmentTxId,
// See HDFS-2174. // See HDFS-2174.
storage.attemptRestoreRemovedStorage(); storage.attemptRestoreRemovedStorage();
mapJournalsAndReportErrors(new JournalClosure() { try {
@Override editLogStream = journalSet.startLogSegment(segmentTxId);
public void apply(JournalAndStream jas) throws IOException { } catch (IOException ex) {
jas.startLogSegment(segmentTxId);
}
}, "starting log segment " + segmentTxId);
if (countActiveJournals() == 0) {
throw new IOException("Unable to start log segment " + throw new IOException("Unable to start log segment " +
segmentTxId + ": no journals successfully started."); segmentTxId + ": no journals successfully started.");
} }
@ -881,14 +763,12 @@ synchronized void endCurrentLogSegment(boolean writeEndTxn) {
final long lastTxId = getLastWrittenTxId(); final long lastTxId = getLastWrittenTxId();
mapJournalsAndReportErrors(new JournalClosure() { try {
@Override journalSet.finalizeLogSegment(curSegmentTxId, lastTxId);
public void apply(JournalAndStream jas) throws IOException { editLogStream = null;
if (jas.isActive()) { } catch (IOException e) {
jas.close(lastTxId); //All journals have failed, it will be handled in logSync.
} }
}
}, "ending log segment");
state = State.BETWEEN_LOG_SEGMENTS; state = State.BETWEEN_LOG_SEGMENTS;
} }
@ -897,14 +777,15 @@ public void apply(JournalAndStream jas) throws IOException {
* Abort all current logs. Called from the backup node. * Abort all current logs. Called from the backup node.
*/ */
synchronized void abortCurrentLogSegment() { synchronized void abortCurrentLogSegment() {
mapJournalsAndReportErrors(new JournalClosure() { try {
//Check for null, as abort can be called any time.
@Override if (editLogStream != null) {
public void apply(JournalAndStream jas) throws IOException { editLogStream.abort();
jas.abort(); editLogStream = null;
} }
}, "aborting all streams"); } catch (IOException e) {
state = State.BETWEEN_LOG_SEGMENTS; LOG.warn("All journals failed to abort", e);
}
} }
/** /**
@ -920,13 +801,12 @@ public void purgeLogsOlderThan(final long minTxIdToKeep) {
"cannot purge logs older than txid " + minTxIdToKeep + "cannot purge logs older than txid " + minTxIdToKeep +
" when current segment starts at " + curSegmentTxId; " when current segment starts at " + curSegmentTxId;
} }
mapJournalsAndReportErrors(new JournalClosure() { try {
@Override journalSet.purgeLogsOlderThan(minTxIdToKeep);
public void apply(JournalAndStream jas) throws IOException { } catch (IOException ex) {
jas.manager.purgeLogsOlderThan(minTxIdToKeep); //All journals have failed, it will be handled in logSync.
} }
}, "purging logs older than " + minTxIdToKeep);
} }
@ -954,9 +834,7 @@ synchronized long getSyncTxId() {
// sets the initial capacity of the flush buffer. // sets the initial capacity of the flush buffer.
public void setOutputBufferCapacity(int size) { public void setOutputBufferCapacity(int size) {
for (JournalAndStream jas : journals) { journalSet.setOutputBufferCapacity(size);
jas.manager.setOutputBufferCapacity(size);
}
} }
/** /**
@ -977,7 +855,7 @@ synchronized void registerBackupNode(
if(bnReg.isRole(NamenodeRole.CHECKPOINT)) if(bnReg.isRole(NamenodeRole.CHECKPOINT))
return; // checkpoint node does not stream edits return; // checkpoint node does not stream edits
JournalAndStream jas = findBackupJournalAndStream(bnReg); JournalManager jas = findBackupJournal(bnReg);
if (jas != null) { if (jas != null) {
// already registered // already registered
LOG.info("Backup node " + bnReg + " re-registers"); LOG.info("Backup node " + bnReg + " re-registers");
@ -986,35 +864,29 @@ synchronized void registerBackupNode(
LOG.info("Registering new backup node: " + bnReg); LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
journals.add(new JournalAndStream(bjm)); journalSet.add(bjm);
} }
synchronized void releaseBackupStream(NamenodeRegistration registration) { synchronized void releaseBackupStream(NamenodeRegistration registration)
for (Iterator<JournalAndStream> iter = journals.iterator(); throws IOException {
iter.hasNext();) { BackupJournalManager bjm = this.findBackupJournal(registration);
JournalAndStream jas = iter.next(); if (bjm != null) {
if (jas.manager instanceof BackupJournalManager && LOG.info("Removing backup journal " + bjm);
((BackupJournalManager)jas.manager).matchesRegistration( journalSet.remove(bjm);
registration)) {
jas.abort();
LOG.info("Removing backup journal " + jas);
iter.remove();
}
} }
} }
/** /**
* Find the JournalAndStream associated with this BackupNode. * Find the JournalAndStream associated with this BackupNode.
*
* @return null if it cannot be found * @return null if it cannot be found
*/ */
private synchronized JournalAndStream findBackupJournalAndStream( private synchronized BackupJournalManager findBackupJournal(
NamenodeRegistration bnReg) { NamenodeRegistration bnReg) {
for (JournalAndStream jas : journals) { for (JournalManager bjm : journalSet.getJournalManagers()) {
if (jas.manager instanceof BackupJournalManager) { if ((bjm instanceof BackupJournalManager)
BackupJournalManager bjm = (BackupJournalManager)jas.manager; && ((BackupJournalManager) bjm).matchesRegistration(bnReg)) {
if (bjm.matchesRegistration(bnReg)) { return (BackupJournalManager) bjm;
return jas;
}
} }
} }
return null; return null;
@ -1026,124 +898,24 @@ private synchronized JournalAndStream findBackupJournalAndStream(
*/ */
synchronized void logEdit(final int length, final byte[] data) { synchronized void logEdit(final int length, final byte[] data) {
long start = beginTransaction(); long start = beginTransaction();
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().writeRaw(data, 0, length); // TODO writeRaw
}
}
}, "Logging edit");
try {
editLogStream.writeRaw(data, 0, length);
} catch (IOException ex) {
// All journals have failed, it will be handled in logSync.
}
endTransaction(start); endTransaction(start);
} }
//// Iteration across journals
private interface JournalClosure {
public void apply(JournalAndStream jas) throws IOException;
}
/**
* Apply the given function across all of the journal managers, disabling
* any for which the closure throws an IOException.
* @param status message used for logging errors (e.g. "opening journal")
*/
private void mapJournalsAndReportErrors(
JournalClosure closure, String status) {
List<JournalAndStream> badJAS = Lists.newLinkedList();
for (JournalAndStream jas : journals) {
try {
closure.apply(jas);
} catch (Throwable t) {
LOG.error("Error " + status + " (journal " + jas + ")", t);
badJAS.add(jas);
}
}
disableAndReportErrorOnJournals(badJAS);
}
/**
* Called when some journals experience an error in some operation.
* This propagates errors to the storage level.
*/
private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
if (badJournals == null || badJournals.isEmpty()) {
return; // nothing to do
}
for (JournalAndStream j : badJournals) {
LOG.error("Disabling journal " + j);
j.abort();
}
}
/**
* Find the best editlog input stream to read from txid. In this case
* best means the editlog which has the largest continuous range of
* transactions starting from the transaction id, fromTxId.
*
* If a journal throws an CorruptionException while reading from a txn id,
* it means that it has more transactions, but can't find any from fromTxId.
* If this is the case and no other journal has transactions, we should throw
* an exception as it means more transactions exist, we just can't load them.
*
* @param fromTxId Transaction id to start from.
* @return a edit log input stream with tranactions fromTxId
* or null if no more exist
*/
private EditLogInputStream selectStream(long fromTxId)
throws IOException {
JournalManager bestjm = null;
long bestjmNumTxns = 0;
CorruptionException corruption = null;
for (JournalAndStream jas : journals) {
JournalManager candidate = jas.getManager();
long candidateNumTxns = 0;
try {
candidateNumTxns = candidate.getNumberOfTransactions(fromTxId);
} catch (CorruptionException ce) {
corruption = ce;
} catch (IOException ioe) {
LOG.warn("Error reading number of transactions from " + candidate);
continue; // error reading disk, just skip
}
if (candidateNumTxns > bestjmNumTxns) {
bestjm = candidate;
bestjmNumTxns = candidateNumTxns;
}
}
if (bestjm == null) {
/**
* If all candidates either threw a CorruptionException or
* found 0 transactions, then a gap exists.
*/
if (corruption != null) {
throw new IOException("Gap exists in logs from "
+ fromTxId, corruption);
} else {
return null;
}
}
return bestjm.getInputStream(fromTxId);
}
/** /**
* Run recovery on all journals to recover any unclosed segments * Run recovery on all journals to recover any unclosed segments
*/ */
void recoverUnclosedStreams() { void recoverUnclosedStreams() {
mapJournalsAndReportErrors(new JournalClosure() { try {
@Override journalSet.recoverUnfinalizedSegments();
public void apply(JournalAndStream jas) throws IOException { } catch (IOException ex) {
jas.manager.recoverUnfinalizedSegments(); // All journals have failed, it is handled in logSync.
} }
}, "recovering unclosed streams");
} }
/** /**
@ -1151,23 +923,16 @@ public void apply(JournalAndStream jas) throws IOException {
* @param fromTxId first transaction in the selected streams * @param fromTxId first transaction in the selected streams
* @param toAtLeast the selected streams must contain this transaction * @param toAtLeast the selected streams must contain this transaction
*/ */
Collection<EditLogInputStream> selectInputStreams(long fromTxId, long toAtLeastTxId) Collection<EditLogInputStream> selectInputStreams(long fromTxId,
throws IOException { long toAtLeastTxId) throws IOException {
List<EditLogInputStream> streams = Lists.newArrayList(); List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
EditLogInputStream stream = journalSet.getInputStream(fromTxId);
boolean gapFound = false;
EditLogInputStream stream = selectStream(fromTxId);
while (stream != null) { while (stream != null) {
fromTxId = stream.getLastTxId() + 1; fromTxId = stream.getLastTxId() + 1;
streams.add(stream); streams.add(stream);
try { stream = journalSet.getInputStream(fromTxId);
stream = selectStream(fromTxId);
} catch (IOException ioe) {
gapFound = true;
break;
}
} }
if (fromTxId <= toAtLeastTxId || gapFound) { if (fromTxId <= toAtLeastTxId) {
closeAllStreams(streams); closeAllStreams(streams);
throw new IOException("No non-corrupt logs for txid " throw new IOException("No non-corrupt logs for txid "
+ fromTxId); + fromTxId);
@ -1184,75 +949,4 @@ static void closeAllStreams(Iterable<EditLogInputStream> streams) {
IOUtils.closeStream(s); IOUtils.closeStream(s);
} }
} }
/**
* Container for a JournalManager paired with its currently
* active stream.
*
* If a Journal gets disabled due to an error writing to its
* stream, then the stream will be aborted and set to null.
*/
static class JournalAndStream {
private final JournalManager manager;
private EditLogOutputStream stream;
private long segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
private JournalAndStream(JournalManager manager) {
this.manager = manager;
}
private void startLogSegment(long txId) throws IOException {
Preconditions.checkState(stream == null);
stream = manager.startLogSegment(txId);
segmentStartsAtTxId = txId;
}
private void close(long lastTxId) throws IOException {
Preconditions.checkArgument(lastTxId >= segmentStartsAtTxId,
"invalid segment: lastTxId %s >= " +
"segment starting txid %s", lastTxId, segmentStartsAtTxId);
if (stream == null) return;
stream.close();
manager.finalizeLogSegment(segmentStartsAtTxId, lastTxId);
stream = null;
}
@VisibleForTesting
void abort() {
if (stream == null) return;
try {
stream.abort();
} catch (IOException ioe) {
LOG.error("Unable to abort stream " + stream, ioe);
}
stream = null;
segmentStartsAtTxId = HdfsConstants.INVALID_TXID;
}
private boolean isActive() {
return stream != null;
}
@VisibleForTesting
EditLogOutputStream getCurrentStream() {
return stream;
}
@Override
public String toString() {
return "JournalAndStream(mgr=" + manager +
", " + "stream=" + stream + ")";
}
@VisibleForTesting
void setCurrentStreamForTests(EditLogOutputStream stream) {
this.stream = stream;
}
@VisibleForTesting
JournalManager getManager() {
return manager;
}
}
} }

View File

@ -23,7 +23,6 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.HashMap;
import java.util.Comparator; import java.util.Comparator;
import java.util.Collections; import java.util.Collections;
import java.util.regex.Matcher; import java.util.regex.Matcher;

View File

@ -19,6 +19,7 @@
import java.io.IOException; import java.io.IOException;
/** /**
* A JournalManager is responsible for managing a single place of storing * A JournalManager is responsible for managing a single place of storing
* edit logs. It may correspond to multiple files, a backup node, etc. * edit logs. It may correspond to multiple files, a backup node, etc.

View File

@ -0,0 +1,549 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
/**
* Manages a collection of Journals. None of the methods are synchronized, it is
* assumed that FSEditLog methods, that use this class, use proper
* synchronization.
*/
public class JournalSet implements JournalManager {
static final Log LOG = LogFactory.getLog(FSEditLog.class);
/**
* Container for a JournalManager paired with its currently
* active stream.
*
* If a Journal gets disabled due to an error writing to its
* stream, then the stream will be aborted and set to null.
*
* This should be used outside JournalSet only for testing.
*/
@VisibleForTesting
static class JournalAndStream {
private final JournalManager journal;
private boolean disabled = false;
private EditLogOutputStream stream;
public JournalAndStream(JournalManager manager) {
this.journal = manager;
}
public void startLogSegment(long txId) throws IOException {
Preconditions.checkState(stream == null);
disabled = false;
stream = journal.startLogSegment(txId);
}
/**
* Closes the stream, also sets it to null.
*/
public void close() throws IOException {
if (stream == null) return;
stream.close();
stream = null;
}
/**
* Aborts the stream, also sets it to null.
*/
public void abort() {
if (stream == null) return;
try {
stream.abort();
} catch (IOException ioe) {
LOG.error("Unable to abort stream " + stream, ioe);
}
stream = null;
}
boolean isActive() {
return stream != null;
}
/**
* Should be used outside JournalSet only for testing.
*/
EditLogOutputStream getCurrentStream() {
return stream;
}
@Override
public String toString() {
return "JournalAndStream(mgr=" + journal +
", " + "stream=" + stream + ")";
}
void setCurrentStreamForTests(EditLogOutputStream stream) {
this.stream = stream;
}
JournalManager getManager() {
return journal;
}
private boolean isDisabled() {
return disabled;
}
private void setDisabled(boolean disabled) {
this.disabled = disabled;
}
}
private List<JournalAndStream> journals = Lists.newArrayList();
@Override
public EditLogOutputStream startLogSegment(final long txId) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.startLogSegment(txId);
}
}, "starting log segment " + txId);
return new JournalSetOutputStream();
}
@Override
public void finalizeLogSegment(final long firstTxId, final long lastTxId)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.close();
jas.getManager().finalizeLogSegment(firstTxId, lastTxId);
}
}
}, "finalize log segment " + firstTxId + ", " + lastTxId);
}
/**
* Find the best editlog input stream to read from txid.
* If a journal throws an CorruptionException while reading from a txn id,
* it means that it has more transactions, but can't find any from fromTxId.
* If this is the case and no other journal has transactions, we should throw
* an exception as it means more transactions exist, we just can't load them.
*
* @param fromTxnId Transaction id to start from.
* @return A edit log input stream with tranactions fromTxId
* or null if no more exist
*/
@Override
public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
JournalManager bestjm = null;
long bestjmNumTxns = 0;
CorruptionException corruption = null;
for (JournalAndStream jas : journals) {
JournalManager candidate = jas.getManager();
long candidateNumTxns = 0;
try {
candidateNumTxns = candidate.getNumberOfTransactions(fromTxnId);
} catch (CorruptionException ce) {
corruption = ce;
} catch (IOException ioe) {
continue; // error reading disk, just skip
}
if (candidateNumTxns > bestjmNumTxns) {
bestjm = candidate;
bestjmNumTxns = candidateNumTxns;
}
}
if (bestjm == null) {
if (corruption != null) {
throw new IOException("No non-corrupt logs for txid "
+ fromTxnId, corruption);
} else {
return null;
}
}
return bestjm.getInputStream(fromTxnId);
}
@Override
public long getNumberOfTransactions(long fromTxnId) throws IOException {
long num = 0;
for (JournalAndStream jas: journals) {
if (jas.isActive()) {
long newNum = jas.getManager().getNumberOfTransactions(fromTxnId);
if (newNum > num) {
num = newNum;
}
}
}
return num;
}
/**
* Returns true if there are no journals or all are disabled.
* @return True if no journals or all are disabled.
*/
public boolean isEmpty() {
for (JournalAndStream jas : journals) {
if (!jas.isDisabled()) {
return false;
}
}
return true;
}
/**
* Called when some journals experience an error in some operation.
*/
private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
if (badJournals == null || badJournals.isEmpty()) {
return; // nothing to do
}
for (JournalAndStream j : badJournals) {
LOG.error("Disabling journal " + j);
j.abort();
j.setDisabled(true);
}
}
/**
* Implementations of this interface encapsulate operations that can be
* iteratively applied on all the journals. For example see
* {@link JournalSet#mapJournalsAndReportErrors}.
*/
private interface JournalClosure {
/**
* The operation on JournalAndStream.
* @param jas Object on which operations are performed.
* @throws IOException
*/
public void apply(JournalAndStream jas) throws IOException;
}
/**
* Apply the given operation across all of the journal managers, disabling
* any for which the closure throws an IOException.
* @param closure {@link JournalClosure} object encapsulating the operation.
* @param status message used for logging errors (e.g. "opening journal")
* @throws IOException If the operation fails on all the journals.
*/
private void mapJournalsAndReportErrors(
JournalClosure closure, String status) throws IOException{
List<JournalAndStream> badJAS = Lists.newLinkedList();
for (JournalAndStream jas : journals) {
try {
closure.apply(jas);
} catch (Throwable t) {
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
badJAS.add(jas);
}
}
disableAndReportErrorOnJournals(badJAS);
if (badJAS.size() >= journals.size()) {
LOG.error("Error: "+status+" failed for all journals");
throw new IOException(status+" failed on all the journals");
}
}
/**
* An implementation of EditLogOutputStream that applies a requested method on
* all the journals that are currently active.
*/
private class JournalSetOutputStream extends EditLogOutputStream {
JournalSetOutputStream() throws IOException {
super();
}
@Override
void write(final FSEditLogOp op)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().write(op);
}
}
}, "write op");
}
@Override
void writeRaw(final byte[] data, final int offset, final int length)
throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().writeRaw(data, offset, length);
}
}
}, "write bytes");
}
@Override
void create() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().create();
}
}
}, "create");
}
@Override
public void close() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.close();
}
}, "close");
}
@Override
public void abort() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.abort();
}
}, "abort");
}
@Override
void setReadyToFlush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().setReadyToFlush();
}
}
}, "setReadyToFlush");
}
@Override
protected void flushAndSync() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flushAndSync();
}
}
}, "flushAndSync");
}
@Override
public void flush() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
if (jas.isActive()) {
jas.getCurrentStream().flush();
}
}
}, "flush");
}
@Override
public boolean shouldForceSync() {
for (JournalAndStream js : journals) {
if (js.isActive() && js.getCurrentStream().shouldForceSync()) {
return true;
}
}
return false;
}
@Override
protected long getNumSync() {
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
return jas.getCurrentStream().getNumSync();
}
}
return 0;
}
}
@Override
public void setOutputBufferCapacity(final int size) {
try {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().setOutputBufferCapacity(size);
}
}, "setOutputBufferCapacity");
} catch (IOException e) {
LOG.error("Error in setting outputbuffer capacity");
}
}
@VisibleForTesting
List<JournalAndStream> getAllJournalStreams() {
return journals;
}
List<JournalManager> getJournalManagers() {
List<JournalManager> jList = new ArrayList<JournalManager>();
for (JournalAndStream j : journals) {
jList.add(j.getManager());
}
return jList;
}
void add(JournalManager j) {
journals.add(new JournalAndStream(j));
}
void remove(JournalManager j) {
JournalAndStream jasToRemove = null;
for (JournalAndStream jas: journals) {
if (jas.getManager().equals(j)) {
jasToRemove = jas;
break;
}
}
if (jasToRemove != null) {
jasToRemove.abort();
journals.remove(jasToRemove);
}
}
@Override
public void purgeLogsOlderThan(final long minTxIdToKeep) throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().purgeLogsOlderThan(minTxIdToKeep);
}
}, "purgeLogsOlderThan " + minTxIdToKeep);
}
@Override
public void recoverUnfinalizedSegments() throws IOException {
mapJournalsAndReportErrors(new JournalClosure() {
@Override
public void apply(JournalAndStream jas) throws IOException {
jas.getManager().recoverUnfinalizedSegments();
}
}, "recoverUnfinalizedSegments");
}
/**
* Return a manifest of what finalized edit logs are available. All available
* edit logs are returned starting from the transaction id passed.
*
* @param fromTxId Starting transaction id to read the logs.
* @return RemoteEditLogManifest object.
*/
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
// Collect RemoteEditLogs available from each FileJournalManager
List<RemoteEditLog> allLogs = Lists.newArrayList();
for (JournalAndStream j : journals) {
if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager();
try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
} catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t);
}
}
}
// Group logs by their starting txid
ImmutableListMultimap<Long, RemoteEditLog> logsByStartTxId =
Multimaps.index(allLogs, RemoteEditLog.GET_START_TXID);
long curStartTxId = fromTxId;
List<RemoteEditLog> logs = Lists.newArrayList();
while (true) {
ImmutableList<RemoteEditLog> logGroup = logsByStartTxId.get(curStartTxId);
if (logGroup.isEmpty()) {
// we have a gap in logs - for example because we recovered some old
// storage directory with ancient logs. Clear out any logs we've
// accumulated so far, and then skip to the next segment of logs
// after the gap.
SortedSet<Long> startTxIds = Sets.newTreeSet(logsByStartTxId.keySet());
startTxIds = startTxIds.tailSet(curStartTxId);
if (startTxIds.isEmpty()) {
break;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Found gap in logs at " + curStartTxId + ": " +
"not returning previous logs in manifest.");
}
logs.clear();
curStartTxId = startTxIds.first();
continue;
}
}
// Find the one that extends the farthest forward
RemoteEditLog bestLog = Collections.max(logGroup);
logs.add(bestLog);
// And then start looking from after that point
curStartTxId = bestLog.getEndTxId() + 1;
}
RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated manifest for logs since " + fromTxId + ":"
+ ret);
}
return ret;
}
/**
* Add sync times to the buffer.
*/
String getSyncTimes() {
StringBuilder buf = new StringBuilder();
for (JournalAndStream jas : journals) {
if (jas.isActive()) {
buf.append(jas.getCurrentStream().getTotalSyncTime());
buf.append(" ");
}
}
return buf.toString();
}
}

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -33,6 +34,7 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -73,7 +75,7 @@ public void shutDownMiniCluster() throws IOException {
public void testSingleFailedEditsDirOnFlush() throws IOException { public void testSingleFailedEditsDirOnFlush() throws IOException {
assertTrue(doAnEdit()); assertTrue(doAnEdit());
// Invalidate one edits journal. // Invalidate one edits journal.
invalidateEditsDirAtIndex(0, true); invalidateEditsDirAtIndex(0, true, false);
// Make sure runtime.exit(...) hasn't been called at all yet. // Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0); assertExitInvocations(0);
assertTrue(doAnEdit()); assertTrue(doAnEdit());
@ -86,8 +88,22 @@ public void testSingleFailedEditsDirOnFlush() throws IOException {
public void testAllEditsDirsFailOnFlush() throws IOException { public void testAllEditsDirsFailOnFlush() throws IOException {
assertTrue(doAnEdit()); assertTrue(doAnEdit());
// Invalidate both edits journals. // Invalidate both edits journals.
invalidateEditsDirAtIndex(0, true); invalidateEditsDirAtIndex(0, true, false);
invalidateEditsDirAtIndex(1, true); invalidateEditsDirAtIndex(1, true, false);
// Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0);
assertTrue(doAnEdit());
// The previous edit could not be synced to any persistent storage, should
// have halted the NN.
assertExitInvocations(1);
}
@Test
public void testAllEditsDirFailOnWrite() throws IOException {
assertTrue(doAnEdit());
// Invalidate both edits journals.
invalidateEditsDirAtIndex(0, true, true);
invalidateEditsDirAtIndex(1, true, true);
// Make sure runtime.exit(...) hasn't been called at all yet. // Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0); assertExitInvocations(0);
assertTrue(doAnEdit()); assertTrue(doAnEdit());
@ -100,7 +116,7 @@ public void testAllEditsDirsFailOnFlush() throws IOException {
public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException { public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
assertTrue(doAnEdit()); assertTrue(doAnEdit());
// Invalidate one edits journal. // Invalidate one edits journal.
invalidateEditsDirAtIndex(0, false); invalidateEditsDirAtIndex(0, false, false);
// Make sure runtime.exit(...) hasn't been called at all yet. // Make sure runtime.exit(...) hasn't been called at all yet.
assertExitInvocations(0); assertExitInvocations(0);
assertTrue(doAnEdit()); assertTrue(doAnEdit());
@ -117,16 +133,18 @@ public void testSingleFailedEditsDirOnSetReadyToFlush() throws IOException {
* @return the original <code>EditLogOutputStream</code> of the journal. * @return the original <code>EditLogOutputStream</code> of the journal.
*/ */
private EditLogOutputStream invalidateEditsDirAtIndex(int index, private EditLogOutputStream invalidateEditsDirAtIndex(int index,
boolean failOnFlush) throws IOException { boolean failOnFlush, boolean failOnWrite) throws IOException {
FSImage fsimage = cluster.getNamesystem().getFSImage(); FSImage fsimage = cluster.getNamesystem().getFSImage();
FSEditLog editLog = fsimage.getEditLog(); FSEditLog editLog = fsimage.getEditLog();
FSEditLog.JournalAndStream jas = editLog.getJournals().get(index); JournalAndStream jas = editLog.getJournals().get(index);
EditLogFileOutputStream elos = EditLogFileOutputStream elos =
(EditLogFileOutputStream) jas.getCurrentStream(); (EditLogFileOutputStream) jas.getCurrentStream();
EditLogFileOutputStream spyElos = spy(elos); EditLogFileOutputStream spyElos = spy(elos);
if (failOnWrite) {
doThrow(new IOException("fail on write()")).when(spyElos).write(
(FSEditLogOp) any());
}
if (failOnFlush) { if (failOnFlush) {
doThrow(new IOException("fail on flush()")).when(spyElos).flush(); doThrow(new IOException("fail on flush()")).when(spyElos).flush();
} else { } else {
@ -151,7 +169,7 @@ private void restoreEditsDirAtIndex(int index, EditLogOutputStream elos) {
FSImage fsimage = cluster.getNamesystem().getFSImage(); FSImage fsimage = cluster.getNamesystem().getFSImage();
FSEditLog editLog = fsimage.getEditLog(); FSEditLog editLog = fsimage.getEditLog();
FSEditLog.JournalAndStream jas = editLog.getJournals().get(index); JournalAndStream jas = editLog.getJournals().get(index);
jas.setCurrentStreamForTests(elos); jas.setCurrentStreamForTests(elos);
} }

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -356,7 +357,7 @@ public void testSaveImageWhileSyncInProgress() throws Exception {
FSImage fsimage = namesystem.getFSImage(); FSImage fsimage = namesystem.getFSImage();
FSEditLog editLog = fsimage.getEditLog(); FSEditLog editLog = fsimage.getEditLog();
FSEditLog.JournalAndStream jas = editLog.getJournals().get(0); JournalAndStream jas = editLog.getJournals().get(0);
EditLogFileOutputStream spyElos = EditLogFileOutputStream spyElos =
spy((EditLogFileOutputStream)jas.getCurrentStream()); spy((EditLogFileOutputStream)jas.getCurrentStream());
jas.setCurrentStreamForTests(spyElos); jas.setCurrentStreamForTests(spyElos);

View File

@ -44,6 +44,8 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getFinalizedEditsFileName;
import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName; import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getImageFileName;
@ -120,7 +122,7 @@ public void invalidateStorage(FSImage fi, Set<File> filesToInvalidate) throws IO
// simulate an error // simulate an error
fi.getStorage().reportErrorsOnDirectories(al); fi.getStorage().reportErrorsOnDirectories(al);
for (FSEditLog.JournalAndStream j : fi.getEditLog().getJournals()) { for (JournalAndStream j : fi.getEditLog().getJournals()) {
if (j.getManager() instanceof FileJournalManager) { if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fm = (FileJournalManager)j.getManager(); FileJournalManager fm = (FileJournalManager)j.getManager();
if (fm.getStorageDirectory().getRoot().equals(path2) if (fm.getStorageDirectory().getRoot().equals(path2)