YARN-7894. Improve ATS response for DShell DS_CONTAINER when container launch fails. Contributed by Chandni Singh

(cherry picked from commit 1ef0a1db1d)
This commit is contained in:
Billie Rinaldi 2018-05-08 13:49:41 -07:00
parent 39236da2ff
commit f7cbfeb726
2 changed files with 93 additions and 15 deletions

View File

@ -117,6 +117,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper; import org.apache.hadoop.yarn.util.TimelineServiceHelper;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -345,6 +346,7 @@ public class ApplicationMaster {
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS"; static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
static final String APPID_TIMELINE_FILTER_NAME = "appId"; static final String APPID_TIMELINE_FILTER_NAME = "appId";
static final String USER_TIMELINE_FILTER_NAME = "user"; static final String USER_TIMELINE_FILTER_NAME = "user";
static final String DIAGNOSTICS = "Diagnostics";
private final String linux_bash_command = "bash"; private final String linux_bash_command = "bash";
private final String windows_command = "cmd /c"; private final String windows_command = "cmd /c";
@ -356,6 +358,8 @@ public class ApplicationMaster {
protected final Set<ContainerId> launchedContainers = protected final Set<ContainerId> launchedContainers =
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>()); Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
/** /**
* Container start times used to set id prefix while publishing entity * Container start times used to set id prefix while publishing entity
* to ATSv2. * to ATSv2.
@ -390,7 +394,7 @@ public class ApplicationMaster {
LOG.info("Application Master completed successfully. exiting"); LOG.info("Application Master completed successfully. exiting");
System.exit(0); System.exit(0);
} else { } else {
LOG.info("Application Master failed. exiting"); LOG.error("Application Master failed. exiting");
System.exit(2); System.exit(2);
} }
} }
@ -931,28 +935,25 @@ public class ApplicationMaster {
LOG.info("Application completed. Signalling finish to RM"); LOG.info("Application completed. Signalling finish to RM");
FinalApplicationStatus appStatus; FinalApplicationStatus appStatus;
String appMessage = null;
boolean success = true; boolean success = true;
String message = null;
if (numCompletedContainers.get() - numFailedContainers.get() if (numCompletedContainers.get() - numFailedContainers.get()
>= numTotalContainers) { >= numTotalContainers) {
appStatus = FinalApplicationStatus.SUCCEEDED; appStatus = FinalApplicationStatus.SUCCEEDED;
} else { } else {
appStatus = FinalApplicationStatus.FAILED; appStatus = FinalApplicationStatus.FAILED;
appMessage = "Diagnostics." + ", total=" + numTotalContainers message = String.format("Application Failure: desired = %d, " +
+ ", completed=" + numCompletedContainers.get() + ", allocated=" "completed = %d, allocated = %d, failed = %d, " +
+ numAllocatedContainers.get() + ", failed=" "diagnostics = %s", numRequestedContainers.get(),
+ numFailedContainers.get(); numCompletedContainers.get(), numAllocatedContainers.get(),
LOG.info(appMessage); numFailedContainers.get(), diagnostics);
success = false; success = false;
} }
try { try {
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); amRMClient.unregisterApplicationMaster(appStatus, message, null);
} catch (YarnException ex) { } catch (YarnException | IOException ex) {
LOG.error("Failed to unregister application", ex); LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
} }
amRMClient.stop(); amRMClient.stop();
// Stop Timeline Client // Stop Timeline Client
@ -974,11 +975,17 @@ public class ApplicationMaster {
LOG.info("Got response from RM for container ask, completedCnt=" LOG.info("Got response from RM for container ask, completedCnt="
+ completedContainers.size()); + completedContainers.size());
for (ContainerStatus containerStatus : completedContainers) { for (ContainerStatus containerStatus : completedContainers) {
LOG.info(appAttemptID + " got container status for containerID=" String message = appAttemptID + " got container status for containerID="
+ containerStatus.getContainerId() + ", state=" + containerStatus.getContainerId() + ", state="
+ containerStatus.getState() + ", exitStatus=" + containerStatus.getState() + ", exitStatus="
+ containerStatus.getExitStatus() + ", diagnostics=" + containerStatus.getExitStatus() + ", diagnostics="
+ containerStatus.getDiagnostics()); + containerStatus.getDiagnostics();
if (containerStatus.getExitStatus() != 0) {
LOG.error(message);
diagnostics.append(containerStatus.getDiagnostics());
} else {
LOG.info(message);
}
// non complete containers should not be here // non complete containers should not be here
assert (containerStatus.getState() == ContainerState.COMPLETE); assert (containerStatus.getState() == ContainerState.COMPLETE);
@ -1244,10 +1251,17 @@ public class ApplicationMaster {
@Override @Override
public void onStartContainerError(ContainerId containerId, Throwable t) { public void onStartContainerError(ContainerId containerId, Throwable t) {
LOG.error("Failed to start Container " + containerId, t); LOG.error("Failed to start Container {}", containerId, t);
containers.remove(containerId); containers.remove(containerId);
applicationMaster.numCompletedContainers.incrementAndGet(); applicationMaster.numCompletedContainers.incrementAndGet();
applicationMaster.numFailedContainers.incrementAndGet(); applicationMaster.numFailedContainers.incrementAndGet();
if (timelineServiceV2Enabled) {
publishContainerStartFailedEventOnTimelineServiceV2(containerId,
t.getMessage());
}
if (timelineServiceV1Enabled) {
publishContainerStartFailedEvent(containerId, t.getMessage());
}
} }
@Override @Override
@ -1525,6 +1539,7 @@ public class ApplicationMaster {
event.setEventType(DSEvent.DS_CONTAINER_END.toString()); event.setEventType(DSEvent.DS_CONTAINER_END.toString());
event.addEventInfo("State", container.getState().name()); event.addEventInfo("State", container.getState().name());
event.addEventInfo("Exit Status", container.getExitStatus()); event.addEventInfo("Exit Status", container.getExitStatus());
event.addEventInfo(DIAGNOSTICS, container.getDiagnostics());
entity.addEvent(event); entity.addEvent(event);
try { try {
processTimelineResponseErrors( processTimelineResponseErrors(
@ -1653,6 +1668,58 @@ public class ApplicationMaster {
} }
} }
private void publishContainerStartFailedEventOnTimelineServiceV2(
final ContainerId containerId, String diagnostics) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
entity = new org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity();
entity.setId(containerId.toString());
entity.setType(DSEntity.DS_CONTAINER.toString());
entity.addInfo("user", appSubmitterUgi.getShortUserName());
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
new org.apache.hadoop.yarn.api.records.timelineservice
.TimelineEvent();
event.setTimestamp(System.currentTimeMillis());
event.setId(DSEvent.DS_CONTAINER_END.toString());
event.addInfo(DIAGNOSTICS, diagnostics);
entity.addEvent(event);
try {
appSubmitterUgi.doAs((PrivilegedExceptionAction<Object>) () -> {
timelineV2Client.putEntitiesAsync(entity);
return null;
});
} catch (Exception e) {
LOG.error("Container start failed event could not be published for {}",
containerId,
e instanceof UndeclaredThrowableException ? e.getCause() : e);
}
}
private void publishContainerStartFailedEvent(final ContainerId containerId,
String diagnostics) {
final TimelineEntity entityV1 = new TimelineEntity();
entityV1.setEntityId(containerId.toString());
entityV1.setEntityType(DSEntity.DS_CONTAINER.toString());
entityV1.setDomainId(domainId);
entityV1.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, appSubmitterUgi
.getShortUserName());
entityV1.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
containerId.getApplicationAttemptId().getApplicationId().toString());
TimelineEvent eventV1 = new TimelineEvent();
eventV1.setTimestamp(System.currentTimeMillis());
eventV1.setEventType(DSEvent.DS_CONTAINER_END.toString());
eventV1.addEventInfo(DIAGNOSTICS, diagnostics);
entityV1.addEvent(eventV1);
try {
processTimelineResponseErrors(putContainerEntity(timelineClient,
containerId.getApplicationAttemptId(), entityV1));
} catch (YarnException | IOException | ClientHandlerException e) {
LOG.error("Container end event could not be published for {}",
containerId, e);
}
}
private void publishContainerEndEventOnTimelineServiceV2( private void publishContainerEndEventOnTimelineServiceV2(
final ContainerStatus container, long containerStartTime) { final ContainerStatus container, long containerStartTime) {
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
@ -1669,6 +1736,7 @@ public class ApplicationMaster {
event.setId(DSEvent.DS_CONTAINER_END.toString()); event.setId(DSEvent.DS_CONTAINER_END.toString());
event.addInfo("State", container.getState().name()); event.addInfo("State", container.getState().name());
event.addInfo("Exit Status", container.getExitStatus()); event.addInfo("Exit Status", container.getExitStatus());
event.addInfo(DIAGNOSTICS, container.getDiagnostics());
entity.addEvent(event); entity.addEvent(event);
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime)); entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent; import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter; import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
@ -665,6 +666,15 @@ public class TestDistributedShell {
if (entityLine.contains(expectedEvent)) { if (entityLine.contains(expectedEvent)) {
actualCount++; actualCount++;
} }
if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) &&
entityLine.contains(expectedEvent)) {
TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
TimelineEvent event = entity.getEvents().pollFirst();
Assert.assertNotNull(event);
Assert.assertTrue("diagnostics",
event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
}
if (checkIdPrefix) { if (checkIdPrefix) {
TimelineEntity entity = FileSystemTimelineReaderImpl. TimelineEntity entity = FileSystemTimelineReaderImpl.
getTimelineRecordFromJSON(entityLine, TimelineEntity.class); getTimelineRecordFromJSON(entityLine, TimelineEntity.class);