YARN-11159. Support failApplicationAttempt, updateApplicationPriority, updateApplicationTimeouts API's for Federation (#4396)

This commit is contained in:
slfan1989 2022-06-08 17:34:43 -07:00 committed by GitHub
parent 5ac55b405d
commit 98ca6fa10a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 487 additions and 9 deletions

View File

@ -75,6 +75,12 @@ public final class RouterMetrics {
private MutableGaugeInt numListReservationsFailedRetrieved; private MutableGaugeInt numListReservationsFailedRetrieved;
@Metric("# of getResourceTypeInfo failed to be retrieved") @Metric("# of getResourceTypeInfo failed to be retrieved")
private MutableGaugeInt numGetResourceTypeInfo; private MutableGaugeInt numGetResourceTypeInfo;
@Metric("# of failApplicationAttempt failed to be retrieved")
private MutableGaugeInt numFailAppAttemptFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
@Metric("# of updateApplicationPriority failed to be retrieved")
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call // Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)") @Metric("Total number of successful Submitted apps and latency(ms)")
@ -114,6 +120,12 @@ public final class RouterMetrics {
private MutableRate totalSucceededListReservationsRetrieved; private MutableRate totalSucceededListReservationsRetrieved;
@Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)") @Metric("Total number of successful Retrieved getResourceTypeInfo and latency(ms)")
private MutableRate totalSucceededGetResourceTypeInfoRetrieved; private MutableRate totalSucceededGetResourceTypeInfoRetrieved;
@Metric("Total number of successful Retrieved failApplicationAttempt and latency(ms)")
private MutableRate totalSucceededFailAppAttemptRetrieved;
@Metric("Total number of successful Retrieved updateApplicationPriority and latency(ms)")
private MutableRate totalSucceededUpdateAppPriorityRetrieved;
@Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
/** /**
* Provide quantile counters for all latencies. * Provide quantile counters for all latencies.
@ -135,8 +147,11 @@ public final class RouterMetrics {
private MutableQuantiles getContainerLatency; private MutableQuantiles getContainerLatency;
private MutableQuantiles listReservationsLatency; private MutableQuantiles listReservationsLatency;
private MutableQuantiles listResourceTypeInfoLatency; private MutableQuantiles listResourceTypeInfoLatency;
private MutableQuantiles failAppAttemptLatency;
private MutableQuantiles updateAppPriorityLatency;
private MutableQuantiles updateAppTimeoutsLatency;
private static volatile RouterMetrics INSTANCE = null; private static volatile RouterMetrics instance = null;
private static MetricsRegistry registry; private static MetricsRegistry registry;
private RouterMetrics() { private RouterMetrics() {
@ -201,25 +216,37 @@ public final class RouterMetrics {
listResourceTypeInfoLatency = listResourceTypeInfoLatency =
registry.newQuantiles("getResourceTypeInfoLatency", registry.newQuantiles("getResourceTypeInfoLatency",
"latency of get resource type info", "ops", "latency", 10); "latency of get resource type info", "ops", "latency", 10);
failAppAttemptLatency =
registry.newQuantiles("failApplicationAttemptLatency",
"latency of fail application attempt", "ops", "latency", 10);
updateAppPriorityLatency =
registry.newQuantiles("updateApplicationPriorityLatency",
"latency of update application priority", "ops", "latency", 10);
updateAppTimeoutsLatency =
registry.newQuantiles("updateApplicationTimeoutsLatency",
"latency of update application timeouts", "ops", "latency", 10);
} }
public static RouterMetrics getMetrics() { public static RouterMetrics getMetrics() {
if (!isInitialized.get()) { if (!isInitialized.get()) {
synchronized (RouterMetrics.class) { synchronized (RouterMetrics.class) {
if (INSTANCE == null) { if (instance == null) {
INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics", instance = DefaultMetricsSystem.instance().register("RouterMetrics",
"Metrics for the Yarn Router", new RouterMetrics()); "Metrics for the Yarn Router", new RouterMetrics());
isInitialized.set(true); isInitialized.set(true);
} }
} }
} }
return INSTANCE; return instance;
} }
@VisibleForTesting @VisibleForTesting
synchronized static void destroy() { synchronized static void destroy() {
isInitialized.set(false); isInitialized.set(false);
INSTANCE = null; instance = null;
} }
@VisibleForTesting @VisibleForTesting
@ -307,6 +334,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples(); return totalSucceededGetResourceTypeInfoRetrieved.lastStat().numSamples();
} }
@VisibleForTesting
public long getNumSucceededFailAppAttemptRetrieved() {
return totalSucceededFailAppAttemptRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededUpdateAppPriorityRetrieved() {
return totalSucceededUpdateAppPriorityRetrieved.lastStat().numSamples();
}
@VisibleForTesting
public long getNumSucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
}
@VisibleForTesting @VisibleForTesting
public double getLatencySucceededAppsCreated() { public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean(); return totalSucceededAppsCreated.lastStat().mean();
@ -392,6 +434,21 @@ public final class RouterMetrics {
return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean(); return totalSucceededGetResourceTypeInfoRetrieved.lastStat().mean();
} }
@VisibleForTesting
public double getLatencySucceededFailAppAttemptRetrieved() {
return totalSucceededFailAppAttemptRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededUpdateAppPriorityRetrieved() {
return totalSucceededUpdateAppPriorityRetrieved.lastStat().mean();
}
@VisibleForTesting
public double getLatencySucceededUpdateAppTimeoutsRetrieved() {
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
}
@VisibleForTesting @VisibleForTesting
public int getAppsFailedCreated() { public int getAppsFailedCreated() {
return numAppsFailedCreated.value(); return numAppsFailedCreated.value();
@ -477,6 +534,21 @@ public final class RouterMetrics {
return numGetResourceTypeInfo.value(); return numGetResourceTypeInfo.value();
} }
@VisibleForTesting
public int getFailApplicationAttemptFailedRetrieved() {
return numFailAppAttemptFailedRetrieved.value();
}
@VisibleForTesting
public int getUpdateApplicationPriorityFailedRetrieved() {
return numUpdateAppPriorityFailedRetrieved.value();
}
@VisibleForTesting
public int getUpdateApplicationTimeoutsFailedRetrieved() {
return numUpdateAppTimeoutsFailedRetrieved.value();
}
public void succeededAppsCreated(long duration) { public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration); totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration); getNewApplicationLatency.add(duration);
@ -562,6 +634,21 @@ public final class RouterMetrics {
listResourceTypeInfoLatency.add(duration); listResourceTypeInfoLatency.add(duration);
} }
public void succeededFailAppAttemptRetrieved(long duration) {
totalSucceededFailAppAttemptRetrieved.add(duration);
failAppAttemptLatency.add(duration);
}
public void succeededUpdateAppPriorityRetrieved(long duration) {
totalSucceededUpdateAppPriorityRetrieved.add(duration);
updateAppPriorityLatency.add(duration);
}
public void succeededUpdateAppTimeoutsRetrieved(long duration) {
totalSucceededUpdateAppTimeoutsRetrieved.add(duration);
updateAppTimeoutsLatency.add(duration);
}
public void incrAppsFailedCreated() { public void incrAppsFailedCreated() {
numAppsFailedCreated.incr(); numAppsFailedCreated.incr();
} }
@ -629,4 +716,16 @@ public final class RouterMetrics {
public void incrResourceTypeInfoFailedRetrieved() { public void incrResourceTypeInfoFailedRetrieved() {
numGetResourceTypeInfo.incr(); numGetResourceTypeInfo.incr();
} }
public void incrFailAppAttemptFailedRetrieved() {
numFailAppAttemptFailedRetrieved.incr();
}
public void incrUpdateAppPriorityFailedRetrieved() {
numUpdateAppPriorityFailedRetrieved.incr();
}
public void incrUpdateApplicationTimeoutsRetrieved() {
numUpdateAppTimeoutsFailedRetrieved.incr();
}
} }

View File

@ -1213,14 +1213,92 @@ public class FederationClientInterceptor
@Override @Override
public FailApplicationAttemptResponse failApplicationAttempt( public FailApplicationAttemptResponse failApplicationAttempt(
FailApplicationAttemptRequest request) throws YarnException, IOException { FailApplicationAttemptRequest request) throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getApplicationAttemptId() == null
|| request.getApplicationAttemptId().getApplicationId() == null) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing failApplicationAttempt request or applicationId " +
"or applicationAttemptId information.", null);
}
long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationAttemptId().getApplicationId();
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("ApplicationAttempt " +
request.getApplicationAttemptId() + " belongs to Application " +
request.getApplicationAttemptId().getApplicationId() +
" does not exist in FederationStateStore.", e);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
FailApplicationAttemptResponse response = null;
try {
response = clientRMProxy.failApplicationAttempt(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to get the applicationAttempt report for " +
request.getApplicationAttemptId() + " to SubCluster " + subClusterId.getId(), e);
}
if (response == null) {
LOG.error("No response when attempting to retrieve the report of " +
"the applicationAttempt {} to SubCluster {}.",
request.getApplicationAttemptId(), subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededFailAppAttemptRetrieved(stopTime - startTime);
return response;
} }
@Override @Override
public UpdateApplicationPriorityResponse updateApplicationPriority( public UpdateApplicationPriorityResponse updateApplicationPriority(
UpdateApplicationPriorityRequest request) UpdateApplicationPriorityRequest request)
throws YarnException, IOException { throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getApplicationId() == null
|| request.getApplicationPriority() == null) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException(
"Missing updateApplicationPriority request or applicationId " +
"or applicationPriority information.", null);
}
long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationId();
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrUpdateAppPriorityFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
request.getApplicationId() +
" does not exist in FederationStateStore.", e);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
UpdateApplicationPriorityResponse response = null;
try {
response = clientRMProxy.updateApplicationPriority(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to update application priority for " +
request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
}
if (response == null) {
LOG.error("No response when update application priority of " +
"the applicationId {} to SubCluster {}.",
applicationId, subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppPriorityRetrieved(stopTime - startTime);
return response;
} }
@Override @Override
@ -1233,7 +1311,45 @@ public class FederationClientInterceptor
public UpdateApplicationTimeoutsResponse updateApplicationTimeouts( public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
UpdateApplicationTimeoutsRequest request) UpdateApplicationTimeoutsRequest request)
throws YarnException, IOException { throws YarnException, IOException {
throw new NotImplementedException("Code is not implemented"); if (request == null || request.getApplicationId() == null
|| request.getApplicationTimeouts() == null) {
routerMetrics.incrUpdateApplicationTimeoutsRetrieved();
RouterServerUtil.logAndThrowException(
"Missing updateApplicationTimeouts request or applicationId " +
"or applicationTimeouts information.", null);
}
long startTime = clock.getTime();
SubClusterId subClusterId = null;
ApplicationId applicationId = request.getApplicationId();
try {
subClusterId = getApplicationHomeSubCluster(applicationId);
} catch (YarnException e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Application " +
request.getApplicationId() +
" does not exist in FederationStateStore.", e);
}
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
UpdateApplicationTimeoutsResponse response = null;
try {
response = clientRMProxy.updateApplicationTimeouts(request);
} catch (Exception e) {
routerMetrics.incrFailAppAttemptFailedRetrieved();
RouterServerUtil.logAndThrowException("Unable to update application timeout for " +
request.getApplicationId() + " to SubCluster " + subClusterId.getId(), e);
}
if (response == null) {
LOG.error("No response when update application timeout of " +
"the applicationId {} to SubCluster {}.",
applicationId, subClusterId.getId());
}
long stopTime = clock.getTime();
routerMetrics.succeededUpdateAppTimeoutsRetrieved(stopTime - startTime);
return response;
} }
@Override @Override

View File

@ -398,6 +398,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getResourceTypeInfo call"); LOG.info("Mocked: failed getResourceTypeInfo call");
metrics.incrResourceTypeInfoFailedRetrieved(); metrics.incrResourceTypeInfoFailedRetrieved();
} }
public void getFailApplicationAttempt() {
LOG.info("Mocked: failed failApplicationAttempt call");
metrics.incrFailAppAttemptFailedRetrieved();
}
public void getUpdateApplicationPriority() {
LOG.info("Mocked: failed updateApplicationPriority call");
metrics.incrUpdateAppPriorityFailedRetrieved();
}
public void getUpdateApplicationTimeouts() {
LOG.info("Mocked: failed updateApplicationTimeouts call");
metrics.incrUpdateApplicationTimeoutsRetrieved();
}
} }
// Records successes for all calls // Records successes for all calls
@ -493,6 +508,21 @@ public class TestRouterMetrics {
LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration); LOG.info("Mocked: successful getResourceTypeInfo call with duration {}", duration);
metrics.succeededGetResourceTypeInfoRetrieved(duration); metrics.succeededGetResourceTypeInfoRetrieved(duration);
} }
public void getFailApplicationAttempt(long duration) {
LOG.info("Mocked: successful failApplicationAttempt call with duration {}", duration);
metrics.succeededFailAppAttemptRetrieved(duration);
}
public void getUpdateApplicationPriority(long duration) {
LOG.info("Mocked: successful updateApplicationPriority call with duration {}", duration);
metrics.succeededUpdateAppPriorityRetrieved(duration);
}
public void getUpdateApplicationTimeouts(long duration) {
LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration);
metrics.succeededUpdateAppTimeoutsRetrieved(duration);
}
} }
@Test @Test
@ -708,4 +738,72 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved()); Assert.assertEquals(totalBadBefore + 1, metrics.getGetResourceTypeInfoRetrieved());
} }
@Test
public void testSucceededFailApplicationAttempt() {
long totalGoodBefore = metrics.getNumSucceededFailAppAttemptRetrieved();
goodSubCluster.getFailApplicationAttempt(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededFailAppAttemptRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getFailApplicationAttempt(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededFailAppAttemptRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededFailAppAttemptRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testFailApplicationAttemptFailed() {
long totalBadBefore = metrics.getFailApplicationAttemptFailedRetrieved();
badSubCluster.getFailApplicationAttempt();
Assert.assertEquals(totalBadBefore + 1, metrics.getFailApplicationAttemptFailedRetrieved());
}
@Test
public void testSucceededUpdateApplicationPriority() {
long totalGoodBefore = metrics.getNumSucceededUpdateAppPriorityRetrieved();
goodSubCluster.getUpdateApplicationPriority(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededUpdateAppPriorityRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getUpdateApplicationPriority(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededUpdateAppPriorityRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededUpdateAppPriorityRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testUpdateApplicationPriorityFailed() {
long totalBadBefore = metrics.getUpdateApplicationPriorityFailedRetrieved();
badSubCluster.getUpdateApplicationPriority();
Assert.assertEquals(totalBadBefore + 1,
metrics.getUpdateApplicationPriorityFailedRetrieved());
}
@Test
public void testSucceededUpdateAppTimeoutsRetrieved() {
long totalGoodBefore = metrics.getNumSucceededUpdateAppTimeoutsRetrieved();
goodSubCluster.getUpdateApplicationTimeouts(150);
Assert.assertEquals(totalGoodBefore + 1,
metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
Assert.assertEquals(150,
metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
goodSubCluster.getUpdateApplicationTimeouts(300);
Assert.assertEquals(totalGoodBefore + 2,
metrics.getNumSucceededUpdateAppTimeoutsRetrieved());
Assert.assertEquals(225,
metrics.getLatencySucceededUpdateAppTimeoutsRetrieved(), ASSERT_DOUBLE_DELTA);
}
@Test
public void testUpdateAppTimeoutsFailed() {
long totalBadBefore = metrics.getUpdateApplicationTimeoutsFailedRetrieved();
badSubCluster.getUpdateApplicationTimeouts();
Assert.assertEquals(totalBadBefore + 1,
metrics.getUpdateApplicationTimeoutsFailedRetrieved());
}
} }

View File

@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -65,6 +66,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllResourceTypeInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -75,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@ -82,7 +90,12 @@ import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationState
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil; import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -109,6 +122,8 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
private final static int NUM_SUBCLUSTER = 4; private final static int NUM_SUBCLUSTER = 4;
private final static int APP_PRIORITY_ZERO = 0;
@Override @Override
public void setUp() { public void setUp() {
super.setUpConfig(); super.setUpConfig();
@ -212,7 +227,7 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class); ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
ApplicationSubmissionContext context = ApplicationSubmissionContext ApplicationSubmissionContext context = ApplicationSubmissionContext
.newInstance(appId, MockApps.newAppName(), "default", .newInstance(appId, MockApps.newAppName(), "default",
Priority.newInstance(0), amContainerSpec, false, false, -1, Priority.newInstance(APP_PRIORITY_ZERO), amContainerSpec, false, false, -1,
Resources.createResource( Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB), YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
"MockApp"); "MockApp");
@ -898,4 +913,147 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance()); interceptor.getResourceTypeInfo(GetAllResourceTypeInfoRequest.newInstance());
Assert.assertEquals(2, response.getResourceTypeInfo().size()); Assert.assertEquals(2, response.getResourceTypeInfo().size());
} }
@Test
public void testFailApplicationAttempt() throws Exception {
LOG.info("Test FederationClientInterceptor : Fail Application Attempt request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing failApplicationAttempt request " +
"or applicationId or applicationAttemptId information.",
() -> interceptor.failApplicationAttempt(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
Assert.assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
Assert.assertNotNull(attemptsResponse);
ApplicationAttemptId attemptId = attemptsResponse.getApplicationAttemptList().
get(0).getApplicationAttemptId();
FailApplicationAttemptRequest requestFailAppAttempt =
FailApplicationAttemptRequest.newInstance(attemptId);
FailApplicationAttemptResponse responseFailAppAttempt =
interceptor.failApplicationAttempt(requestFailAppAttempt);
Assert.assertNotNull(responseFailAppAttempt);
}
@Test
public void testUpdateApplicationPriority() throws Exception {
LOG.info("Test FederationClientInterceptor : Update Application Priority request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationPriority request " +
"or applicationId or applicationPriority information.",
() -> interceptor.updateApplicationPriority(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
Assert.assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
Assert.assertNotNull(attemptsResponse);
Priority priority = Priority.newInstance(20);
UpdateApplicationPriorityRequest requestUpdateAppPriority =
UpdateApplicationPriorityRequest.newInstance(appId, priority);
UpdateApplicationPriorityResponse responseAppPriority =
interceptor.updateApplicationPriority(requestUpdateAppPriority);
Assert.assertNotNull(responseAppPriority);
Assert.assertEquals(20,
responseAppPriority.getApplicationPriority().getPriority());
}
@Test
public void testUpdateApplicationTimeouts() throws Exception {
LOG.info("Test FederationClientInterceptor : Update Application Timeouts request.");
// null request
LambdaTestUtils.intercept(YarnException.class, "Missing updateApplicationTimeouts request " +
"or applicationId or applicationTimeouts information.",
() -> interceptor.updateApplicationTimeouts(null));
// normal request
ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
// Submit the application
SubmitApplicationResponse response = interceptor.submitApplication(request);
Assert.assertNotNull(response);
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
Assert.assertNotNull(subClusterId);
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
mockRM.waitForState(appId, RMAppState.ACCEPTED);
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.SCHEDULED);
// Call GetApplicationAttempts
GetApplicationAttemptsRequest attemptsRequest =
GetApplicationAttemptsRequest.newInstance(appId);
GetApplicationAttemptsResponse attemptsResponse =
interceptor.getApplicationAttempts(attemptsRequest);
Assert.assertNotNull(attemptsResponse);
String appTimeout =
Times.formatISO8601(System.currentTimeMillis() + 5 * 1000);
Map<ApplicationTimeoutType, String> applicationTimeouts = new HashMap<>();
applicationTimeouts.put(ApplicationTimeoutType.LIFETIME, appTimeout);
UpdateApplicationTimeoutsRequest timeoutsRequest =
UpdateApplicationTimeoutsRequest.newInstance(appId, applicationTimeouts);
UpdateApplicationTimeoutsResponse timeoutsResponse =
interceptor.updateApplicationTimeouts(timeoutsRequest);
String responseTimeOut =
timeoutsResponse.getApplicationTimeouts().get(ApplicationTimeoutType.LIFETIME);
Assert.assertNotNull(timeoutsResponse);
Assert.assertEquals(appTimeout, responseTimeOut);
}
} }

View File

@ -115,4 +115,7 @@ public class TestableFederationClientInterceptor
} }
} }
public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
return mockRMs;
}
} }

View File

@ -27,4 +27,8 @@
<name>yarn.resourcemanager.webapp.address</name> <name>yarn.resourcemanager.webapp.address</name>
<value>0.0.0.0:8080</value> <value>0.0.0.0:8080</value>
</property> </property>
<property>
<name>yarn.cluster.max-application-priority</name>
<value>50</value>
</property>
</configuration> </configuration>