YARN-8142. Improve SIGTERM handling for YARN Service Application Master.
Contributed by Billie Rinaldi
This commit is contained in:
parent
e66e287efe
commit
9031a76d44
|
@ -125,6 +125,7 @@ public class ClientAMService extends AbstractService
|
|||
LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser());
|
||||
context.scheduler.getDiagnostics()
|
||||
.append("Stopped by user " + UserGroupInformation.getCurrentUser());
|
||||
context.scheduler.setGracefulStop();
|
||||
|
||||
// Stop the service in 2 seconds delay to make sure this rpc call is completed.
|
||||
// shutdown hook will be executed which will stop AM gracefully.
|
||||
|
|
|
@ -156,6 +156,8 @@ public class ServiceScheduler extends CompositeService {
|
|||
// requests for a single service is not recommended.
|
||||
private boolean hasAtLeastOnePlacementConstraint;
|
||||
|
||||
private boolean gracefulStop = false;
|
||||
|
||||
public ServiceScheduler(ServiceContext context) {
|
||||
super(context.service.getName());
|
||||
this.context = context;
|
||||
|
@ -199,6 +201,7 @@ public class ServiceScheduler extends CompositeService {
|
|||
addIfService(amRMClient);
|
||||
|
||||
nmClient = createNMClient();
|
||||
nmClient.getClient().cleanupRunningContainersOnStop(false);
|
||||
addIfService(nmClient);
|
||||
|
||||
dispatcher = new AsyncDispatcher("Component dispatcher");
|
||||
|
@ -252,6 +255,11 @@ public class ServiceScheduler extends CompositeService {
|
|||
.createAMRMClientAsync(1000, new AMRMClientCallback());
|
||||
}
|
||||
|
||||
protected void setGracefulStop() {
|
||||
this.gracefulStop = true;
|
||||
nmClient.getClient().cleanupRunningContainersOnStop(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
try {
|
||||
|
@ -266,26 +274,31 @@ public class ServiceScheduler extends CompositeService {
|
|||
public void serviceStop() throws Exception {
|
||||
LOG.info("Stopping service scheduler");
|
||||
|
||||
// Mark component-instances/containers as STOPPED
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
||||
for (ContainerId containerId : getLiveInstances().keySet()) {
|
||||
serviceTimelinePublisher.componentInstanceFinished(containerId,
|
||||
KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
|
||||
}
|
||||
}
|
||||
if (executorService != null) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
|
||||
DefaultMetricsSystem.shutdown();
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
||||
serviceTimelinePublisher
|
||||
.serviceAttemptUnregistered(context, diagnostics.toString());
|
||||
|
||||
// only stop the entire service when a graceful stop has been initiated
|
||||
// (e.g. via client RPC, not through the AM receiving a SIGTERM)
|
||||
if (gracefulStop) {
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
||||
// mark component-instances/containers as STOPPED
|
||||
for (ContainerId containerId : getLiveInstances().keySet()) {
|
||||
serviceTimelinePublisher.componentInstanceFinished(containerId,
|
||||
KILLED_AFTER_APP_COMPLETION, diagnostics.toString());
|
||||
}
|
||||
// mark attempt as unregistered
|
||||
serviceTimelinePublisher
|
||||
.serviceAttemptUnregistered(context, diagnostics.toString());
|
||||
}
|
||||
// unregister AM
|
||||
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
|
||||
diagnostics.toString(), "");
|
||||
LOG.info("Service {} unregistered with RM, with attemptId = {} " +
|
||||
", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
|
||||
}
|
||||
amRMClient.unregisterApplicationMaster(FinalApplicationStatus.ENDED,
|
||||
diagnostics.toString(), "");
|
||||
LOG.info("Service {} unregistered with RM, with attemptId = {} " +
|
||||
", diagnostics = {} ", app.getName(), context.attemptId, diagnostics);
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.http.HttpServer2;
|
|||
import org.apache.hadoop.registry.client.impl.zk.CuratorService;
|
||||
import org.apache.hadoop.service.ServiceOperations;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
|
@ -305,6 +306,16 @@ public class ServiceTestUtils {
|
|||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a YarnClient for test purposes.
|
||||
*/
|
||||
public static YarnClient createYarnClient(Configuration conf) {
|
||||
YarnClient client = YarnClient.createYarnClient();
|
||||
client.init(conf);
|
||||
client.start();
|
||||
return client;
|
||||
}
|
||||
|
||||
protected CuratorService getCuratorService() throws IOException {
|
||||
return curatorService;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.records.*;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
|
@ -489,6 +490,76 @@ public class TestYarnNativeServices extends ServiceTestUtils {
|
|||
client.actionDestroy(exampleApp.getName());
|
||||
}
|
||||
|
||||
@Test(timeout = 200000)
|
||||
public void testAMSigtermDoesNotKillApplication() throws Exception {
|
||||
runAMSignalTest(SignalContainerCommand.GRACEFUL_SHUTDOWN);
|
||||
}
|
||||
|
||||
@Test(timeout = 200000)
|
||||
public void testAMSigkillDoesNotKillApplication() throws Exception {
|
||||
runAMSignalTest(SignalContainerCommand.FORCEFUL_SHUTDOWN);
|
||||
}
|
||||
|
||||
public void runAMSignalTest(SignalContainerCommand signal) throws Exception {
|
||||
setupInternal(NUM_NMS);
|
||||
ServiceClient client = createClient(getConf());
|
||||
Service exampleApp = createExampleApplication();
|
||||
client.actionCreate(exampleApp);
|
||||
waitForServiceToBeStable(client, exampleApp);
|
||||
Service appStatus1 = client.getStatus(exampleApp.getName());
|
||||
ApplicationId exampleAppId = ApplicationId.fromString(appStatus1.getId());
|
||||
|
||||
YarnClient yarnClient = createYarnClient(getConf());
|
||||
ApplicationReport applicationReport = yarnClient.getApplicationReport(
|
||||
exampleAppId);
|
||||
|
||||
ApplicationAttemptId firstAttemptId = applicationReport
|
||||
.getCurrentApplicationAttemptId();
|
||||
ApplicationAttemptReport attemptReport = yarnClient
|
||||
.getApplicationAttemptReport(firstAttemptId);
|
||||
|
||||
// the AM should not perform a graceful shutdown since the operation was not
|
||||
// initiated through the service client
|
||||
yarnClient.signalToContainer(attemptReport.getAMContainerId(), signal);
|
||||
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
ApplicationReport ar = client.getYarnClient()
|
||||
.getApplicationReport(exampleAppId);
|
||||
YarnApplicationState state = ar.getYarnApplicationState();
|
||||
Assert.assertTrue(state == YarnApplicationState.RUNNING ||
|
||||
state == YarnApplicationState.ACCEPTED);
|
||||
if (state != YarnApplicationState.RUNNING) {
|
||||
return false;
|
||||
}
|
||||
if (ar.getCurrentApplicationAttemptId() == null ||
|
||||
ar.getCurrentApplicationAttemptId().equals(firstAttemptId)) {
|
||||
return false;
|
||||
}
|
||||
Service appStatus2 = client.getStatus(exampleApp.getName());
|
||||
if (appStatus2.getState() != ServiceState.STABLE) {
|
||||
return false;
|
||||
}
|
||||
Assert.assertEquals(getSortedContainerIds(appStatus1).toString(),
|
||||
getSortedContainerIds(appStatus2).toString());
|
||||
return true;
|
||||
} catch (YarnException | IOException e) {
|
||||
throw new RuntimeException("while waiting", e);
|
||||
}
|
||||
}, 2000, 200000);
|
||||
}
|
||||
|
||||
private static List<String> getSortedContainerIds(Service s) {
|
||||
List<String> containerIds = new ArrayList<>();
|
||||
for (Component component : s.getComponents()) {
|
||||
for (Container container : component.getContainers()) {
|
||||
containerIds.add(container.getId());
|
||||
}
|
||||
}
|
||||
Collections.sort(containerIds);
|
||||
return containerIds;
|
||||
}
|
||||
|
||||
// Check containers launched are in dependency order
|
||||
// Get all containers into a list and sort based on container launch time e.g.
|
||||
// compa-c1, compa-c2, compb-c1, compb-c2;
|
||||
|
|
Loading…
Reference in New Issue