HDFS-16568. dfsadmin -reconfig option to start/query reconfig on all live datanodes (#4264)

Signed-off-by: Tao Li <tomscut@apache.org>
This commit is contained in:
Viraj Jasani 2022-05-10 17:10:03 -07:00 committed by tom lee
parent bb398f78cc
commit e16ae55833
4 changed files with 171 additions and 25 deletions

View File

@ -36,7 +36,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
@ -447,8 +450,7 @@ public class DFSAdmin extends FsShell {
"\t[-refreshSuperUserGroupsConfiguration]\n" +
"\t[-refreshCallQueue]\n" +
"\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
"\t[-reconfig <namenode|datanode> <host:ipc_port> " +
"<start|status|properties>]\n" +
"\t[-reconfig <namenode|datanode> <host:ipc_port|livenodes> <start|status|properties>]\n" +
"\t[-printTopology]\n" +
"\t[-refreshNamenodes datanode_host:ipc_port]\n" +
"\t[-getVolumeReport datanode_host:ipc_port]\n" +
@ -1199,12 +1201,14 @@ public class DFSAdmin extends FsShell {
String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n";
String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
String reconfig = "-reconfig <namenode|datanode> <host:ipc_port|livenodes> " +
"<start|status|properties>:\n" +
"\tStarts or gets the status of a reconfiguration operation, \n" +
"\tor gets a list of reconfigurable properties.\n" +
"\tThe second parameter specifies the node type\n";
"\tThe second parameter specifies the node type\n" +
"\tThe third parameter specifies host address. For start or status, \n" +
"\tdatanode supports livenodes as third parameter, which will start \n" +
"\tor retrieve reconfiguration on all live datanodes.";
String genericRefresh = "-refresh: Arguments are <hostname:ipc_port>" +
" <resource_identifier> [arg1..argn]\n" +
"\tTriggers a runtime-refresh of the resource specified by " +
@ -1844,15 +1848,15 @@ public class DFSAdmin extends FsShell {
return 0;
}
public int reconfig(String[] argv, int i) throws IOException {
public int reconfig(String[] argv, int i) throws IOException, InterruptedException {
String nodeType = argv[i];
String address = argv[i + 1];
String op = argv[i + 2];
if ("start".equals(op)) {
return startReconfiguration(nodeType, address, System.out, System.err);
return startReconfigurationUtil(nodeType, address, System.out, System.err);
} else if ("status".equals(op)) {
return getReconfigurationStatus(nodeType, address, System.out, System.err);
return getReconfigurationStatusUtil(nodeType, address, System.out, System.err);
} else if ("properties".equals(op)) {
return getReconfigurableProperties(nodeType, address, System.out,
System.err);
@ -1862,12 +1866,57 @@ public class DFSAdmin extends FsShell {
}
int startReconfiguration(final String nodeThpe, final String address)
throws IOException {
return startReconfiguration(nodeThpe, address, System.out, System.err);
throws IOException, InterruptedException {
return startReconfigurationUtil(nodeThpe, address, System.out, System.err);
}
int startReconfigurationUtil(final String nodeType, final String address, final PrintStream out,
final PrintStream err) throws IOException, InterruptedException {
if (!"livenodes".equals(address)) {
return startReconfiguration(nodeType, address, out, err);
}
if (!"datanode".equals(nodeType)) {
err.println("Only datanode type supports reconfiguration in bulk.");
return 1;
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
if (nodes != null) {
for (DatanodeInfo node : nodes) {
executorService.submit(() -> {
int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err);
if (status == 0) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
});
}
while ((successCount.get() + failCount.get()) < nodes.length) {
Thread.sleep(1000);
}
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
err.println("Executor service could not be terminated in 60s. Please wait for"
+ " sometime before the system cools down.");
}
out.println("Starting of reconfiguration task successful on " + successCount.get()
+ " nodes, failed on " + failCount.get() + " nodes.");
if (failCount.get() == 0) {
return 0;
} else {
return 1;
}
}
err.println("DFS datanode stats could not be retrieved.");
return 1;
}
int startReconfiguration(final String nodeType, final String address,
final PrintStream out, final PrintStream err) throws IOException {
final PrintStream out, final PrintStream err) {
String outMsg = null;
String errMsg = null;
int ret = 0;
@ -1908,8 +1957,53 @@ public class DFSAdmin extends FsShell {
}
}
int getReconfigurationStatus(final String nodeType, final String address,
final PrintStream out, final PrintStream err) throws IOException {
int getReconfigurationStatusUtil(final String nodeType, final String address,
final PrintStream out, final PrintStream err) throws IOException, InterruptedException {
if (!"livenodes".equals(address)) {
return getReconfigurationStatus(nodeType, address, out, err);
}
if (!"datanode".equals(nodeType)) {
err.println("Only datanode type supports reconfiguration in bulk.");
return 1;
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
if (nodes != null) {
for (DatanodeInfo node : nodes) {
executorService.submit(() -> {
int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err);
if (status == 0) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
});
}
while ((successCount.get() + failCount.get()) < nodes.length) {
Thread.sleep(1000);
}
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
err.println("Executor service could not be terminated in 60s. Please wait for"
+ " sometime before the system cools down.");
}
out.println("Retrieval of reconfiguration status successful on " + successCount.get()
+ " nodes, failed on " + failCount.get() + " nodes.");
if (failCount.get() == 0) {
return 0;
} else {
return 1;
}
}
err.println("DFS datanode stats could not be retrieved.");
return 1;
}
int getReconfigurationStatus(final String nodeType, final String address, final PrintStream out,
final PrintStream err) {
String outMsg = null;
String errMsg = null;
ReconfigurationTaskStatus status = null;
@ -2152,7 +2246,7 @@ public class DFSAdmin extends FsShell {
+ " [-refreshCallQueue]");
} else if ("-reconfig".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"
+ " [-reconfig <namenode|datanode> <host:ipc_port> "
+ " [-reconfig <namenode|datanode> <host:ipc_port|livenodes> "
+ "<start|status|properties>]");
} else if ("-refresh".equals(cmd)) {
System.err.println("Usage: hdfs dfsadmin"

View File

@ -363,7 +363,7 @@ Usage:
hdfs dfsadmin [-refreshSuperUserGroupsConfiguration]
hdfs dfsadmin [-refreshCallQueue]
hdfs dfsadmin [-refresh <host:ipc_port> <key> [arg1..argn]]
hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port> <start |status |properties>]
hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port|livenodes> <start |status |properties>]
hdfs dfsadmin [-printTopology]
hdfs dfsadmin [-refreshNamenodes datanodehost:port]
hdfs dfsadmin [-getVolumeReport datanodehost:port]
@ -401,7 +401,7 @@ Usage:
| `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings |
| `-refreshCallQueue` | Reload the call queue from config. |
| `-refresh` \<host:ipc\_port\> \<key\> [arg1..argn] | Triggers a runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. All other args after are sent to the host. |
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. |
| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes> \<start\|status\|properties\> | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live datanodes. |
| `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode |
| `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. |
| `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. |

View File

@ -351,7 +351,13 @@ Datanode supports hot swappable drives. The user can add or replace HDFS data vo
* The user runs `dfsadmin -reconfig datanode HOST:PORT start` to start
the reconfiguration process. The user can use
`dfsadmin -reconfig datanode HOST:PORT status`
to query the running status of the reconfiguration task.
to query the running status of the reconfiguration task. In place of
HOST:PORT, we can also specify livenodes for datanode. It would allow
start or query reconfiguration on all live datanodes, whereas specifying
HOST:PORT would only allow start or query of reconfiguration on the
particular datanode represented by HOST:PORT. The examples for livenodes
queries are `dfsadmin -reconfig datanode livenodes start` and
`dfsadmin -reconfig datanode livenodes status`.
* Once the reconfiguration task has completed, the user can safely `umount`
the removed data volume directories and physically remove the disks.

View File

@ -177,20 +177,20 @@ public class TestDFSAdmin {
}
private void getReconfigurableProperties(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
final List<String> outs, final List<String> errs) throws IOException, InterruptedException {
reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
address, outs, errs);
}
private void getReconfigurationStatus(String nodeType, String address,
final List<String> outs, final List<String> errs) throws IOException {
final List<String> outs, final List<String> errs) throws IOException, InterruptedException {
reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
address, outs, errs);
}
private void reconfigurationOutErrFormatter(String methodName,
String nodeType, String address, final List<String> outs,
final List<String> errs) throws IOException {
final List<String> errs) throws IOException, InterruptedException {
ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
PrintStream outStream = new PrintStream(bufOut);
ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
@ -203,9 +203,9 @@ public class TestDFSAdmin {
outStream,
errStream);
} else if (methodName.equals("getReconfigurationStatus")) {
admin.getReconfigurationStatus(nodeType, address, outStream, errStream);
admin.getReconfigurationStatusUtil(nodeType, address, outStream, errStream);
} else if (methodName.equals("startReconfiguration")) {
admin.startReconfiguration(nodeType, address, outStream, errStream);
admin.startReconfigurationUtil(nodeType, address, outStream, errStream);
}
scanIntoList(bufOut, outs);
@ -326,7 +326,7 @@ public class TestDFSAdmin {
}
@Test(timeout = 30000)
public void testDataNodeGetReconfigurableProperties() throws IOException {
public void testDataNodeGetReconfigurableProperties() throws IOException, InterruptedException {
final int port = datanode.getIpcPort();
final String address = "localhost:" + port;
final List<String> outs = Lists.newArrayList();
@ -422,7 +422,7 @@ public class TestDFSAdmin {
}
@Test(timeout = 30000)
public void testNameNodeGetReconfigurableProperties() throws IOException {
public void testNameNodeGetReconfigurableProperties() throws IOException, InterruptedException {
final String address = namenode.getHostAndPort();
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
@ -452,7 +452,7 @@ public class TestDFSAdmin {
errs.clear();
try {
getReconfigurationStatus(nodeType, address, outs, errs);
} catch (IOException e) {
} catch (IOException | InterruptedException e) {
LOG.error(String.format(
"call getReconfigurationStatus on %s[%s] failed.", nodeType,
address), e);
@ -1062,4 +1062,50 @@ public class TestDFSAdmin {
}
});
}
@Test
public void testAllDatanodesReconfig()
throws IOException, InterruptedException, TimeoutException {
ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil);
cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil);
List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
changes.add(new ReconfigurationUtil.PropertyChange(
DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true",
datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY)));
when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
any(Configuration.class))).thenReturn(changes);
assertEquals(0, admin.startReconfiguration("datanode", "livenodes"));
final List<String> outsForStartReconf = new ArrayList<>();
final List<String> errsForStartReconf = new ArrayList<>();
reconfigurationOutErrFormatter("startReconfiguration", "datanode",
"livenodes", outsForStartReconf, errsForStartReconf);
assertEquals(3, outsForStartReconf.size());
assertEquals(0, errsForStartReconf.size());
assertTrue(outsForStartReconf.get(0).startsWith("Started reconfiguration task on node"));
assertTrue(outsForStartReconf.get(1).startsWith("Started reconfiguration task on node"));
assertEquals("Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.",
outsForStartReconf.get(2));
Thread.sleep(1000);
final List<String> outs = new ArrayList<>();
final List<String> errs = new ArrayList<>();
awaitReconfigurationFinished("datanode", "livenodes", outs, errs);
assertEquals(9, outs.size());
assertEquals(0, errs.size());
LOG.info("dfsadmin -status -livenodes output:");
outs.forEach(s -> LOG.info("{}", s));
assertTrue(outs.get(0).startsWith("Reconfiguring status for node"));
assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(2));
assertEquals("\tFrom: \"false\"", outs.get(3));
assertEquals("\tTo: \"true\"", outs.get(4));
assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(5));
assertEquals("\tFrom: \"false\"", outs.get(6));
assertEquals("\tTo: \"true\"", outs.get(7));
assertEquals("Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.",
outs.get(8));
}
}