From 93d50b782494af7eef980c4d596a59ff4e11646e Mon Sep 17 00:00:00 2001 From: Zhihai Xu Date: Thu, 30 Jul 2015 23:07:31 -0700 Subject: [PATCH] MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 2 +- .../v2/app/job/event/JobStartEvent.java | 2 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 2 +- .../mapreduce/v2/app/TestMRAppMaster.java | 88 ++++++++++++++++++- .../mapreduce/jobhistory/EventWriter.java | 19 +++- 6 files changed, 107 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 398ffc667b3..738dea5320e 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -542,6 +542,8 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas) + MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index f199ecbbfe3..6dc830fa84a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -233,7 +233,7 @@ public class MRAppMaster extends CompositeService { JobStateInternal forcedState = null; private final ScheduledExecutorService logSyncer; - private long recoveredJobStartTime = 0; + private long recoveredJobStartTime = -1L; private static boolean mainStarted = false; @VisibleForTesting diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java index 39051da000f..a142c316b55 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobStartEvent.java @@ -25,7 +25,7 @@ public class JobStartEvent extends JobEvent { long recoveredJobStartTime; public JobStartEvent(JobId jobID) { - this(jobID, 0); + this(jobID, -1L); } public JobStartEvent(JobId jobID, long recoveredJobStartTime) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4c3b3fed278..9d141eb7a5c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1629,7 +1629,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, @Override public void transition(JobImpl job, JobEvent event) { JobStartEvent jse = (JobStartEvent) event; - if (jse.getRecoveredJobStartTime() != 0) { + if (jse.getRecoveredJobStartTime() != -1L) { job.startTime = jse.getRecoveredJobStartTime(); } else { job.startTime = job.clock.getTime(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java index 63b201d5a07..9e0dafcbed7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java @@ -31,9 +31,11 @@ import static org.mockito.Mockito.times; import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.lang.reflect.Field; import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; import java.util.HashMap; import java.util.Map; @@ -44,16 +46,21 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.EventWriter; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; +import org.apache.hadoop.mapreduce.split.JobSplitWriter; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; @@ -61,6 +68,8 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.AccessControlException; @@ -111,7 +120,7 @@ public class TestMRAppMaster { } dir.mkdirs(); } - + @Test public void testMRAppMasterForDifferentUser() throws IOException, InterruptedException { @@ -170,7 +179,46 @@ public class TestMRAppMaster { // verify the final status is FAILED verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED"); } - + + @Test + public void testMRAppMasterJobLaunchTime() throws IOException, + InterruptedException { + String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; + String containerIdStr = "container_1317529182569_0004_000002_1"; + String userName = "TestAppMasterUser"; + JobConf conf = new JobConf(); + conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json"); + ApplicationAttemptId applicationAttemptId = ConverterUtils + .toApplicationAttemptId(applicationAttemptIdStr); + JobId jobId = TypeConverter.toYarn( + TypeConverter.fromYarn(applicationAttemptId.getApplicationId())); + + File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(), + jobId.toString()); + dir.mkdirs(); + File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile( + new Path(dir.toURI().toString()), jobId, + (applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath()); + historyFile.createNewFile(); + FSDataOutputStream out = new FSDataOutputStream( + new FileOutputStream(historyFile), null); + EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON); + writer.close(); + FileSystem fs = FileSystem.get(conf); + JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf, + fs, new org.apache.hadoop.mapred.InputSplit[0]); + ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); + MRAppMasterTestLaunchTime appMaster = + new MRAppMasterTestLaunchTime(applicationAttemptId, containerId, + "host", -1, -1, System.currentTimeMillis()); + MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); + appMaster.stop(); + assertTrue("Job launch time should not be negative.", + appMaster.jobLaunchTime.get() >= 0); + } + @Test public void testMRAppMasterSuccessLock() throws IOException, InterruptedException { @@ -585,3 +633,39 @@ class MRAppMasterTest extends MRAppMaster { return spyHistoryService; } } + +class MRAppMasterTestLaunchTime extends MRAppMasterTest { + final AtomicLong jobLaunchTime = new AtomicLong(0L); + public MRAppMasterTestLaunchTime(ApplicationAttemptId applicationAttemptId, + ContainerId containerId, String host, int port, int httpPort, + long submitTime) { + super(applicationAttemptId, containerId, host, port, httpPort, + submitTime, false, false); + } + + @Override + protected EventHandler createCommitterEventHandler( + AppContext context, OutputCommitter committer) { + return new CommitterEventHandler(context, committer, + getRMHeartbeatHandler()) { + @Override + public void handle(CommitterEvent event) { + } + }; + } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new JobHistoryEventHandler(context, getStartCount()) { + @Override + public void handle(JobHistoryEvent event) { + if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) { + JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent(); + jobLaunchTime.set(jie.getLaunchTime()); + } + super.handle(event); + } + }; + } +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java index 29489a5c8b5..b1cb6dcf87e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java @@ -29,19 +29,25 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Counters; +import com.google.common.annotations.VisibleForTesting; + /** * Event Writer is an utility class used to write events to the underlying * stream. Typically, one event writer (which translates to one stream) * is created per job * */ -class EventWriter { +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class EventWriter { static final String VERSION = "Avro-Json"; static final String VERSION_BINARY = "Avro-Binary"; @@ -50,11 +56,17 @@ class EventWriter { new SpecificDatumWriter(Event.class); private Encoder encoder; private static final Log LOG = LogFactory.getLog(EventWriter.class); + + /** + * avro encoding format supported by EventWriter. + */ public enum WriteMode { JSON, BINARY } private final WriteMode writeMode; private final boolean jsonOutput; // Cache value while we have 2 modes - EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException { + @VisibleForTesting + public EventWriter(FSDataOutputStream out, WriteMode mode) + throws IOException { this.out = out; this.writeMode = mode; if (this.writeMode==WriteMode.JSON) { @@ -93,7 +105,8 @@ class EventWriter { out.hflush(); } - void close() throws IOException { + @VisibleForTesting + public void close() throws IOException { try { encoder.flush(); out.close();