HDFS-9671. DiskBalancer: SubmitPlan implementation. (Contributed by Anu Engineer)
This commit is contained in:
parent
66f0bb646d
commit
2b1b2faf76
|
@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
|
||||||
import org.apache.hadoop.security.KerberosInfo;
|
import org.apache.hadoop.security.KerberosInfo;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenInfo;
|
import org.apache.hadoop.security.token.TokenInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
|
|
||||||
/** An client-datanode protocol for block recovery
|
/** An client-datanode protocol for block recovery
|
||||||
*/
|
*/
|
||||||
|
@ -182,7 +182,7 @@ public interface ClientDatanodeProtocol {
|
||||||
/**
|
/**
|
||||||
* Gets the status of an executing diskbalancer Plan.
|
* Gets the status of an executing diskbalancer Plan.
|
||||||
*/
|
*/
|
||||||
WorkStatus queryDiskBalancerPlan() throws IOException;
|
DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets a run-time configuration value from running diskbalancer instance.
|
* Gets a run-time configuration value from running diskbalancer instance.
|
||||||
|
|
|
@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.QueryP
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||||
|
@ -345,8 +345,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
* to zero allows datanode to use the value defined in
|
* to zero allows datanode to use the value defined in
|
||||||
* configration.
|
* configration.
|
||||||
* @param plan - Actual plan.
|
* @param plan - Actual plan.
|
||||||
* @return Success or throws Exception.
|
* @throws IOException
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void submitDiskBalancerPlan(String planID, long planVersion,
|
public void submitDiskBalancerPlan(String planID, long planVersion,
|
||||||
|
@ -387,13 +386,14 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
||||||
* Gets the status of an executing diskbalancer Plan.
|
* Gets the status of an executing diskbalancer Plan.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public WorkStatus queryDiskBalancerPlan() throws IOException {
|
public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
|
||||||
try {
|
try {
|
||||||
QueryPlanStatusRequestProto request =
|
QueryPlanStatusRequestProto request =
|
||||||
QueryPlanStatusRequestProto.newBuilder().build();
|
QueryPlanStatusRequestProto.newBuilder().build();
|
||||||
QueryPlanStatusResponseProto response =
|
QueryPlanStatusResponseProto response =
|
||||||
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
|
rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
|
||||||
return new WorkStatus(response.hasResult() ? response.getResult() : 0,
|
return new DiskBalancerWorkStatus(response.hasResult() ?
|
||||||
|
response.getResult() : 0,
|
||||||
response.hasPlanID() ? response.getPlanID() : null,
|
response.hasPlanID() ? response.getPlanID() : null,
|
||||||
response.hasStatus() ? response.getStatus() : null,
|
response.hasStatus() ? response.getStatus() : null,
|
||||||
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
|
response.hasCurrentStatus() ? response.getCurrentStatus() : null);
|
||||||
|
|
|
@ -0,0 +1,160 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Keeps track of how much work has finished.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class DiskBalancerWorkItem {
|
||||||
|
private final long bytesToCopy;
|
||||||
|
private long bytesCopied;
|
||||||
|
private long errorCount;
|
||||||
|
private String errMsg;
|
||||||
|
private long blocksCopied;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a DiskBalancerWorkItem.
|
||||||
|
*
|
||||||
|
* @param bytesToCopy - Total bytes to copy from a disk
|
||||||
|
* @param bytesCopied - Copied So far.
|
||||||
|
*/
|
||||||
|
public DiskBalancerWorkItem(long bytesToCopy, long bytesCopied) {
|
||||||
|
this.bytesToCopy = bytesToCopy;
|
||||||
|
this.bytesCopied = bytesCopied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads a DiskBalancerWorkItem Object from a Json String.
|
||||||
|
*
|
||||||
|
* @param json - Json String.
|
||||||
|
* @return DiskBalancerWorkItem Object
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static DiskBalancerWorkItem parseJson(String json) throws IOException {
|
||||||
|
Preconditions.checkNotNull(json);
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.readValue(json, DiskBalancerWorkItem.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the error message.
|
||||||
|
*/
|
||||||
|
public String getErrMsg() {
|
||||||
|
return errMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the error message.
|
||||||
|
*
|
||||||
|
* @param errMsg - Msg.
|
||||||
|
*/
|
||||||
|
public void setErrMsg(String errMsg) {
|
||||||
|
this.errMsg = errMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of errors encountered.
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getErrorCount() {
|
||||||
|
return errorCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Incs Error Count.
|
||||||
|
*/
|
||||||
|
public void incErrorCount() {
|
||||||
|
this.errorCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns bytes copied so far.
|
||||||
|
*
|
||||||
|
* @return long
|
||||||
|
*/
|
||||||
|
public long getBytesCopied() {
|
||||||
|
return bytesCopied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets bytes copied so far.
|
||||||
|
*
|
||||||
|
* @param bytesCopied - long
|
||||||
|
*/
|
||||||
|
public void setBytesCopied(long bytesCopied) {
|
||||||
|
this.bytesCopied = bytesCopied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments bytesCopied by delta.
|
||||||
|
*
|
||||||
|
* @param delta - long
|
||||||
|
*/
|
||||||
|
public void incCopiedSoFar(long delta) {
|
||||||
|
this.bytesCopied += delta;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns bytes to copy.
|
||||||
|
*
|
||||||
|
* @return - long
|
||||||
|
*/
|
||||||
|
public long getBytesToCopy() {
|
||||||
|
return bytesToCopy;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns number of blocks copied for this DiskBalancerWorkItem.
|
||||||
|
*
|
||||||
|
* @return long count of blocks.
|
||||||
|
*/
|
||||||
|
public long getBlocksCopied() {
|
||||||
|
return blocksCopied;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* increments the number of blocks copied.
|
||||||
|
*/
|
||||||
|
public void incBlocksCopied() {
|
||||||
|
blocksCopied++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns a serialized json string.
|
||||||
|
*
|
||||||
|
* @return String - json
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public String toJson() throws IOException {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
return mapper.writeValueAsString(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -20,16 +20,18 @@
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class that reports how much work has has been done by the node.
|
* Helper class that reports how much work has has been done by the node.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class WorkStatus {
|
@InterfaceStability.Unstable
|
||||||
private int result;
|
public class DiskBalancerWorkStatus {
|
||||||
private String planID;
|
private final int result;
|
||||||
private String status;
|
private final String planID;
|
||||||
private String currentState;
|
private final String status;
|
||||||
|
private final String currentState;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a workStatus Object.
|
* Constructs a workStatus Object.
|
||||||
|
@ -39,8 +41,8 @@ public class WorkStatus {
|
||||||
* @param status - Current Status
|
* @param status - Current Status
|
||||||
* @param currentState - Current State
|
* @param currentState - Current State
|
||||||
*/
|
*/
|
||||||
public WorkStatus(int result, String planID, String status,
|
public DiskBalancerWorkStatus(int result, String planID, String status,
|
||||||
String currentState) {
|
String currentState) {
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.planID = planID;
|
this.planID = planID;
|
||||||
this.status = status;
|
this.status = status;
|
|
@ -21,6 +21,9 @@ HDFS-1312 Change Log
|
||||||
|
|
||||||
HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal)
|
HDFS-9645. DiskBalancer: Add Query RPC. (Anu Engineer via Arpit Agarwal)
|
||||||
|
|
||||||
HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer
|
HDFS-9647. DiskBalancer: Add getRuntimeSettings. (Anu Engineer via
|
||||||
via Arpit Agarwal)
|
Arpit Agarwal)
|
||||||
|
|
||||||
|
HDFS-9671skBalancer: SubmitPlan implementation. (Anu Engineer via
|
||||||
|
Arpit Agarwal)
|
||||||
|
|
||||||
|
|
|
@ -930,6 +930,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
|
DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT =
|
||||||
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
|
HdfsConstants.DEFAULT_DATA_SOCKET_SIZE;
|
||||||
|
|
||||||
|
// Disk Balancer Keys
|
||||||
|
public static final String DFS_DISK_BALANCER_ENABLED =
|
||||||
|
"dfs.disk.balancer.enabled";
|
||||||
|
public static final boolean DFS_DISK_BALANCER_ENABLED_DEFAULT = false;
|
||||||
|
|
||||||
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
|
||||||
|
|
|
@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBalancerSettingResponseProto;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.WorkStatus;
|
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation for protobuf service that forwards requests
|
* Implementation for protobuf service that forwards requests
|
||||||
|
@ -293,7 +293,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
||||||
RpcController controller, QueryPlanStatusRequestProto request)
|
RpcController controller, QueryPlanStatusRequestProto request)
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
try {
|
try {
|
||||||
WorkStatus result = impl.queryDiskBalancerPlan();
|
DiskBalancerWorkStatus result = impl.queryDiskBalancerPlan();
|
||||||
return QueryPlanStatusResponseProto
|
return QueryPlanStatusResponseProto
|
||||||
.newBuilder()
|
.newBuilder()
|
||||||
.setResult(result.getResult())
|
.setResult(result.getResult())
|
||||||
|
|
|
@ -169,7 +169,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
|
||||||
import org.apache.hadoop.hdfs.server.diskbalancer.DiskbalancerException;
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -386,6 +386,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
private static final int NUM_CORES = Runtime.getRuntime()
|
private static final int NUM_CORES = Runtime.getRuntime()
|
||||||
.availableProcessors();
|
.availableProcessors();
|
||||||
private static final double CONGESTION_RATIO = 1.5;
|
private static final double CONGESTION_RATIO = 1.5;
|
||||||
|
private DiskBalancer diskBalancer;
|
||||||
|
|
||||||
|
|
||||||
private static Tracer createTracer(Configuration conf) {
|
private static Tracer createTracer(Configuration conf) {
|
||||||
return new Tracer.Builder("DataNode").
|
return new Tracer.Builder("DataNode").
|
||||||
|
@ -1023,6 +1025,32 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initilizes {@link DiskBalancer}.
|
||||||
|
* @param data - FSDataSet
|
||||||
|
* @param conf - Config
|
||||||
|
*/
|
||||||
|
private synchronized void initDiskBalancer(FsDatasetSpi data,
|
||||||
|
Configuration conf) {
|
||||||
|
if (this.diskBalancer != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DiskBalancer.BlockMover mover = new DiskBalancer.DiskBalancerMover(data,
|
||||||
|
conf);
|
||||||
|
this.diskBalancer = new DiskBalancer(getDatanodeUuid(), conf, mover);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown disk balancer.
|
||||||
|
*/
|
||||||
|
private synchronized void shutdownDiskBalancer() {
|
||||||
|
if (this.diskBalancer != null) {
|
||||||
|
this.diskBalancer.shutdown();
|
||||||
|
this.diskBalancer = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void initDataXceiver(Configuration conf) throws IOException {
|
private void initDataXceiver(Configuration conf) throws IOException {
|
||||||
// find free port or use privileged port provided
|
// find free port or use privileged port provided
|
||||||
TcpPeerServer tcpPeerServer;
|
TcpPeerServer tcpPeerServer;
|
||||||
|
@ -1530,6 +1558,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
|
||||||
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
|
blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
|
||||||
initDirectoryScanner(conf);
|
initDirectoryScanner(conf);
|
||||||
|
initDiskBalancer(data, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<BPOfferService> getAllBpOs() {
|
List<BPOfferService> getAllBpOs() {
|
||||||
|
@ -1867,6 +1896,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
|
|
||||||
// Terminate directory scanner and block scanner
|
// Terminate directory scanner and block scanner
|
||||||
shutdownPeriodicScanners();
|
shutdownPeriodicScanners();
|
||||||
|
shutdownDiskBalancer();
|
||||||
|
|
||||||
// Stop the web server
|
// Stop the web server
|
||||||
if (httpServer != null) {
|
if (httpServer != null) {
|
||||||
|
@ -3296,31 +3326,30 @@ public class DataNode extends ReconfigurableBase
|
||||||
* @param bandwidth - Max disk bandwidth to use, 0 means use value defined
|
* @param bandwidth - Max disk bandwidth to use, 0 means use value defined
|
||||||
* in the configration.
|
* in the configration.
|
||||||
* @param plan - Actual plan
|
* @param plan - Actual plan
|
||||||
* @return success or throws an exception.
|
* @throws IOException
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void submitDiskBalancerPlan(String planID,
|
public void submitDiskBalancerPlan(String planID,
|
||||||
long planVersion, long bandwidth, String plan) throws IOException {
|
long planVersion, long bandwidth, String plan) throws IOException {
|
||||||
|
|
||||||
// TODO : This will be replaced with actual code later.
|
checkSuperuserPrivilege();
|
||||||
// Right now throwing DiskbalancerException instead
|
// TODO : Support force option
|
||||||
// NotImplementedException to indicate the eventually disk balancer code
|
this.diskBalancer.submitPlan(planID, planVersion, plan, bandwidth, false);
|
||||||
// will throw DiskbalancerException.
|
|
||||||
throw new DiskbalancerException("Not Implemented", 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelDiskBalancePlan(String planID) throws
|
public void cancelDiskBalancePlan(String planID) throws
|
||||||
IOException {
|
IOException {
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
throw new DiskbalancerException("Not Implemented", 0);
|
throw new DiskBalancerException("Not Implemented",
|
||||||
|
DiskBalancerException.Result.INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WorkStatus queryDiskBalancerPlan() throws IOException {
|
public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
throw new DiskbalancerException("Not Implemented", 0);
|
throw new DiskBalancerException("Not Implemented",
|
||||||
|
DiskBalancerException.Result.INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3334,6 +3363,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
@Override
|
@Override
|
||||||
public String getDiskBalancerSetting(String key) throws IOException {
|
public String getDiskBalancerSetting(String key) throws IOException {
|
||||||
checkSuperuserPrivilege();
|
checkSuperuserPrivilege();
|
||||||
throw new DiskbalancerException("Not Implemented", 0);
|
throw new DiskBalancerException("Not Implemented",
|
||||||
|
DiskBalancerException.Result.INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,542 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
|
||||||
|
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.charset.Charset;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Worker class for Disk Balancer.
|
||||||
|
* <p/>
|
||||||
|
* Here is the high level logic executed by this class. Users can submit disk
|
||||||
|
* balancing plans using submitPlan calls. After a set of sanity checks the plan
|
||||||
|
* is admitted and put into workMap.
|
||||||
|
* <p/>
|
||||||
|
* The executePlan launches a thread that picks up work from workMap and hands
|
||||||
|
* it over to the BlockMover#copyBlocks function.
|
||||||
|
* <p/>
|
||||||
|
* Constraints :
|
||||||
|
* <p/>
|
||||||
|
* Only one plan can be executing in a datanode at any given time. This is
|
||||||
|
* ensured by checking the future handle of the worker thread in submitPlan.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class DiskBalancer {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(DiskBalancer.class);
|
||||||
|
private final FsDatasetSpi<?> dataset;
|
||||||
|
private final String dataNodeUUID;
|
||||||
|
private final BlockMover blockMover;
|
||||||
|
private final ReentrantLock lock;
|
||||||
|
private final ConcurrentHashMap<VolumePair, DiskBalancerWorkItem> workMap;
|
||||||
|
private boolean isDiskBalancerEnabled = false;
|
||||||
|
private ExecutorService scheduler;
|
||||||
|
private Future future;
|
||||||
|
private String planID;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a Disk Balancer object. This object takes care of reading a
|
||||||
|
* NodePlan and executing it against a set of volumes.
|
||||||
|
*
|
||||||
|
* @param dataNodeUUID - Data node UUID
|
||||||
|
* @param conf - Hdfs Config
|
||||||
|
* @param blockMover - Object that supports moving blocks.
|
||||||
|
*/
|
||||||
|
public DiskBalancer(String dataNodeUUID,
|
||||||
|
Configuration conf, BlockMover blockMover) {
|
||||||
|
this.blockMover = blockMover;
|
||||||
|
this.dataset = this.blockMover.getDataset();
|
||||||
|
this.dataNodeUUID = dataNodeUUID;
|
||||||
|
scheduler = Executors.newSingleThreadExecutor();
|
||||||
|
lock = new ReentrantLock();
|
||||||
|
workMap = new ConcurrentHashMap<>();
|
||||||
|
this.isDiskBalancerEnabled = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED,
|
||||||
|
DFSConfigKeys.DFS_DISK_BALANCER_ENABLED_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown disk balancer services.
|
||||||
|
*/
|
||||||
|
public void shutdown() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
this.isDiskBalancerEnabled = false;
|
||||||
|
if ((this.future != null) && (!this.future.isDone())) {
|
||||||
|
this.blockMover.setExitFlag();
|
||||||
|
shutdownExecutor();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the executor.
|
||||||
|
*/
|
||||||
|
private void shutdownExecutor() {
|
||||||
|
scheduler.shutdown();
|
||||||
|
try {
|
||||||
|
if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
|
||||||
|
LOG.error("Disk Balancer : Scheduler did not terminate.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
scheduler.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Takes a client submitted plan and converts into a set of work items that
|
||||||
|
* can be executed by the blockMover.
|
||||||
|
*
|
||||||
|
* @param planID - A SHA512 of the plan string
|
||||||
|
* @param planVersion - version of the plan string - for future use.
|
||||||
|
* @param plan - Actual Plan
|
||||||
|
* @param bandwidth - BytesPerSec to copy
|
||||||
|
* @param force - Skip some validations and execute the plan file.
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
public void submitPlan(String planID, long planVersion, String plan,
|
||||||
|
long bandwidth, boolean force)
|
||||||
|
throws DiskBalancerException {
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
checkDiskBalancerEnabled();
|
||||||
|
if ((this.future != null) && (!this.future.isDone())) {
|
||||||
|
LOG.error("Disk Balancer - Executing another plan, submitPlan failed.");
|
||||||
|
throw new DiskBalancerException("Executing another plan",
|
||||||
|
DiskBalancerException.Result.PLAN_ALREADY_IN_PROGRESS);
|
||||||
|
}
|
||||||
|
NodePlan nodePlan =
|
||||||
|
verifyPlan(planID, planVersion, plan, bandwidth, force);
|
||||||
|
createWorkPlan(nodePlan);
|
||||||
|
this.planID = planID;
|
||||||
|
executePlan();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Throws if Disk balancer is disabled.
|
||||||
|
*
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
private void checkDiskBalancerEnabled()
|
||||||
|
throws DiskBalancerException {
|
||||||
|
if (!isDiskBalancerEnabled) {
|
||||||
|
LOG.error("Disk Balancer is not enabled.");
|
||||||
|
throw new DiskBalancerException("Disk Balancer is not enabled.",
|
||||||
|
DiskBalancerException.Result.DISK_BALANCER_NOT_ENABLED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that user provided plan is valid.
|
||||||
|
*
|
||||||
|
* @param planID - SHA 512 of the plan.
|
||||||
|
* @param planVersion - Version of the plan, for future use.
|
||||||
|
* @param plan - Plan String in Json.
|
||||||
|
* @param bandwidth - Max disk bandwidth to use per second.
|
||||||
|
* @param force - Skip verifying when the plan was generated.
|
||||||
|
* @return a NodePlan Object.
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
private NodePlan verifyPlan(String planID, long planVersion, String plan,
|
||||||
|
long bandwidth, boolean force)
|
||||||
|
throws DiskBalancerException {
|
||||||
|
|
||||||
|
Preconditions.checkState(lock.isHeldByCurrentThread());
|
||||||
|
verifyPlanVersion(planVersion);
|
||||||
|
NodePlan nodePlan = verifyPlanHash(planID, plan);
|
||||||
|
if (!force) {
|
||||||
|
verifyTimeStamp(nodePlan);
|
||||||
|
}
|
||||||
|
verifyNodeUUID(nodePlan);
|
||||||
|
return nodePlan;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies the plan version is something that we support.
|
||||||
|
*
|
||||||
|
* @param planVersion - Long version.
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
private void verifyPlanVersion(long planVersion)
|
||||||
|
throws DiskBalancerException {
|
||||||
|
if ((planVersion < DiskBalancerConstants.DISKBALANCER_MIN_VERSION) ||
|
||||||
|
(planVersion > DiskBalancerConstants.DISKBALANCER_MAX_VERSION)) {
|
||||||
|
LOG.error("Disk Balancer - Invalid plan version.");
|
||||||
|
throw new DiskBalancerException("Invalid plan version.",
|
||||||
|
DiskBalancerException.Result.INVALID_PLAN_VERSION);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that plan matches the SHA512 provided by the client.
|
||||||
|
*
|
||||||
|
* @param planID - Sha512 Hex Bytes
|
||||||
|
* @param plan - Plan String
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
private NodePlan verifyPlanHash(String planID, String plan)
|
||||||
|
throws DiskBalancerException {
|
||||||
|
final long sha512Length = 128;
|
||||||
|
if (plan == null || plan.length() == 0) {
|
||||||
|
LOG.error("Disk Balancer - Invalid plan.");
|
||||||
|
throw new DiskBalancerException("Invalid plan.",
|
||||||
|
DiskBalancerException.Result.INVALID_PLAN);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((planID == null) ||
|
||||||
|
(planID.length() != sha512Length) ||
|
||||||
|
!DigestUtils.sha512Hex(plan.getBytes(Charset.forName("UTF-8")))
|
||||||
|
.equalsIgnoreCase(planID)) {
|
||||||
|
LOG.error("Disk Balancer - Invalid plan hash.");
|
||||||
|
throw new DiskBalancerException("Invalid or mis-matched hash.",
|
||||||
|
DiskBalancerException.Result.INVALID_PLAN_HASH);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return NodePlan.parseJson(plan);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new DiskBalancerException("Parsing plan failed.", ex,
|
||||||
|
DiskBalancerException.Result.MALFORMED_PLAN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that this plan is not older than 24 hours.
|
||||||
|
*
|
||||||
|
* @param plan - Node Plan
|
||||||
|
*/
|
||||||
|
private void verifyTimeStamp(NodePlan plan) throws DiskBalancerException {
|
||||||
|
long now = Time.now();
|
||||||
|
long planTime = plan.getTimeStamp();
|
||||||
|
|
||||||
|
// TODO : Support Valid Plan hours as a user configurable option.
|
||||||
|
if ((planTime +
|
||||||
|
(TimeUnit.HOURS.toMillis(
|
||||||
|
DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS))) < now) {
|
||||||
|
String hourString = "Plan was generated more than " +
|
||||||
|
Integer.toString(DiskBalancerConstants.DISKBALANCER_VALID_PLAN_HOURS)
|
||||||
|
+ " hours ago.";
|
||||||
|
LOG.error("Disk Balancer - " + hourString);
|
||||||
|
throw new DiskBalancerException(hourString,
|
||||||
|
DiskBalancerException.Result.OLD_PLAN_SUBMITTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify Node UUID.
|
||||||
|
*
|
||||||
|
* @param plan - Node Plan
|
||||||
|
*/
|
||||||
|
private void verifyNodeUUID(NodePlan plan) throws DiskBalancerException {
|
||||||
|
if ((plan.getNodeUUID() == null) ||
|
||||||
|
!plan.getNodeUUID().equals(this.dataNodeUUID)) {
|
||||||
|
LOG.error("Disk Balancer - Plan was generated for another node.");
|
||||||
|
throw new DiskBalancerException(
|
||||||
|
"Plan was generated for another node.",
|
||||||
|
DiskBalancerException.Result.DATANODE_ID_MISMATCH);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a node plan to DiskBalancerWorkItem that Datanode can execute.
|
||||||
|
*
|
||||||
|
* @param plan - Node Plan
|
||||||
|
*/
|
||||||
|
private void createWorkPlan(NodePlan plan) throws DiskBalancerException {
|
||||||
|
Preconditions.checkState(lock.isHeldByCurrentThread());
|
||||||
|
|
||||||
|
// Cleanup any residual work in the map.
|
||||||
|
workMap.clear();
|
||||||
|
Map<String, FsVolumeSpi> pathMap = getStorageIDToVolumeMap();
|
||||||
|
|
||||||
|
for (Step step : plan.getVolumeSetPlans()) {
|
||||||
|
String sourceuuid = step.getSourceVolume().getUuid();
|
||||||
|
String destinationuuid = step.getDestinationVolume().getUuid();
|
||||||
|
|
||||||
|
FsVolumeSpi sourceVol = pathMap.get(sourceuuid);
|
||||||
|
if (sourceVol == null) {
|
||||||
|
LOG.error("Disk Balancer - Unable to find source volume. submitPlan " +
|
||||||
|
"failed.");
|
||||||
|
throw new DiskBalancerException("Unable to find source volume.",
|
||||||
|
DiskBalancerException.Result.INVALID_VOLUME);
|
||||||
|
}
|
||||||
|
|
||||||
|
FsVolumeSpi destVol = pathMap.get(destinationuuid);
|
||||||
|
if (destVol == null) {
|
||||||
|
LOG.error("Disk Balancer - Unable to find destination volume. " +
|
||||||
|
"submitPlan failed.");
|
||||||
|
throw new DiskBalancerException("Unable to find destination volume.",
|
||||||
|
DiskBalancerException.Result.INVALID_VOLUME);
|
||||||
|
}
|
||||||
|
createWorkPlan(sourceVol, destVol, step.getBytesToMove());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a path to Volume Map.
|
||||||
|
*
|
||||||
|
* @return Map
|
||||||
|
* @throws DiskBalancerException
|
||||||
|
*/
|
||||||
|
private Map<String, FsVolumeSpi> getStorageIDToVolumeMap()
|
||||||
|
throws DiskBalancerException {
|
||||||
|
Map<String, FsVolumeSpi> pathMap = new HashMap<>();
|
||||||
|
FsDatasetSpi.FsVolumeReferences references;
|
||||||
|
try {
|
||||||
|
synchronized (this.dataset) {
|
||||||
|
references = this.dataset.getFsVolumeReferences();
|
||||||
|
for (int ndx = 0; ndx < references.size(); ndx++) {
|
||||||
|
FsVolumeSpi vol = references.get(ndx);
|
||||||
|
pathMap.put(vol.getStorageID(), vol);
|
||||||
|
}
|
||||||
|
references.close();
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
LOG.error("Disk Balancer - Internal Error.", ex);
|
||||||
|
throw new DiskBalancerException("Internal error", ex,
|
||||||
|
DiskBalancerException.Result.INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
return pathMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts Executing the plan, exits when the plan is done executing.
|
||||||
|
*/
|
||||||
|
private void executePlan() {
|
||||||
|
Preconditions.checkState(lock.isHeldByCurrentThread());
|
||||||
|
this.blockMover.setRunnable();
|
||||||
|
if (this.scheduler.isShutdown()) {
|
||||||
|
this.scheduler = Executors.newSingleThreadExecutor();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.future = scheduler.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Thread.currentThread().setName("DiskBalancerThread");
|
||||||
|
LOG.info("Executing Disk balancer plan. Plan ID - " + planID);
|
||||||
|
|
||||||
|
for (Map.Entry<VolumePair, DiskBalancerWorkItem> entry :
|
||||||
|
workMap.entrySet()) {
|
||||||
|
blockMover.copyBlocks(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert work items to work map.
|
||||||
|
*
|
||||||
|
* @param source - Source vol
|
||||||
|
* @param dest - destination volume
|
||||||
|
* @param bytesToMove - number of bytes to move
|
||||||
|
*/
|
||||||
|
private void createWorkPlan(FsVolumeSpi source, FsVolumeSpi dest,
|
||||||
|
long bytesToMove) throws DiskBalancerException {
|
||||||
|
|
||||||
|
if(source.getStorageID().equals(dest.getStorageID())) {
|
||||||
|
throw new DiskBalancerException("Same source and destination",
|
||||||
|
DiskBalancerException.Result.INVALID_MOVE);
|
||||||
|
}
|
||||||
|
VolumePair pair = new VolumePair(source, dest);
|
||||||
|
|
||||||
|
// In case we have a plan with more than
|
||||||
|
// one line of same <source, dest>
|
||||||
|
// we compress that into one work order.
|
||||||
|
if (workMap.containsKey(pair)) {
|
||||||
|
bytesToMove += workMap.get(pair).getBytesToCopy();
|
||||||
|
}
|
||||||
|
|
||||||
|
DiskBalancerWorkItem work = new DiskBalancerWorkItem(bytesToMove, 0);
|
||||||
|
workMap.put(pair, work);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BlockMover supports moving blocks across Volumes.
|
||||||
|
*/
|
||||||
|
public interface BlockMover {
|
||||||
|
/**
|
||||||
|
* Copies blocks from a set of volumes.
|
||||||
|
*
|
||||||
|
* @param pair - Source and Destination Volumes.
|
||||||
|
* @param item - Number of bytes to move from volumes.
|
||||||
|
*/
|
||||||
|
void copyBlocks(VolumePair pair, DiskBalancerWorkItem item);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin the actual copy operations. This is useful in testing.
|
||||||
|
*/
|
||||||
|
void setRunnable();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells copyBlocks to exit from the copy routine.
|
||||||
|
*/
|
||||||
|
void setExitFlag();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a pointer to the current dataset we are operating against.
|
||||||
|
*
|
||||||
|
* @return FsDatasetSpi
|
||||||
|
*/
|
||||||
|
FsDatasetSpi getDataset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Holds references to actual volumes that we will be operating against.
|
||||||
|
*/
|
||||||
|
static class VolumePair {
|
||||||
|
private final FsVolumeSpi source;
|
||||||
|
private final FsVolumeSpi dest;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a volume pair.
|
||||||
|
*
|
||||||
|
* @param source - Source Volume
|
||||||
|
* @param dest - Destination Volume
|
||||||
|
*/
|
||||||
|
public VolumePair(FsVolumeSpi source, FsVolumeSpi dest) {
|
||||||
|
this.source = source;
|
||||||
|
this.dest = dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* gets source volume.
|
||||||
|
*
|
||||||
|
* @return volume
|
||||||
|
*/
|
||||||
|
public FsVolumeSpi getSource() {
|
||||||
|
return source;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets Destination volume.
|
||||||
|
*
|
||||||
|
* @return volume.
|
||||||
|
*/
|
||||||
|
public FsVolumeSpi getDest() {
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (o == null || getClass() != o.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
VolumePair that = (VolumePair) o;
|
||||||
|
return source.equals(that.source) && dest.equals(that.dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = source.getBasePath().hashCode();
|
||||||
|
result = 31 * result + dest.getBasePath().hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Actual DataMover class for DiskBalancer.
|
||||||
|
* <p/>
|
||||||
|
* TODO : Add implementation for this class. This is here as a place holder so
|
||||||
|
* that Datanode can make calls into this class.
|
||||||
|
*/
|
||||||
|
public static class DiskBalancerMover implements BlockMover {
|
||||||
|
private final FsDatasetSpi dataset;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs diskBalancerMover.
|
||||||
|
*
|
||||||
|
* @param dataset Dataset
|
||||||
|
* @param conf Configuration
|
||||||
|
*/
|
||||||
|
public DiskBalancerMover(FsDatasetSpi dataset, Configuration conf) {
|
||||||
|
this.dataset = dataset;
|
||||||
|
// TODO : Read Config values.
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Copies blocks from a set of volumes.
|
||||||
|
*
|
||||||
|
* @param pair - Source and Destination Volumes.
|
||||||
|
* @param item - Number of bytes to move from volumes.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin the actual copy operations. This is useful in testing.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setRunnable() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tells copyBlocks to exit from the copy routine.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void setExitFlag() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a pointer to the current dataset we are operating against.
|
||||||
|
*
|
||||||
|
* @return FsDatasetSpi
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public FsDatasetSpi getDataset() {
|
||||||
|
return dataset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,6 +29,15 @@ public final class DiskBalancerConstants {
|
||||||
public static final String DISKBALANCER_VOLUME_NAME =
|
public static final String DISKBALANCER_VOLUME_NAME =
|
||||||
"DiskBalancerVolumeName";
|
"DiskBalancerVolumeName";
|
||||||
|
|
||||||
|
/** Min and Max Plan file versions that we know of. **/
|
||||||
|
public static final int DISKBALANCER_MIN_VERSION = 1;
|
||||||
|
public static final int DISKBALANCER_MAX_VERSION = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We treat a plan as stale if it was generated before the hours
|
||||||
|
* defined by the constant below. Defaults to 24 hours.
|
||||||
|
*/
|
||||||
|
public static final int DISKBALANCER_VALID_PLAN_HOURS = 24;
|
||||||
// never constructed.
|
// never constructed.
|
||||||
private DiskBalancerConstants() {
|
private DiskBalancerConstants() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,19 @@
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* contributor license agreements. See the NOTICE file distributed with this
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* work for additional information regarding copyright ownership. The ASF
|
* distributed with this work for additional information
|
||||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
* "License"); you may not use this file except in compliance with the License.
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* You may obtain a copy of the License at
|
* "License"); you may not use this file except in compliance
|
||||||
* <p/>
|
* with the License. You may obtain a copy of the License at
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
||||||
* <p/>
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* License for the specific language governing permissions and limitations under
|
* See the License for the specific language governing permissions and
|
||||||
* the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.diskbalancer;
|
package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
|
@ -21,17 +22,31 @@ import java.io.IOException;
|
||||||
/**
|
/**
|
||||||
* Disk Balancer Exceptions.
|
* Disk Balancer Exceptions.
|
||||||
*/
|
*/
|
||||||
public class DiskbalancerException extends IOException {
|
public class DiskBalancerException extends IOException {
|
||||||
private int result;
|
/** Possible results from DiskBalancer. **/
|
||||||
|
public enum Result {
|
||||||
|
DISK_BALANCER_NOT_ENABLED,
|
||||||
|
INVALID_PLAN_VERSION,
|
||||||
|
INVALID_PLAN,
|
||||||
|
INVALID_PLAN_HASH,
|
||||||
|
OLD_PLAN_SUBMITTED,
|
||||||
|
DATANODE_ID_MISMATCH,
|
||||||
|
MALFORMED_PLAN,
|
||||||
|
PLAN_ALREADY_IN_PROGRESS,
|
||||||
|
INVALID_VOLUME,
|
||||||
|
INVALID_MOVE,
|
||||||
|
INTERNAL_ERROR
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Result result;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs an {@code IOException} with the specified detail message.
|
* Constructs an {@code IOException} with the specified detail message.
|
||||||
*
|
*
|
||||||
* @param message The detail message (which is saved for later retrieval by
|
* @param message The detail message (which is saved for later retrieval by
|
||||||
* the
|
* the {@link #getMessage()} method)
|
||||||
* {@link #getMessage()} method)
|
|
||||||
*/
|
*/
|
||||||
public DiskbalancerException(String message, int result) {
|
public DiskBalancerException(String message, Result result) {
|
||||||
super(message);
|
super(message);
|
||||||
this.result = result;
|
this.result = result;
|
||||||
}
|
}
|
||||||
|
@ -50,9 +65,8 @@ public class DiskbalancerException extends IOException {
|
||||||
* @param cause The cause (which is saved for later retrieval by the {@link
|
* @param cause The cause (which is saved for later retrieval by the {@link
|
||||||
* #getCause()} method). (A null value is permitted, and
|
* #getCause()} method). (A null value is permitted, and
|
||||||
* indicates that the cause is nonexistent or unknown.)
|
* indicates that the cause is nonexistent or unknown.)
|
||||||
* @since 1.6
|
|
||||||
*/
|
*/
|
||||||
public DiskbalancerException(String message, Throwable cause, int result) {
|
public DiskBalancerException(String message, Throwable cause, Result result) {
|
||||||
super(message, cause);
|
super(message, cause);
|
||||||
this.result = result;
|
this.result = result;
|
||||||
}
|
}
|
||||||
|
@ -61,17 +75,15 @@ public class DiskbalancerException extends IOException {
|
||||||
* Constructs an {@code IOException} with the specified cause and a detail
|
* Constructs an {@code IOException} with the specified cause and a detail
|
||||||
* message of {@code (cause==null ? null : cause.toString())} (which typically
|
* message of {@code (cause==null ? null : cause.toString())} (which typically
|
||||||
* contains the class and detail message of {@code cause}). This
|
* contains the class and detail message of {@code cause}). This
|
||||||
* constructor is
|
* constructor is useful for IO exceptions that are little more than
|
||||||
* useful for IO exceptions that are little more than wrappers for other
|
* wrappers for other throwables.
|
||||||
* throwables.
|
|
||||||
*
|
*
|
||||||
* @param cause The cause (which is saved for later retrieval by the {@link
|
* @param cause The cause (which is saved for later retrieval by the {@link
|
||||||
* #getCause()} method). (A null value is permitted, and
|
* #getCause()} method). (A null value is permitted, and
|
||||||
* indicates
|
* indicates
|
||||||
* that the cause is nonexistent or unknown.)
|
* that the cause is nonexistent or unknown.)
|
||||||
* @since 1.6
|
|
||||||
*/
|
*/
|
||||||
public DiskbalancerException(Throwable cause, int result) {
|
public DiskBalancerException(Throwable cause, Result result) {
|
||||||
super(cause);
|
super(cause);
|
||||||
this.result = result;
|
this.result = result;
|
||||||
}
|
}
|
||||||
|
@ -80,7 +92,7 @@ public class DiskbalancerException extends IOException {
|
||||||
* Returns the result.
|
* Returns the result.
|
||||||
* @return int
|
* @return int
|
||||||
*/
|
*/
|
||||||
public int getResult() {
|
public Result getResult() {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -358,4 +358,18 @@ public class DiskBalancerCluster {
|
||||||
return (10 - modValue) + threadRatio;
|
return (10 - modValue) + threadRatio;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a node by UUID.
|
||||||
|
* @param uuid - Node's UUID
|
||||||
|
* @return DiskBalancerDataNode.
|
||||||
|
*/
|
||||||
|
public DiskBalancerDataNode getNodeByUUID(String uuid) {
|
||||||
|
for(DiskBalancerDataNode node : this.getNodes()) {
|
||||||
|
if(node.getDataNodeUUID().equals(uuid)) {
|
||||||
|
return node;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.diskbalancer;
|
||||||
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
@ -35,9 +36,6 @@ import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
public class TestDiskBalancerRPC {
|
public class TestDiskBalancerRPC {
|
||||||
@Rule
|
@Rule
|
||||||
public ExpectedException thrown = ExpectedException.none();
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
@ -48,6 +46,7 @@ public class TestDiskBalancerRPC {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
conf = new HdfsConfiguration();
|
conf = new HdfsConfiguration();
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
}
|
}
|
||||||
|
@ -72,22 +71,19 @@ public class TestDiskBalancerRPC {
|
||||||
Assert.assertEquals(cluster.getDataNodes().size(),
|
Assert.assertEquals(cluster.getDataNodes().size(),
|
||||||
diskBalancerCluster.getNodes().size());
|
diskBalancerCluster.getNodes().size());
|
||||||
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
|
diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
|
||||||
DiskBalancerDataNode node = diskBalancerCluster.getNodes().get(dnIndex);
|
|
||||||
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
|
DiskBalancerDataNode node = diskBalancerCluster.getNodeByUUID(
|
||||||
|
dataNode.getDatanodeUuid());
|
||||||
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
|
GreedyPlanner planner = new GreedyPlanner(10.0f, node);
|
||||||
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
|
NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
|
||||||
());
|
());
|
||||||
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
|
planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
|
||||||
final int planVersion = 0; // So far we support only one version.
|
final int planVersion = 1; // So far we support only one version.
|
||||||
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
|
||||||
|
|
||||||
String planHash = DigestUtils.sha512Hex(plan.toJson());
|
String planHash = DigestUtils.sha512Hex(plan.toJson());
|
||||||
|
|
||||||
// Since submitDiskBalancerPlan is not implemented yet, it throws an
|
|
||||||
// Exception, this will be modified with the actual implementation.
|
|
||||||
thrown.expect(DiskbalancerException.class);
|
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -117,10 +113,10 @@ public class TestDiskBalancerRPC {
|
||||||
// Exception, this will be modified with the actual implementation.
|
// Exception, this will be modified with the actual implementation.
|
||||||
try {
|
try {
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
} catch (DiskbalancerException ex) {
|
} catch (DiskBalancerException ex) {
|
||||||
// Let us ignore this for time being.
|
// Let us ignore this for time being.
|
||||||
}
|
}
|
||||||
thrown.expect(DiskbalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
dataNode.cancelDiskBalancePlan(planHash);
|
dataNode.cancelDiskBalancePlan(planHash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,13 +148,13 @@ public class TestDiskBalancerRPC {
|
||||||
// Exception, this will be modified with the actual implementation.
|
// Exception, this will be modified with the actual implementation.
|
||||||
try {
|
try {
|
||||||
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
|
||||||
} catch (DiskbalancerException ex) {
|
} catch (DiskBalancerException ex) {
|
||||||
// Let us ignore this for time being.
|
// Let us ignore this for time being.
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO : This will be fixed when we have implementation for this
|
// TODO : This will be fixed when we have implementation for this
|
||||||
// function in server side.
|
// function in server side.
|
||||||
thrown.expect(DiskbalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
dataNode.queryDiskBalancerPlan();
|
dataNode.queryDiskBalancerPlan();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,7 +162,7 @@ public class TestDiskBalancerRPC {
|
||||||
public void testgetDiskBalancerSetting() throws Exception {
|
public void testgetDiskBalancerSetting() throws Exception {
|
||||||
final int dnIndex = 0;
|
final int dnIndex = 0;
|
||||||
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
DataNode dataNode = cluster.getDataNodes().get(dnIndex);
|
||||||
thrown.expect(DiskbalancerException.class);
|
thrown.expect(DiskBalancerException.class);
|
||||||
dataNode.getDiskBalancerSetting(
|
dataNode.getDiskBalancerSetting(
|
||||||
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
DiskBalancerConstants.DISKBALANCER_BANDWIDTH);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue