YARN-1630. Introduce timeout for async polling operations in YarnClientImpl (Aditya Acharya via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-01-28 22:30:45 +00:00
parent be75f21b6f
commit a01eba048b
4 changed files with 76 additions and 4 deletions

View File

@ -317,6 +317,9 @@ Release 2.4.0 - UNRELEASED
YARN-1573. ZK store should use a private password for root-node-acls. YARN-1573. ZK store should use a private password for root-node-acls.
(kasha). (kasha).
YARN-1630. Introduce timeout for async polling operations in YarnClientImpl
(Aditya Acharya via Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -1020,6 +1020,17 @@ public class YarnConfiguration extends Configuration {
YARN_PREFIX + "client.application-client-protocol.poll-interval-ms"; YARN_PREFIX + "client.application-client-protocol.poll-interval-ms";
public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS = public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS =
200; 200;
/**
* The duration that the yarn client library waits, cumulatively across polls,
* for an expected state change to occur. Defaults to -1, which indicates no
* limit.
*/
public static final String YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS =
YARN_PREFIX + "client.application-client-protocol.poll-timeout-ms";
public static final long DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS =
-1;
/** /**
* Max number of threads in NMClientAsync to process container management * Max number of threads in NMClientAsync to process container management
* events * events

View File

@ -86,6 +86,7 @@ public class YarnClientImpl extends YarnClient {
protected ApplicationClientProtocol rmClient; protected ApplicationClientProtocol rmClient;
protected long submitPollIntervalMillis; protected long submitPollIntervalMillis;
private long asyncApiPollIntervalMillis; private long asyncApiPollIntervalMillis;
private long asyncApiPollTimeoutMillis;
protected AHSClient historyClient; protected AHSClient historyClient;
private boolean historyServiceEnabled; private boolean historyServiceEnabled;
@ -101,6 +102,9 @@ public class YarnClientImpl extends YarnClient {
asyncApiPollIntervalMillis = asyncApiPollIntervalMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS, conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS); YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
asyncApiPollTimeoutMillis =
conf.getLong(YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS);
submitPollIntervalMillis = asyncApiPollIntervalMillis; submitPollIntervalMillis = asyncApiPollIntervalMillis;
if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS) if (conf.get(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS)
!= null) { != null) {
@ -174,13 +178,24 @@ public class YarnClientImpl extends YarnClient {
rmClient.submitApplication(request); rmClient.submitApplication(request);
int pollCount = 0; int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) { while (true) {
YarnApplicationState state = YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState(); getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) && if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) { !state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break; break;
} }
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client // Notify the client through the log every 10 poll, in case the client
// is blocked here too long. // is blocked here too long.
if (++pollCount % 10 == 0) { if (++pollCount % 10 == 0) {
@ -191,10 +206,11 @@ public class YarnClientImpl extends YarnClient {
try { try {
Thread.sleep(submitPollIntervalMillis); Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application " + applicationId
+ " to be successfully submitted.");
} }
} }
LOG.info("Submitted application " + applicationId);
return applicationId; return applicationId;
} }
@ -207,15 +223,25 @@ public class YarnClientImpl extends YarnClient {
try { try {
int pollCount = 0; int pollCount = 0;
long startTime = System.currentTimeMillis();
while (true) { while (true) {
KillApplicationResponse response = KillApplicationResponse response =
rmClient.forceKillApplication(request); rmClient.forceKillApplication(request);
if (response.getIsKillCompleted()) { if (response.getIsKillCompleted()) {
LOG.info("Killed application " + applicationId);
break; break;
} }
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= this.asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be killed.");
}
if (++pollCount % 10 == 0) { if (++pollCount % 10 == 0) {
LOG.info("Watiting for application " + applicationId LOG.info("Waiting for application " + applicationId + " to be killed.");
+ " to be killed.");
} }
Thread.sleep(asyncApiPollIntervalMillis); Thread.sleep(asyncApiPollIntervalMillis);
} }
@ -223,7 +249,11 @@ public class YarnClientImpl extends YarnClient {
LOG.error("Interrupted while waiting for application " + applicationId LOG.error("Interrupted while waiting for application " + applicationId
+ " to be killed."); + " to be killed.");
} }
LOG.info("Killed application " + applicationId); }
@VisibleForTesting
boolean enforceAsyncAPITimeout() {
return asyncApiPollTimeoutMillis >= 0;
} }
@Override @Override

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client.api.impl; package org.apache.hadoop.yarn.client.api.impl;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -35,6 +36,7 @@ import java.util.Set;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@ -474,4 +476,30 @@ public class TestYarnClient {
} }
} }
@Test
public void testAsyncAPIPollTimeout() {
testAsyncAPIPollTimeoutHelper(null, false);
testAsyncAPIPollTimeoutHelper(0L, true);
testAsyncAPIPollTimeoutHelper(1L, true);
}
private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout,
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();
try {
Configuration conf = new Configuration();
if (valueForTimeout != null) {
conf.setLong(
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
valueForTimeout);
}
client.init(conf);
Assert.assertEquals(
expectedTimeoutEnforcement, client.enforceAsyncAPITimeout());
} finally {
IOUtils.closeQuietly(client);
}
}
} }