YARN-11160. Support getResourceProfiles, getResourceProfile API's for Federation (#4540)
This commit is contained in:
parent
f4b635c4dc
commit
838020ce3b
|
@ -115,8 +115,7 @@ public class ResourceUtils {
|
|||
* Supporting 'memory', 'memory-mb', 'vcores' also as invalid resource
|
||||
* names, in addition to 'MEMORY' for historical reasons
|
||||
*/
|
||||
String[] keys = { "memory", ResourceInformation.MEMORY_URI,
|
||||
ResourceInformation.VCORES_URI };
|
||||
String[] keys = {"memory", ResourceInformation.MEMORY_URI, ResourceInformation.VCORES_URI};
|
||||
for(String key : keys) {
|
||||
if (resourceInformationMap.containsKey(key)) {
|
||||
LOG.warn("Attempt to define resource '" + key + "', but it is not allowed.");
|
||||
|
@ -234,7 +233,8 @@ public class ResourceUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get maximum allocation from config, *THIS WILL NOT UPDATE INTERNAL DATA*
|
||||
* Get maximum allocation from config, *THIS WILL NOT UPDATE INTERNAL DATA.
|
||||
*
|
||||
* @param conf config
|
||||
* @return maximum allocation
|
||||
*/
|
||||
|
@ -379,7 +379,7 @@ public class ResourceUtils {
|
|||
|
||||
/**
|
||||
* Get the resource types to be supported by the system.
|
||||
* @return A map of the resource name to a ResouceInformation object
|
||||
* @return A map of the resource name to a ResourceInformation object
|
||||
* which contains details such as the unit.
|
||||
*/
|
||||
public static Map<String, ResourceInformation> getResourceTypes() {
|
||||
|
@ -473,10 +473,10 @@ public class ResourceUtils {
|
|||
LOG.debug("Found {}, adding to configuration", resourceFile);
|
||||
conf.addResource(ris);
|
||||
} catch (FileNotFoundException fe) {
|
||||
LOG.info("Unable to find '" + resourceFile + "'.");
|
||||
LOG.info("Unable to find '{}'.", resourceFile);
|
||||
} catch (IOException | YarnException ex) {
|
||||
LOG.error("Exception trying to read resource types configuration '"
|
||||
+ resourceFile + "'.", ex);
|
||||
LOG.error("Exception trying to read resource types configuration '{}'.",
|
||||
resourceFile, ex);
|
||||
throw new YarnRuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
@ -668,7 +668,7 @@ public class ResourceUtils {
|
|||
/**
|
||||
* Reinitialize all resource types from external source (in case of client,
|
||||
* server will send the updated list and local resourceutils cache will be
|
||||
* updated as per server's list of resources)
|
||||
* updated as per server's list of resources).
|
||||
*
|
||||
* @param resourceTypeInfo
|
||||
* List of resource types
|
||||
|
@ -857,6 +857,7 @@ public class ResourceUtils {
|
|||
units = "Gi";
|
||||
} else if (units.isEmpty()) {
|
||||
// do nothing;
|
||||
LOG.debug("units is empty.");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Acceptable units are M/G or empty");
|
||||
}
|
||||
|
|
|
@ -87,6 +87,10 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numGetQueueInfoFailedRetrieved;
|
||||
@Metric("# of moveApplicationAcrossQueues failed to be retrieved")
|
||||
private MutableGaugeInt numMoveApplicationAcrossQueuesFailedRetrieved;
|
||||
@Metric("# of getResourceProfiles failed to be retrieved")
|
||||
private MutableGaugeInt numGetResourceProfilesFailedRetrieved;
|
||||
@Metric("# of getResourceProfile failed to be retrieved")
|
||||
private MutableGaugeInt numGetResourceProfileFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
|
@ -138,6 +142,11 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededGetQueueInfoRetrieved;
|
||||
@Metric("Total number of successful Retrieved moveApplicationAcrossQueues and latency(ms)")
|
||||
private MutableRate totalSucceededMoveApplicationAcrossQueuesRetrieved;
|
||||
@Metric("Total number of successful Retrieved getResourceProfiles and latency(ms)")
|
||||
private MutableRate totalSucceededGetResourceProfilesRetrieved;
|
||||
|
||||
@Metric("Total number of successful Retrieved getResourceProfile and latency(ms)")
|
||||
private MutableRate totalSucceededGetResourceProfileRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
|
@ -165,6 +174,8 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles signalToContainerLatency;
|
||||
private MutableQuantiles getQueueInfoLatency;
|
||||
private MutableQuantiles moveApplicationAcrossQueuesLatency;
|
||||
private MutableQuantiles getResourceProfilesLatency;
|
||||
private MutableQuantiles getResourceProfileLatency;
|
||||
|
||||
private static volatile RouterMetrics instance = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
@ -255,6 +266,14 @@ public final class RouterMetrics {
|
|||
moveApplicationAcrossQueuesLatency =
|
||||
registry.newQuantiles("moveApplicationAcrossQueuesLatency",
|
||||
"latency of move application across queues timeouts", "ops", "latency", 10);
|
||||
|
||||
getResourceProfilesLatency =
|
||||
registry.newQuantiles("getResourceProfilesLatency",
|
||||
"latency of get resource profiles timeouts", "ops", "latency", 10);
|
||||
|
||||
getResourceProfileLatency =
|
||||
registry.newQuantiles("getResourceProfileLatency",
|
||||
"latency of get resource profile timeouts", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
|
@ -391,6 +410,16 @@ public final class RouterMetrics {
|
|||
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededGetResourceProfilesRetrieved() {
|
||||
return totalSucceededGetResourceProfilesRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededGetResourceProfileRetrieved() {
|
||||
return totalSucceededGetResourceProfileRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
|
@ -506,6 +535,16 @@ public final class RouterMetrics {
|
|||
return totalSucceededMoveApplicationAcrossQueuesRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetResourceProfilesRetrieved() {
|
||||
return totalSucceededGetResourceProfilesRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededGetResourceProfileRetrieved() {
|
||||
return totalSucceededGetResourceProfileRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
|
@ -619,6 +658,14 @@ public final class RouterMetrics {
|
|||
return numMoveApplicationAcrossQueuesFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getResourceProfilesFailedRetrieved() {
|
||||
return numGetResourceProfilesFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public int getResourceProfileFailedRetrieved() {
|
||||
return numGetResourceProfileFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public void succeededAppsCreated(long duration) {
|
||||
totalSucceededAppsCreated.add(duration);
|
||||
getNewApplicationLatency.add(duration);
|
||||
|
@ -734,6 +781,16 @@ public final class RouterMetrics {
|
|||
moveApplicationAcrossQueuesLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededGetResourceProfilesRetrieved(long duration) {
|
||||
totalSucceededGetResourceProfilesRetrieved.add(duration);
|
||||
getResourceProfilesLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededGetResourceProfileRetrieved(long duration) {
|
||||
totalSucceededGetResourceProfileRetrieved.add(duration);
|
||||
getResourceProfileLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
|
@ -825,4 +882,12 @@ public final class RouterMetrics {
|
|||
public void incrMoveApplicationAcrossQueuesFailedRetrieved() {
|
||||
numMoveApplicationAcrossQueuesFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetResourceProfilesFailedRetrieved() {
|
||||
numGetResourceProfilesFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrGetResourceProfileFailedRetrieved() {
|
||||
numGetResourceProfileFailedRetrieved.incr();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1448,13 +1448,50 @@ public class FederationClientInterceptor
|
|||
@Override
|
||||
public GetAllResourceProfilesResponse getResourceProfiles(
|
||||
GetAllResourceProfilesRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
if (request == null) {
|
||||
routerMetrics.incrGetResourceProfilesFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Missing getResourceProfiles request.", null);
|
||||
}
|
||||
long startTime = clock.getTime();
|
||||
ClientMethod remoteMethod = new ClientMethod("getResourceProfiles",
|
||||
new Class[] {GetAllResourceProfilesRequest.class}, new Object[] {request});
|
||||
Collection<GetAllResourceProfilesResponse> resourceProfiles = null;
|
||||
try {
|
||||
resourceProfiles = invokeAppClientProtocolMethod(true, remoteMethod,
|
||||
GetAllResourceProfilesResponse.class);
|
||||
} catch (Exception ex) {
|
||||
routerMetrics.incrGetResourceProfilesFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Unable to get resource profiles due to exception.",
|
||||
ex);
|
||||
}
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededGetResourceProfilesRetrieved(stopTime - startTime);
|
||||
return RouterYarnClientUtils.mergeClusterResourceProfilesResponse(resourceProfiles);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetResourceProfileResponse getResourceProfile(
|
||||
GetResourceProfileRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
if (request == null || request.getProfileName() == null) {
|
||||
routerMetrics.incrGetResourceProfileFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Missing getResourceProfile request or profileName.",
|
||||
null);
|
||||
}
|
||||
long startTime = clock.getTime();
|
||||
ClientMethod remoteMethod = new ClientMethod("getResourceProfile",
|
||||
new Class[] {GetResourceProfileRequest.class}, new Object[] {request});
|
||||
Collection<GetResourceProfileResponse> resourceProfile = null;
|
||||
try {
|
||||
resourceProfile = invokeAppClientProtocolMethod(true, remoteMethod,
|
||||
GetResourceProfileResponse.class);
|
||||
} catch (Exception ex) {
|
||||
routerMetrics.incrGetResourceProfileFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Unable to get resource profile due to exception.",
|
||||
ex);
|
||||
}
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededGetResourceProfileRetrieved(stopTime - startTime);
|
||||
return RouterYarnClientUtils.mergeClusterResourceProfileResponse(resourceProfile);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||
|
@ -46,8 +48,10 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
/**
|
||||
|
@ -417,5 +421,52 @@ public final class RouterYarnClientUtils {
|
|||
queueResponse.setQueueInfo(queueInfo);
|
||||
return queueResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges a list of GetAllResourceProfilesResponse.
|
||||
*
|
||||
* @param responses a list of GetAllResourceProfilesResponse to merge.
|
||||
* @return the merged GetAllResourceProfilesResponse.
|
||||
*/
|
||||
public static GetAllResourceProfilesResponse mergeClusterResourceProfilesResponse(
|
||||
Collection<GetAllResourceProfilesResponse> responses) {
|
||||
GetAllResourceProfilesResponse profilesResponse =
|
||||
Records.newRecord(GetAllResourceProfilesResponse.class);
|
||||
Map<String, Resource> profilesMap = new HashMap<>();
|
||||
for (GetAllResourceProfilesResponse response : responses) {
|
||||
if (response != null && response.getResourceProfiles() != null) {
|
||||
for (Map.Entry<String, Resource> entry : response.getResourceProfiles().entrySet()) {
|
||||
String key = entry.getKey();
|
||||
Resource r1 = profilesMap.getOrDefault(key, null);
|
||||
Resource r2 = entry.getValue();
|
||||
Resource rAdd = r1 == null ? r2 : Resources.add(r1, r2);
|
||||
profilesMap.put(key, rAdd);
|
||||
}
|
||||
}
|
||||
}
|
||||
profilesResponse.setResourceProfiles(profilesMap);
|
||||
return profilesResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Merges a list of GetResourceProfileResponse.
|
||||
*
|
||||
* @param responses a list of GetResourceProfileResponse to merge.
|
||||
* @return the merged GetResourceProfileResponse.
|
||||
*/
|
||||
public static GetResourceProfileResponse mergeClusterResourceProfileResponse(
|
||||
Collection<GetResourceProfileResponse> responses) {
|
||||
GetResourceProfileResponse profileResponse =
|
||||
Records.newRecord(GetResourceProfileResponse.class);
|
||||
Resource resource = Resource.newInstance(0, 0);
|
||||
for (GetResourceProfileResponse response : responses) {
|
||||
if (response != null && response.getResource() != null) {
|
||||
Resource responseResource = response.getResource();
|
||||
resource = Resources.add(resource, responseResource);
|
||||
}
|
||||
}
|
||||
profileResponse.setResource(resource);
|
||||
return profileResponse;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -428,6 +428,16 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: failed moveApplicationAcrossQueuesFailed call");
|
||||
metrics.incrMoveApplicationAcrossQueuesFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getResourceProfilesFailed() {
|
||||
LOG.info("Mocked: failed getResourceProfilesFailed call");
|
||||
metrics.incrGetResourceProfilesFailedRetrieved();
|
||||
}
|
||||
|
||||
public void getResourceProfileFailed() {
|
||||
LOG.info("Mocked: failed getResourceProfileFailed call");
|
||||
metrics.incrGetResourceProfileFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
|
@ -553,6 +563,16 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: successful moveApplicationAcrossQueues call with duration {}", duration);
|
||||
metrics.succeededMoveApplicationAcrossQueuesRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getResourceProfilesRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful getResourceProfiles call with duration {}", duration);
|
||||
metrics.succeededGetResourceProfilesRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getResourceProfileRetrieved(long duration) {
|
||||
LOG.info("Mocked: successful getResourceProfile call with duration {}", duration);
|
||||
metrics.succeededGetResourceProfileRetrieved(duration);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -905,4 +925,49 @@ public class TestRouterMetrics {
|
|||
metrics.getMoveApplicationAcrossQueuesFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSucceededGetResourceProfilesRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededGetResourceProfilesRetrieved();
|
||||
goodSubCluster.getResourceProfilesRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededGetResourceProfilesRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededGetResourceProfilesRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getResourceProfilesRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededGetResourceProfilesRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededGetResourceProfilesRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetResourceProfilesRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getResourceProfilesFailedRetrieved();
|
||||
badSubCluster.getResourceProfilesFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getResourceProfilesFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSucceededGetResourceProfileRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededGetResourceProfileRetrieved();
|
||||
goodSubCluster.getResourceProfileRetrieved(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededGetResourceProfileRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededGetResourceProfileRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getResourceProfileRetrieved(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededGetResourceProfileRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededGetResourceProfileRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetResourceProfileRetrievedFailed() {
|
||||
long totalBadBefore = metrics.getResourceProfileFailedRetrieved();
|
||||
badSubCluster.getResourceProfileFailed();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getResourceProfileFailedRetrieved());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,6 +78,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesReq
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -91,6 +95,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
|
@ -188,7 +193,6 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
|
|||
|
||||
// Disable StateStoreFacade cache
|
||||
conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -1169,4 +1173,64 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
|
|||
Assert.assertEquals(queueInfo.getChildQueues().size(), 12, 0);
|
||||
Assert.assertEquals(queueInfo.getAccessibleNodeLabels().size(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetResourceProfiles() throws Exception {
|
||||
LOG.info("Test FederationClientInterceptor : Get Resource Profiles request.");
|
||||
|
||||
// null request
|
||||
LambdaTestUtils.intercept(YarnException.class, "Missing getResourceProfiles request.",
|
||||
() -> interceptor.getResourceProfiles(null));
|
||||
|
||||
// normal request
|
||||
GetAllResourceProfilesRequest request = GetAllResourceProfilesRequest.newInstance();
|
||||
GetAllResourceProfilesResponse response = interceptor.getResourceProfiles(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Map<String, Resource> resProfiles = response.getResourceProfiles();
|
||||
|
||||
Resource maxResProfiles = resProfiles.get("maximum");
|
||||
Assert.assertEquals(32768, maxResProfiles.getMemorySize());
|
||||
Assert.assertEquals(16, maxResProfiles.getVirtualCores());
|
||||
|
||||
Resource defaultResProfiles = resProfiles.get("default");
|
||||
Assert.assertEquals(8192, defaultResProfiles.getMemorySize());
|
||||
Assert.assertEquals(8, defaultResProfiles.getVirtualCores());
|
||||
|
||||
Resource minimumResProfiles = resProfiles.get("minimum");
|
||||
Assert.assertEquals(4096, minimumResProfiles.getMemorySize());
|
||||
Assert.assertEquals(4, minimumResProfiles.getVirtualCores());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetResourceProfile() throws Exception {
|
||||
LOG.info("Test FederationClientInterceptor : Get Resource Profile request.");
|
||||
|
||||
// null request
|
||||
LambdaTestUtils.intercept(YarnException.class,
|
||||
"Missing getResourceProfile request or profileName.",
|
||||
() -> interceptor.getResourceProfile(null));
|
||||
|
||||
// normal request
|
||||
GetResourceProfileRequest request = GetResourceProfileRequest.newInstance("maximum");
|
||||
GetResourceProfileResponse response = interceptor.getResourceProfile(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(32768, response.getResource().getMemorySize());
|
||||
Assert.assertEquals(16, response.getResource().getVirtualCores());
|
||||
|
||||
GetResourceProfileRequest request2 = GetResourceProfileRequest.newInstance("default");
|
||||
GetResourceProfileResponse response2 = interceptor.getResourceProfile(request2);
|
||||
|
||||
Assert.assertNotNull(response2);
|
||||
Assert.assertEquals(8192, response2.getResource().getMemorySize());
|
||||
Assert.assertEquals(8, response2.getResource().getVirtualCores());
|
||||
|
||||
GetResourceProfileRequest request3 = GetResourceProfileRequest.newInstance("minimum");
|
||||
GetResourceProfileResponse response3 = interceptor.getResourceProfile(request3);
|
||||
|
||||
Assert.assertNotNull(response3);
|
||||
Assert.assertEquals(4096, response3.getResource().getMemorySize());
|
||||
Assert.assertEquals(4, response3.getResource().getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetResourceProfileResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceProfilesResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
|
@ -539,4 +541,73 @@ public class TestRouterYarnClientUtils {
|
|||
Assert.assertTrue(CollectionUtils.isEqualCollection(expectedResponse,
|
||||
response.getResourceTypeInfo()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResourceProfiles() {
|
||||
// normal response1
|
||||
Map<String, Resource> profiles = new HashMap<>();
|
||||
Resource resource1 = Resource.newInstance(1024, 1);
|
||||
GetAllResourceProfilesResponse response1 = GetAllResourceProfilesResponse.newInstance();
|
||||
profiles.put("maximum", resource1);
|
||||
response1.setResourceProfiles(profiles);
|
||||
|
||||
// normal response2
|
||||
profiles = new HashMap<>();
|
||||
Resource resource2 = Resource.newInstance(2048, 2);
|
||||
GetAllResourceProfilesResponse response2 = GetAllResourceProfilesResponse.newInstance();
|
||||
profiles.put("maximum", resource2);
|
||||
response2.setResourceProfiles(profiles);
|
||||
|
||||
// empty response
|
||||
GetAllResourceProfilesResponse response3 = GetAllResourceProfilesResponse.newInstance();
|
||||
|
||||
// null response
|
||||
GetAllResourceProfilesResponse response4 = null;
|
||||
|
||||
List<GetAllResourceProfilesResponse> responses = new ArrayList<>();
|
||||
responses.add(response1);
|
||||
responses.add(response2);
|
||||
responses.add(response3);
|
||||
responses.add(response4);
|
||||
|
||||
GetAllResourceProfilesResponse response =
|
||||
RouterYarnClientUtils.mergeClusterResourceProfilesResponse(responses);
|
||||
Resource resource = response.getResourceProfiles().get("maximum");
|
||||
Assert.assertEquals(3, resource.getVirtualCores());
|
||||
Assert.assertEquals(3072, resource.getMemorySize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMergeResourceProfile() {
|
||||
// normal response1
|
||||
Resource resource1 = Resource.newInstance(1024, 1);
|
||||
GetResourceProfileResponse response1 =
|
||||
Records.newRecord(GetResourceProfileResponse.class);
|
||||
response1.setResource(resource1);
|
||||
|
||||
// normal response2
|
||||
Resource resource2 = Resource.newInstance(2048, 2);
|
||||
GetResourceProfileResponse response2 =
|
||||
Records.newRecord(GetResourceProfileResponse.class);
|
||||
response2.setResource(resource2);
|
||||
|
||||
// empty response
|
||||
GetResourceProfileResponse response3 =
|
||||
Records.newRecord(GetResourceProfileResponse.class);
|
||||
|
||||
// null response
|
||||
GetResourceProfileResponse response4 = null;
|
||||
|
||||
List<GetResourceProfileResponse> responses = new ArrayList<>();
|
||||
responses.add(response1);
|
||||
responses.add(response2);
|
||||
responses.add(response3);
|
||||
responses.add(response4);
|
||||
|
||||
GetResourceProfileResponse response =
|
||||
RouterYarnClientUtils.mergeClusterResourceProfileResponse(responses);
|
||||
Resource resource = response.getResource();
|
||||
Assert.assertEquals(3, resource.getVirtualCores());
|
||||
Assert.assertEquals(3072, resource.getMemorySize());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"___asflicense__": [
|
||||
"",
|
||||
"Licensed to the Apache Software Foundation (ASF) under one",
|
||||
"or more contributor license agreements. See the NOTICE file",
|
||||
"distributed with this work for additional information",
|
||||
"regarding copyright ownership. The ASF licenses this file",
|
||||
"to you under the Apache License, Version 2.0 (the",
|
||||
"\"License\"); you may not use this file except in compliance",
|
||||
"with the License. You may obtain a copy of the License at",
|
||||
"",
|
||||
" http://www.apache.org/licenses/LICENSE-2.0",
|
||||
"",
|
||||
"Unless required by applicable law or agreed to in writing, software",
|
||||
"distributed under the License is distributed on an \"AS IS\" BASIS,",
|
||||
"WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.",
|
||||
"See the License for the specific language governing permissions and",
|
||||
"limitations under the License."
|
||||
],
|
||||
"default" : {
|
||||
"memory-mb" : 2048,
|
||||
"vcores" : 2
|
||||
}
|
||||
}
|
|
@ -31,4 +31,12 @@
|
|||
<name>yarn.cluster.max-application-priority</name>
|
||||
<value>50</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>yarn.resourcemanager.resource-profiles.enabled</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>yarn.resourcemanager.resource-profiles.source-file</name>
|
||||
<value>profiles/sample-profiles-1.json</value>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
Loading…
Reference in New Issue