HDFS-2634. Standby needs to ingest latest edit logs before transitioning to active. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1212187 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-08 23:55:40 +00:00
parent d9ea5bb489
commit 2481474bd9
11 changed files with 331 additions and 48 deletions

View File

@ -47,3 +47,5 @@ HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop ProtocolTransl
HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse type (todd)
HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses (todd)
HDFS-2634. Standby needs to ingest latest edit logs before transitioning to active (todd)

View File

@ -345,7 +345,7 @@ private synchronized void setState(BNState newState) {
synchronized void namenodeStartedLogSegment(long txid)
throws IOException {
LOG.info("NameNode started a new log segment at txid " + txid);
if (editLog.isOpenForWrite()) {
if (editLog.isSegmentOpen()) {
if (editLog.getLastWrittenTxId() == txid - 1) {
// We are in sync with the NN, so end and finalize the current segment
editLog.endCurrentLogSegment(false);

View File

@ -249,14 +249,42 @@ synchronized void openForWrite() throws IOException {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state);
startLogSegment(getLastWrittenTxId() + 1, true);
long segmentTxId = getLastWrittenTxId() + 1;
// Safety check: we should never start a segment if there are
// newer txids readable.
EditLogInputStream s = journalSet.getInputStream(segmentTxId);
try {
Preconditions.checkState(s == null,
"Cannot start writing at txid %s when there is a stream " +
"available for read: %s", segmentTxId, s);
} finally {
IOUtils.closeStream(s);
}
startLogSegment(segmentTxId, true);
assert state == State.IN_SEGMENT : "Bad state: " + state;
}
/**
* @return true if the log is currently open in write mode, regardless
* of whether it actually has an open segment.
*/
synchronized boolean isOpenForWrite() {
return state == State.IN_SEGMENT ||
state == State.BETWEEN_LOG_SEGMENTS;
}
/**
* @return true if the log is open in write mode and has a segment open
* ready to take edits.
*/
synchronized boolean isSegmentOpen() {
return state == State.IN_SEGMENT;
}
/**
* @return true if the log is open in read mode.
*/
synchronized boolean isOpenForRead() {
return state == State.OPEN_FOR_READING;
}
@ -290,7 +318,7 @@ synchronized void close() {
*/
void logEdit(final FSEditLogOp op) {
synchronized (this) {
assert state != State.CLOSED && state != State.OPEN_FOR_READING :
assert isOpenForWrite() :
"bad state: " + state;
// wait if an automatic sync is scheduled
@ -386,7 +414,7 @@ public synchronized long getLastWrittenTxId() {
* @return the first transaction ID in the current log segment
*/
synchronized long getCurSegmentTxId() {
Preconditions.checkState(state == State.IN_SEGMENT,
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);
return curSegmentTxId;
}
@ -856,7 +884,7 @@ synchronized void startLogSegment(final long segmentTxId,
*/
synchronized void endCurrentLogSegment(boolean writeEndTxn) {
LOG.info("Ending log segment " + curSegmentTxId);
Preconditions.checkState(state == State.IN_SEGMENT,
Preconditions.checkState(isSegmentOpen(),
"Bad state: %s", state);
if (writeEndTxn) {
@ -1017,6 +1045,9 @@ synchronized void logEdit(final int length, final byte[] data) {
* Run recovery on all journals to recover any unclosed segments
*/
void recoverUnclosedStreams() {
Preconditions.checkState(
state == State.BETWEEN_LOG_SEGMENTS,
"May not recover segments - wrong state: %s", state);
try {
journalSet.recoverUnfinalizedSegments();
} catch (IOException ex) {

View File

@ -342,7 +342,7 @@ private void doUpgrade(FSNamesystem target) throws IOException {
assert curDir.exists() : "Current directory must exist.";
assert !prevDir.exists() : "prvious directory must not exist.";
assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
assert !editLog.isOpenForWrite() : "Edits log must not be open.";
assert !editLog.isSegmentOpen() : "Edits log must not be open.";
// rename current to tmp
NNStorage.rename(curDir, tmpDir);
@ -537,8 +537,6 @@ public FSEditLog getEditLog() {
void openEditLogForWrite() throws IOException {
assert editLog != null : "editLog must be initialized";
Preconditions.checkState(!editLog.isOpenForWrite(),
"edit log should not yet be open");
editLog.openForWrite();
storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId());
};
@ -580,13 +578,16 @@ boolean loadFSImage(FSNamesystem target) throws IOException {
Iterable<EditLogInputStream> editStreams = null;
// TODO(HA): We shouldn't run this when coming up in standby state
editLog.recoverUnclosedStreams();
if (editLog.isOpenForWrite()) {
// We only want to recover streams if we're going into Active mode.
editLog.recoverUnclosedStreams();
}
if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,
getLayoutVersion())) {
editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1,
inspector.getMaxSeenTxId());
inspector.getMaxSeenTxId(),
false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
@ -811,7 +812,7 @@ synchronized void saveNamespace(FSNamesystem source) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
boolean editLogWasOpen = editLog.isOpenForWrite();
boolean editLogWasOpen = editLog.isSegmentOpen();
if (editLogWasOpen) {
editLog.endCurrentLogSegment(true);

View File

@ -490,12 +490,24 @@ void startActiveServices() throws IOException {
LOG.info("Starting services required for active state");
writeLock();
try {
if (!dir.fsImage.editLog.isOpenForWrite()) {
FSEditLog editLog = dir.fsImage.getEditLog();
if (!editLog.isSegmentOpen()) {
// During startup, we're already open for write during initialization.
// TODO(HA): consider adding a startup state?
dir.fsImage.editLog.initJournalsForWrite();
editLog.initJournalsForWrite();
// May need to recover
dir.fsImage.editLog.recoverUnclosedStreams();
editLog.recoverUnclosedStreams();
LOG.info("Catching up to latest edits from old active before " +
"taking over writer role in edits logs.");
editLogTailer.catchupDuringFailover();
long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
LOG.info("Will take over writing edit logs at txnid " +
nextTxId);
editLog.setNextTxId(nextTxId);
dir.fsImage.editLog.openForWrite();
}
if (UserGroupInformation.isSecurityEnabled()) {

View File

@ -96,7 +96,7 @@ synchronized public void finalizeLogSegment(long firstTxId, long lastTxId)
"Can't finalize edits file " + inprogressFile + " since finalized file " +
"already exists");
if (!inprogressFile.renameTo(dstFile)) {
throw new IOException("Unable to finalize edits file " + inprogressFile);
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
}
if (inprogressFile.equals(currentInProgress)) {
currentInProgress = null;
@ -147,7 +147,7 @@ List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if ((firstTxId > elf.getFirstTxId()) &&
(firstTxId <= elf.getLastTxId())) {
throw new IOException("Asked for firstTxId " + firstTxId
throw new IllegalStateException("Asked for firstTxId " + firstTxId
+ " which is in the middle of file " + elf.file);
}
}
@ -237,7 +237,17 @@ public long getNumberOfTransactions(long fromTxId)
if (elf.isInProgress()) {
break;
}
} // else skip
} else if (elf.getFirstTxId() < fromTxId &&
elf.getLastTxId() >= fromTxId) {
// Middle of a log segment - this should never happen
// since getLogFiles checks for it. But we should be
// paranoid about this case since it might result in
// overlapping txid ranges, etc, if we had a bug.
IOException ioe = new IOException("txid " + fromTxId +
" falls in the middle of file " + elf);
LOG.error("Broken invariant in edit log file management", ioe);
throw ioe;
}
}
if (LOG.isDebugEnabled()) {
@ -263,6 +273,7 @@ public long getNumberOfTransactions(long fromTxId)
@Override
synchronized public void recoverUnfinalizedSegments() throws IOException {
File currentDir = sd.getCurrentDir();
LOG.info("Recovering unfinalized segments in " + currentDir);
List<EditLogFile> allLogFiles = matchEditLogs(currentDir.listFiles());
// make sure journal is aware of max seen transaction before moving corrupt

View File

@ -204,6 +204,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
CorruptionException corruption = null;
for (JournalAndStream jas : journals) {
if (jas.isDisabled()) continue;
JournalManager candidate = jas.getManager();
long candidateNumTxns = 0;
try {
@ -211,6 +213,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
} catch (CorruptionException ce) {
corruption = ce;
} catch (IOException ioe) {
LOG.warn("Unable to read input streams from JournalManager " + candidate,
ioe);
continue; // error reading disk, just skip
}
@ -235,7 +239,10 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException {
public long getNumberOfTransactions(long fromTxnId) throws IOException {
long num = 0;
for (JournalAndStream jas: journals) {
if (jas.isActive()) {
if (jas.isDisabled()) {
LOG.info("Skipping jas " + jas + " since it's disabled");
continue;
} else {
long newNum = jas.getManager().getNumberOfTransactions(fromTxnId);
if (newNum > num) {
num = newNum;

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* EditLogTailer represents a thread which periodically reads from edits
@ -44,8 +45,15 @@ public class EditLogTailer {
private final EditLogTailerThread tailerThread;
private final FSNamesystem namesystem;
private final FSImage image;
private final FSEditLog editLog;
public EditLogTailer(FSNamesystem namesystem) {
this.tailerThread = new EditLogTailerThread(namesystem);
this.tailerThread = new EditLogTailerThread();
this.namesystem = namesystem;
this.image = namesystem.getFSImage();
this.editLog = namesystem.getEditLog();
}
public void start() {
@ -72,25 +80,45 @@ public void setSleepTime(long sleepTime) {
public void interrupt() {
tailerThread.interrupt();
}
public void catchupDuringFailover() throws IOException {
Preconditions.checkState(tailerThread == null ||
!tailerThread.isAlive(),
"Tailer thread should not be running once failover starts");
doTailEdits();
}
private void doTailEdits() throws IOException {
// TODO(HA) in a transition from active to standby,
// the following is wrong and ends up causing all of the
// last log segment to get re-read
long lastTxnId = image.getLastAppliedTxId();
if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams = editLog
.selectInputStreams(lastTxnId + 1, 0, false);
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}
long editsLoaded = image.loadEdits(streams, namesystem);
if (LOG.isDebugEnabled()) {
LOG.debug("editsLoaded: " + editsLoaded);
}
}
/**
* The thread which does the actual work of tailing edits journals and
* applying the transactions to the FSNS.
*/
private static class EditLogTailerThread extends Thread {
private FSNamesystem namesystem;
private FSImage image;
private FSEditLog editLog;
private class EditLogTailerThread extends Thread {
private volatile boolean shouldRun = true;
private long sleepTime = 60 * 1000;
private EditLogTailerThread(FSNamesystem namesystem) {
private EditLogTailerThread() {
super("Edit log tailer");
this.namesystem = namesystem;
image = namesystem.getFSImage();
editLog = namesystem.getEditLog();
}
private void setShouldRun(boolean shouldRun) {
@ -105,23 +133,8 @@ private void setSleepTime(long sleepTime) {
public void run() {
while (shouldRun) {
try {
long lastTxnId = image.getLastAppliedTxId();
if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
try {
// At least one record should be available.
Collection<EditLogInputStream> streams = editLog
.selectInputStreams(lastTxnId + 1, lastTxnId + 1, false);
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}
long editsLoaded = image.loadEdits(streams, namesystem);
if (LOG.isDebugEnabled()) {
LOG.debug("editsLoaded: " + editsLoaded);
}
doTailEdits();
} catch (IOException e) {
// Will try again
LOG.info("Got error, will try again.", e);

View File

@ -604,8 +604,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nameservice.getId()),
Joiner.on(",").join(nnIds));
if (manageNameDfsDirs) {
URI sharedEditsUri = fileAsURI(new File(base_dir, "shared-edits-" +
nnCounter + "-through-" + (nnCounter+nnIds.size()-1)));
URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1);
conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString());
}
}
@ -638,6 +637,11 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology,
}
public URI getSharedEditsDir(int minNN, int maxNN) throws IOException {
return fileAsURI(new File(base_dir, "shared-edits-" +
minNN + "-through-" + maxNN));
}
private void initNameNodeConf(Configuration conf,
String nameserviceId, String nnId,
boolean manageNameDfsDirs, int nnIndex)

View File

@ -34,6 +34,8 @@
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@ -189,6 +191,26 @@ public static FSEditLog createStandaloneEditLog(File logDir)
return editLog;
}
/**
* Create an aborted in-progress log in the given directory, containing
* only a specified number of "mkdirs" operations.
*/
public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs)
throws IOException {
FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir);
editLog.openForWrite();
PermissionStatus perms = PermissionStatus.createImmutable("fakeuser", "fakegroup",
FsPermission.createImmutable((short)0755));
for (int i = 1; i <= numDirs; i++) {
String dirName = "dir" + i;
INodeDirectory dir = new INodeDirectory(dirName, perms);
editLog.logMkDir("/" + dirName, dir);
}
editLog.logSync();
editLog.abortCurrentLogSegment();
}
/**
* Assert that all of the given directories have the same newest filename
* for fsimage that they hold the same data.

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
/**
* Test cases for the handling of edit logs during failover
* and startup of the standby node.
*/
public class TestEditLogsDuringFailover {
private static final Log LOG =
LogFactory.getLog(TestEditLogsDuringFailover.class);
private static final int NUM_DIRS_IN_LOG = 5;
@Test
public void testStartup() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
try {
// During HA startup, both nodes should be in
// standby and we shouldn't have any edits files
// in any edits directory!
List<URI> allDirs = Lists.newArrayList();
allDirs.addAll(cluster.getNameDirs(0));
allDirs.addAll(cluster.getNameDirs(1));
allDirs.add(cluster.getSharedEditsDir(0, 1));
assertNoEditFiles(allDirs);
// Set the first NN to active, make sure it creates edits
// in its own dirs and the shared dir. The standby
// should still have no edits!
cluster.getNameNode(0).getRpcServer().transitionToActive();
assertEditFiles(cluster.getNameDirs(0),
NNStorage.getInProgressEditsFileName(1));
assertEditFiles(
Collections.singletonList(cluster.getSharedEditsDir(0, 1)),
NNStorage.getInProgressEditsFileName(1));
assertNoEditFiles(cluster.getNameDirs(1));
cluster.getNameNode(0).getRpcServer().mkdirs("/test",
FsPermission.createImmutable((short)0755), true);
// Restarting the standby should not finalize any edits files
// in the shared directory when it starts up!
cluster.restartNameNode(1);
assertEditFiles(cluster.getNameDirs(0),
NNStorage.getInProgressEditsFileName(1));
assertEditFiles(
Collections.singletonList(cluster.getSharedEditsDir(0, 1)),
NNStorage.getInProgressEditsFileName(1));
assertNoEditFiles(cluster.getNameDirs(1));
// Additionally it should not have applied any in-progress logs
// at start-up -- otherwise, it would have read half-way into
// the current log segment, and on the next roll, it would have to
// either replay starting in the middle of the segment (not allowed)
// or double-replay the edits (incorrect).
assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test", true));
cluster.getNameNode(0).getRpcServer().mkdirs("/test2",
FsPermission.createImmutable((short)0755), true);
// If we restart NN0, it'll come back as standby, and we can
// transition NN1 to active and make sure it reads edits correctly at this point.
cluster.restartNameNode(0);
cluster.getNameNode(1).getRpcServer().transitionToActive();
// NN1 should have both the edits that came before its restart, and the edits that
// came after its restart.
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test", true));
assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test2", true));
} finally {
cluster.shutdown();
}
}
@Test
public void testFailoverFinalizesAndReadsInProgress() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
try {
// Create a fake in-progress edit-log in the shared directory
URI sharedUri = cluster.getSharedEditsDir(0, 1);
File sharedDir = new File(sharedUri.getPath(), "current");
FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG);
assertEditFiles(Collections.singletonList(sharedUri),
NNStorage.getInProgressEditsFileName(1));
// Transition one of the NNs to active
cluster.getNameNode(0).getRpcServer().transitionToActive();
// In the transition to active, it should have read the log -- and
// hence see one of the dirs we made in the fake log.
String testPath = "/dir" + NUM_DIRS_IN_LOG;
assertNotNull(cluster.getNameNode(0).getRpcServer().getFileInfo(testPath));
// It also should have finalized that log in the shared directory and started
// writing to a new one at the next txid.
assertEditFiles(Collections.singletonList(sharedUri),
NNStorage.getFinalizedEditsFileName(1, NUM_DIRS_IN_LOG + 1),
NNStorage.getInProgressEditsFileName(NUM_DIRS_IN_LOG + 2));
} finally {
cluster.shutdown();
}
}
/**
* Check that no edits files are present in the given storage dirs.
*/
private void assertNoEditFiles(Iterable<URI> dirs) throws IOException {
assertEditFiles(dirs, new String[]{});
}
/**
* Check that the given list of edits files are present in the given storage
* dirs.
*/
private void assertEditFiles(Iterable<URI> dirs, String ... files)
throws IOException {
for (URI u : dirs) {
File editDirRoot = new File(u.getPath());
File editDir = new File(editDirRoot, "current");
GenericTestUtils.assertExists(editDir);
if (files.length == 0) {
LOG.info("Checking no edit files exist in " + editDir);
} else {
LOG.info("Checking for following edit files in " + editDir
+ ": " + Joiner.on(",").join(files));
}
GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files);
}
}
}