HDFS-10737. disk balancer add volume path to report command. Contributed by Yuanbo Liu.
This commit is contained in:
parent
d677b68c25
commit
9f29f423e4
|
@ -33,13 +33,17 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
|
||||||
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -420,6 +424,37 @@ public abstract class Command extends Configured {
|
||||||
return Math.min(nodes, cluster.getNodes().size());
|
return Math.min(nodes, cluster.getNodes().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the Physical path of the disks we are balancing. This is needed to
|
||||||
|
* make the disk balancer human friendly and not used in balancing.
|
||||||
|
*
|
||||||
|
* @param node - Disk Balancer Node.
|
||||||
|
*/
|
||||||
|
protected void populatePathNames(
|
||||||
|
DiskBalancerDataNode node) throws IOException {
|
||||||
|
// if the cluster is a local file system, there is no need to
|
||||||
|
// invoke rpc call to dataNode.
|
||||||
|
if (getClusterURI().getScheme().startsWith("file")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String dnAddress = node.getDataNodeIP() + ":" + node.getDataNodePort();
|
||||||
|
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
|
||||||
|
String volumeNameJson = dnClient.getDiskBalancerSetting(
|
||||||
|
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, String> volumeMap =
|
||||||
|
mapper.readValue(volumeNameJson, HashMap.class);
|
||||||
|
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
|
||||||
|
for (DiskBalancerVolume vol : set.getVolumes()) {
|
||||||
|
if (volumeMap.containsKey(vol.getUuid())) {
|
||||||
|
vol.setPath(volumeMap.get(vol.getUuid()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set top number of nodes to be processed.
|
* Set top number of nodes to be processed.
|
||||||
* */
|
* */
|
||||||
|
|
|
@ -24,23 +24,13 @@ import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
|
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
|
||||||
.DiskBalancerDataNode;
|
.DiskBalancerDataNode;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
|
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
|
|
||||||
.DiskBalancerVolumeSet;
|
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
import org.apache.hadoop.hdfs.tools.DiskBalancer;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that implements Plan Command.
|
* Class that implements Plan Command.
|
||||||
|
@ -158,30 +148,6 @@ public class PlanCommand extends Command {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Reads the Physical path of the disks we are balancing. This is needed to
|
|
||||||
* make the disk balancer human friendly and not used in balancing.
|
|
||||||
*
|
|
||||||
* @param node - Disk Balancer Node.
|
|
||||||
*/
|
|
||||||
private void populatePathNames(DiskBalancerDataNode node) throws IOException {
|
|
||||||
String dnAddress = node.getDataNodeIP() + ":" + node.getDataNodePort();
|
|
||||||
ClientDatanodeProtocol dnClient = getDataNodeProxy(dnAddress);
|
|
||||||
String volumeNameJson = dnClient.getDiskBalancerSetting(
|
|
||||||
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
Map<String, String> volumeMap =
|
|
||||||
mapper.readValue(volumeNameJson, HashMap.class);
|
|
||||||
for (DiskBalancerVolumeSet set : node.getVolumeSets().values()) {
|
|
||||||
for (DiskBalancerVolume vol : set.getVolumes()) {
|
|
||||||
if (volumeMap.containsKey(vol.getUuid())) {
|
|
||||||
vol.setPath(volumeMap.get(vol.getUuid()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets extended help for this command.
|
* Gets extended help for this command.
|
||||||
|
|
|
@ -130,7 +130,7 @@ public class ReportCommand extends Command {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleNodeReport(final CommandLine cmd, StrBuilder result,
|
private void handleNodeReport(final CommandLine cmd, StrBuilder result,
|
||||||
final String nodeFormat, final String volumeFormat) {
|
final String nodeFormat, final String volumeFormat) throws Exception {
|
||||||
String outputLine = "";
|
String outputLine = "";
|
||||||
/*
|
/*
|
||||||
* get value that identifies a DataNode from command line, it could be UUID,
|
* get value that identifies a DataNode from command line, it could be UUID,
|
||||||
|
@ -152,6 +152,8 @@ public class ReportCommand extends Command {
|
||||||
final String trueStr = "True";
|
final String trueStr = "True";
|
||||||
final String falseStr = "False";
|
final String falseStr = "False";
|
||||||
DiskBalancerDataNode dbdn = getNode(nodeVal);
|
DiskBalancerDataNode dbdn = getNode(nodeVal);
|
||||||
|
// get storage path of datanode
|
||||||
|
populatePathNames(dbdn);
|
||||||
|
|
||||||
if (dbdn == null) {
|
if (dbdn == null) {
|
||||||
outputLine = String.format(
|
outputLine = String.format(
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
package org.apache.hadoop.hdfs.server.diskbalancer.command;
|
||||||
|
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.allOf;
|
import static org.hamcrest.CoreMatchers.allOf;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
@ -50,6 +51,7 @@ import static org.apache.hadoop.hdfs.tools.DiskBalancer.HELP;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancer.NODE;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancer.NODE;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancer.PLAN;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancer.PLAN;
|
||||||
import static org.apache.hadoop.hdfs.tools.DiskBalancer.QUERY;
|
import static org.apache.hadoop.hdfs.tools.DiskBalancer.QUERY;
|
||||||
|
import static org.apache.hadoop.hdfs.tools.DiskBalancer.REPORT;
|
||||||
|
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
@ -261,6 +263,41 @@ public class TestDiskBalancerCommand {
|
||||||
containsString("0.25 free: 490407853993/2000000000000"))));
|
containsString("0.25 free: 490407853993/2000000000000"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReportNodeWithoutJson() throws Exception {
|
||||||
|
String dataNodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid();
|
||||||
|
final String planArg = String.format("-%s -%s %s",
|
||||||
|
REPORT, NODE, dataNodeUuid);
|
||||||
|
final String cmdLine = String
|
||||||
|
.format(
|
||||||
|
"hdfs diskbalancer %s", planArg);
|
||||||
|
List<String> outputs = runCommand(cmdLine, cluster);
|
||||||
|
|
||||||
|
assertThat(
|
||||||
|
outputs.get(0),
|
||||||
|
containsString("Processing report command"));
|
||||||
|
assertThat(
|
||||||
|
outputs.get(1),
|
||||||
|
is(allOf(containsString("Reporting volume information for DataNode"),
|
||||||
|
containsString(dataNodeUuid))));
|
||||||
|
assertThat(
|
||||||
|
outputs.get(2),
|
||||||
|
is(allOf(containsString(dataNodeUuid),
|
||||||
|
containsString("2 volumes with node data density 0.00"))));
|
||||||
|
assertThat(
|
||||||
|
outputs.get(3),
|
||||||
|
is(allOf(containsString("DISK"),
|
||||||
|
containsString("/dfs/data/data1"),
|
||||||
|
containsString("0.00"),
|
||||||
|
containsString("1.00"))));
|
||||||
|
assertThat(
|
||||||
|
outputs.get(4),
|
||||||
|
is(allOf(containsString("DISK"),
|
||||||
|
containsString("/dfs/data/data2"),
|
||||||
|
containsString("0.00"),
|
||||||
|
containsString("1.00"))));
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testReadClusterFromJson() throws Exception {
|
public void testReadClusterFromJson() throws Exception {
|
||||||
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
|
ClusterConnector jsonConnector = ConnectorFactory.getCluster(clusterJson,
|
||||||
|
@ -283,6 +320,17 @@ public class TestDiskBalancerCommand {
|
||||||
runCommand(cmdLine, cluster);
|
runCommand(cmdLine, cluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* test -plan DataNodeID */
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPlanJsonNode() throws Exception {
|
||||||
|
final String planArg = String.format("-%s %s", PLAN,
|
||||||
|
"a87654a9-54c7-4693-8dd9-c9c7021dc340");
|
||||||
|
final String cmdLine = String
|
||||||
|
.format(
|
||||||
|
"hdfs diskbalancer %s", planArg);
|
||||||
|
runCommand(cmdLine);
|
||||||
|
}
|
||||||
|
|
||||||
/* Test that illegal arguments are handled correctly*/
|
/* Test that illegal arguments are handled correctly*/
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testIllegalArgument() throws Exception {
|
public void testIllegalArgument() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue