MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in client. Contributed by Rohith

This commit is contained in:
Jason Lowe 2014-10-17 19:51:10 +00:00
parent a6aa6e42ca
commit 209b1699fc
3 changed files with 69 additions and 21 deletions

View File

@ -435,6 +435,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
for maps (Siqi Li via jlowe)
MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in
client (Rohith via jlowe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol {
.getTaskReports(jobID, taskType);
}
private void killUnFinishedApplication(ApplicationId appId)
throws IOException {
ApplicationReport application = null;
try {
application = resMgrDelegate.getApplicationReport(appId);
} catch (YarnException e) {
throw new IOException(e);
}
if (application.getYarnApplicationState() == YarnApplicationState.FINISHED
|| application.getYarnApplicationState() == YarnApplicationState.FAILED
|| application.getYarnApplicationState() == YarnApplicationState.KILLED) {
return;
}
killApplication(appId);
}
private void killApplication(ApplicationId appId) throws IOException {
try {
resMgrDelegate.killApplication(appId);
} catch (YarnException e) {
throw new IOException(e);
}
}
private boolean isJobInTerminalState(JobStatus status) {
return status.getState() == JobStatus.State.KILLED
|| status.getState() == JobStatus.State.FAILED
|| status.getState() == JobStatus.State.SUCCEEDED;
}
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
/* check if the status is not running, if not send kill to RM */
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
// get status from RM and return
if (status == null) {
killUnFinishedApplication(appId);
return;
}
if (status.getState() != JobStatus.State.RUNNING) {
try {
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
} catch (YarnException e) {
throw new IOException(e);
}
killApplication(appId);
return;
}
@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol {
clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
!= JobStatus.State.KILLED)) {
try {
Thread.sleep(1000L);
} catch(InterruptedException ie) {
/** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
status = clientCache.getClient(arg0).getJobStatus(arg0);
while ((currentTimeMillis < timeKillIssued + 10000L)
&& !isJobInTerminalState(status)) {
try {
Thread.sleep(1000L);
} catch (InterruptedException ie) {
/** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
status = clientCache.getClient(arg0).getJobStatus(arg0);
if (status == null) {
killUnFinishedApplication(appId);
return;
}
}
} catch(IOException io) {
LOG.debug("Error when checking for application status", io);
}
if (status.getState() != JobStatus.State.KILLED) {
try {
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
} catch (YarnException e) {
throw new IOException(e);
}
if (status != null && !isJobInTerminalState(status)) {
killApplication(appId);
}
}

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -188,6 +189,16 @@ public class TestYARNRunner extends TestCase {
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
.thenReturn(
ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
"tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f,
"tmp", null));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
}
@Test(timeout=20000)