HDFS-10998. Add unit tests for HDFS command 'dfsadmin -fetchImage' in HA. Contributed by Xiaobing Zhou

This commit is contained in:
Mingliang Liu 2016-10-20 19:51:48 -07:00
parent 262827cf75
commit d7d87deece
1 changed files with 79 additions and 26 deletions

View File

@ -17,10 +17,15 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -29,11 +34,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestFetchImage {
@ -43,46 +53,89 @@ public class TestFetchImage {
// Shamelessly stolen from NNStorage.
private static final Pattern IMAGE_REGEX = Pattern.compile("fsimage_(\\d+)");
private MiniDFSCluster cluster;
private NameNode nn0 = null;
private NameNode nn1 = null;
private Configuration conf = null;
@BeforeClass
public static void setupImageDir() {
FETCHED_IMAGE_FILE.mkdirs();
}
@AfterClass
public static void cleanup() {
FileUtil.fullyDelete(FETCHED_IMAGE_FILE);
}
@Before
public void setupCluster() throws IOException, URISyntaxException {
conf = new Configuration();
conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setLong(DFS_BLOCK_SIZE_KEY, 1024);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(1)
.build();
nn0 = cluster.getNameNode(0);
nn1 = cluster.getNameNode(1);
HATestUtil.configureFailoverFs(cluster, conf);
cluster.waitActive();
}
/**
* Download a few fsimages using `hdfs dfsadmin -fetchImage ...' and verify
* the results.
*/
@Test
public void testFetchImage() throws Exception {
FETCHED_IMAGE_FILE.mkdirs();
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = null;
try {
DFSAdmin dfsAdmin = new DFSAdmin();
dfsAdmin.setConf(conf);
@Test(timeout=30000)
public void testFetchImageHA() throws Exception {
final Path parent = new Path(
PathUtils.getTestPath(getClass()),
GenericTestUtils.getMethodName());
int nnIndex = 0;
/* run on nn0 as active */
cluster.transitionToActive(nnIndex);
testFetchImageInternal(
nnIndex,
new Path(parent, "dir1"),
new Path(parent, "dir2"));
/* run on nn1 as active */
nnIndex = 1;
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
cluster.transitionToActive(nnIndex);
testFetchImageInternal(
nnIndex,
new Path(parent, "dir3"),
new Path(parent, "dir4"));
}
private void testFetchImageInternal(
final int nnIndex,
final Path dir1,
final Path dir2) throws Exception {
final Configuration dfsConf = cluster.getConfiguration(nnIndex);
final DFSAdmin dfsAdmin = new DFSAdmin();
dfsAdmin.setConf(dfsConf);
try (FileSystem fs = cluster.getFileSystem(nnIndex)) {
runFetchImage(dfsAdmin, cluster);
fs = cluster.getFileSystem();
fs.mkdirs(new Path("/foo"));
fs.mkdirs(new Path("/foo2"));
fs.mkdirs(new Path("/foo3"));
fs.mkdirs(dir1);
fs.mkdirs(dir2);
cluster.getNameNodeRpc()
.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace(0, 0);
cluster.getNameNodeRpc()
.setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
cluster.getNameNodeRpc(nnIndex).setSafeMode(
SafeModeAction.SAFEMODE_ENTER,
false);
cluster.getNameNodeRpc(nnIndex).saveNamespace(0, 0);
cluster.getNameNodeRpc(nnIndex).setSafeMode(
SafeModeAction.SAFEMODE_LEAVE,
false);
runFetchImage(dfsAdmin, cluster);
} finally {
if (fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}