diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java index 24976694f39..11c8e7f2d68 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PrintStream; import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; @@ -82,6 +83,7 @@ public abstract class Command extends Configured { private FileSystem fs = null; private DiskBalancerCluster cluster = null; private int topNodes; + private PrintStream ps; private static final Path DEFAULT_LOG_DIR = new Path("/system/diskbalancer"); @@ -91,9 +93,25 @@ public abstract class Command extends Configured { * Constructs a command. */ public Command(Configuration conf) { + this(conf, System.out); + } + + /** + * Constructs a command. + */ + public Command(Configuration conf, final PrintStream ps) { super(conf); // These arguments are valid for all commands. topNodes = 0; + this.ps = ps; + } + + /** + * Gets printing stream. + * @return print stream + */ + PrintStream getPrintStream() { + return ps; } /** @@ -423,7 +441,8 @@ public abstract class Command extends Configured { * * @return Cluster. */ - protected DiskBalancerCluster getCluster() { + @VisibleForTesting + DiskBalancerCluster getCluster() { return cluster; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java index c7352997e2b..f7c84e16f7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java @@ -78,7 +78,7 @@ public class HelpCommand extends Command { command = new CancelCommand(getConf()); break; case DiskBalancerCLI.REPORT: - command = new ReportCommand(getConf(), null); + command = new ReportCommand(getConf()); break; default: command = this; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java index 97494097e6d..1d07a63d037 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hdfs.server.diskbalancer.command; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -31,6 +34,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step; import org.apache.hadoop.hdfs.tools.DiskBalancerCLI; import java.nio.charset.StandardCharsets; import java.util.List; +import java.io.PrintStream; /** * Class that implements Plan Command. @@ -49,7 +53,14 @@ public class PlanCommand extends Command { * Constructs a plan command. */ public PlanCommand(Configuration conf) { - super(conf); + this(conf, System.out); + } + + /** + * Constructs a plan command. + */ + public PlanCommand(Configuration conf, final PrintStream ps) { + super(conf, ps); this.thresholdPercentage = 1; this.bandwidth = 0; this.maxError = 0; @@ -73,9 +84,12 @@ public class PlanCommand extends Command { * -plan -node IP -plan -node hostName -plan -node DatanodeUUID * * @param cmd - CommandLine + * @throws Exception */ @Override public void execute(CommandLine cmd) throws Exception { + StrBuilder result = new StrBuilder(); + String outputLine = ""; LOG.debug("Processing Plan Command."); Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.PLAN)); verifyCommandOptions(DiskBalancerCLI.PLAN, cmd); @@ -131,22 +145,35 @@ public class PlanCommand extends Command { .getBytes(StandardCharsets.UTF_8)); } - if (plan != null && plan.getVolumeSetPlans().size() > 0) { - LOG.info("Writing plan to : {}", getOutputPath()); - try (FSDataOutputStream planStream = create(String.format( - DiskBalancerCLI.PLAN_TEMPLATE, - cmd.getOptionValue(DiskBalancerCLI.PLAN)))) { - planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8)); + try { + if (plan != null && plan.getVolumeSetPlans().size() > 0) { + outputLine = String.format("Writing plan to: %s", getOutputPath()); + recordOutput(result, outputLine); + try (FSDataOutputStream planStream = create(String.format( + DiskBalancerCLI.PLAN_TEMPLATE, + cmd.getOptionValue(DiskBalancerCLI.PLAN)))) { + planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8)); + } + } else { + outputLine = String.format( + "No plan generated. DiskBalancing not needed for node: %s" + + " threshold used: %s", + cmd.getOptionValue(DiskBalancerCLI.PLAN), this.thresholdPercentage); + recordOutput(result, outputLine); } - } else { - LOG.info("No plan generated. DiskBalancing not needed for node: {} " + - "threshold used: {}", cmd.getOptionValue(DiskBalancerCLI.PLAN), - this.thresholdPercentage); + + if (cmd.hasOption(DiskBalancerCLI.VERBOSE) && plans.size() > 0) { + printToScreen(plans); + } + } catch (Exception e) { + final String errMsg = + "Errors while recording the output of plan command."; + LOG.error(errMsg, e); + result.appendln(errMsg); + result.appendln(Throwables.getStackTraceAsString(e)); } - if (cmd.hasOption(DiskBalancerCLI.VERBOSE) && plans.size() > 0) { - printToScreen(plans); - } + getPrintStream().println(result.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java index 79ba14f5fe3..e10ffacc735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java @@ -47,11 +47,12 @@ import com.google.common.collect.Lists; */ public class ReportCommand extends Command { - private PrintStream out; + public ReportCommand(Configuration conf) { + this(conf, System.out); + } - public ReportCommand(Configuration conf, final PrintStream out) { - super(conf); - this.out = out; + public ReportCommand(Configuration conf, final PrintStream ps) { + super(conf, ps); addValidCommandParameters(DiskBalancerCLI.REPORT, "Report volume information of nodes."); @@ -95,7 +96,7 @@ public class ReportCommand extends Command { handleTopReport(cmd, result, nodeFormat); } - out.println(result.toString()); + getPrintStream().println(result.toString()); } private void handleTopReport(final CommandLine cmd, final StrBuilder result, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java index c216a3042cd..5bcf939c868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java @@ -137,6 +137,8 @@ public class DiskBalancerCLI extends Configured implements Tool { private final PrintStream printStream; + private Command currentCommand = null; + /** * Construct a DiskBalancer. * @@ -431,6 +433,13 @@ public class DiskBalancerCLI extends Configured implements Tool { return parser.parse(opts, argv); } + /** + * Gets current command associated with this instance of DiskBalancer. + */ + public Command getCurrentCommand() { + return currentCommand; + } + /** * Dispatches calls to the right command Handler classes. * @@ -440,38 +449,38 @@ public class DiskBalancerCLI extends Configured implements Tool { */ private int dispatch(CommandLine cmd, Options opts) throws Exception { - Command currentCommand = null; + Command dbCmd = null; if (cmd.hasOption(DiskBalancerCLI.PLAN)) { - currentCommand = new PlanCommand(getConf()); + dbCmd = new PlanCommand(getConf(), printStream); } if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) { - currentCommand = new ExecuteCommand(getConf()); + dbCmd = new ExecuteCommand(getConf()); } if (cmd.hasOption(DiskBalancerCLI.QUERY)) { - currentCommand = new QueryCommand(getConf()); + dbCmd = new QueryCommand(getConf()); } if (cmd.hasOption(DiskBalancerCLI.CANCEL)) { - currentCommand = new CancelCommand(getConf()); + dbCmd = new CancelCommand(getConf()); } if (cmd.hasOption(DiskBalancerCLI.REPORT)) { - currentCommand = new ReportCommand(getConf(), this.printStream); + dbCmd = new ReportCommand(getConf(), this.printStream); } if (cmd.hasOption(DiskBalancerCLI.HELP)) { - currentCommand = new HelpCommand(getConf()); + dbCmd = new HelpCommand(getConf()); } // Invoke main help here. - if (currentCommand == null) { + if (dbCmd == null) { new HelpCommand(getConf()).execute(null); return 1; } - currentCommand.execute(cmd); + dbCmd.execute(cmd); return 0; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java index 5e98eb2b061..a575097595f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerTestUtil.java @@ -18,10 +18,20 @@ package org.apache.hadoop.hdfs.server.diskbalancer; import com.google.common.base.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.balancer.TestBalancer; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.NullConnector; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; @@ -29,9 +39,12 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet; import org.apache.hadoop.util.Time; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeoutException; /** * Helper class to create various cluster configrations at run time. @@ -242,6 +255,65 @@ public class DiskBalancerTestUtil { return count; } + public static MiniDFSCluster newImbalancedCluster( + final Configuration conf, + final int numDatanodes, + final long[] storageCapacities, + final int defaultBlockSize, + final int fileLen) + throws IOException, InterruptedException, TimeoutException { + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + + final String fileName = "/" + UUID.randomUUID().toString(); + final Path filePath = new Path(fileName); + + Preconditions.checkNotNull(storageCapacities); + Preconditions.checkArgument( + storageCapacities.length == 2, + "need to specify capacities for two storages."); + + // Write a file and restart the cluster + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .storageCapacities(storageCapacities) + .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) + .storagesPerDatanode(2) + .build(); + FsVolumeImpl source = null; + FsVolumeImpl dest = null; + + cluster.waitActive(); + Random r = new Random(); + FileSystem fs = cluster.getFileSystem(0); + TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, 0); + + DFSTestUtil.waitReplication(fs, filePath, (short) 1); + cluster.restartDataNodes(); + cluster.waitActive(); + + // Get the data node and move all data to one disk. + for (int i = 0; i < numDatanodes; i++) { + DataNode dnNode = cluster.getDataNodes().get(i); + try (FsDatasetSpi.FsVolumeReferences refs = + dnNode.getFSDataset().getFsVolumeReferences()) { + source = (FsVolumeImpl) refs.get(0); + dest = (FsVolumeImpl) refs.get(1); + assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); + DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), + source, dest); + assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); + } + } + + cluster.restartDataNodes(); + cluster.waitActive(); + + return cluster; + } + /** * Moves all blocks to the destination volume. * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index 9985210f249..556803228d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -244,7 +244,9 @@ public class TestDiskBalancer { } catch (Exception e) { Assert.fail("Unexpected exception: " + e); } finally { - cluster.shutdown(); + if (cluster != null) { + cluster.shutdown(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java index 66977856203..9f9c7b8c1b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/command/TestDiskBalancerCommand.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; @@ -409,14 +410,53 @@ public class TestDiskBalancerCommand { runCommand(cmdLine); } - private List runCommandInternal(final String cmdLine) throws - Exception { + @Test + public void testPrintFullPathOfPlan() + throws Exception { + MiniDFSCluster miniCluster = null; + try { + Configuration hdfsConf = new HdfsConfiguration(); + final int numDatanodes = 1; + final int defaultBlockSize = 1024; + final int fileLen = 200 * 1024; + final long capcacity = 300 * 1024; + final long[] capacities = new long[] {capcacity, capcacity}; + List outputs = null; + + /* new cluster with imbalanced capacity */ + miniCluster = DiskBalancerTestUtil.newImbalancedCluster( + hdfsConf, + numDatanodes, + capacities, + defaultBlockSize, + fileLen); + + /* run plan command */ + final String cmdLine = String.format( + "hdfs diskbalancer -%s %s", + PLAN, + miniCluster.getDataNodes().get(0).getDatanodeUuid()); + outputs = runCommand(cmdLine, hdfsConf, miniCluster); + + /* verify the path of plan */ + assertThat(outputs.get(0), containsString("Writing plan to")); + assertThat(outputs.get(0), containsString("/system/diskbalancer")); + } finally { + if (miniCluster != null) { + miniCluster.shutdown(); + } + } + } + + private List runCommandInternal( + final String cmdLine, + final Configuration clusterConf) throws Exception { String[] cmds = StringUtils.split(cmdLine, ' '); ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); PrintStream out = new PrintStream(bufOut); - Tool diskBalancerTool = new DiskBalancerCLI(conf, out); - ToolRunner.run(conf, diskBalancerTool, cmds); + Tool diskBalancerTool = new DiskBalancerCLI(clusterConf, out); + ToolRunner.run(clusterConf, diskBalancerTool, cmds); Scanner scanner = new Scanner(bufOut.toString()); List outputs = Lists.newArrayList(); @@ -426,6 +466,11 @@ public class TestDiskBalancerCommand { return outputs; } + private List runCommandInternal(final String cmdLine) + throws Exception { + return runCommandInternal(cmdLine, conf); + } + private List runCommand(final String cmdLine) throws Exception { FileSystem.setDefaultUri(conf, clusterJson); return runCommandInternal(cmdLine); @@ -437,6 +482,14 @@ public class TestDiskBalancerCommand { return runCommandInternal(cmdLine); } + private List runCommand( + final String cmdLine, + Configuration clusterConf, + MiniDFSCluster miniCluster) throws Exception { + FileSystem.setDefaultUri(clusterConf, miniCluster.getURI()); + return runCommandInternal(cmdLine, clusterConf); + } + /** * Making sure that we can query the node without having done a submit. * @throws Exception