MAPREDUCE-6433. launchTime may be negative. Contributed by Zhihai Xu

(cherry picked from commit 93d50b7824)
This commit is contained in:
Zhihai Xu 2015-07-30 23:07:31 -07:00
parent 0e2019fa30
commit 9d40eead81
6 changed files with 107 additions and 8 deletions

View File

@ -275,6 +275,8 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas) MAPREDUCE-6427. Fix typo in JobHistoryEventHandler. (Ray Chiang via cdouglas)
MAPREDUCE-6433. launchTime may be negative. (Zhihai Xu)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -230,7 +230,7 @@ public class MRAppMaster extends CompositeService {
JobStateInternal forcedState = null; JobStateInternal forcedState = null;
private final ScheduledExecutorService logSyncer; private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0; private long recoveredJobStartTime = -1L;
private static boolean mainStarted = false; private static boolean mainStarted = false;
@VisibleForTesting @VisibleForTesting

View File

@ -25,7 +25,7 @@ public class JobStartEvent extends JobEvent {
long recoveredJobStartTime; long recoveredJobStartTime;
public JobStartEvent(JobId jobID) { public JobStartEvent(JobId jobID) {
this(jobID, 0); this(jobID, -1L);
} }
public JobStartEvent(JobId jobID, long recoveredJobStartTime) { public JobStartEvent(JobId jobID, long recoveredJobStartTime) {

View File

@ -1627,7 +1627,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
@Override @Override
public void transition(JobImpl job, JobEvent event) { public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event; JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != 0) { if (jse.getRecoveredJobStartTime() != -1L) {
job.startTime = jse.getRecoveredJobStartTime(); job.startTime = jse.getRecoveredJobStartTime();
} else { } else {
job.startTime = job.clock.getTime(); job.startTime = job.clock.getTime();

View File

@ -31,9 +31,11 @@ import static org.mockito.Mockito.times;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -44,16 +46,21 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventType;
import org.apache.hadoop.mapreduce.jobhistory.EventWriter;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent; import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@ -61,6 +68,8 @@ import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -111,7 +120,7 @@ public class TestMRAppMaster {
} }
dir.mkdirs(); dir.mkdirs();
} }
@Test @Test
public void testMRAppMasterForDifferentUser() throws IOException, public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException { InterruptedException {
@ -170,7 +179,46 @@ public class TestMRAppMaster {
// verify the final status is FAILED // verify the final status is FAILED
verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED"); verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
} }
@Test
public void testMRAppMasterJobLaunchTime() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002";
String containerIdStr = "container_1317529182569_0004_000002_1";
String userName = "TestAppMasterUser";
JobConf conf = new JobConf();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.set(JHAdminConfig.MR_HS_JHIST_FORMAT, "json");
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
JobId jobId = TypeConverter.toYarn(
TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
File dir = new File(MRApps.getStagingAreaDir(conf, userName).toString(),
jobId.toString());
dir.mkdirs();
File historyFile = new File(JobHistoryUtils.getStagingJobHistoryFile(
new Path(dir.toURI().toString()), jobId,
(applicationAttemptId.getAttemptId() - 1)).toUri().getRawPath());
historyFile.createNewFile();
FSDataOutputStream out = new FSDataOutputStream(
new FileOutputStream(historyFile), null);
EventWriter writer = new EventWriter(out, EventWriter.WriteMode.JSON);
writer.close();
FileSystem fs = FileSystem.get(conf);
JobSplitWriter.createSplitFiles(new Path(dir.getAbsolutePath()), conf,
fs, new org.apache.hadoop.mapred.InputSplit[0]);
ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
MRAppMasterTestLaunchTime appMaster =
new MRAppMasterTestLaunchTime(applicationAttemptId, containerId,
"host", -1, -1, System.currentTimeMillis());
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
appMaster.stop();
assertTrue("Job launch time should not be negative.",
appMaster.jobLaunchTime.get() >= 0);
}
@Test @Test
public void testMRAppMasterSuccessLock() throws IOException, public void testMRAppMasterSuccessLock() throws IOException,
InterruptedException { InterruptedException {
@ -585,3 +633,39 @@ class MRAppMasterTest extends MRAppMaster {
return spyHistoryService; return spyHistoryService;
} }
} }
class MRAppMasterTestLaunchTime extends MRAppMasterTest {
final AtomicLong jobLaunchTime = new AtomicLong(0L);
public MRAppMasterTestLaunchTime(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
long submitTime) {
super(applicationAttemptId, containerId, host, port, httpPort,
submitTime, false, false);
}
@Override
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
getRMHeartbeatHandler()) {
@Override
public void handle(CommitterEvent event) {
}
};
}
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
return new JobHistoryEventHandler(context, getStartCount()) {
@Override
public void handle(JobHistoryEvent event) {
if (event.getHistoryEvent().getEventType() == EventType.JOB_INITED) {
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
jobLaunchTime.set(jie.getLaunchTime());
}
super.handle(event);
}
};
}
}

View File

@ -29,19 +29,25 @@ import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8; import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Counters;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Event Writer is an utility class used to write events to the underlying * Event Writer is an utility class used to write events to the underlying
* stream. Typically, one event writer (which translates to one stream) * stream. Typically, one event writer (which translates to one stream)
* is created per job * is created per job
* *
*/ */
class EventWriter { @InterfaceAudience.Private
@InterfaceStability.Unstable
public class EventWriter {
static final String VERSION = "Avro-Json"; static final String VERSION = "Avro-Json";
static final String VERSION_BINARY = "Avro-Binary"; static final String VERSION_BINARY = "Avro-Binary";
@ -50,11 +56,17 @@ class EventWriter {
new SpecificDatumWriter<Event>(Event.class); new SpecificDatumWriter<Event>(Event.class);
private Encoder encoder; private Encoder encoder;
private static final Log LOG = LogFactory.getLog(EventWriter.class); private static final Log LOG = LogFactory.getLog(EventWriter.class);
/**
* avro encoding format supported by EventWriter.
*/
public enum WriteMode { JSON, BINARY } public enum WriteMode { JSON, BINARY }
private final WriteMode writeMode; private final WriteMode writeMode;
private final boolean jsonOutput; // Cache value while we have 2 modes private final boolean jsonOutput; // Cache value while we have 2 modes
EventWriter(FSDataOutputStream out, WriteMode mode) throws IOException { @VisibleForTesting
public EventWriter(FSDataOutputStream out, WriteMode mode)
throws IOException {
this.out = out; this.out = out;
this.writeMode = mode; this.writeMode = mode;
if (this.writeMode==WriteMode.JSON) { if (this.writeMode==WriteMode.JSON) {
@ -93,7 +105,8 @@ class EventWriter {
out.hflush(); out.hflush();
} }
void close() throws IOException { @VisibleForTesting
public void close() throws IOException {
try { try {
encoder.flush(); encoder.flush();
out.close(); out.close();