HDFS-9702. DiskBalancer: getVolumeMap implementation. (Contributed by Anu Engineer)
This commit is contained in:
parent
4b93ddae07
commit
918722bdd2
|
@ -169,6 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
|
@ -3360,8 +3361,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a run-time configuration value from running diskbalancer instance. For
|
* Gets a runtime configuration value from diskbalancer instance. For
|
||||||
* example : Disk Balancer bandwidth of a running instance.
|
* example : DiskBalancer bandwidth.
|
||||||
*
|
*
|
||||||
* @param key - String that represents the run time key value.
|
* @param key - String that represents the run time key value.
|
||||||
* @return value of the key as a string.
|
* @return value of the key as a string.
|
||||||
|
@ -3370,7 +3371,15 @@ public class DataNode extends ReconfigurableBase
|
||||||
@Override
|
@Override
|
||||||
public String getDiskBalancerSetting(String key) throws IOException {
|
public String getDiskBalancerSetting(String key) throws IOException {
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
throw new DiskBalancerException("Not Implemented",
|
Preconditions.checkNotNull(key);
|
||||||
DiskBalancerException.Result.INTERNAL_ERROR);
|
switch (key) {
|
||||||
|
case DiskBalancerConstants.DISKBALANCER_VOLUME_NAME:
|
||||||
|
return this.diskBalancer.getVolumeNames();
|
||||||
|
default:
|
||||||
|
LOG.error("Disk Balancer - Unknown key in get balancer setting. Key: " +
|
||||||
|
key);
|
||||||
|
throw new DiskBalancerException("Unknown key",
|
||||||
|
DiskBalancerException.Result.UNKNOWN_KEY);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
@ -221,6 +222,31 @@ public class DiskBalancer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a volume ID to Volume base path map.
|
||||||
|
*
|
||||||
|
* @return Json string of the volume map.
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
public String getVolumeNames() throws DiskBalancerException {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
checkDiskBalancerEnabled();
|
||||||
|
Map<String, String> pathMap = new HashMap<>();
|
||||||
|
Map<String, FsVolumeSpi> volMap = getStorageIDToVolumeMap();
|
||||||
|
for (Map.Entry<String, FsVolumeSpi> entry : volMap.entrySet()) {
|
||||||
|
pathMap.put(entry.getKey(), entry.getValue().getBasePath());
|
||||||
|
}
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.writeValueAsString(pathMap);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new DiskBalancerException("Internal error, Unable to " +
|
||||||
|
"create JSON string.", e,
|
||||||
|
DiskBalancerException.Result.INTERNAL_ERROR);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,7 +36,8 @@ public class DiskBalancerException extends IOException {
|
||||||
INVALID_VOLUME,
|
INVALID_VOLUME,
|
||||||
INVALID_MOVE,
|
INVALID_MOVE,
|
||||||
INTERNAL_ERROR,
|
INTERNAL_ERROR,
|
||||||
NO_SUCH_PLAN
|
NO_SUCH_PLAN,
|
||||||
|
UNKNOWN_KEY
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Result result;
|
private final Result result;
|
||||||
|
|
|
@ -24,18 +24,24 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException.*;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
|
import org.hamcrest.*;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
|
||||||
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
|
import static org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
|
||||||
|
@ -84,6 +90,8 @@ public class TestDiskBalancerRPC {
|
||||||
int planVersion = rpcTestHelper.getPlanVersion();
|
int planVersion = rpcTestHelper.getPlanVersion();
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.INVALID_PLAN_HASH));
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,6 +104,8 @@ public class TestDiskBalancerRPC {
|
||||||
planVersion++;
|
planVersion++;
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.INVALID_PLAN_VERSION));
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +117,8 @@ public class TestDiskBalancerRPC {
|
||||||
int planVersion = rpcTestHelper.getPlanVersion();
|
int planVersion = rpcTestHelper.getPlanVersion();
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.INVALID_PLAN));
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,6 +143,8 @@ public class TestDiskBalancerRPC {
|
||||||
planHash = String.valueOf(hashArray);
|
planHash = String.valueOf(hashArray);
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.NO_SUCH_PLAN));
|
||||||
dataNode.cancelDiskBalancePlan(planHash);
|
dataNode.cancelDiskBalancePlan(planHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,9 +155,38 @@ public class TestDiskBalancerRPC {
|
||||||
String planHash = "";
|
String planHash = "";
|
||||||
NodePlan plan = rpcTestHelper.getPlan();
|
NodePlan plan = rpcTestHelper.getPlan();
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.NO_SUCH_PLAN));
|
||||||
dataNode.cancelDiskBalancePlan(planHash);
|
dataNode.cancelDiskBalancePlan(planHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDiskBalancerVolumeMapping() throws Exception {
|
||||||
|
final int dnIndex = 0;
|
||||||
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
|
String volumeNameJson = dataNode.getDiskBalancerSetting(
|
||||||
|
DiskBalancerConstants.DISKBALANCER_VOLUME_NAME);
|
||||||
|
Assert.assertNotNull(volumeNameJson);
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
Map<String, String> volumemap =
|
||||||
|
mapper.readValue(volumeNameJson, HashMap.class);
|
||||||
|
|
||||||
|
Assert.assertEquals(2, volumemap.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetDiskBalancerInvalidSetting() throws Exception {
|
||||||
|
final int dnIndex = 0;
|
||||||
|
final String invalidSetting = "invalidSetting";
|
||||||
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.UNKNOWN_KEY));
|
||||||
|
dataNode.getDiskBalancerSetting(invalidSetting);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueryPlan() throws Exception {
|
public void testQueryPlan() throws Exception {
|
||||||
|
@ -173,6 +216,8 @@ public class TestDiskBalancerRPC {
|
||||||
final int dnIndex = 0;
|
final int dnIndex = 0;
|
||||||
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
thrown.expect(DiskBalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
|
thrown.expect(new
|
||||||
|
ResultVerifier(Result.UNKNOWN_KEY));
|
||||||
dataNode.getDiskBalancerSetting(
|
dataNode.getDiskBalancerSetting(
|
||||||
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
||||||
}
|
}
|
||||||
|
@ -223,4 +268,25 @@ public class TestDiskBalancerRPC {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ResultVerifier
|
||||||
|
extends TypeSafeMatcher<DiskBalancerException> {
|
||||||
|
private final DiskBalancerException.Result expectedResult;
|
||||||
|
|
||||||
|
ResultVerifier(DiskBalancerException.Result expectedResult){
|
||||||
|
this.expectedResult = expectedResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean matchesSafely(DiskBalancerException exception) {
|
||||||
|
return (this.expectedResult == exception.getResult());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void describeTo(Description description) {
|
||||||
|
description.appendText("expects Result: ")
|
||||||
|
.appendValue(this.expectedResult);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue