HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of datanodes without restarting. Contributed by Eric Payne
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1152401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8830af00ee
commit
8390152d08
|
@ -303,6 +303,9 @@ Trunk (unreleased changes)
|
||||||
HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major
|
HDFS-2156. Make hdfs and mapreduce rpm only depend on the same major
|
||||||
version for common and hdfs. (eyang via omalley)
|
version for common and hdfs. (eyang via omalley)
|
||||||
|
|
||||||
|
HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
|
||||||
|
datanodes without restarting. (Eric Payne via szetszwo)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
|
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
|
||||||
|
|
|
@ -1269,6 +1269,18 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
public void metaSave(String pathname) throws IOException {
|
public void metaSave(String pathname) throws IOException {
|
||||||
namenode.metaSave(pathname);
|
namenode.metaSave(pathname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Requests the namenode to tell all datanodes to use a new, non-persistent
|
||||||
|
* bandwidth value for dfs.balance.bandwidthPerSec.
|
||||||
|
* See {@link ClientProtocol#setBalancerBandwidth(long)}
|
||||||
|
* for more details.
|
||||||
|
*
|
||||||
|
* @see ClientProtocol#setBalancerBandwidth(long)
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||||
|
namenode.setBalancerBandwidth(bandwidth);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see ClientProtocol#finalizeUpgrade()
|
* @see ClientProtocol#finalizeUpgrade()
|
||||||
|
|
|
@ -867,4 +867,17 @@ public class DistributedFileSystem extends FileSystem {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
dfs.cancelDelegationToken(token);
|
dfs.cancelDelegationToken(token);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Requests the namenode to tell all datanodes to use a new, non-persistent
|
||||||
|
* bandwidth value for dfs.balance.bandwidthPerSec.
|
||||||
|
* The bandwidth parameter is the max number of bytes per second of network
|
||||||
|
* bandwidth to be used by a datanode during balancing.
|
||||||
|
*
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||||
|
dfs.setBalancerBandwidth(bandwidth);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,9 +67,9 @@ public interface ClientProtocol extends VersionedProtocol {
|
||||||
* Compared to the previous version the following changes have been introduced:
|
* Compared to the previous version the following changes have been introduced:
|
||||||
* (Only the latest change is reflected.
|
* (Only the latest change is reflected.
|
||||||
* The log of historical changes can be retrieved from the svn).
|
* The log of historical changes can be retrieved from the svn).
|
||||||
* 67: Add block pool ID to Block
|
* 68: Add Balancer Bandwidth Command protocol
|
||||||
*/
|
*/
|
||||||
public static final long versionID = 67L;
|
public static final long versionID = 68L;
|
||||||
|
|
||||||
///////////////////////////////////////
|
///////////////////////////////////////
|
||||||
// File contents
|
// File contents
|
||||||
|
@ -715,6 +715,15 @@ public interface ClientProtocol extends VersionedProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void metaSave(String filename) throws IOException;
|
public void metaSave(String filename) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell all datanodes to use a new, non-persistent bandwidth value for
|
||||||
|
* dfs.balance.bandwidthPerSec.
|
||||||
|
*
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the file info for a specific file or directory.
|
* Get the file info for a specific file or directory.
|
||||||
|
|
|
@ -106,6 +106,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
public boolean isAlive = false;
|
public boolean isAlive = false;
|
||||||
public boolean needKeyUpdate = false;
|
public boolean needKeyUpdate = false;
|
||||||
|
|
||||||
|
// A system administrator can tune the balancer bandwidth parameter
|
||||||
|
// (dfs.balance.bandwidthPerSec) dynamically by calling
|
||||||
|
// "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
|
||||||
|
// following 'bandwidth' variable gets updated with the new value for each
|
||||||
|
// node. Once the heartbeat command is issued to update the value on the
|
||||||
|
// specified datanode, this value will be set back to 0.
|
||||||
|
private long bandwidth;
|
||||||
|
|
||||||
/** A queue of blocks to be replicated by this datanode */
|
/** A queue of blocks to be replicated by this datanode */
|
||||||
private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
|
private BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<BlockTargetPair>();
|
||||||
/** A queue of blocks to be recovered by this datanode */
|
/** A queue of blocks to be recovered by this datanode */
|
||||||
|
@ -569,4 +577,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
public void updateRegInfo(DatanodeID nodeReg) {
|
public void updateRegInfo(DatanodeID nodeReg) {
|
||||||
super.updateRegInfo(nodeReg);
|
super.updateRegInfo(nodeReg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Blanacer bandwidth in bytes per second for this datanode.
|
||||||
|
*/
|
||||||
|
public long getBalancerBandwidth() {
|
||||||
|
return this.bandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) {
|
||||||
|
this.bandwidth = bandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
|
||||||
import org.apache.hadoop.net.DNSToSwitchMapping;
|
import org.apache.hadoop.net.DNSToSwitchMapping;
|
||||||
|
@ -749,7 +750,7 @@ public class DatanodeManager {
|
||||||
return new DatanodeCommand[] { brCommand };
|
return new DatanodeCommand[] { brCommand };
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
|
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
|
||||||
//check pending replication
|
//check pending replication
|
||||||
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
|
||||||
maxTransfers);
|
maxTransfers);
|
||||||
|
@ -765,6 +766,14 @@ public class DatanodeManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
namesystem.addKeyUpdateCommand(cmds, nodeinfo);
|
namesystem.addKeyUpdateCommand(cmds, nodeinfo);
|
||||||
|
|
||||||
|
// check for balancer bandwidth update
|
||||||
|
if (nodeinfo.getBalancerBandwidth() > 0) {
|
||||||
|
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
|
||||||
|
// set back to 0 to indicate that datanode has been sent the new value
|
||||||
|
nodeinfo.setBalancerBandwidth(0);
|
||||||
|
}
|
||||||
|
|
||||||
if (!cmds.isEmpty()) {
|
if (!cmds.isEmpty()) {
|
||||||
return cmds.toArray(new DatanodeCommand[cmds.size()]);
|
return cmds.toArray(new DatanodeCommand[cmds.size()]);
|
||||||
}
|
}
|
||||||
|
@ -773,4 +782,26 @@ public class DatanodeManager {
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell all datanodes to use a new, non-persistent bandwidth value for
|
||||||
|
* dfs.balance.bandwidthPerSec.
|
||||||
|
*
|
||||||
|
* A system administrator can tune the balancer bandwidth parameter
|
||||||
|
* (dfs.datanode.balance.bandwidthPerSec) dynamically by calling
|
||||||
|
* "dfsadmin -setBalanacerBandwidth newbandwidth", at which point the
|
||||||
|
* following 'bandwidth' variable gets updated with the new value for each
|
||||||
|
* node. Once the heartbeat command is issued to update the value on the
|
||||||
|
* specified datanode, this value will be set back to 0.
|
||||||
|
*
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||||
|
synchronized(datanodeMap) {
|
||||||
|
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
|
||||||
|
nodeInfo.setBalancerBandwidth(bandwidth);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||||
import org.apache.hadoop.http.HttpServer;
|
import org.apache.hadoop.http.HttpServer;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
@ -1340,6 +1341,16 @@ public class DataNode extends Configured
|
||||||
((KeyUpdateCommand) cmd).getExportedKeys());
|
((KeyUpdateCommand) cmd).getExportedKeys());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
|
||||||
|
LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
|
||||||
|
long bandwidth =
|
||||||
|
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
|
||||||
|
if (bandwidth > 0) {
|
||||||
|
DataXceiverServer dxcs =
|
||||||
|
(DataXceiverServer) dataXceiverServer.getRunnable();
|
||||||
|
dxcs.balanceThrottler.setBandwidth(bandwidth);
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
|
LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
|
||||||
}
|
}
|
||||||
|
@ -2774,4 +2785,15 @@ public class DataNode extends Configured
|
||||||
return new DatanodeID(getMachineName(), getStorageId(),
|
return new DatanodeID(getMachineName(), getStorageId(),
|
||||||
infoServer.getPort(), getIpcPort());
|
infoServer.getPort(), getIpcPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current value of the max balancer bandwidth in bytes per second.
|
||||||
|
*
|
||||||
|
* @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
||||||
|
*/
|
||||||
|
public Long getBalancerBandwidth() {
|
||||||
|
DataXceiverServer dxcs =
|
||||||
|
(DataXceiverServer) this.dataXceiverServer.getRunnable();
|
||||||
|
return dxcs.balanceThrottler.getBandwidth();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5066,5 +5066,13 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
|
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell all datanodes to use a new, non-persistent bandwidth value for
|
||||||
|
* dfs.datanode.balance.bandwidthPerSec.
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||||
|
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1089,6 +1089,16 @@ public class NameNode implements NamenodeProtocols, FSConstants {
|
||||||
}
|
}
|
||||||
return new CorruptFileBlocks(files, lastCookie);
|
return new CorruptFileBlocks(files, lastCookie);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tell all datanodes to use a new, non-persistent bandwidth value for
|
||||||
|
* dfs.datanode.balance.bandwidthPerSec.
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setBalancerBandwidth(long bandwidth) throws IOException {
|
||||||
|
namesystem.setBalancerBandwidth(bandwidth);
|
||||||
|
}
|
||||||
|
|
||||||
@Override // ClientProtocol
|
@Override // ClientProtocol
|
||||||
public ContentSummary getContentSummary(String path) throws IOException {
|
public ContentSummary getContentSummary(String path) throws IOException {
|
||||||
|
|
|
@ -0,0 +1,105 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A system administrator can tune the balancer bandwidth parameter
|
||||||
|
* (dfs.balance.bandwidthPerSec) dynamically by calling
|
||||||
|
* "dfsadmin -setBalanacerBandwidth newbandwidth".
|
||||||
|
* This class is to define the command which sends the new bandwidth value to
|
||||||
|
* each datanode.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableFactories;
|
||||||
|
import org.apache.hadoop.io.WritableFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Balancer bandwidth command instructs each datanode to change its value for
|
||||||
|
* the max amount of network bandwidth it may use during the block balancing
|
||||||
|
* operation.
|
||||||
|
*
|
||||||
|
* The Balancer Bandwidth Command contains the new bandwidth value as its
|
||||||
|
* payload. The bandwidth value is in bytes per second.
|
||||||
|
*/
|
||||||
|
public class BalancerBandwidthCommand extends DatanodeCommand {
|
||||||
|
private final static long BBC_DEFAULTBANDWIDTH = 0L;
|
||||||
|
|
||||||
|
private long bandwidth;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Balancer Bandwidth Command constructor. Sets bandwidth to 0.
|
||||||
|
*/
|
||||||
|
BalancerBandwidthCommand() {
|
||||||
|
this(BBC_DEFAULTBANDWIDTH);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Balancer Bandwidth Command constructor.
|
||||||
|
*
|
||||||
|
* @param bandwidth Blanacer bandwidth in bytes per second.
|
||||||
|
*/
|
||||||
|
public BalancerBandwidthCommand(long bandwidth) {
|
||||||
|
super(DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE);
|
||||||
|
this.bandwidth = bandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get current value of the max balancer bandwidth in bytes per second.
|
||||||
|
*
|
||||||
|
* @return bandwidth Blanacer bandwidth in bytes per second for this datanode.
|
||||||
|
*/
|
||||||
|
public long getBalancerBandwidthValue() {
|
||||||
|
return this.bandwidth;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ///////////////////////////////////////////////
|
||||||
|
// Writable
|
||||||
|
// ///////////////////////////////////////////////
|
||||||
|
static { // register a ctor
|
||||||
|
WritableFactories.setFactory(BalancerBandwidthCommand.class, new WritableFactory() {
|
||||||
|
public Writable newInstance() {
|
||||||
|
return new BalancerBandwidthCommand();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the bandwidth payload to the Balancer Bandwidth Command packet.
|
||||||
|
* @param out DataOutput stream used for writing commands to the datanode.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
super.write(out);
|
||||||
|
out.writeLong(this.bandwidth);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the bandwidth payload from the Balancer Bandwidth Command packet.
|
||||||
|
* @param in DataInput stream used for reading commands to the datanode.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
super.readFields(in);
|
||||||
|
this.bandwidth = in.readLong();
|
||||||
|
}
|
||||||
|
}
|
|
@ -45,9 +45,9 @@ import org.apache.avro.reflect.Nullable;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface DatanodeProtocol extends VersionedProtocol {
|
public interface DatanodeProtocol extends VersionedProtocol {
|
||||||
/**
|
/**
|
||||||
* 27: Add block pool ID to Block
|
* 28: Add Balancer Bandwidth Command protocol.
|
||||||
*/
|
*/
|
||||||
public static final long versionID = 27L;
|
public static final long versionID = 28L;
|
||||||
|
|
||||||
// error code
|
// error code
|
||||||
final static int NOTIFY = 0;
|
final static int NOTIFY = 0;
|
||||||
|
@ -67,6 +67,7 @@ public interface DatanodeProtocol extends VersionedProtocol {
|
||||||
final static int DNA_FINALIZE = 5; // finalize previous upgrade
|
final static int DNA_FINALIZE = 5; // finalize previous upgrade
|
||||||
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
|
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
|
||||||
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
|
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
|
||||||
|
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register Datanode.
|
* Register Datanode.
|
||||||
|
|
|
@ -452,6 +452,40 @@ public class DFSAdmin extends FsShell {
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Command to ask the namenode to set the balancer bandwidth for all of the
|
||||||
|
* datanodes.
|
||||||
|
* Usage: java DFSAdmin -setBalancerBandwidth bandwidth
|
||||||
|
* @param argv List of of command line parameters.
|
||||||
|
* @param idx The index of the command that is being processed.
|
||||||
|
* @exception IOException
|
||||||
|
*/
|
||||||
|
public int setBalancerBandwidth(String[] argv, int idx) throws IOException {
|
||||||
|
long bandwidth;
|
||||||
|
int exitCode = -1;
|
||||||
|
|
||||||
|
try {
|
||||||
|
bandwidth = Long.parseLong(argv[idx]);
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
System.err.println("NumberFormatException: " + nfe.getMessage());
|
||||||
|
System.err.println("Usage: java DFSAdmin"
|
||||||
|
+ " [-setBalancerBandwidth <bandwidth in bytes per second>]");
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
FileSystem fs = getFS();
|
||||||
|
if (!(fs instanceof DistributedFileSystem)) {
|
||||||
|
System.err.println("FileSystem is " + fs.getUri());
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||||
|
dfs.setBalancerBandwidth(bandwidth);
|
||||||
|
exitCode = 0;
|
||||||
|
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
|
|
||||||
private void printHelp(String cmd) {
|
private void printHelp(String cmd) {
|
||||||
String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
|
String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
|
||||||
"The full syntax is: \n\n" +
|
"The full syntax is: \n\n" +
|
||||||
|
@ -469,6 +503,7 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t[-printTopology]\n" +
|
"\t[-printTopology]\n" +
|
||||||
"\t[-refreshNamenodes datanodehost:port]\n"+
|
"\t[-refreshNamenodes datanodehost:port]\n"+
|
||||||
"\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
|
"\t[-deleteBlockPool datanodehost:port blockpoolId [force]]\n"+
|
||||||
|
"\t[-setBalancerBandwidth <bandwidth>]\n" +
|
||||||
"\t[-help [cmd]]\n";
|
"\t[-help [cmd]]\n";
|
||||||
|
|
||||||
String report ="-report: \tReports basic filesystem information and statistics.\n";
|
String report ="-report: \tReports basic filesystem information and statistics.\n";
|
||||||
|
@ -546,6 +581,14 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t\t will fail if datanode is still serving the block pool.\n" +
|
"\t\t will fail if datanode is still serving the block pool.\n" +
|
||||||
"\t\t Refer to refreshNamenodes to shutdown a block pool\n" +
|
"\t\t Refer to refreshNamenodes to shutdown a block pool\n" +
|
||||||
"\t\t service on a datanode.\n";
|
"\t\t service on a datanode.\n";
|
||||||
|
|
||||||
|
String setBalancerBandwidth = "-setBalancerBandwidth <bandwidth>:\n" +
|
||||||
|
"\tChanges the network bandwidth used by each datanode during\n" +
|
||||||
|
"\tHDFS block balancing.\n\n" +
|
||||||
|
"\t\t<bandwidth> is the maximum number of bytes per second\n" +
|
||||||
|
"\t\tthat will be used by each datanode. This value overrides\n" +
|
||||||
|
"\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
|
||||||
|
"\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
|
||||||
|
|
||||||
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n" +
|
||||||
"\t\tis specified.\n";
|
"\t\tis specified.\n";
|
||||||
|
@ -586,6 +629,8 @@ public class DFSAdmin extends FsShell {
|
||||||
System.out.println(refreshNamenodes);
|
System.out.println(refreshNamenodes);
|
||||||
} else if ("deleteBlockPool".equals(cmd)) {
|
} else if ("deleteBlockPool".equals(cmd)) {
|
||||||
System.out.println(deleteBlockPool);
|
System.out.println(deleteBlockPool);
|
||||||
|
} else if ("setBalancerBandwidth".equals(cmd)) {
|
||||||
|
System.out.println(setBalancerBandwidth);
|
||||||
} else if ("help".equals(cmd)) {
|
} else if ("help".equals(cmd)) {
|
||||||
System.out.println(help);
|
System.out.println(help);
|
||||||
} else {
|
} else {
|
||||||
|
@ -879,6 +924,9 @@ public class DFSAdmin extends FsShell {
|
||||||
} else if ("-deleteBlockPool".equals(cmd)) {
|
} else if ("-deleteBlockPool".equals(cmd)) {
|
||||||
System.err.println("Usage: java DFSAdmin"
|
System.err.println("Usage: java DFSAdmin"
|
||||||
+ " [-deleteBlockPool datanode-host:port blockpoolId [force]]");
|
+ " [-deleteBlockPool datanode-host:port blockpoolId [force]]");
|
||||||
|
} else if ("-setBalancerBandwidth".equals(cmd)) {
|
||||||
|
System.err.println("Usage: java DFSAdmin"
|
||||||
|
+ " [-setBalancerBandwidth <bandwidth in bytes per second>]");
|
||||||
} else {
|
} else {
|
||||||
System.err.println("Usage: java DFSAdmin");
|
System.err.println("Usage: java DFSAdmin");
|
||||||
System.err.println(" [-report]");
|
System.err.println(" [-report]");
|
||||||
|
@ -899,6 +947,7 @@ public class DFSAdmin extends FsShell {
|
||||||
System.err.println(" ["+ClearQuotaCommand.USAGE+"]");
|
System.err.println(" ["+ClearQuotaCommand.USAGE+"]");
|
||||||
System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]");
|
System.err.println(" ["+SetSpaceQuotaCommand.USAGE+"]");
|
||||||
System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]");
|
System.err.println(" ["+ClearSpaceQuotaCommand.USAGE+"]");
|
||||||
|
System.err.println(" [-setBalancerBandwidth <bandwidth in bytes per second>]");
|
||||||
System.err.println(" [-help [cmd]]");
|
System.err.println(" [-help [cmd]]");
|
||||||
System.err.println();
|
System.err.println();
|
||||||
ToolRunner.printGenericCommandUsage(System.err);
|
ToolRunner.printGenericCommandUsage(System.err);
|
||||||
|
@ -990,6 +1039,11 @@ public class DFSAdmin extends FsShell {
|
||||||
printUsage(cmd);
|
printUsage(cmd);
|
||||||
return exitCode;
|
return exitCode;
|
||||||
}
|
}
|
||||||
|
} else if ("-setBalancerBandwidth".equals(cmd)) {
|
||||||
|
if (argv.length != 2) {
|
||||||
|
printUsage(cmd);
|
||||||
|
return exitCode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialize DFSAdmin
|
// initialize DFSAdmin
|
||||||
|
@ -1042,6 +1096,8 @@ public class DFSAdmin extends FsShell {
|
||||||
exitCode = refreshNamenodes(argv, i);
|
exitCode = refreshNamenodes(argv, i);
|
||||||
} else if ("-deleteBlockPool".equals(cmd)) {
|
} else if ("-deleteBlockPool".equals(cmd)) {
|
||||||
exitCode = deleteBlockPool(argv, i);
|
exitCode = deleteBlockPool(argv, i);
|
||||||
|
} else if ("-setBalancerBandwidth".equals(cmd)) {
|
||||||
|
exitCode = setBalancerBandwidth(argv, i);
|
||||||
} else if ("-help".equals(cmd)) {
|
} else if ("-help".equals(cmd)) {
|
||||||
if (i < argv.length) {
|
if (i < argv.length) {
|
||||||
printHelp(argv[i]);
|
printHelp(argv[i]);
|
||||||
|
|
|
@ -0,0 +1,93 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test ensures that the balancer bandwidth is dynamically adjusted
|
||||||
|
* correctly.
|
||||||
|
*/
|
||||||
|
public class TestBalancerBandwidth extends TestCase {
|
||||||
|
final static private Configuration conf = new Configuration();
|
||||||
|
final static private int NUM_OF_DATANODES = 2;
|
||||||
|
final static private int DEFAULT_BANDWIDTH = 1024*1024;
|
||||||
|
public static final Log LOG = LogFactory.getLog(TestBalancerBandwidth.class);
|
||||||
|
|
||||||
|
public void testBalancerBandwidth() throws Exception {
|
||||||
|
/* Set bandwidthPerSec to a low value of 1M bps. */
|
||||||
|
conf.setLong(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
||||||
|
DEFAULT_BANDWIDTH);
|
||||||
|
|
||||||
|
/* Create and start cluster */
|
||||||
|
MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
|
||||||
|
DistributedFileSystem fs = (DistributedFileSystem) cluster.getFileSystem();
|
||||||
|
|
||||||
|
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
// Ensure value from the configuration is reflected in the datanodes.
|
||||||
|
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth());
|
||||||
|
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth());
|
||||||
|
|
||||||
|
// Dynamically change balancer bandwidth and ensure the updated value
|
||||||
|
// is reflected on the datanodes.
|
||||||
|
long newBandwidth = 12 * DEFAULT_BANDWIDTH; // 12M bps
|
||||||
|
fs.setBalancerBandwidth(newBandwidth);
|
||||||
|
|
||||||
|
// Give it a few seconds to propogate new the value to the datanodes.
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (Exception e) {}
|
||||||
|
|
||||||
|
assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
||||||
|
assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
||||||
|
|
||||||
|
// Dynamically change balancer bandwidth to 0. Balancer bandwidth on the
|
||||||
|
// datanodes should remain as it was.
|
||||||
|
fs.setBalancerBandwidth(0);
|
||||||
|
|
||||||
|
// Give it a few seconds to propogate new the value to the datanodes.
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (Exception e) {}
|
||||||
|
|
||||||
|
assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
||||||
|
assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
||||||
|
}finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
new TestBalancerBandwidth().testBalancerBandwidth();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue