YARN-11122. Support getClusterNodes API in FederationClientInterceptor (#4274)

This commit is contained in:
slfan1989 2022-05-15 09:16:06 -07:00 committed by GitHub
parent 6985f9aabe
commit 6896c35a8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 127 additions and 2 deletions

View File

@ -55,6 +55,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppAttemptsFailedRetrieved;
@Metric("# of getClusterMetrics failed to be retrieved")
private MutableGaugeInt numGetClusterMetricsFailedRetrieved;
@Metric("# of getClusterNodes failed to be retrieved")
private MutableGaugeInt numGetClusterNodesFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -74,7 +76,8 @@ public final class RouterMetrics {
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
/**
* Provide quantile counters for all latencies.
@ -86,6 +89,7 @@ public final class RouterMetrics {
private MutableQuantiles getApplicationsReportLatency;
private MutableQuantiles getApplicationAttemptReportLatency;
private MutableQuantiles getClusterMetricsLatency;
private MutableQuantiles getClusterNodesLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@ -112,6 +116,10 @@ public final class RouterMetrics {
getClusterMetricsLatency =
registry.newQuantiles("getClusterMetricsLatency",
"latency of get cluster metrics", "ops", "latency", 10);
getClusterNodesLatency =
registry.newQuantiles("getClusterNodesLatency",
"latency of get cluster nodes", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -168,6 +176,11 @@ public final class RouterMetrics {
return totalSucceededGetClusterMetricsRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetClusterNodesRetrieved(){
return totalSucceededGetClusterNodesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -203,6 +216,11 @@ public final class RouterMetrics {
return totalSucceededGetClusterMetricsRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetClusterNodesRetrieved() {
return totalSucceededGetClusterNodesRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -238,6 +256,11 @@ public final class RouterMetrics {
return numGetClusterMetricsFailedRetrieved.value();
}
@VisibleForTesting
public int getClusterNodesFailedRetrieved() {
return numGetClusterNodesFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -273,6 +296,11 @@ public final class RouterMetrics {
getClusterMetricsLatency.add(duration);
}
public void succeededGetClusterNodesRetrieved(long duration) {
totalSucceededGetClusterNodesRetrieved.add(duration);
getClusterNodesLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -301,4 +329,7 @@ public final class RouterMetrics {
numGetClusterMetricsFailedRetrieved.incr();
}
public void incrClusterNodesFailedRetrieved() {
numGetClusterNodesFailedRetrieved.incr();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.router.clientrm;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.Method;
@ -791,7 +792,30 @@ public class FederationClientInterceptor
@Override
public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null) {
routerMetrics.incrClusterNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
}
long startTime = clock.getTime();
Map<SubClusterId, SubClusterInfo> subClusters =
federationFacade.getSubClusters(true);
Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
for (SubClusterId subClusterId : subClusters.keySet()) {
ApplicationClientProtocol client;
try {
client = getClientRMProxyForSubCluster(subClusterId);
GetClusterNodesResponse response = client.getClusterNodes(request);
clusterNodes.put(subClusterId, response);
} catch (Exception ex) {
routerMetrics.incrClusterNodesFailedRetrieved();
LOG.error("Unable to get cluster nodes due to exception.", ex);
throw ex;
}
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodesRetrieved(stopTime - startTime);
// Merge the NodesResponse
return RouterYarnClientUtils.mergeClusterNodesResponse(clusterNodes.values());
}
@Override

View File

@ -20,14 +20,19 @@ package org.apache.hadoop.yarn.server.router.clientrm;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -194,4 +199,23 @@ public final class RouterYarnClientUtils {
return !(appName.startsWith(UnmanagedApplicationManager.APP_NAME) ||
appName.startsWith(PARTIAL_REPORT));
}
/**
* Merges a list of GetClusterNodesResponse.
*
* @param responses a list of GetClusterNodesResponse to merge.
* @return the merged GetClusterNodesResponse.
*/
public static GetClusterNodesResponse mergeClusterNodesResponse(
Collection<GetClusterNodesResponse> responses) {
GetClusterNodesResponse clusterNodesResponse = Records.newRecord(GetClusterNodesResponse.class);
List<NodeReport> nodeReports = new ArrayList<>();
for (GetClusterNodesResponse response : responses) {
if (response != null && response.getNodeReports() != null) {
nodeReports.addAll(response.getNodeReports());
}
}
clusterNodesResponse.setNodeReports(nodeReports);
return clusterNodesResponse;
}
}

View File

@ -38,6 +38,8 @@ public class TestRouterMetrics {
private static RouterMetrics metrics = RouterMetrics.getMetrics();
private static final Double ASSERT_DOUBLE_DELTA = 0.01;
@BeforeClass
public static void init() {
@ -346,6 +348,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getClusterMetrics call");
metrics.incrGetClusterMetricsFailedRetrieved();
}
public void getClusterNodes() {
LOG.info("Mocked: failed getClusterNodes call");
metrics.incrClusterNodesFailedRetrieved();
}
}
// Records successes for all calls
@ -392,5 +399,30 @@ public class TestRouterMetrics {
duration);
metrics.succeededGetClusterMetricsRetrieved(duration);
}
public void getClusterNodes(long duration) {
LOG.info("Mocked: successful getClusterNodes call with duration {}", duration);
metrics.succeededGetClusterNodesRetrieved(duration);
}
}
@Test
public void testSucceededGetClusterNodes() {
long totalGoodBefore = metrics.getNumSucceededGetClusterNodesRetrieved();
goodSubCluster.getClusterNodes(150);
Assert.assertEquals(totalGoodBefore + 1, metrics.getNumSucceededGetClusterNodesRetrieved());
Assert.assertEquals(150, metrics.getLatencySucceededGetClusterNodesRetrieved(),
ASSERT_DOUBLE_DELTA);
goodSubCluster.getClusterNodes(300);
Assert.assertEquals(totalGoodBefore + 2, metrics.getNumSucceededGetClusterNodesRetrieved());
Assert.assertEquals(225, metrics.getLatencySucceededGetClusterNodesRetrieved(),
ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetClusterNodesFailed() {
long totalBadBefore = metrics.getClusterNodesFailedRetrieved();
badSubCluster.getClusterNodes();
Assert.assertEquals(totalBadBefore + 1, metrics.getClusterNodesFailedRetrieved());
}
}

View File

@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -641,4 +643,16 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
Assert.assertNotNull(responseGet);
Assert.assertTrue(responseGet.getApplicationList().isEmpty());
}
@Test
public void testGetClusterNodesRequest() throws Exception {
LOG.info("Test FederationClientInterceptor : Get Cluster Nodeds request");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodes request.",
() -> interceptor.getClusterNodes(null));
// normal request.
GetClusterNodesResponse response =
interceptor.getClusterNodes(GetClusterNodesRequest.newInstance());
Assert.assertEquals(subClusters.size(), response.getNodeReports().size());
}
}