MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in client. Contributed by Rohith

(cherry picked from commit 209b1699fc)
This commit is contained in:
Jason Lowe 2014-10-17 19:51:10 +00:00
parent 79428e021f
commit 8885b75d78
3 changed files with 69 additions and 21 deletions

View File

@ -216,6 +216,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
for maps (Siqi Li via jlowe) for maps (Siqi Li via jlowe)
MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in
client (Rohith via jlowe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol {
.getTaskReports(jobID, taskType); .getTaskReports(jobID, taskType);
} }
private void killUnFinishedApplication(ApplicationId appId)
throws IOException {
ApplicationReport application = null;
try {
application = resMgrDelegate.getApplicationReport(appId);
} catch (YarnException e) {
throw new IOException(e);
}
if (application.getYarnApplicationState() == YarnApplicationState.FINISHED
|| application.getYarnApplicationState() == YarnApplicationState.FAILED
|| application.getYarnApplicationState() == YarnApplicationState.KILLED) {
return;
}
killApplication(appId);
}
private void killApplication(ApplicationId appId) throws IOException {
try {
resMgrDelegate.killApplication(appId);
} catch (YarnException e) {
throw new IOException(e);
}
}
private boolean isJobInTerminalState(JobStatus status) {
return status.getState() == JobStatus.State.KILLED
|| status.getState() == JobStatus.State.FAILED
|| status.getState() == JobStatus.State.SUCCEEDED;
}
@Override @Override
public void killJob(JobID arg0) throws IOException, InterruptedException { public void killJob(JobID arg0) throws IOException, InterruptedException {
/* check if the status is not running, if not send kill to RM */ /* check if the status is not running, if not send kill to RM */
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0); JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
// get status from RM and return
if (status == null) {
killUnFinishedApplication(appId);
return;
}
if (status.getState() != JobStatus.State.RUNNING) { if (status.getState() != JobStatus.State.RUNNING) {
try { killApplication(appId);
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
} catch (YarnException e) {
throw new IOException(e);
}
return; return;
} }
@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol {
clientCache.getClient(arg0).killJob(arg0); clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis(); long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis; long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() while ((currentTimeMillis < timeKillIssued + 10000L)
!= JobStatus.State.KILLED)) { && !isJobInTerminalState(status)) {
try { try {
Thread.sleep(1000L); Thread.sleep(1000L);
} catch(InterruptedException ie) { } catch (InterruptedException ie) {
/** interrupted, just break */ /** interrupted, just break */
break; break;
} }
currentTimeMillis = System.currentTimeMillis(); currentTimeMillis = System.currentTimeMillis();
status = clientCache.getClient(arg0).getJobStatus(arg0); status = clientCache.getClient(arg0).getJobStatus(arg0);
if (status == null) {
killUnFinishedApplication(appId);
return;
}
} }
} catch(IOException io) { } catch(IOException io) {
LOG.debug("Error when checking for application status", io); LOG.debug("Error when checking for application status", io);
} }
if (status.getState() != JobStatus.State.KILLED) { if (status != null && !isJobInTerminalState(status)) {
try { killApplication(appId);
resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
} catch (YarnException e) {
throw new IOException(e);
}
} }
} }

View File

@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@ -188,6 +189,16 @@ public class TestYARNRunner extends TestCase {
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
yarnRunner.killJob(jobId); yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId); verify(clientDelegate).killJob(jobId);
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
.thenReturn(
ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
"tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f,
"tmp", null));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
} }
@Test(timeout=20000) @Test(timeout=20000)