Configurable timeout between YARNRunner terminate the application and forcefully kill. Contributed by Eric Payne.

This commit is contained in:
Junping Du 2015-03-10 06:21:59 -07:00
parent 7711049837
commit d39bc903a0
5 changed files with 46 additions and 1 deletions

View File

@ -335,6 +335,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6267. Refactor JobSubmitter#copyAndConfigureFiles into it's own
class. (Chris Trezzo via kasha)
MAPREDUCE-6263. Configurable timeout between YARNRunner terminate the
application and forcefully kill. (Eric Payne via junping_du)
OPTIMIZATIONS
MAPREDUCE-6169. MergeQueue should release reference to the current item

View File

@ -644,6 +644,11 @@ public interface MRJobConfig {
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
public static final String MR_AM_HARD_KILL_TIMEOUT_MS =
MR_AM_PREFIX + "hard-kill-timeout-ms";
public static final long DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS =
10 * 1000l;
/**
* The threshold in terms of seconds after which an unsatisfied mapper request
* triggers reducer preemption to free space. Default 0 implies that the reduces

View File

@ -1359,6 +1359,14 @@
</description>
</property>
<property>
<name>yarn.app.mapreduce.am.hard-kill-timeout-ms</name>
<value>10000</value>
<description>
Number of milliseconds to wait before the job client kills the application.
</description>
</property>
<property>
<description>CLASSPATH for MR applications. A comma-separated list
of CLASSPATH entries. If mapreduce.application.framework is set then this

View File

@ -640,7 +640,10 @@ public class YARNRunner implements ClientProtocol {
clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
while ((currentTimeMillis < timeKillIssued + 10000L)
long killTimeOut =
conf.getLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS);
while ((currentTimeMillis < timeKillIssued + killTimeOut)
&& !isJobInTerminalState(status)) {
try {
Thread.sleep(1000L);

View File

@ -201,6 +201,32 @@ public class TestYARNRunner extends TestCase {
verify(clientDelegate).killJob(jobId);
}
@Test(timeout=60000)
public void testJobKillTimeout() throws Exception {
long timeToWaitBeforeHardKill =
10000 + MRJobConfig.DEFAULT_MR_AM_HARD_KILL_TIMEOUT_MS;
conf.setLong(MRJobConfig.MR_AM_HARD_KILL_TIMEOUT_MS,
timeToWaitBeforeHardKill);
clientDelegate = mock(ClientServiceDelegate.class);
doAnswer(
new Answer<ClientServiceDelegate>() {
@Override
public ClientServiceDelegate answer(InvocationOnMock invocation)
throws Throwable {
return clientDelegate;
}
}
).when(clientCache).getClient(any(JobID.class));
when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new
org.apache.hadoop.mapreduce.JobStatus(jobId, 0f, 0f, 0f, 0f,
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
long startTimeMillis = System.currentTimeMillis();
yarnRunner.killJob(jobId);
assertTrue("killJob should have waited at least " + timeToWaitBeforeHardKill
+ " ms.", System.currentTimeMillis() - startTimeMillis
>= timeToWaitBeforeHardKill);
}
@Test(timeout=20000)
public void testJobSubmissionFailure() throws Exception {
when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))).