HDFS-11038. DiskBalancer: support running multiple commands in single test. Contributed by Xiaobing Zhou.

This commit is contained in:
Anu Engineer 2016-10-26 19:39:02 -07:00
parent 22ff0eff4d
commit 9f32364d28
5 changed files with 162 additions and 54 deletions

View File

@ -52,6 +52,7 @@ import org.codehaus.jackson.map.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
@ -74,7 +75,7 @@ import java.util.TreeSet;
/**
* Common interface for command handling.
*/
public abstract class Command extends Configured {
public abstract class Command extends Configured implements Closeable {
private static final ObjectReader READER =
new ObjectMapper().reader(HashMap.class);
static final Logger LOG = LoggerFactory.getLogger(Command.class);
@ -106,6 +107,22 @@ public abstract class Command extends Configured {
this.ps = ps;
}
/**
* Cleans any resources held by this command.
* <p>
* The main goal is to delete id file created in
* {@link org.apache.hadoop.hdfs.server.balancer
* .NameNodeConnector#checkAndMarkRunning}
* , otherwise, it's not allowed to run multiple commands in a row.
* </p>
*/
@Override
public void close() throws IOException {
if (fs != null) {
fs.close();
}
}
/**
* Gets printing stream.
* @return print stream

View File

@ -26,6 +26,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
@ -147,11 +148,17 @@ public class PlanCommand extends Command {
try {
if (plan != null && plan.getVolumeSetPlans().size() > 0) {
outputLine = String.format("Writing plan to: %s", getOutputPath());
outputLine = String.format("Writing plan to:");
recordOutput(result, outputLine);
try (FSDataOutputStream planStream = create(String.format(
final String planFileName = String.format(
DiskBalancerCLI.PLAN_TEMPLATE,
cmd.getOptionValue(DiskBalancerCLI.PLAN)))) {
cmd.getOptionValue(DiskBalancerCLI.PLAN));
final String planFileFullName =
new Path(getOutputPath(), planFileName).toString();
recordOutput(result, planFileFullName);
try (FSDataOutputStream planStream = create(planFileName)) {
planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8));
}
} else {
@ -173,7 +180,7 @@ public class PlanCommand extends Command {
result.appendln(Throwables.getStackTraceAsString(e));
}
getPrintStream().println(result.toString());
getPrintStream().print(result.toString());
}

View File

@ -450,37 +450,44 @@ public class DiskBalancerCLI extends Configured implements Tool {
private int dispatch(CommandLine cmd, Options opts)
throws Exception {
Command dbCmd = null;
if (cmd.hasOption(DiskBalancerCLI.PLAN)) {
dbCmd = new PlanCommand(getConf(), printStream);
}
try {
if (cmd.hasOption(DiskBalancerCLI.PLAN)) {
dbCmd = new PlanCommand(getConf(), printStream);
}
if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) {
dbCmd = new ExecuteCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) {
dbCmd = new ExecuteCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
dbCmd = new QueryCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
dbCmd = new QueryCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
dbCmd = new CancelCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
dbCmd = new CancelCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.REPORT)) {
dbCmd = new ReportCommand(getConf(), this.printStream);
}
if (cmd.hasOption(DiskBalancerCLI.REPORT)) {
dbCmd = new ReportCommand(getConf(), this.printStream);
}
if (cmd.hasOption(DiskBalancerCLI.HELP)) {
dbCmd = new HelpCommand(getConf());
}
if (cmd.hasOption(DiskBalancerCLI.HELP)) {
dbCmd = new HelpCommand(getConf());
}
// Invoke main help here.
if (dbCmd == null) {
new HelpCommand(getConf()).execute(null);
return 1;
}
// Invoke main help here.
if (dbCmd == null) {
dbCmd = new HelpCommand(getConf());
dbCmd.execute(null);
return 1;
}
dbCmd.execute(cmd);
return 0;
dbCmd.execute(cmd);
return 0;
} finally {
if (dbCmd != null) {
dbCmd.close();
}
}
}
}

View File

@ -47,7 +47,7 @@ import java.util.UUID;
import java.util.concurrent.TimeoutException;
/**
* Helper class to create various cluster configrations at run time.
* Helper class to create various cluster configurations at run time.
*/
public class DiskBalancerTestUtil {
public static final long MB = 1024 * 1024L;

View File

@ -18,6 +18,14 @@
package org.apache.hadoop.hdfs.server.diskbalancer.command;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.EXECUTE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.NODE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.OUTFILE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
@ -34,6 +42,7 @@ import java.util.Scanner;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -44,34 +53,34 @@ import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.google.common.collect.Lists;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.CANCEL;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.HELP;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.NODE;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.PLAN;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.QUERY;
import static org.apache.hadoop.hdfs.tools.DiskBalancerCLI.REPORT;
import org.junit.Rule;
import org.junit.rules.ExpectedException;
/**
* Tests various CLI commands of DiskBalancer.
*/
public class TestDiskBalancerCommand {
@Rule
public ExpectedException thrown = ExpectedException.none();
private MiniDFSCluster cluster;
private URI clusterJson;
private Configuration conf = new HdfsConfiguration();
private final static int DEFAULT_BLOCK_SIZE = 1024;
private final static int FILE_LEN = 200 * 1024;
private final static long CAPCACITY = 300 * 1024;
private final static long[] CAPACITIES = new long[] {CAPCACITY, CAPCACITY};
@Before
public void setUp() throws Exception {
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
@ -87,11 +96,69 @@ public class TestDiskBalancerCommand {
public void tearDown() throws Exception {
if (cluster != null) {
// Just make sure we can shutdown datanodes.
cluster.getDataNodes().get(0).shutdown();
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
cluster.getDataNodes().get(i).shutdown();
}
cluster.shutdown();
}
}
/**
* Tests running multiple commands under on setup. This mainly covers
* {@link org.apache.hadoop.hdfs.server.diskbalancer.command.Command#close}
*/
@Test(timeout = 60000)
public void testRunMultipleCommandsUnderOneSetup() throws Exception {
final int numDatanodes = 1;
MiniDFSCluster miniCluster = null;
final Configuration hdfsConf = new HdfsConfiguration();
try {
/* new cluster with imbalanced capacity */
miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
hdfsConf,
numDatanodes,
CAPACITIES,
DEFAULT_BLOCK_SIZE,
FILE_LEN);
String cmdLine = "";
List<String> outputs = null;
final DataNode dn = miniCluster.getDataNodes().get(0);
/* run plan command */
cmdLine = String.format(
"hdfs diskbalancer -%s %s",
PLAN,
dn.getDatanodeUuid());
outputs = runCommand(cmdLine, hdfsConf, miniCluster);
/* get path of plan file*/
final String planFileName = dn.getDatanodeUuid();
/* verify plan command */
assertEquals(
"There must be two lines: the 1st is writing plan to...,"
+ " the 2nd is actual full path of plan file.",
2, outputs.size());
assertThat(outputs.get(1), containsString(planFileName));
/* get full path of plan file*/
final String planFileFullName = outputs.get(1);
/* run execute command */
cmdLine = String.format(
"hdfs diskbalancer -%s %s",
EXECUTE,
planFileFullName);
outputs = runCommand(cmdLine, hdfsConf, miniCluster);
} finally {
if (miniCluster != null) {
miniCluster.shutdown();
}
}
}
/* test basic report */
@Test(timeout = 60000)
public void testReportSimple() throws Exception {
@ -413,34 +480,44 @@ public class TestDiskBalancerCommand {
@Test
public void testPrintFullPathOfPlan()
throws Exception {
final Path parent = new Path(
PathUtils.getTestPath(getClass()),
GenericTestUtils.getMethodName());
MiniDFSCluster miniCluster = null;
try {
Configuration hdfsConf = new HdfsConfiguration();
final int numDatanodes = 1;
final int defaultBlockSize = 1024;
final int fileLen = 200 * 1024;
final long capcacity = 300 * 1024;
final long[] capacities = new long[] {capcacity, capcacity};
List<String> outputs = null;
/* new cluster with imbalanced capacity */
miniCluster = DiskBalancerTestUtil.newImbalancedCluster(
hdfsConf,
numDatanodes,
capacities,
defaultBlockSize,
fileLen);
1,
CAPACITIES,
DEFAULT_BLOCK_SIZE,
FILE_LEN);
/* run plan command */
final String cmdLine = String.format(
"hdfs diskbalancer -%s %s",
"hdfs diskbalancer -%s %s -%s %s",
PLAN,
miniCluster.getDataNodes().get(0).getDatanodeUuid());
miniCluster.getDataNodes().get(0).getDatanodeUuid(),
OUTFILE,
parent);
outputs = runCommand(cmdLine, hdfsConf, miniCluster);
/* get full path */
final String planFileFullName = new Path(
parent,
miniCluster.getDataNodes().get(0).getDatanodeUuid()).toString();
/* verify the path of plan */
assertEquals(
"There must be two lines: the 1st is writing plan to,"
+ " the 2nd is actual full path of plan file.",
2, outputs.size());
assertThat(outputs.get(0), containsString("Writing plan to"));
assertThat(outputs.get(0), containsString("/system/diskbalancer"));
assertThat(outputs.get(1), containsString(planFileFullName));
} finally {
if (miniCluster != null) {
miniCluster.shutdown();