diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1ede42469f7..ba8c673f615 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -216,6 +216,9 @@ Release 2.6.0 - UNRELEASED MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps (Siqi Li via jlowe) + MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in + client (Rohith via jlowe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index d3b80f3830e..40ef9822783 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol { .getTaskReports(jobID, taskType); } + private void killUnFinishedApplication(ApplicationId appId) + throws IOException { + ApplicationReport application = null; + try { + application = resMgrDelegate.getApplicationReport(appId); + } catch (YarnException e) { + throw new IOException(e); + } + if (application.getYarnApplicationState() == YarnApplicationState.FINISHED + || application.getYarnApplicationState() == YarnApplicationState.FAILED + || application.getYarnApplicationState() == YarnApplicationState.KILLED) { + return; + } + killApplication(appId); + } + + private void killApplication(ApplicationId appId) throws IOException { + try { + resMgrDelegate.killApplication(appId); + } catch (YarnException e) { + throw new IOException(e); + } + } + + private boolean isJobInTerminalState(JobStatus status) { + return status.getState() == JobStatus.State.KILLED + || status.getState() == JobStatus.State.FAILED + || status.getState() == JobStatus.State.SUCCEEDED; + } + @Override public void killJob(JobID arg0) throws IOException, InterruptedException { /* check if the status is not running, if not send kill to RM */ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0); + ApplicationId appId = TypeConverter.toYarn(arg0).getAppId(); + + // get status from RM and return + if (status == null) { + killUnFinishedApplication(appId); + return; + } + if (status.getState() != JobStatus.State.RUNNING) { - try { - resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); - } catch (YarnException e) { - throw new IOException(e); - } + killApplication(appId); return; } @@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol { clientCache.getClient(arg0).killJob(arg0); long currentTimeMillis = System.currentTimeMillis(); long timeKillIssued = currentTimeMillis; - while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() - != JobStatus.State.KILLED)) { - try { - Thread.sleep(1000L); - } catch(InterruptedException ie) { - /** interrupted, just break */ - break; - } - currentTimeMillis = System.currentTimeMillis(); - status = clientCache.getClient(arg0).getJobStatus(arg0); + while ((currentTimeMillis < timeKillIssued + 10000L) + && !isJobInTerminalState(status)) { + try { + Thread.sleep(1000L); + } catch (InterruptedException ie) { + /** interrupted, just break */ + break; + } + currentTimeMillis = System.currentTimeMillis(); + status = clientCache.getClient(arg0).getJobStatus(arg0); + if (status == null) { + killUnFinishedApplication(appId); + return; + } } } catch(IOException io) { LOG.debug("Error when checking for application status", io); } - if (status.getState() != JobStatus.State.KILLED) { - try { - resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); - } catch (YarnException e) { - throw new IOException(e); - } + if (status != null && !isJobInTerminalState(status)) { + killApplication(appId); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 256778548fd..420a95f9bbf 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; @@ -188,6 +189,16 @@ public class TestYARNRunner extends TestCase { State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp")); yarnRunner.killJob(jobId); verify(clientDelegate).killJob(jobId); + + when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null); + when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class))) + .thenReturn( + ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp", + "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp", + 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f, + "tmp", null)); + yarnRunner.killJob(jobId); + verify(clientDelegate).killJob(jobId); } @Test(timeout=20000)