MAPREDUCE-7042. Killed MR job data does not move to mapreduce.jobhistory.done-dir when ATS v2 is enabled. Contributed by Rohith Sharma K S.

(cherry picked from commit 83e60cd2db)
This commit is contained in:
Sunil G 2018-04-26 19:07:02 +05:30 committed by Rohith Sharma K S
parent a91d5c7e2c
commit 600f4d402f
2 changed files with 89 additions and 9 deletions

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -119,7 +120,11 @@ public class JobHistoryEventHandler extends AbstractService
protected BlockingQueue<JobHistoryEvent> eventQueue = protected BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>(); new LinkedBlockingQueue<JobHistoryEvent>();
protected boolean handleTimelineEvent = false;
protected AsyncDispatcher atsEventDispatcher = null;
protected Thread eventHandlingThread; protected Thread eventHandlingThread;
private volatile boolean stopped; private volatile boolean stopped;
private final Object lock = new Object(); private final Object lock = new Object();
@ -279,6 +284,7 @@ public class JobHistoryEventHandler extends AbstractService
((MRAppMaster.RunningAppContext) context).getTimelineClient(); ((MRAppMaster.RunningAppContext) context).getTimelineClient();
timelineClient.init(conf); timelineClient.init(conf);
} }
handleTimelineEvent = true;
LOG.info("Timeline service is enabled; version: " + LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf)); YarnConfiguration.getTimelineServiceVersion(conf));
} else { } else {
@ -302,10 +308,23 @@ public class JobHistoryEventHandler extends AbstractService
"'json' or 'binary'. Falling back to default value '" + "'json' or 'binary'. Falling back to default value '" +
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'."); JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
} }
// initiate the atsEventDispatcher for timeline event
// if timeline service is enabled.
if (handleTimelineEvent) {
atsEventDispatcher = createDispatcher();
EventHandler<JobHistoryEvent> timelineEventHandler =
new ForwardingEventHandler();
atsEventDispatcher.register(EventType.class, timelineEventHandler);
atsEventDispatcher.setDrainEventsOnStop();
atsEventDispatcher.init(conf);
}
super.serviceInit(conf); super.serviceInit(conf);
} }
protected AsyncDispatcher createDispatcher() {
return new AsyncDispatcher("Job ATS Event Dispatcher");
}
private void mkdir(FileSystem fs, Path path, FsPermission fsp) private void mkdir(FileSystem fs, Path path, FsPermission fsp)
throws IOException { throws IOException {
if (!fs.exists(path)) { if (!fs.exists(path)) {
@ -371,6 +390,10 @@ public class JobHistoryEventHandler extends AbstractService
} }
}, "eventHandlingThread"); }, "eventHandlingThread");
eventHandlingThread.start(); eventHandlingThread.start();
if (handleTimelineEvent) {
atsEventDispatcher.start();
}
super.serviceStart(); super.serviceStart();
} }
@ -453,6 +476,11 @@ public class JobHistoryEventHandler extends AbstractService
LOG.info("Exception while closing file " + e.getMessage()); LOG.info("Exception while closing file " + e.getMessage());
} }
} }
if (handleTimelineEvent && atsEventDispatcher != null) {
atsEventDispatcher.stop();
}
if (timelineClient != null) { if (timelineClient != null) {
timelineClient.stop(); timelineClient.stop();
} else if (timelineV2Client != null) { } else if (timelineV2Client != null) {
@ -572,6 +600,10 @@ public class JobHistoryEventHandler extends AbstractService
} }
eventQueue.put(event); eventQueue.put(event);
// Process it for ATS (if enabled)
if (handleTimelineEvent) {
atsEventDispatcher.getEventHandler().handle(event);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
@ -614,13 +646,6 @@ public class JobHistoryEventHandler extends AbstractService
} }
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID()); event.getJobID());
if (timelineV2Client != null) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else if (timelineClient != null) {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler " LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType()); + event.getHistoryEvent().getEventType());
@ -702,6 +727,23 @@ public class JobHistoryEventHandler extends AbstractService
} }
} }
private void handleTimelineEvent(JobHistoryEvent event) {
HistoryEvent historyEvent = event.getHistoryEvent();
if (handleTimelineEvent) {
if (timelineV2Client != null) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else if (timelineClient != null) {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler, handle timelineEvent:"
+ event.getHistoryEvent().getEventType());
}
}
public void processEventForJobSummary(HistoryEvent event, JobSummary summary, public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) { JobId jobId) {
// context.getJob could be used for some of this info as well. // context.getJob could be used for some of this info as well.
@ -1708,4 +1750,12 @@ public class JobHistoryEventHandler extends AbstractService
boolean getFlushTimerStatus() { boolean getFlushTimerStatus() {
return isTimerActive; return isTimerActive;
} }
private final class ForwardingEventHandler
implements EventHandler<JobHistoryEvent> {
@Override
public void handle(JobHistoryEvent event) {
handleTimelineEvent(event);
}
}
} }

View File

@ -75,6 +75,8 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.client.api.TimelineClient; import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.client.api.TimelineV2Client;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
@ -586,6 +588,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent( handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1), t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1),
currentTime - 10)); currentTime - 10));
jheh.getDispatcher().await();
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null, TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null, null); null, null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -602,6 +605,7 @@ public class TestJobHistoryEventHandler {
"user", 200, "/foo/job.xml", "user", 200, "/foo/job.xml",
new HashMap<JobACL, AccessControlList>(), "default"), new HashMap<JobACL, AccessControlList>(), "default"),
currentTime + 10)); currentTime + 10));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -620,6 +624,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"), new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
currentTime - 20)); currentTime - 20));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -642,6 +647,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
0, new Counters(), new Counters(), new Counters()), currentTime)); 0, new Counters(), new Counters(), new Counters()), currentTime));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -667,7 +673,9 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, handleEvent(jheh, new JobHistoryEvent(t.jobId,
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId), new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20)); 0, 0, 0, JobStateInternal.KILLED.toString()),
currentTime + 20));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -697,6 +705,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, ""))); new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -710,6 +719,7 @@ public class TestJobHistoryEventHandler {
handleEvent(jheh, new JobHistoryEvent(t.jobId, handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, ""))); new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
jheh.getDispatcher().await();
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null, entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null, null); null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size()); Assert.assertEquals(1, entities.getEntities().size());
@ -1027,6 +1037,7 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
private EventWriter eventWriter; private EventWriter eventWriter;
private boolean mockHistoryProcessing = true; private boolean mockHistoryProcessing = true;
private DrainDispatcher dispatcher;
public JHEvenHandlerForTest(AppContext context, int startCount) { public JHEvenHandlerForTest(AppContext context, int startCount) {
super(context, startCount); super(context, startCount);
JobHistoryEventHandler.fileMap.clear(); JobHistoryEventHandler.fileMap.clear();
@ -1038,6 +1049,12 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
JobHistoryEventHandler.fileMap.clear(); JobHistoryEventHandler.fileMap.clear();
} }
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
@Override @Override
protected void serviceStart() { protected void serviceStart() {
if (timelineClient != null) { if (timelineClient != null) {
@ -1045,6 +1062,19 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
} else if (timelineV2Client != null) { } else if (timelineV2Client != null) {
timelineV2Client.start(); timelineV2Client.start();
} }
if (handleTimelineEvent) {
atsEventDispatcher.start();
}
}
@Override
protected AsyncDispatcher createDispatcher() {
dispatcher = new DrainDispatcher();
return dispatcher;
}
public DrainDispatcher getDispatcher() {
return dispatcher;
} }
@Override @Override