YARN-10122. Support signalToContainer API for Federation. (#4421)
This commit is contained in:
parent
80446dcd08
commit
62e4476102
|
@ -81,6 +81,8 @@ public final class RouterMetrics {
|
|||
private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
|
||||
@Metric("# of updateApplicationPriority failed to be retrieved")
|
||||
private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
|
||||
@Metric("# of signalToContainer failed to be retrieved")
|
||||
private MutableGaugeInt numSignalToContainerFailedRetrieved;
|
||||
|
||||
// Aggregate metrics are shared, and don't have to be looked up per call
|
||||
@Metric("Total number of successful Submitted apps and latency(ms)")
|
||||
|
@ -126,6 +128,8 @@ public final class RouterMetrics {
|
|||
private MutableRate totalSucceededUpdateAppPriorityRetrieved;
|
||||
@Metric("Total number of successful Retrieved updateApplicationTimeouts and latency(ms)")
|
||||
private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
|
||||
@Metric("Total number of successful Retrieved signalToContainer and latency(ms)")
|
||||
private MutableRate totalSucceededSignalToContainerRetrieved;
|
||||
|
||||
/**
|
||||
* Provide quantile counters for all latencies.
|
||||
|
@ -150,6 +154,7 @@ public final class RouterMetrics {
|
|||
private MutableQuantiles failAppAttemptLatency;
|
||||
private MutableQuantiles updateAppPriorityLatency;
|
||||
private MutableQuantiles updateAppTimeoutsLatency;
|
||||
private MutableQuantiles signalToContainerLatency;
|
||||
|
||||
private static volatile RouterMetrics instance = null;
|
||||
private static MetricsRegistry registry;
|
||||
|
@ -228,6 +233,10 @@ public final class RouterMetrics {
|
|||
updateAppTimeoutsLatency =
|
||||
registry.newQuantiles("updateApplicationTimeoutsLatency",
|
||||
"latency of update application timeouts", "ops", "latency", 10);
|
||||
|
||||
signalToContainerLatency =
|
||||
registry.newQuantiles("signalToContainerLatency",
|
||||
"latency of signal to container timeouts", "ops", "latency", 10);
|
||||
}
|
||||
|
||||
public static RouterMetrics getMetrics() {
|
||||
|
@ -349,6 +358,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumSucceededSignalToContainerRetrieved() {
|
||||
return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededAppsCreated() {
|
||||
return totalSucceededAppsCreated.lastStat().mean();
|
||||
|
@ -449,6 +463,11 @@ public final class RouterMetrics {
|
|||
return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public double getLatencySucceededSignalToContainerRetrieved() {
|
||||
return totalSucceededSignalToContainerRetrieved.lastStat().mean();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getAppsFailedCreated() {
|
||||
return numAppsFailedCreated.value();
|
||||
|
@ -549,6 +568,11 @@ public final class RouterMetrics {
|
|||
return numUpdateAppTimeoutsFailedRetrieved.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getSignalToContainerFailedRetrieved() {
|
||||
return numSignalToContainerFailedRetrieved.value();
|
||||
}
|
||||
|
||||
public void succeededAppsCreated(long duration) {
|
||||
totalSucceededAppsCreated.add(duration);
|
||||
getNewApplicationLatency.add(duration);
|
||||
|
@ -649,6 +673,11 @@ public final class RouterMetrics {
|
|||
updateAppTimeoutsLatency.add(duration);
|
||||
}
|
||||
|
||||
public void succeededSignalToContainerRetrieved(long duration) {
|
||||
totalSucceededSignalToContainerRetrieved.add(duration);
|
||||
signalToContainerLatency.add(duration);
|
||||
}
|
||||
|
||||
public void incrAppsFailedCreated() {
|
||||
numAppsFailedCreated.incr();
|
||||
}
|
||||
|
@ -728,4 +757,8 @@ public final class RouterMetrics {
|
|||
public void incrUpdateApplicationTimeoutsRetrieved() {
|
||||
numUpdateAppTimeoutsFailedRetrieved.incr();
|
||||
}
|
||||
|
||||
public void incrSignalToContainerFailedRetrieved() {
|
||||
numSignalToContainerFailedRetrieved.incr();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1304,7 +1304,43 @@ public class FederationClientInterceptor
|
|||
@Override
|
||||
public SignalContainerResponse signalToContainer(
|
||||
SignalContainerRequest request) throws YarnException, IOException {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
if (request == null || request.getContainerId() == null
|
||||
|| request.getCommand() == null) {
|
||||
routerMetrics.incrSignalToContainerFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException(
|
||||
"Missing signalToContainer request or containerId " +
|
||||
"or command information.", null);
|
||||
}
|
||||
|
||||
long startTime = clock.getTime();
|
||||
SubClusterId subClusterId = null;
|
||||
ApplicationId applicationId =
|
||||
request.getContainerId().getApplicationAttemptId().getApplicationId();
|
||||
try {
|
||||
subClusterId = getApplicationHomeSubCluster(applicationId);
|
||||
} catch (YarnException ex) {
|
||||
routerMetrics.incrSignalToContainerFailedRetrieved();
|
||||
RouterServerUtil.logAndThrowException("Application " + applicationId +
|
||||
" does not exist in FederationStateStore.", ex);
|
||||
}
|
||||
|
||||
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
|
||||
SignalContainerResponse response = null;
|
||||
try {
|
||||
response = clientRMProxy.signalToContainer(request);
|
||||
} catch (Exception ex) {
|
||||
RouterServerUtil.logAndThrowException("Unable to signal to container for " +
|
||||
applicationId + " from SubCluster " + subClusterId.getId(), ex);
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
LOG.error("No response when signal to container of " +
|
||||
"the applicationId {} to SubCluster {}.", applicationId, subClusterId.getId());
|
||||
}
|
||||
|
||||
long stopTime = clock.getTime();
|
||||
routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -413,6 +413,11 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: failed updateApplicationTimeouts call");
|
||||
metrics.incrUpdateApplicationTimeoutsRetrieved();
|
||||
}
|
||||
|
||||
public void getSignalContainer() {
|
||||
LOG.info("Mocked: failed signalContainer call");
|
||||
metrics.incrSignalToContainerFailedRetrieved();
|
||||
}
|
||||
}
|
||||
|
||||
// Records successes for all calls
|
||||
|
@ -523,6 +528,11 @@ public class TestRouterMetrics {
|
|||
LOG.info("Mocked: successful updateApplicationTimeouts call with duration {}", duration);
|
||||
metrics.succeededUpdateAppTimeoutsRetrieved(duration);
|
||||
}
|
||||
|
||||
public void getSignalToContainerTimeouts(long duration) {
|
||||
LOG.info("Mocked: successful signalToContainer call with duration {}", duration);
|
||||
metrics.succeededSignalToContainerRetrieved(duration);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -806,4 +816,27 @@ public class TestRouterMetrics {
|
|||
metrics.getUpdateApplicationTimeoutsFailedRetrieved());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSucceededSignalToContainerRetrieved() {
|
||||
long totalGoodBefore = metrics.getNumSucceededSignalToContainerRetrieved();
|
||||
goodSubCluster.getSignalToContainerTimeouts(150);
|
||||
Assert.assertEquals(totalGoodBefore + 1,
|
||||
metrics.getNumSucceededSignalToContainerRetrieved());
|
||||
Assert.assertEquals(150,
|
||||
metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
goodSubCluster.getSignalToContainerTimeouts(300);
|
||||
Assert.assertEquals(totalGoodBefore + 2,
|
||||
metrics.getNumSucceededSignalToContainerRetrieved());
|
||||
Assert.assertEquals(225,
|
||||
metrics.getLatencySucceededSignalToContainerRetrieved(), ASSERT_DOUBLE_DELTA);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSignalToContainerFailed() {
|
||||
long totalBadBefore = metrics.getSignalToContainerFailedRetrieved();
|
||||
badSubCluster.getSignalContainer();
|
||||
Assert.assertEquals(totalBadBefore + 1,
|
||||
metrics.getSignalToContainerFailedRetrieved());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityReque
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
|
@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|||
import org.apache.hadoop.yarn.api.records.ReservationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
|
||||
|
@ -91,6 +94,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
|||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
||||
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
|
@ -1056,4 +1060,45 @@ public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
|
|||
Assert.assertNotNull(timeoutsResponse);
|
||||
Assert.assertEquals(appTimeout, responseTimeOut);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSignalContainer() throws Exception {
|
||||
LOG.info("Test FederationClientInterceptor : Signal Container request.");
|
||||
|
||||
// null request
|
||||
LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer request " +
|
||||
"or containerId or command information.", () -> interceptor.signalToContainer(null));
|
||||
|
||||
// normal request
|
||||
ApplicationId appId =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
|
||||
|
||||
// Submit the application
|
||||
SubmitApplicationResponse response = interceptor.submitApplication(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
|
||||
|
||||
SubClusterId subClusterId = interceptor.getApplicationHomeSubCluster(appId);
|
||||
Assert.assertNotNull(subClusterId);
|
||||
|
||||
MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
|
||||
mockRM.waitForState(appId, RMAppState.ACCEPTED);
|
||||
RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
|
||||
mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
|
||||
RMAppAttemptState.SCHEDULED);
|
||||
MockNM nm = interceptor.getMockNMs().get(subClusterId);
|
||||
nm.nodeHeartbeat(true);
|
||||
mockRM.waitForState(rmApp.getCurrentAppAttempt(), RMAppAttemptState.ALLOCATED);
|
||||
mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
|
||||
|
||||
ContainerId containerId = rmApp.getCurrentAppAttempt().getMasterContainer().getId();
|
||||
|
||||
SignalContainerRequest signalContainerRequest =
|
||||
SignalContainerRequest.newInstance(containerId, SignalContainerCommand.GRACEFUL_SHUTDOWN);
|
||||
SignalContainerResponse signalContainerResponse =
|
||||
interceptor.signalToContainer(signalContainerRequest);
|
||||
|
||||
Assert.assertNotNull(signalContainerResponse);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
|
@ -51,6 +52,9 @@ public class TestableFederationClientInterceptor
|
|||
private ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private ConcurrentHashMap<SubClusterId, MockNM> mockNMs =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
|
||||
|
||||
@Override
|
||||
|
@ -71,7 +75,8 @@ public class TestableFederationClientInterceptor
|
|||
mockRM.init(super.getConf());
|
||||
mockRM.start();
|
||||
try {
|
||||
mockRM.registerNode("h1:1234", 1024);
|
||||
MockNM nm = mockRM.registerNode("127.0.0.1:1234", 8*1024, 4);
|
||||
mockNMs.put(subClusterId, nm);
|
||||
} catch (Exception e) {
|
||||
Assert.fail(e.getMessage());
|
||||
}
|
||||
|
@ -118,4 +123,8 @@ public class TestableFederationClientInterceptor
|
|||
public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
|
||||
return mockRMs;
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
|
||||
return mockNMs;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue