YARN-7894. Improve ATS response for DShell DS_CONTAINER when container launch fails. Contributed by Chandni Singh
This commit is contained in:
parent
a2ea756420
commit
1ef0a1db1d
|
@ -117,6 +117,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.util.BoundedAppender;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
@ -345,6 +346,7 @@ public class ApplicationMaster {
|
|||
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
|
||||
static final String APPID_TIMELINE_FILTER_NAME = "appId";
|
||||
static final String USER_TIMELINE_FILTER_NAME = "user";
|
||||
static final String DIAGNOSTICS = "Diagnostics";
|
||||
|
||||
private final String linux_bash_command = "bash";
|
||||
private final String windows_command = "cmd /c";
|
||||
|
@ -356,6 +358,8 @@ public class ApplicationMaster {
|
|||
protected final Set<ContainerId> launchedContainers =
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
|
||||
|
||||
private BoundedAppender diagnostics = new BoundedAppender(64 * 1024);
|
||||
|
||||
/**
|
||||
* Container start times used to set id prefix while publishing entity
|
||||
* to ATSv2.
|
||||
|
@ -390,7 +394,7 @@ public class ApplicationMaster {
|
|||
LOG.info("Application Master completed successfully. exiting");
|
||||
System.exit(0);
|
||||
} else {
|
||||
LOG.info("Application Master failed. exiting");
|
||||
LOG.error("Application Master failed. exiting");
|
||||
System.exit(2);
|
||||
}
|
||||
}
|
||||
|
@ -931,28 +935,25 @@ public class ApplicationMaster {
|
|||
LOG.info("Application completed. Signalling finish to RM");
|
||||
|
||||
FinalApplicationStatus appStatus;
|
||||
String appMessage = null;
|
||||
boolean success = true;
|
||||
String message = null;
|
||||
if (numCompletedContainers.get() - numFailedContainers.get()
|
||||
>= numTotalContainers) {
|
||||
appStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
} else {
|
||||
appStatus = FinalApplicationStatus.FAILED;
|
||||
appMessage = "Diagnostics." + ", total=" + numTotalContainers
|
||||
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
||||
+ numAllocatedContainers.get() + ", failed="
|
||||
+ numFailedContainers.get();
|
||||
LOG.info(appMessage);
|
||||
message = String.format("Application Failure: desired = %d, " +
|
||||
"completed = %d, allocated = %d, failed = %d, " +
|
||||
"diagnostics = %s", numRequestedContainers.get(),
|
||||
numCompletedContainers.get(), numAllocatedContainers.get(),
|
||||
numFailedContainers.get(), diagnostics);
|
||||
success = false;
|
||||
}
|
||||
try {
|
||||
amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
|
||||
} catch (YarnException ex) {
|
||||
amRMClient.unregisterApplicationMaster(appStatus, message, null);
|
||||
} catch (YarnException | IOException ex) {
|
||||
LOG.error("Failed to unregister application", ex);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to unregister application", e);
|
||||
}
|
||||
|
||||
amRMClient.stop();
|
||||
|
||||
// Stop Timeline Client
|
||||
|
@ -974,11 +975,17 @@ public class ApplicationMaster {
|
|||
LOG.info("Got response from RM for container ask, completedCnt="
|
||||
+ completedContainers.size());
|
||||
for (ContainerStatus containerStatus : completedContainers) {
|
||||
LOG.info(appAttemptID + " got container status for containerID="
|
||||
String message = appAttemptID + " got container status for containerID="
|
||||
+ containerStatus.getContainerId() + ", state="
|
||||
+ containerStatus.getState() + ", exitStatus="
|
||||
+ containerStatus.getExitStatus() + ", diagnostics="
|
||||
+ containerStatus.getDiagnostics());
|
||||
+ containerStatus.getDiagnostics();
|
||||
if (containerStatus.getExitStatus() != 0) {
|
||||
LOG.error(message);
|
||||
diagnostics.append(containerStatus.getDiagnostics());
|
||||
} else {
|
||||
LOG.info(message);
|
||||
}
|
||||
|
||||
// non complete containers should not be here
|
||||
assert (containerStatus.getState() == ContainerState.COMPLETE);
|
||||
|
@ -1244,10 +1251,17 @@ public class ApplicationMaster {
|
|||
|
||||
@Override
|
||||
public void onStartContainerError(ContainerId containerId, Throwable t) {
|
||||
LOG.error("Failed to start Container " + containerId, t);
|
||||
LOG.error("Failed to start Container {}", containerId, t);
|
||||
containers.remove(containerId);
|
||||
applicationMaster.numCompletedContainers.incrementAndGet();
|
||||
applicationMaster.numFailedContainers.incrementAndGet();
|
||||
if (timelineServiceV2Enabled) {
|
||||
publishContainerStartFailedEventOnTimelineServiceV2(containerId,
|
||||
t.getMessage());
|
||||
}
|
||||
if (timelineServiceV1Enabled) {
|
||||
publishContainerStartFailedEvent(containerId, t.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1525,6 +1539,7 @@ public class ApplicationMaster {
|
|||
event.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
||||
event.addEventInfo("State", container.getState().name());
|
||||
event.addEventInfo("Exit Status", container.getExitStatus());
|
||||
event.addEventInfo(DIAGNOSTICS, container.getDiagnostics());
|
||||
entity.addEvent(event);
|
||||
try {
|
||||
processTimelineResponseErrors(
|
||||
|
@ -1653,6 +1668,58 @@ public class ApplicationMaster {
|
|||
}
|
||||
}
|
||||
|
||||
private void publishContainerStartFailedEventOnTimelineServiceV2(
|
||||
final ContainerId containerId, String diagnostics) {
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity = new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntity();
|
||||
entity.setId(containerId.toString());
|
||||
entity.setType(DSEntity.DS_CONTAINER.toString());
|
||||
entity.addInfo("user", appSubmitterUgi.getShortUserName());
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice
|
||||
.TimelineEvent();
|
||||
event.setTimestamp(System.currentTimeMillis());
|
||||
event.setId(DSEvent.DS_CONTAINER_END.toString());
|
||||
event.addInfo(DIAGNOSTICS, diagnostics);
|
||||
entity.addEvent(event);
|
||||
try {
|
||||
appSubmitterUgi.doAs((PrivilegedExceptionAction<Object>) () -> {
|
||||
timelineV2Client.putEntitiesAsync(entity);
|
||||
return null;
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error("Container start failed event could not be published for {}",
|
||||
containerId,
|
||||
e instanceof UndeclaredThrowableException ? e.getCause() : e);
|
||||
}
|
||||
}
|
||||
|
||||
private void publishContainerStartFailedEvent(final ContainerId containerId,
|
||||
String diagnostics) {
|
||||
final TimelineEntity entityV1 = new TimelineEntity();
|
||||
entityV1.setEntityId(containerId.toString());
|
||||
entityV1.setEntityType(DSEntity.DS_CONTAINER.toString());
|
||||
entityV1.setDomainId(domainId);
|
||||
entityV1.addPrimaryFilter(USER_TIMELINE_FILTER_NAME, appSubmitterUgi
|
||||
.getShortUserName());
|
||||
entityV1.addPrimaryFilter(APPID_TIMELINE_FILTER_NAME,
|
||||
containerId.getApplicationAttemptId().getApplicationId().toString());
|
||||
|
||||
TimelineEvent eventV1 = new TimelineEvent();
|
||||
eventV1.setTimestamp(System.currentTimeMillis());
|
||||
eventV1.setEventType(DSEvent.DS_CONTAINER_END.toString());
|
||||
eventV1.addEventInfo(DIAGNOSTICS, diagnostics);
|
||||
entityV1.addEvent(eventV1);
|
||||
try {
|
||||
processTimelineResponseErrors(putContainerEntity(timelineClient,
|
||||
containerId.getApplicationAttemptId(), entityV1));
|
||||
} catch (YarnException | IOException | ClientHandlerException e) {
|
||||
LOG.error("Container end event could not be published for {}",
|
||||
containerId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void publishContainerEndEventOnTimelineServiceV2(
|
||||
final ContainerStatus container, long containerStartTime) {
|
||||
final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
|
@ -1669,6 +1736,7 @@ public class ApplicationMaster {
|
|||
event.setId(DSEvent.DS_CONTAINER_END.toString());
|
||||
event.addInfo("State", container.getState().name());
|
||||
event.addInfo("Exit Status", container.getExitStatus());
|
||||
event.addInfo(DIAGNOSTICS, container.getDiagnostics());
|
||||
entity.addEvent(event);
|
||||
entity.setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
|
||||
|
||||
|
|
|
@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
|
||||
|
@ -665,6 +666,15 @@ public class TestDistributedShell {
|
|||
if (entityLine.contains(expectedEvent)) {
|
||||
actualCount++;
|
||||
}
|
||||
if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) &&
|
||||
entityLine.contains(expectedEvent)) {
|
||||
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
||||
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
|
||||
TimelineEvent event = entity.getEvents().pollFirst();
|
||||
Assert.assertNotNull(event);
|
||||
Assert.assertTrue("diagnostics",
|
||||
event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
|
||||
}
|
||||
if (checkIdPrefix) {
|
||||
TimelineEntity entity = FileSystemTimelineReaderImpl.
|
||||
getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
|
||||
|
|
Loading…
Reference in New Issue