YARN-11376. [Federation] Support updateNodeResource、refreshNodesResources API's for Federation. (#5496)

This commit is contained in:
slfan1989 2023-03-28 00:27:21 +08:00 committed by GitHub
parent 762d3ddb43
commit 926993cb73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 360 additions and 5 deletions

View File

@ -36,4 +36,31 @@ public abstract class RefreshNodesResourcesRequest {
Records.newRecord(RefreshNodesResourcesRequest.class);
return request;
}
@Public
@Evolving
public static RefreshNodesResourcesRequest newInstance(String subClusterId) {
RefreshNodesResourcesRequest request =
Records.newRecord(RefreshNodesResourcesRequest.class);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -51,7 +51,18 @@ public abstract class UpdateNodeResourceRequest {
request.setNodeResourceMap(nodeResourceMap);
return request;
}
@Public
@Evolving
public static UpdateNodeResourceRequest newInstance(
Map<NodeId, ResourceOption> nodeResourceMap, String subClusterId) {
UpdateNodeResourceRequest request =
Records.newRecord(UpdateNodeResourceRequest.class);
request.setNodeResourceMap(nodeResourceMap);
request.setSubClusterId(subClusterId);
return request;
}
/**
* Get the map from <code>NodeId</code> to <code>ResourceOption</code>.
* @return the map of {@code <NodeId, ResourceOption>}
@ -68,4 +79,21 @@ public abstract class UpdateNodeResourceRequest {
@Evolving
public abstract void setNodeResourceMap(Map<NodeId, ResourceOption> nodeResourceMap);
/**
* Get the subClusterId.
*
* @return subClusterId.
*/
@Public
@Evolving
public abstract String getSubClusterId();
/**
* Set the subClusterId.
*
* @param subClusterId subCluster Id.
*/
@Public
@Evolving
public abstract void setSubClusterId(String subClusterId);
}

View File

@ -79,12 +79,14 @@ message GetGroupsForUserResponseProto {
message UpdateNodeResourceRequestProto {
repeated NodeResourceMapProto node_resource_map = 1;
optional string sub_cluster_id = 2;
}
message UpdateNodeResourceResponseProto {
}
message RefreshNodesResourcesRequestProto {
optional string sub_cluster_id = 1;
}
message RefreshNodesResourcesResponseProto {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RefreshNodesResourcesRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
@ -69,4 +70,27 @@ public class RefreshNodesResourcesRequestPBImpl extends RefreshNodesResourcesReq
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = RefreshNodesResourcesRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String getSubClusterId() {
RefreshNodesResourcesRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
}

View File

@ -66,6 +66,22 @@ public class UpdateNodeResourceRequestPBImpl extends UpdateNodeResourceRequest {
this.nodeResourceMap.putAll(nodeResourceMap);
}
@Override
public String getSubClusterId() {
UpdateNodeResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasSubClusterId()) ? p.getSubClusterId() : null;
}
@Override
public void setSubClusterId(String subClusterId) {
maybeInitBuilder();
if (subClusterId == null) {
builder.clearSubClusterId();
return;
}
builder.setSubClusterId(subClusterId);
}
public UpdateNodeResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();

View File

@ -163,6 +163,10 @@ public final class RouterMetrics {
private MutableGaugeInt numGetClusterInfoFailedRetrieved;
@Metric("# of getClusterUserInfo failed to be retrieved")
private MutableGaugeInt numGetClusterUserInfoFailedRetrieved;
@Metric("# of updateNodeResource failed to be retrieved")
private MutableGaugeInt numUpdateNodeResourceFailedRetrieved;
@Metric("# of refreshNodesResources failed to be retrieved")
private MutableGaugeInt numRefreshNodesResourcesFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -287,6 +291,10 @@ public final class RouterMetrics {
private MutableRate totalSucceededGetClusterInfoRetrieved;
@Metric("Total number of successful Retrieved GetClusterUserInfoRetrieved and latency(ms)")
private MutableRate totalSucceededGetClusterUserInfoRetrieved;
@Metric("Total number of successful Retrieved UpdateNodeResource and latency(ms)")
private MutableRate totalSucceededUpdateNodeResourceRetrieved;
@Metric("Total number of successful Retrieved RefreshNodesResources and latency(ms)")
private MutableRate totalSucceededRefreshNodesResourcesRetrieved;
/**
* Provide quantile counters for all latencies.
@ -352,6 +360,8 @@ public final class RouterMetrics {
private MutableQuantiles removeFromClusterNodeLabelsLatency;
private MutableQuantiles getClusterInfoLatency;
private MutableQuantiles getClusterUserInfoLatency;
private MutableQuantiles updateNodeResourceLatency;
private MutableQuantiles refreshNodesResourcesLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -567,6 +577,12 @@ public final class RouterMetrics {
getClusterUserInfoLatency = registry.newQuantiles("getClusterUserInfoLatency",
"latency of get cluster user info timeouts", "ops", "latency", 10);
updateNodeResourceLatency = registry.newQuantiles("updateNodeResourceLatency",
"latency of update node resource timeouts", "ops", "latency", 10);
refreshNodesResourcesLatency = registry.newQuantiles("refreshNodesResourcesLatency",
"latency of refresh nodes resources timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -873,6 +889,16 @@ public final class RouterMetrics {
return totalSucceededGetClusterUserInfoRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededUpdateNodeResourceRetrieved() {
return totalSucceededUpdateNodeResourceRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshNodesResourcesRetrieved() {
return totalSucceededRefreshNodesResourcesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().numSamples();
@ -1173,6 +1199,16 @@ public final class RouterMetrics {
return totalSucceededGetClusterUserInfoRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededUpdateNodeResourceRetrieved() {
return totalSucceededUpdateNodeResourceRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshNodesResourcesRetrieved() {
return totalSucceededRefreshNodesResourcesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededRefreshSuperUserGroupsConfigurationRetrieved() {
return totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.lastStat().mean();
@ -1426,6 +1462,14 @@ public final class RouterMetrics {
return numGetClusterUserInfoFailedRetrieved.value();
}
public int getUpdateNodeResourceFailedRetrieved() {
return numUpdateNodeResourceFailedRetrieved.value();
}
public int getRefreshNodesResourcesFailedRetrieved() {
return numRefreshNodesResourcesFailedRetrieved.value();
}
public int getDelegationTokenFailedRetrieved() {
return numGetDelegationTokenFailedRetrieved.value();
}
@ -1739,6 +1783,16 @@ public final class RouterMetrics {
getClusterUserInfoLatency.add(duration);
}
public void succeededUpdateNodeResourceRetrieved(long duration) {
totalSucceededUpdateNodeResourceRetrieved.add(duration);
updateNodeResourceLatency.add(duration);
}
public void succeededRefreshNodesResourcesRetrieved(long duration) {
totalSucceededRefreshNodesResourcesRetrieved.add(duration);
refreshNodesResourcesLatency.add(duration);
}
public void succeededRefreshSuperUserGroupsConfRetrieved(long duration) {
totalSucceededRefreshSuperUserGroupsConfigurationRetrieved.add(duration);
refreshSuperUserGroupsConfLatency.add(duration);
@ -1967,6 +2021,14 @@ public final class RouterMetrics {
numGetClusterUserInfoFailedRetrieved.incr();
}
public void incrUpdateNodeResourceFailedRetrieved() {
numUpdateNodeResourceFailedRetrieved.incr();
}
public void incrRefreshNodesResourcesFailedRetrieved() {
numRefreshNodesResourcesFailedRetrieved.incr();
}
public void incrGetDelegationTokenFailedRetrieved() {
numGetDelegationTokenFailedRetrieved.incr();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.router.rmadmin;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -112,7 +113,7 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
@VisibleForTesting
protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
SubClusterId subClusterId) throws YarnException {
SubClusterId subClusterId) throws Exception {
if (adminRMProxies.containsKey(subClusterId)) {
return adminRMProxies.get(subClusterId);
@ -438,13 +439,75 @@ public class FederationRMAdminInterceptor extends AbstractRMAdminRequestIntercep
@Override
public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrUpdateNodeResourceFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing UpdateNodeResource request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrUpdateNodeResourceFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing UpdateNodeResource SubClusterId.", null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{UpdateNodeResourceRequest.class}, new Object[]{request});
Collection<UpdateNodeResourceResponse> updateNodeResourceResps =
remoteMethod.invokeConcurrent(this, UpdateNodeResourceResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(updateNodeResourceResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededUpdateNodeResourceRetrieved(stopTime - startTime);
return UpdateNodeResourceResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrUpdateNodeResourceFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to updateNodeResource due to exception. " + e.getMessage());
}
routerMetrics.incrUpdateNodeResourceFailedRetrieved();
throw new YarnException("Unable to updateNodeResource.");
}
@Override
public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request)
throws YarnException, IOException {
throw new NotImplementedException();
// parameter verification.
if (request == null) {
routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshNodesResources request.", null);
}
String subClusterId = request.getSubClusterId();
if (StringUtils.isBlank(subClusterId)) {
routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing RefreshNodesResources SubClusterId.", null);
}
try {
long startTime = clock.getTime();
RMAdminProtocolMethod remoteMethod = new RMAdminProtocolMethod(
new Class[]{RefreshNodesResourcesRequest.class}, new Object[]{request});
Collection<RefreshNodesResourcesResponse> refreshNodesResourcesResps =
remoteMethod.invokeConcurrent(this, RefreshNodesResourcesResponse.class, subClusterId);
if (CollectionUtils.isNotEmpty(refreshNodesResourcesResps)) {
long stopTime = clock.getTime();
routerMetrics.succeededRefreshNodesResourcesRetrieved(stopTime - startTime);
return RefreshNodesResourcesResponse.newInstance();
}
} catch (YarnException e) {
routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
RouterServerUtil.logAndThrowException(e,
"Unable to refreshNodesResources due to exception. " + e.getMessage());
}
routerMetrics.incrRefreshNodesResourcesFailedRetrieved();
throw new YarnException("Unable to refreshNodesResources.");
}
@Override

View File

@ -578,6 +578,16 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getClusterUserInfo call");
metrics.incrGetClusterUserInfoFailedRetrieved();
}
public void getUpdateNodeResourceFailed() {
LOG.info("Mocked: failed getClusterUserInfo call");
metrics.incrUpdateNodeResourceFailedRetrieved();
}
public void getRefreshNodesResourcesFailed() {
LOG.info("Mocked: failed refreshNodesResources call");
metrics.incrRefreshNodesResourcesFailedRetrieved();
}
}
// Records successes for all calls
@ -858,6 +868,16 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful GetClusterUserInfoRetrieved call with duration {}", duration);
metrics.succeededGetClusterUserInfoRetrieved(duration);
}
public void getUpdateNodeResourceRetrieved(long duration) {
LOG.info("Mocked: successful UpdateNodeResourceRetrieved call with duration {}", duration);
metrics.succeededUpdateNodeResourceRetrieved(duration);
}
public void getRefreshNodesResourcesRetrieved(long duration) {
LOG.info("Mocked: successful RefreshNodesResourcesRetrieved call with duration {}", duration);
metrics.succeededRefreshNodesResourcesRetrieved(duration);
}
}
@Test
@ -1912,4 +1932,48 @@ public class TestRouterMetrics {
Assert.assertEquals(225,
metrics.getLatencySucceededGetClusterUserInfoRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testUpdateNodeResourceRetrievedFailed() {
long totalBadBefore = metrics.getUpdateNodeResourceFailedRetrieved();
badSubCluster.getUpdateNodeResourceFailed();
Assert.assertEquals(totalBadBefore + 1, metrics.getUpdateNodeResourceFailedRetrieved());
}
@Test
public void testUpdateNodeResourceRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetClusterUserInfoRetrieved();
goodSubCluster.getUpdateNodeResourceRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededUpdateNodeResourceRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededUpdateNodeResourceRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getUpdateNodeResourceRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededUpdateNodeResourceRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededUpdateNodeResourceRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testRefreshNodesResourcesRetrievedFailed() {
long totalBadBefore = metrics.getRefreshNodesResourcesFailedRetrieved();
badSubCluster.getRefreshNodesResourcesFailed();
Assert.assertEquals(totalBadBefore + 1, metrics.getRefreshNodesResourcesFailedRetrieved());
}
@Test
public void testRefreshNodesResourcesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededRefreshNodesResourcesRetrieved();
goodSubCluster.getRefreshNodesResourcesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededRefreshNodesResourcesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getRefreshNodesResourcesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededRefreshNodesResourcesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededRefreshNodesResourcesRetrieved(), ASSERT_DOUBLE_DELTA);
}
}

View File

@ -22,6 +22,9 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.yarn.api.records.DecommissionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
@ -35,6 +38,10 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@ -45,7 +52,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertNotNull;
@ -63,6 +72,7 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
////////////////////////////////
private final static String USER_NAME = "test-user";
private final static int NUM_SUBCLUSTER = 4;
private final static int GB = 1024;
private TestableFederationRMAdminInterceptor interceptor;
private FederationStateStoreFacade facade;
@ -320,4 +330,62 @@ public class TestFederationRMAdminInterceptor extends BaseRouterRMAdminTest {
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshServiceAcls(request1));
}
@Test
public void testUpdateNodeResourceEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource request.",
() -> interceptor.updateNodeResource(null));
// null request2.
Map<NodeId, ResourceOption> nodeResourceMap = new HashMap<>();
UpdateNodeResourceRequest request = UpdateNodeResourceRequest.newInstance(nodeResourceMap);
LambdaTestUtils.intercept(YarnException.class, "Missing UpdateNodeResource SubClusterId.",
() -> interceptor.updateNodeResource(request));
}
@Test
public void testUpdateNodeResourceNormalRequest() throws Exception {
// case 1, test the existing subCluster (SC-1).
Map<NodeId, ResourceOption> nodeResourceMap = new HashMap<>();
NodeId nodeId = NodeId.newInstance("127.0.0.1", 1);
ResourceOption resourceOption =
ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1);
nodeResourceMap.put(nodeId, resourceOption);
UpdateNodeResourceRequest request =
UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-1");
UpdateNodeResourceResponse response = interceptor.updateNodeResource(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
UpdateNodeResourceRequest request1 =
UpdateNodeResourceRequest.newInstance(nodeResourceMap, "SC-NON");
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.updateNodeResource(request1));
}
@Test
public void testRefreshNodesResourcesEmptyRequest() throws Exception {
// null request1.
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources request.",
() -> interceptor.refreshNodesResources(null));
// null request2.
RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance();
LambdaTestUtils.intercept(YarnException.class, "Missing RefreshNodesResources SubClusterId.",
() -> interceptor.refreshNodesResources(request));
}
@Test
public void testRefreshNodesResourcesNormalRequest() throws Exception {
// case 1, test the existing subCluster (SC-1).
RefreshNodesResourcesRequest request = RefreshNodesResourcesRequest.newInstance("SC-1");
RefreshNodesResourcesResponse response = interceptor.refreshNodesResources(request);
assertNotNull(response);
// case 2, test the non-exist subCluster.
RefreshNodesResourcesRequest request1 = RefreshNodesResourcesRequest.newInstance("SC-NON");
LambdaTestUtils.intercept(Exception.class, "subClusterId = SC-NON is not an active subCluster.",
() -> interceptor.refreshNodesResources(request1));
}
}

View File

@ -52,7 +52,7 @@ public class TestableFederationRMAdminInterceptor extends FederationRMAdminInter
@Override
protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
SubClusterId subClusterId) throws YarnException {
SubClusterId subClusterId) throws Exception {
MockRM mockRM;
synchronized (this) {
if (mockRMs.containsKey(subClusterId)) {
@ -66,6 +66,7 @@ public class TestableFederationRMAdminInterceptor extends FederationRMAdminInter
}
mockRM.init(config);
mockRM.start();
mockRM.registerNode("127.0.0.1:1", 102400, 100);
mockRMs.put(subClusterId, mockRM);
}
return mockRM.getAdminService();