HDFS-2623. Add test case for hot standby capability. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1209256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f4fa76719e
commit
28dbd56de0
|
@ -37,3 +37,5 @@ HDFS-2616. Change DatanodeProtocol#sendHeartbeat() to return HeartbeatResponse.
|
||||||
HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd)
|
HDFS-2622. Fix TestDFSUpgrade in HA branch. (todd)
|
||||||
|
|
||||||
HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd)
|
HDFS-2612. Handle refreshNameNodes in federated HA clusters (todd)
|
||||||
|
|
||||||
|
HDFS-2623. Add test case for hot standby capability (todd)
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class AppendTestUtil {
|
||||||
out.write(bytes);
|
out.write(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void check(FileSystem fs, Path p, long length) throws IOException {
|
public static void check(FileSystem fs, Path p, long length) throws IOException {
|
||||||
int i = -1;
|
int i = -1;
|
||||||
try {
|
try {
|
||||||
final FileStatus status = fs.getFileStatus(p);
|
final FileStatus status = fs.getFileStatus(p);
|
||||||
|
|
|
@ -82,13 +82,28 @@ public class TestDFSClientFailover {
|
||||||
AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
|
AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
|
||||||
out1.close();
|
out1.close();
|
||||||
out2.close();
|
out2.close();
|
||||||
|
|
||||||
|
FileSystem fs = configureFailoverFs(cluster, conf);
|
||||||
|
|
||||||
|
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
||||||
|
cluster.getNameNode(0).stop();
|
||||||
|
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
||||||
|
|
||||||
|
fs.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf)
|
||||||
|
throws IOException, URISyntaxException {
|
||||||
|
InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
|
||||||
|
InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
|
||||||
|
|
||||||
String nsId = "nameserviceId1";
|
String nsId = "nameserviceId1";
|
||||||
|
|
||||||
final String logicalNameNodeId = "ha-nn-uri";
|
final String logicalNameNodeId = "ha-nn-uri";
|
||||||
String nameNodeId1 = "nn1";
|
String nameNodeId1 = "nn1";
|
||||||
String nameNodeId2 = "nn2";
|
String nameNodeId2 = "nn2";
|
||||||
|
|
||||||
|
conf = new Configuration(conf);
|
||||||
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
|
||||||
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
|
String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
|
||||||
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||||
|
@ -103,12 +118,7 @@ public class TestDFSClientFailover {
|
||||||
ConfiguredFailoverProxyProvider.class.getName());
|
ConfiguredFailoverProxyProvider.class.getName());
|
||||||
|
|
||||||
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalNameNodeId), conf);
|
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalNameNodeId), conf);
|
||||||
|
return fs;
|
||||||
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
|
||||||
cluster.getNameNode(0).stop();
|
|
||||||
AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
|
|
||||||
|
|
||||||
fs.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestEditLogTailer {
|
public class TestEditLogTailer {
|
||||||
|
@ -99,12 +100,13 @@ public class TestEditLogTailer {
|
||||||
return DIR_PREFIX + suffix;
|
return DIR_PREFIX + suffix;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void waitForStandbyToCatchUp(NameNode active,
|
static void waitForStandbyToCatchUp(NameNode active,
|
||||||
NameNode standby) throws InterruptedException, IOException {
|
NameNode standby) throws InterruptedException, IOException {
|
||||||
|
|
||||||
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
long activeTxId = active.getNamesystem().getFSImage().getEditLog()
|
||||||
.getLastWrittenTxId();
|
.getLastWrittenTxId();
|
||||||
|
|
||||||
|
// TODO: we should really just ask for a log roll here
|
||||||
doSaveNamespace(active);
|
doSaveNamespace(active);
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
@ -112,10 +114,13 @@ public class TestEditLogTailer {
|
||||||
long nn2HighestTxId = standby.getNamesystem().getFSImage()
|
long nn2HighestTxId = standby.getNamesystem().getFSImage()
|
||||||
.getLastAppliedTxId();
|
.getLastAppliedTxId();
|
||||||
if (nn2HighestTxId >= activeTxId) {
|
if (nn2HighestTxId >= activeTxId) {
|
||||||
break;
|
return;
|
||||||
}
|
}
|
||||||
Thread.sleep(SLEEP_TIME);
|
Thread.sleep(SLEEP_TIME);
|
||||||
}
|
}
|
||||||
|
Assert.fail("Standby did not catch up to txid " + activeTxId +
|
||||||
|
" (currently at " +
|
||||||
|
standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void doSaveNamespace(NameNode nn)
|
private static void doSaveNamespace(NameNode nn)
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.hdfs.TestDFSClientFailover;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The hotornot.com of unit tests: makes sure that the standby not only
|
||||||
|
* has namespace information, but also has the correct block reports, etc.
|
||||||
|
*/
|
||||||
|
public class TestStandbyIsHot {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(
|
||||||
|
TestStandbyIsHot.class);
|
||||||
|
private static final String TEST_FILE_DATA = "hello highly available world";
|
||||||
|
private static final String TEST_FILE = "/testStandbyIsHot";
|
||||||
|
private static final Path TEST_FILE_PATH = new Path(TEST_FILE);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStandbyIsHot() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(3)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
NameNode nn1 = cluster.getNameNode(0);
|
||||||
|
NameNode nn2 = cluster.getNameNode(1);
|
||||||
|
nn2.getNamesystem().getEditLogTailer().setSleepTime(250);
|
||||||
|
nn2.getNamesystem().getEditLogTailer().interrupt();
|
||||||
|
|
||||||
|
FileSystem fs = TestDFSClientFailover.configureFailoverFs(cluster, conf);
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
System.err.println("==================================");
|
||||||
|
DFSTestUtil.writeFile(fs, TEST_FILE_PATH, TEST_FILE_DATA);
|
||||||
|
// Have to force an edit log roll so that the standby catches up
|
||||||
|
nn1.getRpcServer().rollEditLog();
|
||||||
|
System.err.println("==================================");
|
||||||
|
|
||||||
|
waitForBlockLocations(nn2, TEST_FILE, 3);
|
||||||
|
|
||||||
|
nn1.stop();
|
||||||
|
cluster.transitionToActive(1);
|
||||||
|
|
||||||
|
assertEquals(TEST_FILE_DATA, DFSTestUtil.readFile(fs, TEST_FILE_PATH));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForBlockLocations(final NameNode nn,
|
||||||
|
final String path, final int expectedReplicas)
|
||||||
|
throws Exception {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
try {
|
||||||
|
LocatedBlocks locs = NameNodeAdapter.getBlockLocations(nn, path, 0, 1000);
|
||||||
|
LOG.info("Got locs: " + locs);
|
||||||
|
return locs.getLastLocatedBlock().getLocations().length == expectedReplicas;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("No block locations yet: " + e.getMessage());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 500, 10000);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue