YARN-11161. Support getAttributesToNodes, getClusterNodeAttributes, getNodesToAttributes API's for Federation (#4610)

This commit is contained in:
slfan1989 2022-07-26 01:05:45 +08:00 committed by GitHub
parent 2f49eec5dd
commit edeb99548a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 604 additions and 32 deletions

View File

@ -91,6 +91,12 @@ public final class RouterMetrics {
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
@Metric("# of getResourceProfile failed to be retrieved")
private MutableGaugeInt numGetResourceProfileFailedRetrieved;
@Metric("# of getAttributesToNodes failed to be retrieved")
private MutableGaugeInt numGetAttributesToNodesFailedRetrieved;
@Metric("# of getClusterNodeAttributes failed to be retrieved")
private MutableGaugeInt numGetClusterNodeAttributesFailedRetrieved;
@Metric("# of getNodesToAttributes failed to be retrieved")
private MutableGaugeInt numGetNodesToAttributesFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@ -101,14 +107,11 @@ public final class RouterMetrics {
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
@Metric("Total number of successful Retrieved multiple apps reports and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved multiple apps reports and latency(ms)")
private MutableRate totalSucceededMultipleAppsRetrieved;
@Metric("Total number of successful Retrieved " +
"appAttempt reports and latency(ms)")
@Metric("Total number of successful Retrieved appAttempt reports and latency(ms)")
private MutableRate totalSucceededAppAttemptsRetrieved;
@Metric("Total number of successful Retrieved getClusterMetrics and "
+ "latency(ms)")
@Metric("Total number of successful Retrieved getClusterMetrics and latency(ms)")
private MutableRate totalSucceededGetClusterMetricsRetrieved;
@Metric("Total number of successful Retrieved getClusterNodes and latency(ms)")
private MutableRate totalSucceededGetClusterNodesRetrieved;
@ -144,9 +147,14 @@ public final class RouterMetrics {
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
private MutableRate totalSucceededGetResourceProfilesRetrieved;
@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
private MutableRate totalSucceededGetResourceProfileRetrieved;
@Metric("Total number of successful Retrieved getAttributesToNodes and latency(ms)")
private MutableRate totalSucceededGetAttributesToNodesRetrieved;
@Metric("Total number of successful Retrieved getClusterNodeAttributes and latency(ms)")
private MutableRate totalSucceededGetClusterNodeAttributesRetrieved;
@Metric("Total number of successful Retrieved getNodesToAttributes and latency(ms)")
private MutableRate totalSucceededGetNodesToAttributesRetrieved;
/**
* Provide quantile counters for all latencies.
@ -176,6 +184,10 @@ public final class RouterMetrics {
private MutableQuantiles moveApplicationAcrossQueuesLatency;
private MutableQuantiles getResourceProfilesLatency;
private MutableQuantiles getResourceProfileLatency;
private MutableQuantiles getAttributesToNodesLatency;
private MutableQuantiles getClusterNodeAttributesLatency;
private MutableQuantiles getNodesToAttributesLatency;
private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry;
@ -274,6 +286,18 @@ public final class RouterMetrics {
getResourceProfileLatency =
registry.newQuantiles("getResourceProfileLatency",
"latency of get resource profile timeouts", "ops", "latency", 10);
getAttributesToNodesLatency =
registry.newQuantiles("getAttributesToNodesLatency",
"latency of get attributes to nodes timeouts", "ops", "latency", 10);
getClusterNodeAttributesLatency =
registry.newQuantiles("getClusterNodeAttributesLatency",
"latency of get cluster node attributes timeouts", "ops", "latency", 10);
getNodesToAttributesLatency =
registry.newQuantiles("getNodesToAttributesLatency",
"latency of get nodes to attributes timeouts", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@ -420,6 +444,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
@ -545,6 +584,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceProfileRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetAttributesToNodesRetrieved() {
return totalSucceededGetAttributesToNodesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetClusterNodeAttributesRetrieved() {
return totalSucceededGetClusterNodeAttributesRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededGetNodesToAttributesRetrieved() {
return totalSucceededGetNodesToAttributesRetrieved.lastStat().mean();
}
@VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
@ -666,6 +720,18 @@ public final class RouterMetrics {
return numGetResourceProfileFailedRetrieved.value();
}
public int getAttributesToNodesFailedRetrieved() {
return numGetAttributesToNodesFailedRetrieved.value();
}
public int getClusterNodeAttributesFailedRetrieved() {
return numGetClusterNodeAttributesFailedRetrieved.value();
}
public int getNodesToAttributesFailedRetrieved() {
return numGetNodesToAttributesFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@ -791,6 +857,21 @@ public final class RouterMetrics {
getResourceProfileLatency.add(duration);
}
public void succeededGetAttributesToNodesRetrieved(long duration) {
totalSucceededGetAttributesToNodesRetrieved.add(duration);
getAttributesToNodesLatency.add(duration);
}
public void succeededGetClusterNodeAttributesRetrieved(long duration) {
totalSucceededGetClusterNodeAttributesRetrieved.add(duration);
getClusterNodeAttributesLatency.add(duration);
}
public void succeededGetNodesToAttributesRetrieved(long duration) {
totalSucceededGetNodesToAttributesRetrieved.add(duration);
getNodesToAttributesLatency.add(duration);
}
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@ -890,4 +971,16 @@ public final class RouterMetrics {
public void incrGetResourceProfileFailedRetrieved() {
numGetResourceProfileFailedRetrieved.incr();
}
public void incrGetAttributesToNodesFailedRetrieved() {
numGetAttributesToNodesFailedRetrieved.incr();
}
public void incrGetClusterNodeAttributesFailedRetrieved() {
numGetClusterNodeAttributesFailedRetrieved.incr();
}
public void incrGetNodesToAttributesFailedRetrieved() {
numGetNodesToAttributesFailedRetrieved.incr();
}
}

View File

@ -175,7 +175,6 @@ public class FederationClientInterceptor
federationFacade = FederationStateStoreFacade.getInstance();
rand = new Random(System.currentTimeMillis());
int numThreads = getConf().getInt(
YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
@ -195,12 +194,11 @@ public class FederationClientInterceptor
LOG.error(e.getMessage());
}
numSubmitRetries =
conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
numSubmitRetries = conf.getInt(
YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
clientRMProxies =
new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
clientRMProxies = new ConcurrentHashMap<>();
routerMetrics = RouterMetrics.getMetrics();
returnPartialReport = conf.getBoolean(
@ -230,16 +228,14 @@ public class FederationClientInterceptor
CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
UserGroupInformation realUser = user;
if (serviceAuthEnabled) {
realUser = UserGroupInformation.createProxyUser(
user.getShortUserName(), UserGroupInformation.getLoginUser());
realUser = UserGroupInformation.createProxyUser(user.getShortUserName(),
UserGroupInformation.getLoginUser());
}
clientRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
ApplicationClientProtocol.class, subClusterId, realUser);
} catch (Exception e) {
RouterServerUtil.logAndThrowException(
"Unable to create the interface to reach the SubCluster "
+ subClusterId,
e);
"Unable to create the interface to reach the SubCluster " + subClusterId, e);
}
clientRMProxies.put(subClusterId, clientRMProxy);
@ -287,8 +283,7 @@ public class FederationClientInterceptor
for (int i = 0; i < numSubmitRetries; ++i) {
SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
LOG.debug(
"getNewApplication try #{} on SubCluster {}", i, subClusterId);
LOG.debug("getNewApplication try #{} on SubCluster {}", i, subClusterId);
ApplicationClientProtocol clientRMProxy =
getClientRMProxyForSubCluster(subClusterId);
GetNewApplicationResponse response = null;
@ -410,7 +405,7 @@ public class FederationClientInterceptor
ApplicationId applicationId =
request.getApplicationSubmissionContext().getApplicationId();
List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
List<SubClusterId> blacklist = new ArrayList<>();
for (int i = 0; i < numSubmitRetries; ++i) {
@ -561,8 +556,8 @@ public class FederationClientInterceptor
}
if (response == null) {
LOG.error("No response when attempting to kill the application "
+ applicationId + " to SubCluster " + subClusterId.getId());
LOG.error("No response when attempting to kill the application {} to SubCluster {}.",
applicationId, subClusterId.getId());
}
long stopTime = clock.getTime();
@ -1528,20 +1523,75 @@ public class FederationClientInterceptor
@Override
public GetAttributesToNodesResponse getAttributesToNodes(
GetAttributesToNodesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getNodeAttributes() == null) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getAttributesToNodes request " +
"or nodeAttributes.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getAttributesToNodes",
new Class[] {GetAttributesToNodesRequest.class}, new Object[] {request});
Collection<GetAttributesToNodesResponse> attributesToNodesResponses = null;
try {
attributesToNodesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetAttributesToNodesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetAttributesToNodesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get attributes to nodes due to exception.",
ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetAttributesToNodesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeAttributesToNodesResponse(attributesToNodesResponses);
}
@Override
public GetClusterNodeAttributesResponse getClusterNodeAttributes(
GetClusterNodeAttributesRequest request)
throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
GetClusterNodeAttributesRequest request) throws YarnException, IOException {
if (request == null) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getClusterNodeAttributes request.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getClusterNodeAttributes",
new Class[] {GetClusterNodeAttributesRequest.class}, new Object[] {request});
Collection<GetClusterNodeAttributesResponse> clusterNodeAttributesResponses = null;
try {
clusterNodeAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetClusterNodeAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetClusterNodeAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get cluster node attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetClusterNodeAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeClusterNodeAttributesResponse(clusterNodeAttributesResponses);
}
@Override
public GetNodesToAttributesResponse getNodesToAttributes(
GetNodesToAttributesRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented");
if (request == null || request.getHostNames() == null) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Missing getNodesToAttributes request or " +
"hostNames.", null);
}
long startTime = clock.getTime();
ClientMethod remoteMethod = new ClientMethod("getNodesToAttributes",
new Class[] {GetNodesToAttributesRequest.class}, new Object[] {request});
Collection<GetNodesToAttributesResponse> nodesToAttributesResponses = null;
try {
nodesToAttributesResponses = invokeAppClientProtocolMethod(true, remoteMethod,
GetNodesToAttributesResponse.class);
} catch (Exception ex) {
routerMetrics.incrGetNodesToAttributesFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get nodes to attributes due " +
" to exception.", ex);
}
long stopTime = clock.getTime();
routerMetrics.succeededGetNodesToAttributesRetrieved(stopTime - startTime);
return RouterYarnClientUtils.mergeNodesToAttributesResponse(nodesToAttributesResponses);
}
protected SubClusterId getApplicationHomeSubCluster(

View File

@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -468,5 +474,56 @@ public final class RouterYarnClientUtils {
profileResponse.setResource(resource);
return profileResponse;
}
/**
* Merges a list of GetAttributesToNodesResponse.
*
* @param responses a list of GetAttributesToNodesResponse to merge.
* @return the merged GetAttributesToNodesResponse.
*/
public static GetAttributesToNodesResponse mergeAttributesToNodesResponse(
Collection<GetAttributesToNodesResponse> responses) {
Map<NodeAttributeKey, List<NodeToAttributeValue>> nodeAttributeMap = new HashMap<>();
for (GetAttributesToNodesResponse response : responses) {
if (response != null && response.getAttributesToNodes() != null) {
nodeAttributeMap.putAll(response.getAttributesToNodes());
}
}
return GetAttributesToNodesResponse.newInstance(nodeAttributeMap);
}
/**
* Merges a list of GetClusterNodeAttributesResponse.
*
* @param responses a list of GetClusterNodeAttributesResponse to merge.
* @return the merged GetClusterNodeAttributesResponse.
*/
public static GetClusterNodeAttributesResponse mergeClusterNodeAttributesResponse(
Collection<GetClusterNodeAttributesResponse> responses) {
Set<NodeAttributeInfo> nodeAttributeInfo = new HashSet<>();
for (GetClusterNodeAttributesResponse response : responses) {
if (response != null && response.getNodeAttributes() != null) {
nodeAttributeInfo.addAll(response.getNodeAttributes());
}
}
return GetClusterNodeAttributesResponse.newInstance(nodeAttributeInfo);
}
/**
* Merges a list of GetNodesToAttributesResponse.
*
* @param responses a list of GetNodesToAttributesResponse to merge.
* @return the merged GetNodesToAttributesResponse.
*/
public static GetNodesToAttributesResponse mergeNodesToAttributesResponse(
Collection<GetNodesToAttributesResponse> responses) {
Map<String, Set<NodeAttribute>> attributesMap = new HashMap<>();
for (GetNodesToAttributesResponse response : responses) {
if (response != null && response.getNodeToAttributes() != null) {
attributesMap.putAll(response.getNodeToAttributes());
}
}
return GetNodesToAttributesResponse.newInstance(attributesMap);
}
}

View File

@ -438,6 +438,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getResourceProfileFailed call");
metrics.incrGetResourceProfileFailedRetrieved();
}
public void getAttributesToNodesFailed() {
LOG.info("Mocked: failed getAttributesToNodesFailed call");
metrics.incrGetAttributesToNodesFailedRetrieved();
}
public void getClusterNodeAttributesFailed() {
LOG.info("Mocked: failed getClusterNodeAttributesFailed call");
metrics.incrGetClusterNodeAttributesFailedRetrieved();
}
public void getNodesToAttributesFailed() {
LOG.info("Mocked: failed getNodesToAttributesFailed call");
metrics.incrGetNodesToAttributesFailedRetrieved();
}
}
// Records successes for all calls
@ -573,6 +588,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getResourceProfile call with duration {}", duration);
metrics.succeededGetResourceProfileRetrieved(duration);
}
public void getAttributesToNodesRetrieved(long duration) {
LOG.info("Mocked: successful getAttributesToNodes call with duration {}", duration);
metrics.succeededGetAttributesToNodesRetrieved(duration);
}
public void getClusterNodeAttributesRetrieved(long duration) {
LOG.info("Mocked: successful getClusterNodeAttributes call with duration {}", duration);
metrics.succeededGetClusterNodeAttributesRetrieved(duration);
}
public void getNodesToAttributesRetrieved(long duration) {
LOG.info("Mocked: successful getNodesToAttributes call with duration {}", duration);
metrics.succeededGetNodesToAttributesRetrieved(duration);
}
}
@Test
@ -970,4 +1000,73 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1,
metrics.getResourceProfileFailedRetrieved());
}
@Test
public void testSucceededGetAttributesToNodesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetAttributesToNodesRetrieved();
goodSubCluster.getAttributesToNodesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetAttributesToNodesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getAttributesToNodesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetAttributesToNodesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetAttributesToNodesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetAttributesToNodesRetrievedFailed() {
long totalBadBefore = metrics.getAttributesToNodesFailedRetrieved();
badSubCluster.getAttributesToNodesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getAttributesToNodesFailedRetrieved());
}
@Test
public void testGetClusterNodeAttributesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetClusterNodeAttributesRetrieved();
goodSubCluster.getClusterNodeAttributesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getClusterNodeAttributesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetClusterNodeAttributesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetClusterNodeAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetClusterNodeAttributesRetrievedFailed() {
long totalBadBefore = metrics.getClusterNodeAttributesFailedRetrieved();
badSubCluster.getClusterNodeAttributesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getClusterNodeAttributesFailedRetrieved());
}
@Test
public void testGetNodesToAttributesRetrieved() {
long totalGoodBefore = metrics.getNumSucceededGetNodesToAttributesRetrieved();
goodSubCluster.getNodesToAttributesRetrieved(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededGetNodesToAttributesRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getNodesToAttributesRetrieved(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededGetNodesToAttributesRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededGetNodesToAttributesRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testGetNodesToAttributesRetrievedFailed() {
long totalBadBefore = metrics.getNodesToAttributesFailedRetrieved();
badSubCluster.getNodesToAttributesFailed();
Assert.assertEquals(totalBadBefore + 1,
metrics.getNodesToAttributesFailedRetrieved());
}
}

View File

@ -82,6 +82,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -96,6 +102,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -1233,4 +1244,81 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
Assert.assertEquals(4096, response3.getResource().getMemorySize());
Assert.assertEquals(4, response3.getResource().getVirtualCores());
}
@Test
public void testGetAttributesToNodes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get AttributesToNodes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getAttributesToNodes request " +
"or nodeAttributes.", () -> interceptor.getAttributesToNodes(null));
// normal request
GetAttributesToNodesResponse response =
interceptor.getAttributesToNodes(GetAttributesToNodesRequest.newInstance());
Assert.assertNotNull(response);
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
Assert.assertNotNull(attrs);
Assert.assertEquals(4, attrs.size());
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeToAttributeValue attributeValue1 =
NodeToAttributeValue.newInstance("0-host1", gpu.getAttributeValue());
NodeAttributeKey gpuKey = gpu.getAttributeKey();
Assert.assertTrue(attrs.get(gpuKey).contains(attributeValue1));
}
@Test
public void testClusterNodeAttributes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get ClusterNodeAttributes request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing getClusterNodeAttributes request.",
() -> interceptor.getClusterNodeAttributes(null));
// normal request
GetClusterNodeAttributesResponse response =
interceptor.getClusterNodeAttributes(GetClusterNodeAttributesRequest.newInstance());
Assert.assertNotNull(response);
Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
Assert.assertNotNull(nodeAttributeInfos);
Assert.assertEquals(4, nodeAttributeInfos.size());
NodeAttributeInfo nodeAttributeInfo1 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
NodeAttributeType.STRING);
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
NodeAttributeInfo nodeAttributeInfo2 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("OS"),
NodeAttributeType.STRING);
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
}
@Test
public void testNodesToAttributes() throws Exception {
LOG.info("Test FederationClientInterceptor : Get NodesToAttributes request.");
// null request
LambdaTestUtils.intercept(YarnException.class,
"Missing getNodesToAttributes request or hostNames.",
() -> interceptor.getNodesToAttributes(null));
// normal request
Set<String> hostNames = Collections.singleton("0-host1");
GetNodesToAttributesResponse response =
interceptor.getNodesToAttributes(GetNodesToAttributesRequest.newInstance(hostNames));
Assert.assertNotNull(response);
Map<String, Set<NodeAttribute>> nodeAttributeMap = response.getNodeToAttributes();
Assert.assertNotNull(nodeAttributeMap);
Assert.assertEquals(1, nodeAttributeMap.size());
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvida");
Assert.assertTrue(nodeAttributeMap.get("0-host1").contains(gpu));
}
}

View File

@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAttributesToNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeAttributesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToAttributesResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -53,6 +56,11 @@ import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeToAttributeValue;
import org.apache.hadoop.yarn.api.records.NodeAttributeInfo;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
@ -610,4 +618,147 @@ public class TestRouterYarnClientUtils {
Assert.assertEquals(3, resource.getVirtualCores());
Assert.assertEquals(3072, resource.getMemorySize());
}
@Test
public void testMergeAttributesToNodesResponse() {
// normal response1
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
Map<NodeAttributeKey, List<NodeToAttributeValue>> map1 = new HashMap<>();
List<NodeToAttributeValue> lists1 = new ArrayList<>();
NodeToAttributeValue attributeValue1 =
NodeToAttributeValue.newInstance("node1", gpu.getAttributeValue());
lists1.add(attributeValue1);
map1.put(gpu.getAttributeKey(), lists1);
GetAttributesToNodesResponse response1 = GetAttributesToNodesResponse.newInstance(map1);
// normal response2
NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
Map<NodeAttributeKey, List<NodeToAttributeValue>> map2 = new HashMap<>();
List<NodeToAttributeValue> lists2 = new ArrayList<>();
NodeToAttributeValue attributeValue2 =
NodeToAttributeValue.newInstance("node2", docker.getAttributeValue());
lists2.add(attributeValue2);
map2.put(docker.getAttributeKey(), lists2);
GetAttributesToNodesResponse response2 = GetAttributesToNodesResponse.newInstance(map2);
// empty response3
GetAttributesToNodesResponse response3 =
GetAttributesToNodesResponse.newInstance(new HashMap<>());
// null response4
GetAttributesToNodesResponse response4 = null;
List<GetAttributesToNodesResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
GetAttributesToNodesResponse response =
RouterYarnClientUtils.mergeAttributesToNodesResponse(responses);
Assert.assertNotNull(response);
Assert.assertEquals(2, response.getAttributesToNodes().size());
Map<NodeAttributeKey, List<NodeToAttributeValue>> attrs = response.getAttributesToNodes();
NodeAttributeKey gpuKey = gpu.getAttributeKey();
Assert.assertEquals(attributeValue1.toString(), attrs.get(gpuKey).get(0).toString());
NodeAttributeKey dockerKey = docker.getAttributeKey();
Assert.assertEquals(attributeValue2.toString(), attrs.get(dockerKey).get(0).toString());
}
@Test
public void testMergeClusterNodeAttributesResponse() {
// normal response1
NodeAttributeInfo nodeAttributeInfo1 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("GPU"),
NodeAttributeType.STRING);
Set<NodeAttributeInfo> attributes1 = new HashSet<>();
attributes1.add(nodeAttributeInfo1);
GetClusterNodeAttributesResponse response1 =
GetClusterNodeAttributesResponse.newInstance(attributes1);
// normal response2
NodeAttributeInfo nodeAttributeInfo2 =
NodeAttributeInfo.newInstance(NodeAttributeKey.newInstance("CPU"),
NodeAttributeType.STRING);
Set<NodeAttributeInfo> attributes2 = new HashSet<>();
attributes2.add(nodeAttributeInfo2);
GetClusterNodeAttributesResponse response2 =
GetClusterNodeAttributesResponse.newInstance(attributes2);
// empty response3
GetClusterNodeAttributesResponse response3 =
GetClusterNodeAttributesResponse.newInstance(new HashSet<>());
// null response4
GetClusterNodeAttributesResponse response4 = null;
List<GetClusterNodeAttributesResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
GetClusterNodeAttributesResponse response =
RouterYarnClientUtils.mergeClusterNodeAttributesResponse(responses);
Assert.assertNotNull(response);
Set<NodeAttributeInfo> nodeAttributeInfos = response.getNodeAttributes();
Assert.assertEquals(2, nodeAttributeInfos.size());
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo1));
Assert.assertTrue(nodeAttributeInfos.contains(nodeAttributeInfo2));
}
@Test
public void testMergeNodesToAttributesResponse() {
// normal response1
NodeAttribute gpu = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvida");
NodeAttribute os = NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
NodeAttributeType.STRING, "windows64");
NodeAttribute dist = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
NodeAttributeType.STRING, "3_0_2");
Map<String, Set<NodeAttribute>> node1Map = new HashMap<>();
node1Map.put("node1", ImmutableSet.of(gpu, os, dist));
GetNodesToAttributesResponse response1 = GetNodesToAttributesResponse.newInstance(node1Map);
// normal response2
NodeAttribute docker = NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
Map<String, Set<NodeAttribute>> node2Map = new HashMap<>();
node2Map.put("node2", ImmutableSet.of(docker));
GetNodesToAttributesResponse response2 = GetNodesToAttributesResponse.newInstance(node2Map);
// empty response3
GetNodesToAttributesResponse response3 =
GetNodesToAttributesResponse.newInstance(new HashMap<>());
// null response4
GetNodesToAttributesResponse response4 = null;
List<GetNodesToAttributesResponse> responses = new ArrayList<>();
responses.add(response1);
responses.add(response2);
responses.add(response3);
responses.add(response4);
GetNodesToAttributesResponse response =
RouterYarnClientUtils.mergeNodesToAttributesResponse(responses);
Assert.assertNotNull(response);
Map<String, Set<NodeAttribute>> hostToAttrs = response.getNodeToAttributes();
Assert.assertNotNull(hostToAttrs);
Assert.assertEquals(2, hostToAttrs.size());
Assert.assertTrue(hostToAttrs.get("node1").contains(dist));
Assert.assertTrue(hostToAttrs.get("node1").contains(gpu));
Assert.assertTrue(hostToAttrs.get("node1").contains(os));
Assert.assertTrue(hostToAttrs.get("node2").contains(docker));
}
}

View File

@ -24,12 +24,19 @@ import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeType;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -82,6 +89,7 @@ public class TestableFederationClientInterceptor
}
mockRMs.put(subClusterId, mockRM);
}
initNodeAttributes(subClusterId, mockRM);
return mockRM.getClientRMService();
}
}
@ -127,4 +135,30 @@ public class TestableFederationClientInterceptor
public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
return mockNMs;
}
private void initNodeAttributes(SubClusterId subClusterId, MockRM mockRM) {
String node1 = subClusterId.getId() +"-host1";
String node2 = subClusterId.getId() +"-host2";
NodeAttributesManager mgr = mockRM.getRMContext().getNodeAttributesManager();
NodeAttribute gpu =
NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "GPU",
NodeAttributeType.STRING, "nvidia");
NodeAttribute os =
NodeAttribute.newInstance(NodeAttribute.PREFIX_CENTRALIZED, "OS",
NodeAttributeType.STRING, "windows64");
NodeAttribute docker =
NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "DOCKER",
NodeAttributeType.STRING, "docker0");
NodeAttribute dist =
NodeAttribute.newInstance(NodeAttribute.PREFIX_DISTRIBUTED, "VERSION",
NodeAttributeType.STRING, "3_0_2");
Map<String, Set<NodeAttribute>> nodes = new HashMap<>();
nodes.put(node1, ImmutableSet.of(gpu, os, dist));
nodes.put(node2, ImmutableSet.of(docker, dist));
try {
mgr.addNodeAttributes(nodes);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}