HDFS-7116. Add a command to get the balancer bandwidth (Contributed by Rakesh R)
This commit is contained in:
parent
4d13335fc9
commit
0f0e897bf1
|
@ -149,4 +149,11 @@ public interface ClientDatanodeProtocol {
|
|||
*/
|
||||
void triggerBlockReport(BlockReportOptions options)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Get current value of the balancer bandwidth in bytes per second.
|
||||
*
|
||||
* @return balancer bandwidth
|
||||
*/
|
||||
long getBalancerBandwidth() throws IOException;
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
|
||||
|
@ -98,6 +100,9 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
private static final ListReconfigurablePropertiesRequestProto
|
||||
VOID_LIST_RECONFIGURABLE_PROPERTIES =
|
||||
ListReconfigurablePropertiesRequestProto.newBuilder().build();
|
||||
private static final GetBalancerBandwidthRequestProto
|
||||
VOID_GET_BALANCER_BANDWIDTH =
|
||||
GetBalancerBandwidthRequestProto.newBuilder().build();
|
||||
|
||||
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
|
||||
Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
|
||||
|
@ -323,4 +328,16 @@ public class ClientDatanodeProtocolTranslatorPB implements
|
|||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getBalancerBandwidth() throws IOException {
|
||||
GetBalancerBandwidthResponseProto response;
|
||||
try {
|
||||
response = rpcProxy.getBalancerBandwidth(NULL_CONTROLLER,
|
||||
VOID_GET_BALANCER_BANDWIDTH);
|
||||
return response.getBandwidth();
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -162,6 +162,16 @@ message ListReconfigurablePropertiesResponseProto {
|
|||
repeated string name = 1;
|
||||
}
|
||||
|
||||
message GetBalancerBandwidthRequestProto {
|
||||
}
|
||||
|
||||
/**
|
||||
* bandwidth - balancer bandwidth value of the datanode.
|
||||
*/
|
||||
message GetBalancerBandwidthResponseProto {
|
||||
required uint64 bandwidth = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol used from client to the Datanode.
|
||||
* See the request and response for details of rpc call.
|
||||
|
@ -211,4 +221,10 @@ service ClientDatanodeProtocolService {
|
|||
|
||||
rpc triggerBlockReport(TriggerBlockReportRequestProto)
|
||||
returns(TriggerBlockReportResponseProto);
|
||||
|
||||
/**
|
||||
* Returns the balancer bandwidth value of datanode.
|
||||
*/
|
||||
rpc getBalancerBandwidth(GetBalancerBandwidthRequestProto)
|
||||
returns(GetBalancerBandwidthResponseProto);
|
||||
}
|
||||
|
|
|
@ -908,6 +908,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8716. Introduce a new config specifically for safe mode block count
|
||||
(Chang Li via kihwal)
|
||||
|
||||
HDFS-7116. Add a command to get the balancer bandwidth
|
||||
(Rakesh R via vinayakumarb)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBalancerBandwidthResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetDatanodeInfoRequestProto;
|
||||
|
@ -231,4 +233,18 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
return TRIGGER_BLOCK_REPORT_RESP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetBalancerBandwidthResponseProto getBalancerBandwidth(
|
||||
RpcController controller, GetBalancerBandwidthRequestProto request)
|
||||
throws ServiceException {
|
||||
long bandwidth;
|
||||
try {
|
||||
bandwidth = impl.getBalancerBandwidth();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
return GetBalancerBandwidthResponseProto.newBuilder()
|
||||
.setBandwidth(bandwidth).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3128,12 +3128,8 @@ public class DataNode extends ReconfigurableBase
|
|||
blockPoolTokenSecretManager.clearAllKeysForTesting();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current value of the max balancer bandwidth in bytes per second.
|
||||
*
|
||||
* @return Balancer bandwidth in bytes per second for this datanode.
|
||||
*/
|
||||
public Long getBalancerBandwidth() {
|
||||
@Override // ClientDatanodeProtocol
|
||||
public long getBalancerBandwidth() {
|
||||
DataXceiverServer dxcs =
|
||||
(DataXceiverServer) this.dataXceiverServer.getRunnable();
|
||||
return dxcs.balanceThrottler.getBandwidth();
|
||||
|
|
|
@ -419,6 +419,7 @@ public class DFSAdmin extends FsShell {
|
|||
"\t[-refreshNamenodes datanode_host:ipc_port]\n"+
|
||||
"\t[-deleteBlockPool datanode_host:ipc_port blockpoolId [force]]\n"+
|
||||
"\t[-setBalancerBandwidth <bandwidth in bytes per second>]\n" +
|
||||
"\t[-getBalancerBandwidth <datanode_host:ipc_port>]\n" +
|
||||
"\t[-fetchImage <local directory>]\n" +
|
||||
"\t[-allowSnapshot <snapshotDir>]\n" +
|
||||
"\t[-disallowSnapshot <snapshotDir>]\n" +
|
||||
|
@ -887,6 +888,26 @@ public class DFSAdmin extends FsShell {
|
|||
return exitCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Command to get balancer bandwidth for the given datanode. Usage: hdfs
|
||||
* dfsadmin -getBalancerBandwidth {@literal <datanode_host:ipc_port>}
|
||||
* @param argv List of of command line parameters.
|
||||
* @param idx The index of the command that is being processed.
|
||||
* @exception IOException
|
||||
*/
|
||||
public int getBalancerBandwidth(String[] argv, int idx) throws IOException {
|
||||
ClientDatanodeProtocol dnProxy = getDataNodeProxy(argv[idx]);
|
||||
try {
|
||||
long bandwidth = dnProxy.getBalancerBandwidth();
|
||||
System.out.println("Balancer bandwidth is " + bandwidth
|
||||
+ " bytes per second.");
|
||||
} catch (IOException ioe) {
|
||||
System.err.println("Datanode unreachable.");
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Download the most recent fsimage from the name node, and save it to a local
|
||||
* file in the given directory.
|
||||
|
@ -1026,6 +1047,12 @@ public class DFSAdmin extends FsShell {
|
|||
"\t\tthe dfs.balance.bandwidthPerSec parameter.\n\n" +
|
||||
"\t\t--- NOTE: The new value is not persistent on the DataNode.---\n";
|
||||
|
||||
String getBalancerBandwidth = "-getBalancerBandwidth <datanode_host:ipc_port>:\n" +
|
||||
"\tGet the network bandwidth for the given datanode.\n" +
|
||||
"\tThis is the maximum network bandwidth used by the datanode\n" +
|
||||
"\tduring HDFS block balancing.\n\n" +
|
||||
"\t--- NOTE: This value is not persistent on the DataNode.---\n";
|
||||
|
||||
String fetchImage = "-fetchImage <local directory>:\n" +
|
||||
"\tDownloads the most recent fsimage from the Name Node and saves it in" +
|
||||
"\tthe specified local directory.\n";
|
||||
|
@ -1103,6 +1130,8 @@ public class DFSAdmin extends FsShell {
|
|||
System.out.println(deleteBlockPool);
|
||||
} else if ("setBalancerBandwidth".equals(cmd)) {
|
||||
System.out.println(setBalancerBandwidth);
|
||||
} else if ("getBalancerBandwidth".equals(cmd)) {
|
||||
System.out.println(getBalancerBandwidth);
|
||||
} else if ("fetchImage".equals(cmd)) {
|
||||
System.out.println(fetchImage);
|
||||
} else if ("allowSnapshot".equalsIgnoreCase(cmd)) {
|
||||
|
@ -1140,6 +1169,7 @@ public class DFSAdmin extends FsShell {
|
|||
System.out.println(refreshNamenodes);
|
||||
System.out.println(deleteBlockPool);
|
||||
System.out.println(setBalancerBandwidth);
|
||||
System.out.println(getBalancerBandwidth);
|
||||
System.out.println(fetchImage);
|
||||
System.out.println(allowSnapshot);
|
||||
System.out.println(disallowSnapshot);
|
||||
|
@ -1682,6 +1712,9 @@ public class DFSAdmin extends FsShell {
|
|||
} else if ("-setBalancerBandwidth".equals(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-setBalancerBandwidth <bandwidth in bytes per second>]");
|
||||
} else if ("-getBalancerBandwidth".equalsIgnoreCase(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-getBalancerBandwidth <datanode_host:ipc_port>]");
|
||||
} else if ("-fetchImage".equals(cmd)) {
|
||||
System.err.println("Usage: hdfs dfsadmin"
|
||||
+ " [-fetchImage <local directory>]");
|
||||
|
@ -1817,6 +1850,11 @@ public class DFSAdmin extends FsShell {
|
|||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
} else if ("-getBalancerBandwidth".equalsIgnoreCase(cmd)) {
|
||||
if (argv.length != 2) {
|
||||
printUsage(cmd);
|
||||
return exitCode;
|
||||
}
|
||||
} else if ("-fetchImage".equals(cmd)) {
|
||||
if (argv.length != 2) {
|
||||
printUsage(cmd);
|
||||
|
@ -1902,6 +1940,8 @@ public class DFSAdmin extends FsShell {
|
|||
exitCode = deleteBlockPool(argv, i);
|
||||
} else if ("-setBalancerBandwidth".equals(cmd)) {
|
||||
exitCode = setBalancerBandwidth(argv, i);
|
||||
} else if ("-getBalancerBandwidth".equals(cmd)) {
|
||||
exitCode = getBalancerBandwidth(argv, i);
|
||||
} else if ("-fetchImage".equals(cmd)) {
|
||||
exitCode = fetchImage(argv, i);
|
||||
} else if ("-shutdownDatanode".equals(cmd)) {
|
||||
|
|
|
@ -338,6 +338,7 @@ Usage:
|
|||
[-refreshNamenodes datanodehost:port]
|
||||
[-deleteBlockPool datanode-host:port blockpoolId [force]]
|
||||
[-setBalancerBandwidth <bandwidth in bytes per second>]
|
||||
[-getBalancerBandwidth <datanode_host:ipc_port>]
|
||||
[-allowSnapshot <snapshotDir>]
|
||||
[-disallowSnapshot <snapshotDir>]
|
||||
[-fetchImage <local directory>]
|
||||
|
@ -370,7 +371,8 @@ Usage:
|
|||
| `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode |
|
||||
| `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. |
|
||||
| `-deleteBlockPool` datanode-host:port blockpoolId [force] | If force is passed, block pool directory for the given blockpool id on the given datanode is deleted along with its contents, otherwise the directory is deleted only if it is empty. The command will fail if datanode is still serving the block pool. Refer to refreshNamenodes to shutdown a block pool service on a datanode. |
|
||||
| `-setBalancerBandwidth` \<bandwidth in bytes per second\> | Changes the network bandwidth used by each datanode during HDFS block balancing. \<bandwidth\> is the maximum number of bytes per second that will be used by each datanode. This value overrides the dfs.balance.bandwidthPerSec parameter. NOTE: The new value is not persistent on the DataNode. |
|
||||
| `-setBalancerBandwidth` \<bandwidth in bytes per second\> | Changes the network bandwidth used by each datanode during HDFS block balancing. \<bandwidth\> is the maximum number of bytes per second that will be used by each datanode. This value overrides the dfs.balance.bandwidthPerSec parameter. NOTE: The new value is not persistent on the DataNode. |
|
||||
| `-getBalancerBandwidth` \<datanode\_host:ipc\_port\> | Get the network bandwidth(in bytes per second) for the given datanode. This is the maximum network bandwidth used by the datanode during HDFS block balancing.|
|
||||
| `-allowSnapshot` \<snapshotDir\> | Allowing snapshots of a directory to be created. If the operation completes successfully, the directory becomes snapshottable. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. |
|
||||
| `-disallowSnapshot` \<snapshotDir\> | Disallowing snapshots of a directory to be created. All snapshots of the directory must be deleted before disallowing snapshots. See the [HDFS Snapshot Documentation](./HdfsSnapshots.html) for more information. |
|
||||
| `-fetchImage` \<local directory\> | Downloads the most recent fsimage from the NameNode and saves it in the specified local directory. |
|
||||
|
|
|
@ -18,13 +18,19 @@
|
|||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -36,6 +42,9 @@ public class TestBalancerBandwidth {
|
|||
final static private int NUM_OF_DATANODES = 2;
|
||||
final static private int DEFAULT_BANDWIDTH = 1024*1024;
|
||||
public static final Log LOG = LogFactory.getLog(TestBalancerBandwidth.class);
|
||||
private static final Charset UTF8 = Charset.forName("UTF-8");
|
||||
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
|
||||
private final PrintStream outStream = new PrintStream(outContent);
|
||||
|
||||
@Test
|
||||
public void testBalancerBandwidth() throws Exception {
|
||||
|
@ -56,6 +65,23 @@ public class TestBalancerBandwidth {
|
|||
// Ensure value from the configuration is reflected in the datanodes.
|
||||
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(0).getBalancerBandwidth());
|
||||
assertEquals(DEFAULT_BANDWIDTH, (long) datanodes.get(1).getBalancerBandwidth());
|
||||
ClientDatanodeProtocol dn1Proxy = DFSUtilClient
|
||||
.createClientDatanodeProtocolProxy(datanodes.get(0).getDatanodeId(),
|
||||
conf, 60000, false);
|
||||
ClientDatanodeProtocol dn2Proxy = DFSUtilClient
|
||||
.createClientDatanodeProtocolProxy(datanodes.get(1).getDatanodeId(),
|
||||
conf, 60000, false);
|
||||
DFSAdmin admin = new DFSAdmin(conf);
|
||||
String dn1Address = datanodes.get(0).ipcServer.getListenerAddress()
|
||||
.getHostName() + ":" + datanodes.get(0).getIpcPort();
|
||||
String dn2Address = datanodes.get(1).ipcServer.getListenerAddress()
|
||||
.getHostName() + ":" + datanodes.get(1).getIpcPort();
|
||||
|
||||
// verifies the dfsadmin command execution
|
||||
String[] args = new String[] { "-getBalancerBandwidth", dn1Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn1Proxy, DEFAULT_BANDWIDTH);
|
||||
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn2Proxy, DEFAULT_BANDWIDTH);
|
||||
|
||||
// Dynamically change balancer bandwidth and ensure the updated value
|
||||
// is reflected on the datanodes.
|
||||
|
@ -69,6 +95,11 @@ public class TestBalancerBandwidth {
|
|||
|
||||
assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
||||
assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
||||
// verifies the dfsadmin command execution
|
||||
args = new String[] { "-getBalancerBandwidth", dn1Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn1Proxy, newBandwidth);
|
||||
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn2Proxy, newBandwidth);
|
||||
|
||||
// Dynamically change balancer bandwidth to 0. Balancer bandwidth on the
|
||||
// datanodes should remain as it was.
|
||||
|
@ -81,11 +112,33 @@ public class TestBalancerBandwidth {
|
|||
|
||||
assertEquals(newBandwidth, (long) datanodes.get(0).getBalancerBandwidth());
|
||||
assertEquals(newBandwidth, (long) datanodes.get(1).getBalancerBandwidth());
|
||||
}finally {
|
||||
// verifies the dfsadmin command execution
|
||||
args = new String[] { "-getBalancerBandwidth", dn1Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn1Proxy, newBandwidth);
|
||||
args = new String[] { "-getBalancerBandwidth", dn2Address };
|
||||
runGetBalancerBandwidthCmd(admin, args, dn2Proxy, newBandwidth);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void runGetBalancerBandwidthCmd(DFSAdmin admin, String[] args,
|
||||
ClientDatanodeProtocol proxy, long expectedBandwidth) throws Exception {
|
||||
PrintStream initialStdOut = System.out;
|
||||
outContent.reset();
|
||||
try {
|
||||
System.setOut(outStream);
|
||||
int exitCode = admin.run(args);
|
||||
assertEquals("DFSAdmin should return 0", 0, exitCode);
|
||||
String bandwidthOutMsg = "Balancer bandwidth is " + expectedBandwidth
|
||||
+ " bytes per second.";
|
||||
String strOut = new String(outContent.toByteArray(), UTF8);
|
||||
assertTrue("Wrong balancer bandwidth!", strOut.contains(bandwidthOutMsg));
|
||||
} finally {
|
||||
System.setOut(initialStdOut);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new TestBalancerBandwidth().testBalancerBandwidth();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue