MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu

This commit is contained in:
Zhihai Xu 2015-07-30 23:07:31 -07:00
parent ab80e27703
commit 93d50b7824
6 changed files with 107 additions and 8 deletions

View File

@ -542,6 +542,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas)
MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -233,7 +233,7 @@ public class MRAppMaster extends CompositeService {
JobStateInternal forcedState = null;
private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
private long recoveredJobStartTime = -1L;
private static boolean mainStarted = false;
@VisibleForTesting

View File

@ -25,7 +25,7 @@ public class JobStartEvent extends JobEvent {
long recoveredJobStartTime;
public JobStartEvent(JobId jobID) {
this(jobID, 0);
this(jobID, -1L);
}
public JobStartEvent(JobId jobID, long recoveredJobStartTime) {

View File

@ -1629,7 +1629,7 @@ public static class StartTransition
@Override
public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != 0) {
if (jse.getRecoveredJobStartTime() != -1L) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();

View File

@ -31,9 +31,11 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.HashMap;
import java.util.Map;
@ -44,16 +46,21 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@ -61,6 +68,8 @@
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException;
@ -171,6 +180,45 @@ public void testMRAppMasterMidLock() throws IOException,
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
}
@Test
public void testMRAppMasterJobLaunchTime() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(),
jobId.toString());
dir.mkdirs();
File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile(
new Path(dir.toURI().toString()), jobId,
(applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath());
historyFile.createNewFile();
FSDataOutputStream out = new FSDataOutputStream(
new FileOutputStream(historyFile), null);
EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON);
writer.close();
FileSystem fs = FileSystem.get(conf);
JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf,
fs, new org.apache.hadoop.mapred.InputSplit[0]);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTestLaunchTime appMaster =
new MRAppMasterTestLaunchTime(applicationAttemptId, containerId,
"host", -1, -1, System.currentTimeMillis());
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
appMaster.stop();
assertTrue("Job launch time should not be negative.",
appMaster.jobLaunchTime.get() >= 0);
}
@Test
public void testMRAppMasterSuccessLock() throws IOException,
InterruptedException {
@ -585,3 +633,39 @@ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
return spyHistoryService;
}
}
class MRAppMasterTestLaunchTime extends MRAppMasterTest {
final AtomicLong jobLaunchTime = new AtomicLong(0L);
public MRAppMasterTestLaunchTime(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
super(applicationAttemptId, containerId, host, port, httpPort,
submitTime, false, false);
}
@Override
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
getRMHeartbeatHandler()) {
@Override
public void handle(CommitterEvent event) {
}
};
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler(context, getStartCount()) {
@Override
public void handle(JobHistoryEvent event) {
if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) {
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
jobLaunchTime.set(jie.getLaunchTime());
}
super.handle(event);
}
};
}
}

View File

@ -29,19 +29,25 @@
import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import com.google.common.annotations.VisibleForTesting;
/**
* Event Writer is an utility class used to write events to the underlying
* stream. Typically, one event writer (which translates to one stream)
* is created per job
*
*/
class EventWriter {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class EventWriter {
static final String VERSION = "Avro-Json";
static final String VERSION_BINARY = "Avro-Binary";
@ -50,11 +56,17 @@ class EventWriter {
new SpecificDatumWriter<Event>(Event.class);
private Encoder encoder;
private static final Log LOG = LogFactory.getLog(EventWriter.class);
/**
* avro encoding format supported by EventWriter.
*/
public enum WriteMode { JSON, BINARY }
private final WriteMode writeMode;
private final boolean jsonOutput; // Cache value while we have 2 modes
EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException {
@VisibleForTesting
public EventWriter(FSDataOutputStream out, WriteMode mode)
throws IOException {
this.out = out;
this.writeMode = mode;
if (this.writeMode==WriteMode.JSON) {
@ -93,7 +105,8 @@ void flush() throws IOException {
out.hflush();
}
void close() throws IOException {
@VisibleForTesting
public void close() throws IOException {
try {
encoder.flush();
out.close();