From 148322ca72ee7bc40ece551b485f9e939fe4a893 Mon Sep 17 00:00:00 2001 From: Eric Yang Date: Fri, 13 Apr 2018 15:34:33 -0400 Subject: [PATCH] YARN-8142. Improve SIGTERM handling for YARN Service Application Master. Contributed by Billie Rinaldi (cherry picked from commit 9031a76d447f0c5eaa392144fd17c5b9812e1b20) --- .../hadoop/yarn/service/ClientAMService.java | 1 + .../hadoop/yarn/service/ServiceScheduler.java | 41 +++++++---- .../hadoop/yarn/service/ServiceTestUtils.java | 11 +++ .../yarn/service/TestYarnNativeServices.java | 71 +++++++++++++++++++ 4 files changed, 110 insertions(+), 14 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 08c36f443ed..3d037e7189b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -125,6 +125,7 @@ public class ClientAMService extends AbstractService LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser()); context.scheduler.getDiagnostics() .append("Stopped by user " + UserGroupInformation.getCurrentUser()); + context.scheduler.setGracefulStop(); // Stop the service in 2 seconds delay to make sure this rpc call is completed. // shutdown hook will be executed which will stop AM gracefully. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 0fcca165099..7eddef9c23d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -156,6 +156,8 @@ public class ServiceScheduler extends CompositeService { // requests for a single service is not recommended. private boolean hasAtLeastOnePlacementConstraint; + private boolean gracefulStop = false; + public ServiceScheduler(ServiceContext context) { super(context.service.getName()); this.context = context; @@ -199,6 +201,7 @@ public class ServiceScheduler extends CompositeService { addIfService(amRMClient); nmClient = createNMClient(); + nmClient.getClient().cleanupRunningContainersOnStop(false); addIfService(nmClient); dispatcher = new AsyncDispatcher("Component dispatcher"); @@ -252,6 +255,11 @@ public class ServiceScheduler extends CompositeService { .createAMRMClientAsync(1000, new AMRMClientCallback()); } + protected void setGracefulStop() { + this.gracefulStop = true; + nmClient.getClient().cleanupRunningContainersOnStop(true); + } + @Override public void serviceInit(Configuration conf) throws Exception { try { @@ -266,26 +274,31 @@ public class ServiceScheduler extends CompositeService { public void serviceStop() throws Exception { LOG.info("Stopping service scheduler"); - // Mark component-instances/containers as STOPPED - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - for (ContainerId containerId : getLiveInstances().keySet()) { - serviceTimelinePublisher.componentInstanceFinished(containerId, - KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); - } - } if (executorService != null) { executorService.shutdownNow(); } DefaultMetricsSystem.shutdown(); - if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - serviceTimelinePublisher - .serviceAttemptUnregistered(context, diagnostics.toString()); + + // only stop the entire service when a graceful stop has been initiated + // (e.g. via client RPC, not through the AM receiving a SIGTERM) + if (gracefulStop) { + if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { + // mark component-instances/containers as STOPPED + for (ContainerId containerId : getLiveInstances().keySet()) { + serviceTimelinePublisher.componentInstanceFinished(containerId, + KILLED_AFTER_APP_COMPLETION, diagnostics.toString()); + } + // mark attempt as unregistered + serviceTimelinePublisher + .serviceAttemptUnregistered(context, diagnostics.toString()); + } + // unregister AM + amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, + diagnostics.toString(), ""); + LOG.info("Service {} unregistered with RM, with attemptId = {} " + + ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); } - amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED, - diagnostics.toString(), ""); - LOG.info("Service {} unregistered with RM, with attemptId = {} " + - ", diagnostics = {} ", app.getName(), context.attemptId, diagnostics); super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index d84f05fc6bb..599b8a7a0ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.registry.client.impl.zk.CuratorService; import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; @@ -305,6 +306,16 @@ public class ServiceTestUtils { return client; } + /** + * Creates a YarnClient for test purposes. + */ + public static YarnClient createYarnClient(Configuration conf) { + YarnClient client = YarnClient.createYarnClient(); + client.init(conf); + client.start(); + return client; + } + protected CuratorService getCuratorService() throws IOException { return curatorService; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 5e267bb15b1..443ba0b4035 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; @@ -489,6 +490,76 @@ public class TestYarnNativeServices extends ServiceTestUtils { client.actionDestroy(exampleApp.getName()); } + @Test(timeout = 200000) + public void testAMSigtermDoesNotKillApplication() throws Exception { + runAMSignalTest(SignalContainerCommand.GRACEFUL_SHUTDOWN); + } + + @Test(timeout = 200000) + public void testAMSigkillDoesNotKillApplication() throws Exception { + runAMSignalTest(SignalContainerCommand.FORCEFUL_SHUTDOWN); + } + + public void runAMSignalTest(SignalContainerCommand signal) throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = createExampleApplication(); + client.actionCreate(exampleApp); + waitForServiceToBeStable(client, exampleApp); + Service appStatus1 = client.getStatus(exampleApp.getName()); + ApplicationId exampleAppId = ApplicationId.fromString(appStatus1.getId()); + + YarnClient yarnClient = createYarnClient(getConf()); + ApplicationReport applicationReport = yarnClient.getApplicationReport( + exampleAppId); + + ApplicationAttemptId firstAttemptId = applicationReport + .getCurrentApplicationAttemptId(); + ApplicationAttemptReport attemptReport = yarnClient + .getApplicationAttemptReport(firstAttemptId); + + // the AM should not perform a graceful shutdown since the operation was not + // initiated through the service client + yarnClient.signalToContainer(attemptReport.getAMContainerId(), signal); + + GenericTestUtils.waitFor(() -> { + try { + ApplicationReport ar = client.getYarnClient() + .getApplicationReport(exampleAppId); + YarnApplicationState state = ar.getYarnApplicationState(); + Assert.assertTrue(state == YarnApplicationState.RUNNING || + state == YarnApplicationState.ACCEPTED); + if (state != YarnApplicationState.RUNNING) { + return false; + } + if (ar.getCurrentApplicationAttemptId() == null || + ar.getCurrentApplicationAttemptId().equals(firstAttemptId)) { + return false; + } + Service appStatus2 = client.getStatus(exampleApp.getName()); + if (appStatus2.getState() != ServiceState.STABLE) { + return false; + } + Assert.assertEquals(getSortedContainerIds(appStatus1).toString(), + getSortedContainerIds(appStatus2).toString()); + return true; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + } + + private static List getSortedContainerIds(Service s) { + List containerIds = new ArrayList<>(); + for (Component component : s.getComponents()) { + for (Container container : component.getContainers()) { + containerIds.add(container.getId()); + } + } + Collections.sort(containerIds); + return containerIds; + } + // Check containers launched are in dependency order // Get all containers into a list and sort based on container launch time e.g. // compa-c1, compa-c2, compb-c1, compb-c2;