MAPREDUCE-6927. MR job should only set tracking url if history was successfully written. Contributed by Eric Badger

(cherry picked from commit 735fce5bec)

Conflicts:
	hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
This commit is contained in:
Jason Lowe 2017-08-08 14:46:47 -05:00
parent bc87a293d9
commit ea67e1f266
8 changed files with 167 additions and 10 deletions

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
@ -1151,7 +1152,12 @@ public class JobHistoryEventHandler extends AbstractService
qualifiedDoneFile =
doneDirFS.makeQualified(new Path(doneDirPrefixPath,
doneJobHistoryFileName));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
if(moveToDoneNow(qualifiedLogFile, qualifiedDoneFile)) {
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
getConfig(), context.getApplicationID());
context.setHistoryUrl(historyUrl);
LOG.info("Set historyUrl to " + historyUrl);
}
}
// Move confFile to Done Folder
@ -1357,7 +1363,7 @@ public class JobHistoryEventHandler extends AbstractService
}
}
private void moveTmpToDone(Path tmpPath) throws IOException {
protected void moveTmpToDone(Path tmpPath) throws IOException {
if (tmpPath != null) {
String tmpFileName = tmpPath.getName();
String fileName = getFileNameFromTmpFN(tmpFileName);
@ -1369,7 +1375,9 @@ public class JobHistoryEventHandler extends AbstractService
// TODO If the FS objects are the same, this should be a rename instead of a
// copy.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
protected boolean moveToDoneNow(Path fromPath, Path toPath)
throws IOException {
boolean success = false;
// check if path exists, in case of retries it may not exist
if (stagingDirFS.exists(fromPath)) {
LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
@ -1380,14 +1388,19 @@ public class JobHistoryEventHandler extends AbstractService
boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
false, getConfig());
if (copied)
LOG.info("Copied to done location: " + toPath);
else
LOG.info("copy failed");
doneDirFS.setPermission(toPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
if (copied) {
LOG.info("Copied from: " + fromPath.toString()
+ " to done location: " + toPath.toString());
success = true;
} else {
LOG.info("Copy failed from: " + fromPath.toString()
+ " to done location: " + toPath.toString());
}
}
return success;
}
boolean pathExists(FileSystem fileSys, Path path) throws IOException {
return fileSys.exists(path);

View File

@ -69,4 +69,8 @@ public interface AppContext {
String getNMHostname();
TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
String getHistoryUrl();
void setHistoryUrl(String historyUrl);
}

View File

@ -1046,6 +1046,7 @@ public class MRAppMaster extends CompositeService {
private final Configuration conf;
private final ClusterInfo clusterInfo = new ClusterInfo();
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private String historyUrl = null;
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
@ -1145,6 +1146,15 @@ public class MRAppMaster extends CompositeService {
return taskAttemptFinishingMonitor;
}
@Override
public String getHistoryUrl() {
return historyUrl;
}
@Override
public void setHistoryUrl(String historyUrl) {
this.historyUrl = historyUrl;
}
}
@SuppressWarnings("unchecked")

View File

@ -215,9 +215,7 @@ public abstract class RMCommunicator extends AbstractService
}
LOG.info("Setting job diagnostics to " + sb.toString());
String historyUrl =
MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(getConfig(),
context.getApplicationID());
String historyUrl = context.getHistoryUrl();
LOG.info("History url is " + historyUrl);
FinishApplicationMasterRequest request =
FinishApplicationMasterRequest.newInstance(finishState,

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.mapreduce.jobhistory;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -60,6 +63,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.mapreduce.v2.util.MRWebAppUtil;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -899,6 +903,104 @@ public class TestJobHistoryEventHandler {
jheh.lastEventHandled.getHistoryEvent()
instanceof JobUnsuccessfulCompletionEvent);
}
@Test (timeout=50000)
public void testSetTrackingURLAfterHistoryIsWritten() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes and successfully writes history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
String historyUrl = MRWebAppUtil.getApplicationWebURLOnJHSWithScheme(
conf, t.mockAppContext.getApplicationID());
verify(t.mockAppContext, times(1)).setHistoryUrl(historyUrl);
} finally {
jheh.stop();
}
}
@Test (timeout=50000)
public void testDontSetTrackingURLIfHistoryWriteFailed() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doReturn(false).when(jheh).moveToDoneNow(any(Path.class),
any(Path.class));
doNothing().when(jheh).moveTmpToDone(any(Path.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
@Test (timeout=50000)
public void testDontSetTrackingURLIfHistoryWriteThrows() throws Exception {
TestParams t = new TestParams(true);
Configuration conf = new Configuration();
JHEvenHandlerForTest realJheh =
new JHEvenHandlerForTest(t.mockAppContext, 0, false);
JHEvenHandlerForTest jheh = spy(realJheh);
jheh.init(conf);
try {
jheh.start();
doThrow(new YarnRuntimeException(new IOException()))
.when(jheh).processDoneFiles(any(JobId.class));
handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000, -1)));
verify(jheh, times(0)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
// Job finishes, but doesn't successfully write history
try {
handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
new Counters(), new Counters())));
throw new RuntimeException(
"processDoneFiles didn't throw, but should have");
} catch (YarnRuntimeException yre) {
// Exception expected, do nothing
}
verify(jheh, times(1)).processDoneFiles(any(JobId.class));
verify(t.mockAppContext, times(0)).setHistoryUrl(any(String.class));
} finally {
jheh.stop();
}
}
}
class JHEvenHandlerForTest extends JobHistoryEventHandler {

View File

@ -154,4 +154,14 @@ public class MockAppContext implements AppContext {
return null;
}
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
}

View File

@ -895,5 +895,15 @@ public class TestRuntimeEstimators {
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null;
}
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
}
}

View File

@ -405,4 +405,14 @@ public class JobHistory extends AbstractService implements HistoryContext {
public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
return null;
}
@Override
public String getHistoryUrl() {
return null;
}
@Override
public void setHistoryUrl(String historyUrl) {
return;
}
}