From 61900651b1b85cf235e01142acf2a51727fc5537 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sun, 18 Sep 2011 07:16:18 +0000 Subject: [PATCH] MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out history file. (vinodkv) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1172206 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../jobhistory/JobHistoryEventHandler.java | 2 +- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 252 ++++++++++++------ .../apache/hadoop/mapreduce/v2/app/MRApp.java | 36 +-- .../mapreduce/v2/app/MRAppBenchmark.java | 2 +- .../hadoop/mapreduce/v2/app/TestFail.java | 12 +- .../hadoop/mapreduce/v2/app/TestMRApp.java | 3 +- .../mapreduce/v2/hs/TestJobHistoryEvents.java | 89 ++++++- 8 files changed, 290 insertions(+), 109 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 38c3412696b..b236427a17c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1339,6 +1339,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2987. Fixed display of logged user on RM Web-UI. (Thomas Graves via acmurthy) + MAPREDUCE-3006. Fixed MapReduce AM to exit only after properly writing out + history file. (vinodkv) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java index 9650d821c3d..797d48a2f45 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java @@ -74,7 +74,7 @@ public class JobHistoryEventHandler extends AbstractService private BlockingQueue eventQueue = new LinkedBlockingQueue(); - private Thread eventHandlingThread; + protected Thread eventHandlingThread; private volatile boolean stopped; private final Object lock = new Object(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index af04b3ff57b..20c7e9779e8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -56,12 +56,14 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator; @@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.service.AbstractService; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; @@ -126,6 +129,7 @@ public class MRAppMaster extends CompositeService { private TaskAttemptListener taskAttemptListener; private JobTokenSecretManager jobTokenSecretManager = new JobTokenSecretManager(); + private JobEventDispatcher jobEventDispatcher; private Job job; @@ -148,7 +152,7 @@ public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) { @Override public void init(final Configuration conf) { - context = new RunningAppContext(); + context = new RunningAppContext(conf); // Job name is the same as the app name util we support DAG of jobs // for an app later @@ -182,18 +186,17 @@ public void init(final Configuration conf) { //service to log job history events EventHandler historyService = createJobHistoryHandler(context); - addIfService(historyService); + dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, + historyService); - JobEventDispatcher synchronousJobEventDispatcher = new JobEventDispatcher(); + this.jobEventDispatcher = new JobEventDispatcher(); //register the event dispatchers - dispatcher.register(JobEventType.class, synchronousJobEventDispatcher); + dispatcher.register(JobEventType.class, jobEventDispatcher); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(TaskCleaner.EventType.class, taskCleaner); - dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, - historyService); if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { @@ -203,10 +206,34 @@ public void init(final Configuration conf) { } dispatcher.register(Speculator.EventType.class, - new SpeculatorEventDispatcher()); + new SpeculatorEventDispatcher(conf)); + // service to allocate containers from RM (if non-uber) or to fake it (uber) + containerAllocator = createContainerAllocator(clientService, context); + addIfService(containerAllocator); + dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); + + // corresponding service to launch allocated containers via NodeManager + containerLauncher = createContainerLauncher(context); + addIfService(containerLauncher); + dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); + + // Add the JobHistoryEventHandler last so that it is properly stopped first. + // This will guarantee that all history-events are flushed before AM goes + // ahead with shutdown. + // Note: Even though JobHistoryEventHandler is started last, if any + // component creates a JobHistoryEvent in the meanwhile, it will be just be + // queued inside the JobHistoryEventHandler + addIfService(historyService); + + super.init(conf); + } // end of init() + + /** Create and initialize (but don't start) a single job. */ + protected Job createJob(Configuration conf) { + + // ////////// Obtain the tokens needed by the job. ////////// Credentials fsTokens = new Credentials(); - UserGroupInformation currentUser = null; try { @@ -234,66 +261,12 @@ public void init(final Configuration conf) { } catch (IOException e) { throw new YarnException(e); } - - super.init(conf); - - //---- start of what used to be startJobs() code: - - Configuration config = getConfig(); - - job = createJob(config, fsTokens, currentUser.getUserName()); - - /** create a job event for job intialization */ - JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); - /** send init to the job (this does NOT trigger job execution) */ - synchronousJobEventDispatcher.handle(initJobEvent); - - // send init to speculator. This won't yest start as dispatcher isn't - // started yet. - dispatcher.getEventHandler().handle( - new SpeculatorEvent(job.getID(), clock.getTime())); - - // JobImpl's InitTransition is done (call above is synchronous), so the - // "uber-decision" (MR-1220) has been made. Query job and switch to - // ubermode if appropriate (by registering different container-allocator - // and container-launcher services/event-handlers). - - if (job.isUber()) { - LOG.info("MRAppMaster uberizing job " + job.getID() - + " in local container (\"uber-AM\")."); - } else { - LOG.info("MRAppMaster launching normal, non-uberized, multi-container " - + "job " + job.getID() + "."); - } - - // service to allocate containers from RM (if non-uber) or to fake it (uber) - containerAllocator = - createContainerAllocator(clientService, context, job.isUber()); - addIfService(containerAllocator); - dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); - if (containerAllocator instanceof Service) { - ((Service) containerAllocator).init(config); - } - - // corresponding service to launch allocated containers via NodeManager - containerLauncher = createContainerLauncher(context, job.isUber()); - addIfService(containerLauncher); - dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); - if (containerLauncher instanceof Service) { - ((Service) containerLauncher).init(config); - } - - } // end of init() - - /** Create and initialize (but don't start) a single job. - * @param fsTokens */ - protected Job createJob(Configuration conf, Credentials fsTokens, - String user) { + // ////////// End of obtaining the tokens needed by the job. ////////// // create single job Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, fsTokens, clock, startCount, - completedTasksFromPreviousRun, metrics, user); + completedTasksFromPreviousRun, metrics, currentUser.getUserName()); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, @@ -388,19 +361,13 @@ protected TaskCleaner createTaskCleaner(AppContext context) { } protected ContainerAllocator createContainerAllocator( - ClientService clientService, AppContext context, boolean isLocal) { - //return new StaticContainerAllocator(context); - return isLocal - ? new LocalContainerAllocator(clientService, context) - : new RMContainerAllocator(clientService, context); + final ClientService clientService, final AppContext context) { + return new ContainerAllocatorRouter(clientService, context); } - protected ContainerLauncher createContainerLauncher(AppContext context, - boolean isLocal) { - return isLocal - ? new LocalContainerLauncher(context, - (TaskUmbilicalProtocol) taskAttemptListener) - : new ContainerLauncherImpl(context); + protected ContainerLauncher + createContainerLauncher(final AppContext context) { + return new ContainerLauncherRouter(context); } //TODO:should have an interface for MRClientService @@ -440,9 +407,96 @@ public TaskAttemptListener getTaskAttemptListener() { return taskAttemptListener; } - class RunningAppContext implements AppContext { + /** + * By the time life-cycle of this router starts, job-init would have already + * happened. + */ + private final class ContainerAllocatorRouter extends AbstractService + implements ContainerAllocator { + private final ClientService clientService; + private final AppContext context; + private ContainerAllocator containerAllocator; - private Map jobs = new ConcurrentHashMap(); + ContainerAllocatorRouter(ClientService clientService, + AppContext context) { + super(ContainerAllocatorRouter.class.getName()); + this.clientService = clientService; + this.context = context; + } + + @Override + public synchronized void start() { + if (job.isUber()) { + this.containerAllocator = new LocalContainerAllocator( + this.clientService, this.context); + } else { + this.containerAllocator = new RMContainerAllocator( + this.clientService, this.context); + } + ((Service)this.containerAllocator).init(getConfig()); + ((Service)this.containerAllocator).start(); + super.start(); + } + + @Override + public synchronized void stop() { + ((Service)this.containerAllocator).stop(); + super.stop(); + } + + @Override + public void handle(ContainerAllocatorEvent event) { + this.containerAllocator.handle(event); + } + } + + /** + * By the time life-cycle of this router starts, job-init would have already + * happened. + */ + private final class ContainerLauncherRouter extends AbstractService + implements ContainerLauncher { + private final AppContext context; + private ContainerLauncher containerLauncher; + + ContainerLauncherRouter(AppContext context) { + super(ContainerLauncherRouter.class.getName()); + this.context = context; + } + + @Override + public synchronized void start() { + if (job.isUber()) { + this.containerLauncher = new LocalContainerLauncher(context, + (TaskUmbilicalProtocol) taskAttemptListener); + } else { + this.containerLauncher = new ContainerLauncherImpl(context); + } + ((Service)this.containerLauncher).init(getConfig()); + ((Service)this.containerLauncher).start(); + super.start(); + } + + @Override + public void handle(ContainerLauncherEvent event) { + this.containerLauncher.handle(event); + } + + @Override + public synchronized void stop() { + ((Service)this.containerLauncher).stop(); + super.stop(); + } + } + + private class RunningAppContext implements AppContext { + + private final Map jobs = new ConcurrentHashMap(); + private final Configuration conf; + + public RunningAppContext(Configuration config) { + this.conf = config; + } @Override public ApplicationAttemptId getApplicationAttemptId() { @@ -481,7 +535,7 @@ public EventHandler getEventHandler() { @Override public CharSequence getUser() { - return getConfig().get(MRJobConfig.USER_NAME); + return this.conf.get(MRJobConfig.USER_NAME); } @Override @@ -492,13 +546,45 @@ public Clock getClock() { @Override public void start() { + + ///////////////////// Create the job itself. + job = createJob(getConfig()); + // End of creating the job. + // metrics system init is really init & start. // It's more test friendly to put it here. DefaultMetricsSystem.initialize("MRAppMaster"); - startJobs(); + /** create a job event for job intialization */ + JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); + /** send init to the job (this does NOT trigger job execution) */ + // This is a synchronous call, not an event through dispatcher. We want + // job-init to be done completely here. + jobEventDispatcher.handle(initJobEvent); + + // send init to speculator. This won't yest start as dispatcher isn't + // started yet. + dispatcher.getEventHandler().handle( + new SpeculatorEvent(job.getID(), clock.getTime())); + + // JobImpl's InitTransition is done (call above is synchronous), so the + // "uber-decision" (MR-1220) has been made. Query job and switch to + // ubermode if appropriate (by registering different container-allocator + // and container-launcher services/event-handlers). + + if (job.isUber()) { + LOG.info("MRAppMaster uberizing job " + job.getID() + + " in local container (\"uber-AM\")."); + } else { + LOG.info("MRAppMaster launching normal, non-uberized, multi-container " + + "job " + job.getID() + "."); + } + //start all the components super.start(); + + // All components have started, start the job. + startJobs(); } /** @@ -546,10 +632,14 @@ public void handle(TaskAttemptEvent event) { private class SpeculatorEventDispatcher implements EventHandler { + private final Configuration conf; + public SpeculatorEventDispatcher(Configuration config) { + this.conf = config; + } @Override public void handle(SpeculatorEvent event) { - if (getConfig().getBoolean(MRJobConfig.MAP_SPECULATIVE, false) - || getConfig().getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { + if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false) + || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) { // Speculator IS enabled, direct the event to there. speculator.handle(event); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 499831c5f87..548d754a6c6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -63,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -234,11 +235,16 @@ public void verifyCompleted() { } @Override - protected Job createJob(Configuration conf, Credentials fsTokens, - String user) { - Job newJob = new TestJob(getAppID(), getDispatcher().getEventHandler(), + protected Job createJob(Configuration conf) { + UserGroupInformation currentUser = null; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (IOException e) { + throw new YarnException(e); + } + Job newJob = new TestJob(conf, getAppID(), getDispatcher().getEventHandler(), getTaskAttemptListener(), getContext().getClock(), - user); + currentUser.getUserName()); ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob); getDispatcher().register(JobFinishEvent.Type.class, @@ -279,8 +285,7 @@ public void handle(JobHistoryEvent event) { } @Override - protected ContainerLauncher createContainerLauncher(AppContext context, - boolean isLocal) { + protected ContainerLauncher createContainerLauncher(AppContext context) { return new MockContainerLauncher(); } @@ -317,7 +322,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) { @Override protected ContainerAllocator createContainerAllocator( - ClientService clientService, AppContext context, boolean isLocal) { + ClientService clientService, AppContext context) { return new ContainerAllocator(){ private int containerCount; @Override @@ -369,12 +374,14 @@ public int getHttpPort() { class TestJob extends JobImpl { //override the init transition + private final TestInitTransition initTransition = new TestInitTransition( + maps, reduces); StateMachineFactory localFactory = stateMachineFactory.addTransition(JobState.NEW, EnumSet.of(JobState.INITED, JobState.FAILED), JobEventType.JOB_INIT, // This is abusive. - new TestInitTransition(getConfig(), maps, reduces)); + initTransition); private final StateMachine localStateMachine; @@ -384,10 +391,10 @@ protected StateMachine getStateMachine() { return localStateMachine; } - public TestJob(ApplicationId appID, EventHandler eventHandler, - TaskAttemptListener taskAttemptListener, Clock clock, - String user) { - super(appID, new Configuration(), eventHandler, taskAttemptListener, + public TestJob(Configuration conf, ApplicationId appID, + EventHandler eventHandler, TaskAttemptListener taskAttemptListener, + Clock clock, String user) { + super(appID, conf, eventHandler, taskAttemptListener, new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), getCompletedTaskFromPreviousRun(), metrics, user); @@ -399,17 +406,14 @@ public TestJob(ApplicationId appID, EventHandler eventHandler, //Override InitTransition to not look for split files etc static class TestInitTransition extends JobImpl.InitTransition { - private Configuration config; private int maps; private int reduces; - TestInitTransition(Configuration config, int maps, int reduces) { - this.config = config; + TestInitTransition(int maps, int reduces) { this.maps = maps; this.reduces = reduces; } @Override protected void setup(JobImpl job) throws IOException { - job.conf = config; job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces); job.remoteJobConfFile = new Path("test"); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java index 3615c27b152..c5a117ce4f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRAppBenchmark.java @@ -94,7 +94,7 @@ protected void attemptLaunched(TaskAttemptId attemptID) { @Override protected ContainerAllocator createContainerAllocator( - ClientService clientService, AppContext context, boolean isLocal) { + ClientService clientService, AppContext context) { return new ThrottledContainerAllocator(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index e08d4f2a316..5598cecb5ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -169,7 +169,7 @@ public void testTimedOutTask() throws Exception { @Test public void testTaskFailWithUnusedContainer() throws Exception { - MRApp app = new FailingTaskWithUnusedContainer(); + MRApp app = new MRAppWithFailingTaskAndUnusedContainer(); Configuration conf = new Configuration(); int maxAttempts = 1; conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); @@ -194,21 +194,21 @@ public void testTaskFailWithUnusedContainer() throws Exception { app.waitForState(job, JobState.FAILED); } - static class FailingTaskWithUnusedContainer extends MRApp { + static class MRAppWithFailingTaskAndUnusedContainer extends MRApp { - public FailingTaskWithUnusedContainer() { + public MRAppWithFailingTaskAndUnusedContainer() { super(1, 0, false, "TaskFailWithUnsedContainer", true); } - protected ContainerLauncher createContainerLauncher(AppContext context, - boolean isLocal) { + @Override + protected ContainerLauncher createContainerLauncher(AppContext context) { return new ContainerLauncherImpl(context) { @Override public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - super.handle(event); + super.handle(event); // Unused event and container. break; case CONTAINER_REMOTE_CLEANUP: getContext().getEventHandler().handle( diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index d1dd7203d1f..e4bac1ffd54 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -24,10 +24,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; -import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; @@ -195,6 +195,7 @@ public void checkTaskStateTypeConversion() { public static void main(String[] args) throws Exception { TestMRApp t = new TestMRApp(); t.testMapReduce(); + t.testZeroMapReduces(); t.testCommitPending(); t.testCompletedMapsForReduceSlowstart(); t.testJobError(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java index 634c596ff57..f94714e133d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java @@ -92,6 +92,60 @@ public void testHistoryEvents() throws Exception { parsedJob.getState()); } + /** + * Verify that all the events are flushed on stopping the HistoryHandler + * @throws Exception + */ + @Test + public void testEventsFlushOnStop() throws Exception { + + Configuration conf = new Configuration(); + conf.set(MRJobConfig.USER_NAME, "test"); + MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this + .getClass().getName(), true); + app.submit(conf); + Job job = app.getContext().getAllJobs().values().iterator().next(); + JobId jobId = job.getID(); + LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString()); + app.waitForState(job, JobState.SUCCEEDED); + + // make sure all events are flushed + app.waitForState(Service.STATE.STOPPED); + /* + * Use HistoryContext to read logged events and verify the number of + * completed maps + */ + HistoryContext context = new JobHistory(); + ((JobHistory) context).init(conf); + Job parsedJob = context.getJob(jobId); + Assert.assertEquals("CompletedMaps not correct", 1, parsedJob + .getCompletedMaps()); + + Map tasks = parsedJob.getTasks(); + Assert.assertEquals("No of tasks not correct", 1, tasks.size()); + verifyTask(tasks.values().iterator().next()); + + Map maps = parsedJob.getTasks(TaskType.MAP); + Assert.assertEquals("No of maps not correct", 1, maps.size()); + + Assert.assertEquals("Job state not currect", JobState.SUCCEEDED, + parsedJob.getState()); + } + + @Test + public void testJobHistoryEventHandlerIsFirstServiceToStop() { + MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this + .getClass().getName(), true); + Configuration conf = new Configuration(); + app.init(conf); + Service[] services = app.getServices().toArray(new Service[0]); + // Verifying that it is the last to be added is same as verifying that it is + // the first to be stopped. CompositeService related tests already validate + // this. + Assert.assertEquals("JobHistoryEventHandler", + services[services.length - 1].getName()); + } + private void verifyTask(Task task) { Assert.assertEquals("Task state not currect", TaskState.SUCCEEDED, task.getState()); @@ -116,14 +170,43 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, @Override protected EventHandler createJobHistoryHandler( AppContext context) { - JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, - getStartCount()); + JobHistoryEventHandler eventHandler = new JobHistoryEventHandler( + context, getStartCount()); return eventHandler; } } - + + /** + * MRapp with special HistoryEventHandler that writes events only during stop. + * This is to simulate events that don't get written by the eventHandling + * thread due to say a slow DFS and verify that they are flushed during stop. + */ + private static class MRAppWithSpecialHistoryHandler extends MRApp { + + public MRAppWithSpecialHistoryHandler(int maps, int reduces, + boolean autoComplete, String testName, boolean cleanOnStart) { + super(maps, reduces, autoComplete, testName, cleanOnStart); + } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new JobHistoryEventHandler(context, getStartCount()) { + @Override + public void start() { + // Don't start any event draining thread. + super.eventHandlingThread = new Thread(); + super.eventHandlingThread.start(); + } + }; + } + + } + public static void main(String[] args) throws Exception { TestJobHistoryEvents t = new TestJobHistoryEvents(); t.testHistoryEvents(); + t.testEventsFlushOnStop(); + t.testJobHistoryEventHandlerIsFirstServiceToStop(); } }