YARN-11378. [Federation] Support checkForDecommissioningNodes、refreshClusterMaxPriority API's for Federation. (#5551)

This commit is contained in:
slfan1989 2023-04-26 05:12:38 +08:00 committed by GitHub
parent a716459cdf
commit 55eebcf277
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 351 additions and 9 deletions

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
@Private
@ -32,4 +34,31 @@ public abstract class CheckForDecommissioningNodesRequest {
.newRecord(CheckForDecommissioningNodesRequest.class);
return request;
}
@Private
@Unstable
public static CheckForDecommissioningNodesRequest newInstance(String subClusterId) {
CheckForDecommissioningNodesRequest request = Records
.newRecord(CheckForDecommissioningNodesRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -19,7 +19,9 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
@Private
@ -32,4 +34,31 @@ public abstract class RefreshClusterMaxPriorityRequest {
Records.newRecord(RefreshClusterMaxPriorityRequest.class);
return request;
}
@Private
@Unstable
public static RefreshClusterMaxPriorityRequest newInstance(String subClusterId) {
RefreshClusterMaxPriorityRequest request =
Records.newRecord(RefreshClusterMaxPriorityRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -122,12 +122,14 @@ message UpdateNodeLabelsResponseProto {
}
message CheckForDecommissioningNodesRequestProto {
optional string sub_cluster_id = 1;
}
message CheckForDecommissioningNodesResponseProto {
repeated NodeIdProto decommissioningNodes = 1;
}
message RefreshClusterMaxPriorityRequestProto {
optional string sub_cluster_id = 1;
}
message RefreshClusterMaxPriorityResponseProto {
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.CheckForDecommissioningNodesRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
@ -51,6 +52,13 @@ public class CheckForDecommissioningNodesRequestPBImpl extends
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = CheckForDecommissioningNodesRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
return getProto().hashCode();
@ -70,4 +78,20 @@ public class CheckForDecommissioningNodesRequestPBImpl extends
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getSubClusterId() {
CheckForDecommissioningNodesRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshClusterMaxPriorityRequestProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
@ -27,9 +28,7 @@ import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@Private
@Unstable
public class RefreshClusterMaxPriorityRequestPBImpl
extends
RefreshClusterMaxPriorityRequest {
public class RefreshClusterMaxPriorityRequestPBImpl extends RefreshClusterMaxPriorityRequest {
RefreshClusterMaxPriorityRequestProto proto =
RefreshClusterMaxPriorityRequestProto.getDefaultInstance();
@ -51,6 +50,13 @@ public class RefreshClusterMaxPriorityRequestPBImpl
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RefreshClusterMaxPriorityRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int hashCode() {
@ -71,4 +77,20 @@ public class RefreshClusterMaxPriorityRequestPBImpl
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public String getSubClusterId() {
RefreshClusterMaxPriorityRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -171,6 +171,10 @@ public final class RouterMetrics {
private MutableGaugeInt numUpdateNodeResourceFailedRetrieved;
@Metric("# of refreshNodesResources failed to be retrieved")
private MutableGaugeInt numRefreshNodesResourcesFailedRetrieved;
@Metric("# of checkForDecommissioningNodes failed to be retrieved")
private MutableGaugeInt numCheckForDecommissioningNodesFailedRetrieved;
@Metric("# of refreshClusterMaxPriority failed to be retrieved")
private MutableGaugeInt numRefreshClusterMaxPriorityFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -303,6 +307,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededUpdateNodeResourceRetrieved;
@Metric("Total number of successful Retrieved RefreshNodesResources and latency(ms)")
private MutableRate totalSucceededRefreshNodesResourcesRetrieved;
@Metric("Total number of successful Retrieved CheckForDecommissioningNodes and latency(ms)")
private MutableRate totalSucceededCheckForDecommissioningNodesRetrieved;
@Metric("Total number of successful Retrieved RefreshClusterMaxPriority and latency(ms)")
private MutableRate totalSucceededRefreshClusterMaxPriorityRetrieved;
/**
* Provide quantile counters for all latencies.
@ -372,6 +380,8 @@ public final class RouterMetrics {
private MutableQuantiles getClusterUserInfoLatency;
private MutableQuantiles updateNodeResourceLatency;
private MutableQuantiles refreshNodesResourcesLatency;
private MutableQuantiles checkForDecommissioningNodesLatency;
private MutableQuantiles refreshClusterMaxPriorityLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -599,6 +609,13 @@ public final class RouterMetrics {
refreshNodesResourcesLatency = registry.newQuantiles("refreshNodesResourcesLatency",
"latency of refresh nodes resources timeouts", "ops", "latency", 10);
checkForDecommissioningNodesLatency = registry.newQuantiles(
"checkForDecommissioningNodesLatency", "latency of check for decommissioningnodes timeouts",
"ops", "latency", 10);
refreshClusterMaxPriorityLatency = registry.newQuantiles("refreshClusterMaxPriorityLatency",
"latency of refresh cluster max priority timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -925,6 +942,16 @@ public final class RouterMetrics {
return totalSucceededRefreshNodesResourcesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededCheckForDecommissioningNodesRetrieved() {
return totalSucceededCheckForDecommissioningNodesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshClusterMaxPriorityRetrieved() {
return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
@ -1245,6 +1272,16 @@ public final class RouterMetrics {
return totalSucceededRefreshNodesResourcesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededCheckForDecommissioningNodesRetrieved() {
return totalSucceededCheckForDecommissioningNodesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshClusterMaxPriorityRetrieved() {
return totalSucceededRefreshClusterMaxPriorityRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
@ -1514,6 +1551,14 @@ public final class RouterMetrics {
return numRefreshNodesResourcesFailedRetrieved.value();
}
public int getCheckForDecommissioningNodesFailedRetrieved() {
return numCheckForDecommissioningNodesFailedRetrieved.value();
}
public int getRefreshClusterMaxPriorityFailedRetrieved() {
return numRefreshClusterMaxPriorityFailedRetrieved.value();
}
public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}
@ -1847,6 +1892,16 @@ public final class RouterMetrics {
refreshNodesResourcesLatency.add(duration);
}
public void succeededCheckForDecommissioningNodesRetrieved(long duration) {
totalSucceededCheckForDecommissioningNodesRetrieved.add(duration);
checkForDecommissioningNodesLatency.add(duration);
}
public void succeededRefreshClusterMaxPriorityRetrieved(long duration) {
totalSucceededRefreshClusterMaxPriorityRetrieved.add(duration);
refreshClusterMaxPriorityLatency.add(duration);
}
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
@ -2091,6 +2146,14 @@ public final class RouterMetrics {
numRefreshNodesResourcesFailedRetrieved.incr();
}
public void incrCheckForDecommissioningNodesFailedRetrieved() {
numCheckForDecommissioningNodesFailedRetrieved.incr();
}
public void incrRefreshClusterMaxPriorityFailedRetrieved() {
numRefreshClusterMaxPriorityFailedRetrieved.incr();
}
public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -70,14 +71,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {
@ -623,16 +627,89 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
@Override
public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
throws YarnException, IOException {
throw new NotImplementedException();
CheckForDecommissioningNodesRequest request) throws YarnException, IOException {
// Parameter check
if (request == null) {
RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes request.", null);
routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing checkForDecommissioningNodes SubClusterId.",
null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{CheckForDecommissioningNodesRequest.class}, new Object[]{request});
Collection<CheckForDecommissioningNodesResponse> responses =
remoteMethod.invokeConcurrent(this, CheckForDecommissioningNodesResponse.class,
subClusterId);
if (CollectionUtils.isNotEmpty(responses)) {
// We selected a subCluster, the list is not empty and size=1.
List<CheckForDecommissioningNodesResponse> collects =
responses.stream().collect(Collectors.toList());
if (!collects.isEmpty() && collects.size() == 1) {
CheckForDecommissioningNodesResponse response = collects.get(0);
long stopTime = clock.getTime();
routerMetrics.succeededCheckForDecommissioningNodesRetrieved((stopTime - startTime));
Set<NodeId> nodes = response.getDecommissioningNodes();
return CheckForDecommissioningNodesResponse.newInstance(nodes);
}
}
} catch (YarnException e) {
routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to checkForDecommissioningNodes due to exception " + e.getMessage());
}
routerMetrics.incrCheckForDecommissioningNodesFailedRetrieved();
throw new YarnException("Unable to checkForDecommissioningNodes.");
}
@Override
public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
RefreshClusterMaxPriorityRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
RefreshClusterMaxPriorityRequest request) throws YarnException, IOException {
// parameter verification.
if (request == null) {
routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshClusterMaxPriority SubClusterId.",
null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{RefreshClusterMaxPriorityRequest.class}, new Object[]{request});
Collection<RefreshClusterMaxPriorityResponse> refreshClusterMaxPriorityResps =
remoteMethod.invokeConcurrent(this, RefreshClusterMaxPriorityResponse.class,
subClusterId);
if (CollectionUtils.isNotEmpty(refreshClusterMaxPriorityResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRefreshClusterMaxPriorityRetrieved(stopTime - startTime);
return RefreshClusterMaxPriorityResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to refreshClusterMaxPriority due to exception. " + e.getMessage());
}
routerMetrics.incrRefreshClusterMaxPriorityFailedRetrieved();
throw new YarnException("Unable to refreshClusterMaxPriority.");
}
@Override

View File

@ -598,6 +598,16 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed refreshNodesResources call");
metrics.incrRefreshNodesResourcesFailedRetrieved();
}
public void getCheckForDecommissioningNodesFailed() {
LOG.info("Mocked: failed checkForDecommissioningNodes call");
metrics.incrCheckForDecommissioningNodesFailedRetrieved();
}
public void getRefreshClusterMaxPriorityFailed() {
LOG.info("Mocked: failed refreshClusterMaxPriority call");
metrics.incrRefreshClusterMaxPriorityFailedRetrieved();
}
}
// Records successes for all calls
@ -898,6 +908,18 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful RefreshNodesResourcesRetrieved call with duration {}", duration);
metrics.succeededRefreshNodesResourcesRetrieved(duration);
}
public void getCheckForDecommissioningNodesRetrieved(long duration) {
LOG.info("Mocked: successful CheckForDecommissioningNodesRetrieved call with duration {}",
duration);
metrics.succeededCheckForDecommissioningNodesRetrieved(duration);
}
public void getRefreshClusterMaxPriorityRetrieved(long duration) {
LOG.info("Mocked: successful RefreshClusterMaxPriority call with duration {}",
duration);
metrics.succeededRefreshClusterMaxPriorityRetrieved(duration);
}
}
@Test
@ -2042,4 +2064,49 @@ public class TestRouterMetrics {
Assert.assertEquals(225,
metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testCheckForDecommissioningNodesFailedRetrieved() {
long totalBadBefore = metrics.getCheckForDecommissioningNodesFailedRetrieved();
badSubCluster.getCheckForDecommissioningNodesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getCheckForDecommissioningNodesFailedRetrieved());
}
@Test
public void testCheckForDecommissioningNodesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededCheckForDecommissioningNodesRetrieved();
goodSubCluster.getCheckForDecommissioningNodesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededCheckForDecommissioningNodesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededCheckForDecommissioningNodesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getCheckForDecommissioningNodesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededCheckForDecommissioningNodesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededCheckForDecommissioningNodesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testRefreshClusterMaxPriorityFailedRetrieved() {
long totalBadBefore = metrics.getRefreshClusterMaxPriorityFailedRetrieved();
badSubCluster.getRefreshClusterMaxPriorityFailed();
Assert.assertEquals(totalBadBefore + 1, metrics.getRefreshClusterMaxPriorityFailedRetrieved());
}
@Test
public void testRefreshClusterMaxPriorityRetrieved() {
long totalGoodBefore = metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved();
goodSubCluster.getRefreshClusterMaxPriorityRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededRefreshClusterMaxPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getRefreshClusterMaxPriorityRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededRefreshClusterMaxPriorityRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededRefreshClusterMaxPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
}
}

View File

@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLa
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -65,6 +67,7 @@ import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
@ -521,4 +524,30 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
"subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.replaceLabelsOnNode(request2));
}
@Test
public void testCheckForDecommissioningNodesRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing checkForDecommissioningNodes request.",
() -> interceptor.checkForDecommissioningNodes(null));
// null request2.
CheckForDecommissioningNodesRequest request =
CheckForDecommissioningNodesRequest.newInstance(null);
LambdaTestUtils.intercept(YarnException.class,
"Missing checkForDecommissioningNodes SubClusterId.",
() -> interceptor.checkForDecommissioningNodes(request));
}
@Test
public void testCheckForDecommissioningNodesNormalRequest() throws Exception {
CheckForDecommissioningNodesRequest request =
CheckForDecommissioningNodesRequest.newInstance("SC-1");
CheckForDecommissioningNodesResponse response =
interceptor.checkForDecommissioningNodes(request);
assertNotNull(response);
Set<NodeId> nodeIds = response.getDecommissioningNodes();
assertNotNull(nodeIds);
assertEquals(0, nodeIds.size());
}
}