diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 763245d69a5..925af24d937 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -47,3 +47,5 @@ HDFS-2624. ConfiguredFailoverProxyProvider doesn't correctly stop ProtocolTransl HDFS-2625. TestDfsOverAvroRpc failing after introduction of HeartbeatResponse type (todd) HDFS-2627. Determine DN's view of which NN is active based on heartbeat responses (todd) + +HDFS-2634. Standby needs to ingest latest edit logs before transitioning to active (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java index 4de70367a5a..4e28d83a528 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java @@ -345,7 +345,7 @@ private synchronized void setState(BNState newState) { synchronized void namenodeStartedLogSegment(long txid) throws IOException { LOG.info("NameNode started a new log segment at txid " + txid); - if (editLog.isOpenForWrite()) { + if (editLog.isSegmentOpen()) { if (editLog.getLastWrittenTxId() == txid - 1) { // We are in sync with the NN, so end and finalize the current segment editLog.endCurrentLogSegment(false); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index 97961b26af2..92ef2b5ee3b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -249,14 +249,42 @@ synchronized void openForWrite() throws IOException { Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, "Bad state: %s", state); - startLogSegment(getLastWrittenTxId() + 1, true); + long segmentTxId = getLastWrittenTxId() + 1; + // Safety check: we should never start a segment if there are + // newer txids readable. + EditLogInputStream s = journalSet.getInputStream(segmentTxId); + try { + Preconditions.checkState(s == null, + "Cannot start writing at txid %s when there is a stream " + + "available for read: %s", segmentTxId, s); + } finally { + IOUtils.closeStream(s); + } + + startLogSegment(segmentTxId, true); assert state == State.IN_SEGMENT : "Bad state: " + state; } + /** + * @return true if the log is currently open in write mode, regardless + * of whether it actually has an open segment. + */ synchronized boolean isOpenForWrite() { + return state == State.IN_SEGMENT || + state == State.BETWEEN_LOG_SEGMENTS; + } + + /** + * @return true if the log is open in write mode and has a segment open + * ready to take edits. + */ + synchronized boolean isSegmentOpen() { return state == State.IN_SEGMENT; } + /** + * @return true if the log is open in read mode. + */ synchronized boolean isOpenForRead() { return state == State.OPEN_FOR_READING; } @@ -290,7 +318,7 @@ synchronized void close() { */ void logEdit(final FSEditLogOp op) { synchronized (this) { - assert state != State.CLOSED && state != State.OPEN_FOR_READING : + assert isOpenForWrite() : "bad state: " + state; // wait if an automatic sync is scheduled @@ -386,7 +414,7 @@ public synchronized long getLastWrittenTxId() { * @return the first transaction ID in the current log segment */ synchronized long getCurSegmentTxId() { - Preconditions.checkState(state == State.IN_SEGMENT, + Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); return curSegmentTxId; } @@ -856,7 +884,7 @@ synchronized void startLogSegment(final long segmentTxId, */ synchronized void endCurrentLogSegment(boolean writeEndTxn) { LOG.info("Ending log segment " + curSegmentTxId); - Preconditions.checkState(state == State.IN_SEGMENT, + Preconditions.checkState(isSegmentOpen(), "Bad state: %s", state); if (writeEndTxn) { @@ -1017,6 +1045,9 @@ synchronized void logEdit(final int length, final byte[] data) { * Run recovery on all journals to recover any unclosed segments */ void recoverUnclosedStreams() { + Preconditions.checkState( + state == State.BETWEEN_LOG_SEGMENTS, + "May not recover segments - wrong state: %s", state); try { journalSet.recoverUnfinalizedSegments(); } catch (IOException ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 6f5533cdfd7..8ce90eb0e2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -342,7 +342,7 @@ private void doUpgrade(FSNamesystem target) throws IOException { assert curDir.exists() : "Current directory must exist."; assert !prevDir.exists() : "prvious directory must not exist."; assert !tmpDir.exists() : "prvious.tmp directory must not exist."; - assert !editLog.isOpenForWrite() : "Edits log must not be open."; + assert !editLog.isSegmentOpen() : "Edits log must not be open."; // rename current to tmp NNStorage.rename(curDir, tmpDir); @@ -537,8 +537,6 @@ public FSEditLog getEditLog() { void openEditLogForWrite() throws IOException { assert editLog != null : "editLog must be initialized"; - Preconditions.checkState(!editLog.isOpenForWrite(), - "edit log should not yet be open"); editLog.openForWrite(); storage.writeTransactionIdFileToStorage(editLog.getCurSegmentTxId()); }; @@ -580,13 +578,16 @@ boolean loadFSImage(FSNamesystem target) throws IOException { Iterable editStreams = null; - // TODO(HA): We shouldn't run this when coming up in standby state - editLog.recoverUnclosedStreams(); + if (editLog.isOpenForWrite()) { + // We only want to recover streams if we're going into Active mode. + editLog.recoverUnclosedStreams(); + } if (LayoutVersion.supports(Feature.TXID_BASED_LAYOUT, getLayoutVersion())) { editStreams = editLog.selectInputStreams(imageFile.getCheckpointTxId() + 1, - inspector.getMaxSeenTxId()); + inspector.getMaxSeenTxId(), + false); } else { editStreams = FSImagePreTransactionalStorageInspector .getEditLogStreams(storage); @@ -811,7 +812,7 @@ synchronized void saveNamespace(FSNamesystem source) throws IOException { assert editLog != null : "editLog must be initialized"; storage.attemptRestoreRemovedStorage(); - boolean editLogWasOpen = editLog.isOpenForWrite(); + boolean editLogWasOpen = editLog.isSegmentOpen(); if (editLogWasOpen) { editLog.endCurrentLogSegment(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index de7817987ee..9229926cae2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -490,12 +490,24 @@ void startActiveServices() throws IOException { LOG.info("Starting services required for active state"); writeLock(); try { - if (!dir.fsImage.editLog.isOpenForWrite()) { + FSEditLog editLog = dir.fsImage.getEditLog(); + + if (!editLog.isSegmentOpen()) { // During startup, we're already open for write during initialization. // TODO(HA): consider adding a startup state? - dir.fsImage.editLog.initJournalsForWrite(); + editLog.initJournalsForWrite(); // May need to recover - dir.fsImage.editLog.recoverUnclosedStreams(); + editLog.recoverUnclosedStreams(); + + LOG.info("Catching up to latest edits from old active before " + + "taking over writer role in edits logs."); + editLogTailer.catchupDuringFailover(); + + long nextTxId = dir.fsImage.getLastAppliedTxId() + 1; + LOG.info("Will take over writing edit logs at txnid " + + nextTxId); + editLog.setNextTxId(nextTxId); + dir.fsImage.editLog.openForWrite(); } if (UserGroupInformation.isSecurityEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index bf7bfde2da3..bbab3e58f54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -96,7 +96,7 @@ synchronized public void finalizeLogSegment(long firstTxId, long lastTxId) "Can't finalize edits file " + inprogressFile + " since finalized file " + "already exists"); if (!inprogressFile.renameTo(dstFile)) { - throw new IOException("Unable to finalize edits file " + inprogressFile); + throw new IllegalStateException("Unable to finalize edits file " + inprogressFile); } if (inprogressFile.equals(currentInProgress)) { currentInProgress = null; @@ -147,7 +147,7 @@ List getRemoteEditLogs(long firstTxId) throws IOException { ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); } else if ((firstTxId > elf.getFirstTxId()) && (firstTxId <= elf.getLastTxId())) { - throw new IOException("Asked for firstTxId " + firstTxId + throw new IllegalStateException("Asked for firstTxId " + firstTxId + " which is in the middle of file " + elf.file); } } @@ -237,7 +237,17 @@ public long getNumberOfTransactions(long fromTxId) if (elf.isInProgress()) { break; } - } // else skip + } else if (elf.getFirstTxId() < fromTxId && + elf.getLastTxId() >= fromTxId) { + // Middle of a log segment - this should never happen + // since getLogFiles checks for it. But we should be + // paranoid about this case since it might result in + // overlapping txid ranges, etc, if we had a bug. + IOException ioe = new IOException("txid " + fromTxId + + " falls in the middle of file " + elf); + LOG.error("Broken invariant in edit log file management", ioe); + throw ioe; + } } if (LOG.isDebugEnabled()) { @@ -263,6 +273,7 @@ public long getNumberOfTransactions(long fromTxId) @Override synchronized public void recoverUnfinalizedSegments() throws IOException { File currentDir = sd.getCurrentDir(); + LOG.info("Recovering unfinalized segments in " + currentDir); List allLogFiles = matchEditLogs(currentDir.listFiles()); // make sure journal is aware of max seen transaction before moving corrupt diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 58b1ca09e64..7af0b51b909 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -204,6 +204,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { CorruptionException corruption = null; for (JournalAndStream jas : journals) { + if (jas.isDisabled()) continue; + JournalManager candidate = jas.getManager(); long candidateNumTxns = 0; try { @@ -211,6 +213,8 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { } catch (CorruptionException ce) { corruption = ce; } catch (IOException ioe) { + LOG.warn("Unable to read input streams from JournalManager " + candidate, + ioe); continue; // error reading disk, just skip } @@ -235,7 +239,10 @@ public EditLogInputStream getInputStream(long fromTxnId) throws IOException { public long getNumberOfTransactions(long fromTxnId) throws IOException { long num = 0; for (JournalAndStream jas: journals) { - if (jas.isActive()) { + if (jas.isDisabled()) { + LOG.info("Skipping jas " + jas + " since it's disabled"); + continue; + } else { long newNum = jas.getManager().getNumberOfTransactions(fromTxnId); if (newNum > num) { num = newNum; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index c15629f38b8..e1ce570c093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * EditLogTailer represents a thread which periodically reads from edits @@ -44,8 +45,15 @@ public class EditLogTailer { private final EditLogTailerThread tailerThread; + private final FSNamesystem namesystem; + private final FSImage image; + private final FSEditLog editLog; + public EditLogTailer(FSNamesystem namesystem) { - this.tailerThread = new EditLogTailerThread(namesystem); + this.tailerThread = new EditLogTailerThread(); + this.namesystem = namesystem; + this.image = namesystem.getFSImage(); + this.editLog = namesystem.getEditLog(); } public void start() { @@ -72,25 +80,45 @@ public void setSleepTime(long sleepTime) { public void interrupt() { tailerThread.interrupt(); } + + public void catchupDuringFailover() throws IOException { + Preconditions.checkState(tailerThread == null || + !tailerThread.isAlive(), + "Tailer thread should not be running once failover starts"); + doTailEdits(); + } + + private void doTailEdits() throws IOException { + // TODO(HA) in a transition from active to standby, + // the following is wrong and ends up causing all of the + // last log segment to get re-read + long lastTxnId = image.getLastAppliedTxId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("lastTxnId: " + lastTxnId); + } + Collection streams = editLog + .selectInputStreams(lastTxnId + 1, 0, false); + if (LOG.isDebugEnabled()) { + LOG.debug("edit streams to load from: " + streams.size()); + } + + long editsLoaded = image.loadEdits(streams, namesystem); + if (LOG.isDebugEnabled()) { + LOG.debug("editsLoaded: " + editsLoaded); + } + } /** * The thread which does the actual work of tailing edits journals and * applying the transactions to the FSNS. */ - private static class EditLogTailerThread extends Thread { - - private FSNamesystem namesystem; - private FSImage image; - private FSEditLog editLog; - + private class EditLogTailerThread extends Thread { private volatile boolean shouldRun = true; private long sleepTime = 60 * 1000; - private EditLogTailerThread(FSNamesystem namesystem) { + private EditLogTailerThread() { super("Edit log tailer"); - this.namesystem = namesystem; - image = namesystem.getFSImage(); - editLog = namesystem.getEditLog(); } private void setShouldRun(boolean shouldRun) { @@ -105,23 +133,8 @@ private void setSleepTime(long sleepTime) { public void run() { while (shouldRun) { try { - long lastTxnId = image.getLastAppliedTxId(); - - if (LOG.isDebugEnabled()) { - LOG.debug("lastTxnId: " + lastTxnId); - } try { - // At least one record should be available. - Collection streams = editLog - .selectInputStreams(lastTxnId + 1, lastTxnId + 1, false); - if (LOG.isDebugEnabled()) { - LOG.debug("edit streams to load from: " + streams.size()); - } - - long editsLoaded = image.loadEdits(streams, namesystem); - if (LOG.isDebugEnabled()) { - LOG.debug("editsLoaded: " + editsLoaded); - } + doTailEdits(); } catch (IOException e) { // Will try again LOG.info("Got error, will try again.", e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 2a2699048d1..13352ab82aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -604,8 +604,7 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nameservice.getId()), Joiner.on(",").join(nnIds)); if (manageNameDfsDirs) { - URI sharedEditsUri = fileAsURI(new File(base_dir, "shared-edits-" + - nnCounter + "-through-" + (nnCounter+nnIds.size()-1))); + URI sharedEditsUri = getSharedEditsDir(nnCounter, nnCounter+nnIds.size()-1); conf.set(DFS_NAMENODE_SHARED_EDITS_DIR_KEY, sharedEditsUri.toString()); } } @@ -638,6 +637,11 @@ private void createNameNodesAndSetConf(MiniDFSNNTopology nnTopology, } + public URI getSharedEditsDir(int minNN, int maxNN) throws IOException { + return fileAsURI(new File(base_dir, "shared-edits-" + + minNN + "-through-" + maxNN)); + } + private void initNameNodeConf(Configuration conf, String nameserviceId, String nnId, boolean manageNameDfsDirs, int nnIndex) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java index 0269166b57d..9e9af7af617 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/FSImageTestUtil.java @@ -34,6 +34,8 @@ import java.util.Set; import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; @@ -189,6 +191,26 @@ public static FSEditLog createStandaloneEditLog(File logDir) return editLog; } + /** + * Create an aborted in-progress log in the given directory, containing + * only a specified number of "mkdirs" operations. + */ + public static void createAbortedLogWithMkdirs(File editsLogDir, int numDirs) + throws IOException { + FSEditLog editLog = FSImageTestUtil.createStandaloneEditLog(editsLogDir); + editLog.openForWrite(); + + PermissionStatus perms = PermissionStatus.createImmutable("fakeuser", "fakegroup", + FsPermission.createImmutable((short)0755)); + for (int i = 1; i <= numDirs; i++) { + String dirName = "dir" + i; + INodeDirectory dir = new INodeDirectory(dirName, perms); + editLog.logMkDir("/" + dirName, dir); + } + editLog.logSync(); + editLog.abortCurrentLogSegment(); + } + /** * Assert that all of the given directories have the same newest filename * for fsimage that they hold the same data. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java new file mode 100644 index 00000000000..1bbe33b72d8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogsDuringFailover.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * Test cases for the handling of edit logs during failover + * and startup of the standby node. + */ +public class TestEditLogsDuringFailover { + private static final Log LOG = + LogFactory.getLog(TestEditLogsDuringFailover.class); + private static final int NUM_DIRS_IN_LOG = 5; + + @Test + public void testStartup() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + // During HA startup, both nodes should be in + // standby and we shouldn't have any edits files + // in any edits directory! + List allDirs = Lists.newArrayList(); + allDirs.addAll(cluster.getNameDirs(0)); + allDirs.addAll(cluster.getNameDirs(1)); + allDirs.add(cluster.getSharedEditsDir(0, 1)); + assertNoEditFiles(allDirs); + + // Set the first NN to active, make sure it creates edits + // in its own dirs and the shared dir. The standby + // should still have no edits! + cluster.getNameNode(0).getRpcServer().transitionToActive(); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertEditFiles( + Collections.singletonList(cluster.getSharedEditsDir(0, 1)), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test", + FsPermission.createImmutable((short)0755), true); + + // Restarting the standby should not finalize any edits files + // in the shared directory when it starts up! + cluster.restartNameNode(1); + + assertEditFiles(cluster.getNameDirs(0), + NNStorage.getInProgressEditsFileName(1)); + assertEditFiles( + Collections.singletonList(cluster.getSharedEditsDir(0, 1)), + NNStorage.getInProgressEditsFileName(1)); + assertNoEditFiles(cluster.getNameDirs(1)); + + // Additionally it should not have applied any in-progress logs + // at start-up -- otherwise, it would have read half-way into + // the current log segment, and on the next roll, it would have to + // either replay starting in the middle of the segment (not allowed) + // or double-replay the edits (incorrect). + assertNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test", true)); + + cluster.getNameNode(0).getRpcServer().mkdirs("/test2", + FsPermission.createImmutable((short)0755), true); + + // If we restart NN0, it'll come back as standby, and we can + // transition NN1 to active and make sure it reads edits correctly at this point. + cluster.restartNameNode(0); + cluster.getNameNode(1).getRpcServer().transitionToActive(); + + // NN1 should have both the edits that came before its restart, and the edits that + // came after its restart. + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test", true)); + assertNotNull(NameNodeAdapter.getFileInfo(cluster.getNameNode(1), "/test2", true)); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testFailoverFinalizesAndReadsInProgress() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + try { + // Create a fake in-progress edit-log in the shared directory + URI sharedUri = cluster.getSharedEditsDir(0, 1); + File sharedDir = new File(sharedUri.getPath(), "current"); + FSImageTestUtil.createAbortedLogWithMkdirs(sharedDir, NUM_DIRS_IN_LOG); + assertEditFiles(Collections.singletonList(sharedUri), + NNStorage.getInProgressEditsFileName(1)); + + // Transition one of the NNs to active + cluster.getNameNode(0).getRpcServer().transitionToActive(); + + // In the transition to active, it should have read the log -- and + // hence see one of the dirs we made in the fake log. + String testPath = "/dir" + NUM_DIRS_IN_LOG; + assertNotNull(cluster.getNameNode(0).getRpcServer().getFileInfo(testPath)); + + // It also should have finalized that log in the shared directory and started + // writing to a new one at the next txid. + assertEditFiles(Collections.singletonList(sharedUri), + NNStorage.getFinalizedEditsFileName(1, NUM_DIRS_IN_LOG + 1), + NNStorage.getInProgressEditsFileName(NUM_DIRS_IN_LOG + 2)); + } finally { + cluster.shutdown(); + } + + } + + /** + * Check that no edits files are present in the given storage dirs. + */ + private void assertNoEditFiles(Iterable dirs) throws IOException { + assertEditFiles(dirs, new String[]{}); + } + + /** + * Check that the given list of edits files are present in the given storage + * dirs. + */ + private void assertEditFiles(Iterable dirs, String ... files) + throws IOException { + for (URI u : dirs) { + File editDirRoot = new File(u.getPath()); + File editDir = new File(editDirRoot, "current"); + GenericTestUtils.assertExists(editDir); + if (files.length == 0) { + LOG.info("Checking no edit files exist in " + editDir); + } else { + LOG.info("Checking for following edit files in " + editDir + + ": " + Joiner.on(",").join(files)); + } + + GenericTestUtils.assertGlobEquals(editDir, "edits_.*", files); + } + } +}