HDFS-2667. Fix transition from active to standby. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1215037 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
71071b904d
commit
cdb9f01ad4
|
@ -59,3 +59,5 @@ HDFS-2683. Authority-based lookup of proxy provider fails if path becomes canoni
|
||||||
HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress() (atm)
|
HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress() (atm)
|
||||||
|
|
||||||
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)
|
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)
|
||||||
|
|
||||||
|
HDFS-2667. Fix transition from active to standby (todd)
|
||||||
|
|
|
@ -678,9 +678,9 @@ public class FSImage implements Closeable {
|
||||||
for (EditLogInputStream editIn : editStreams) {
|
for (EditLogInputStream editIn : editStreams) {
|
||||||
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
|
LOG.info("Reading " + editIn + " expecting start txid #" + startingTxId);
|
||||||
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
|
int thisNumLoaded = loader.loadFSEdits(editIn, startingTxId);
|
||||||
|
lastAppliedTxId = startingTxId + thisNumLoaded - 1;
|
||||||
startingTxId += thisNumLoaded;
|
startingTxId += thisNumLoaded;
|
||||||
numLoaded += thisNumLoaded;
|
numLoaded += thisNumLoaded;
|
||||||
lastAppliedTxId += thisNumLoaded;
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// TODO(HA): Should this happen when called by the tailer?
|
// TODO(HA): Should this happen when called by the tailer?
|
||||||
|
@ -1117,4 +1117,13 @@ public class FSImage implements Closeable {
|
||||||
return lastAppliedTxId;
|
return lastAppliedTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getLastAppliedOrWrittenTxId() {
|
||||||
|
return Math.max(lastAppliedTxId,
|
||||||
|
editLog != null ? editLog.getLastWrittenTxId() : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateLastAppliedTxIdFromWritten() {
|
||||||
|
this.lastAppliedTxId = editLog.getLastWrittenTxId();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -535,6 +535,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
leaseManager.stopMonitor();
|
leaseManager.stopMonitor();
|
||||||
}
|
}
|
||||||
dir.fsImage.editLog.close();
|
dir.fsImage.editLog.close();
|
||||||
|
// Update the fsimage with the last txid that we wrote
|
||||||
|
// so that the tailer starts from the right spot.
|
||||||
|
dir.fsImage.updateLastAppliedTxIdFromWritten();
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -2795,8 +2798,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
throw new AssertionError("Invalid state: " + state.getClass());
|
throw new AssertionError("Invalid state: " + state.getClass());
|
||||||
}
|
}
|
||||||
return new NNHAStatusHeartbeat(hbState,
|
return new NNHAStatusHeartbeat(hbState,
|
||||||
Math.max(getFSImage().getLastAppliedTxId(),
|
getFSImage().getLastAppliedOrWrittenTxId());
|
||||||
getFSImage().getEditLog().getLastWrittenTxId()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -304,7 +304,7 @@ class FileJournalManager implements JournalManager {
|
||||||
for (EditLogFile elf : allLogFiles) {
|
for (EditLogFile elf : allLogFiles) {
|
||||||
if (fromTxId > elf.getFirstTxId()
|
if (fromTxId > elf.getFirstTxId()
|
||||||
&& fromTxId <= elf.getLastTxId()) {
|
&& fromTxId <= elf.getLastTxId()) {
|
||||||
throw new IOException("Asked for fromTxId " + fromTxId
|
throw new IllegalStateException("Asked for fromTxId " + fromTxId
|
||||||
+ " which is in middle of file " + elf.file);
|
+ " which is in middle of file " + elf.file);
|
||||||
}
|
}
|
||||||
if (fromTxId <= elf.getFirstTxId()) {
|
if (fromTxId <= elf.getFirstTxId()) {
|
||||||
|
|
|
@ -1553,7 +1553,7 @@ public class MiniDFSCluster {
|
||||||
|
|
||||||
public void transitionToStandby(int nnIndex) throws IOException,
|
public void transitionToStandby(int nnIndex) throws IOException,
|
||||||
ServiceFailedException {
|
ServiceFailedException {
|
||||||
getHaServiceClient(nnIndex).transitionToActive();
|
getHaServiceClient(nnIndex).transitionToStandby();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Wait until the given namenode gets registration from all the datanodes */
|
/** Wait until the given namenode gets registration from all the datanodes */
|
||||||
|
|
|
@ -199,7 +199,7 @@ public class TestFileJournalManager {
|
||||||
* This should fail as edit logs must currently be treated as indevisable
|
* This should fail as edit logs must currently be treated as indevisable
|
||||||
* units.
|
* units.
|
||||||
*/
|
*/
|
||||||
@Test(expected=IOException.class)
|
@Test(expected=IllegalStateException.class)
|
||||||
public void testAskForTransactionsMidfile() throws IOException {
|
public void testAskForTransactionsMidfile() throws IOException {
|
||||||
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
|
File f = new File(TestEditLog.TEST_DIR + "/filejournaltest2");
|
||||||
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
|
NNStorage storage = setupEdits(Collections.<URI>singletonList(f.toURI()),
|
||||||
|
@ -295,7 +295,7 @@ public class TestFileJournalManager {
|
||||||
try {
|
try {
|
||||||
assertEquals("[]", getLogsAsString(fjm, 150));
|
assertEquals("[]", getLogsAsString(fjm, 150));
|
||||||
fail("Did not throw when asking for a txn in the middle of a log");
|
fail("Did not throw when asking for a txn in the middle of a log");
|
||||||
} catch (IOException ioe) {
|
} catch (IllegalStateException ioe) {
|
||||||
GenericTestUtils.assertExceptionContains(
|
GenericTestUtils.assertExceptionContains(
|
||||||
"150 which is in the middle", ioe);
|
"150 which is in the middle", ioe);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.TestDFSClientFailover;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests state transition from active->standby, and manual failover
|
||||||
|
* and failback between two namenodes.
|
||||||
|
*/
|
||||||
|
public class TestHAStateTransitions {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(
|
||||||
|
TestStandbyIsHot.class);
|
||||||
|
private static final Path TEST_DIR = new Path("/test");
|
||||||
|
private static final Path TEST_FILE_PATH = new Path(TEST_DIR, "foo");
|
||||||
|
private static final String TEST_FILE_DATA =
|
||||||
|
"Hello state transitioning world";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test which takes a single node and flip flops between
|
||||||
|
* active and standby mode, making sure it doesn't
|
||||||
|
* double-play any edits.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testTransitionActiveToStandby() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(1)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
FileSystem fs = cluster.getFileSystem(0);
|
||||||
|
|
||||||
|
fs.mkdirs(TEST_DIR);
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
try {
|
||||||
|
fs.mkdirs(new Path("/x"));
|
||||||
|
fail("Didn't throw trying to mutate FS in standby state");
|
||||||
|
} catch (Throwable t) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Operation category WRITE is not supported", t);
|
||||||
|
}
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
// Create a file, then delete the whole directory recursively.
|
||||||
|
DFSTestUtil.createFile(fs, new Path(TEST_DIR, "foo"),
|
||||||
|
10, (short)1, 1L);
|
||||||
|
fs.delete(TEST_DIR, true);
|
||||||
|
|
||||||
|
// Now if the standby tries to replay the last segment that it just
|
||||||
|
// wrote as active, it would fail since it's trying to create a file
|
||||||
|
// in a non-existent directory.
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
assertFalse(fs.exists(TEST_DIR));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests manual failover back and forth between two NameNodes.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testManualFailoverAndFailback() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(1)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
LOG.info("Starting with NN 0 active");
|
||||||
|
FileSystem fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
|
||||||
|
fs.mkdirs(TEST_DIR);
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 1");
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
assertTrue(fs.exists(TEST_DIR));
|
||||||
|
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 0");
|
||||||
|
cluster.transitionToStandby(1);
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
assertTrue(fs.exists(TEST_DIR));
|
||||||
|
assertEquals(TEST_FILE_DATA,
|
||||||
|
DFSTestUtil.readFile(fs, TEST_FILE_PATH));
|
||||||
|
|
||||||
|
LOG.info("Removing test file");
|
||||||
|
fs.delete(TEST_DIR, true);
|
||||||
|
assertFalse(fs.exists(TEST_DIR));
|
||||||
|
|
||||||
|
LOG.info("Failing over to NN 1");
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
assertFalse(fs.exists(TEST_DIR));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue