YARN-2709. Made timeline client getDelegationToken API retry if ConnectException happens. Contributed by Li Lu.
(cherry picked from commit b2942762d7
)
This commit is contained in:
parent
2d0996ae6d
commit
296bbd8971
|
@ -352,6 +352,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-90. NodeManager should identify failed disks becoming good again
|
YARN-90. NodeManager should identify failed disks becoming good again
|
||||||
(Varun Vasudev via jlowe)
|
(Varun Vasudev via jlowe)
|
||||||
|
|
||||||
|
YARN-2709. Made timeline client getDelegationToken API retry if ConnectException
|
||||||
|
happens. (Li Lu via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -107,9 +107,23 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
private URI resURI;
|
private URI resURI;
|
||||||
private boolean isEnabled;
|
private boolean isEnabled;
|
||||||
|
|
||||||
private TimelineJerseyRetryFilter retryFilter;
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
TimelineClientConnectionRetry connectionRetry;
|
||||||
|
|
||||||
static class TimelineJerseyRetryFilter extends ClientFilter {
|
// Abstract class for an operation that should be retried by timeline client
|
||||||
|
private static abstract class TimelineClientRetryOp {
|
||||||
|
// The operation that should be retried
|
||||||
|
public abstract Object run() throws IOException;
|
||||||
|
// The method to indicate if we should retry given the incoming exception
|
||||||
|
public abstract boolean shouldRetryOn(Exception e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Class to handle retry
|
||||||
|
// Outside this class, only visible to tests
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
static class TimelineClientConnectionRetry {
|
||||||
// maxRetries < 0 means keep trying
|
// maxRetries < 0 means keep trying
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -119,14 +133,14 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long retryInterval;
|
public long retryInterval;
|
||||||
|
|
||||||
// Indicates if retries happened last time
|
// Indicates if retries happened last time. Only tests should read it.
|
||||||
|
// In unit tests, retryOn() calls should _not_ be concurrent.
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public boolean retried = false;
|
public boolean retried = false;
|
||||||
|
|
||||||
// Constructor with default retry settings
|
// Constructor with default retry settings
|
||||||
public TimelineJerseyRetryFilter(Configuration conf) {
|
public TimelineClientConnectionRetry(Configuration conf) {
|
||||||
super();
|
|
||||||
maxRetries = conf.getInt(
|
maxRetries = conf.getInt(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||||
|
@ -135,32 +149,36 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
public Object retryOn(TimelineClientRetryOp op)
|
||||||
public ClientResponse handle(ClientRequest cr)
|
throws RuntimeException, IOException {
|
||||||
throws ClientHandlerException {
|
|
||||||
int leftRetries = maxRetries;
|
int leftRetries = maxRetries;
|
||||||
retried = false;
|
retried = false;
|
||||||
|
|
||||||
// keep trying
|
// keep trying
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
// try pass the request on, if fail, keep retrying
|
// try perform the op, if fail, keep retrying
|
||||||
return getNext().handle(cr);
|
return op.run();
|
||||||
} catch (ClientHandlerException e) {
|
} catch (IOException e) {
|
||||||
|
// We may only throw runtime and IO exceptions. After switching to
|
||||||
|
// Java 1.7, we can merge these two catch blocks into one.
|
||||||
|
|
||||||
// break if there's no retries left
|
// break if there's no retries left
|
||||||
if (leftRetries == 0) {
|
if (leftRetries == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if(e.getCause() instanceof ConnectException) {
|
if (op.shouldRetryOn(e)) {
|
||||||
if (leftRetries > 0) {
|
logException(e, leftRetries);
|
||||||
LOG.info("Connection Timeout (" + cr.getURI() + "), will try "
|
} else {
|
||||||
+ leftRetries + " more time(s).");
|
throw e;
|
||||||
} else {
|
}
|
||||||
// note that maxRetries may be -1 at the very beginning
|
} catch (RuntimeException e) {
|
||||||
// maxRetries = -1 means keep trying
|
// break if there's no retries left
|
||||||
LOG.info("Connection Timeout (" + cr.getURI()
|
if (leftRetries == 0) {
|
||||||
+ "), will keep retrying.");
|
break;
|
||||||
}
|
}
|
||||||
retried = true;
|
if (op.shouldRetryOn(e)) {
|
||||||
|
logException(e, leftRetries);
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -168,6 +186,7 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
if (leftRetries > 0) {
|
if (leftRetries > 0) {
|
||||||
leftRetries--;
|
leftRetries--;
|
||||||
}
|
}
|
||||||
|
retried = true;
|
||||||
try {
|
try {
|
||||||
// sleep for the given time interval
|
// sleep for the given time interval
|
||||||
Thread.sleep(retryInterval);
|
Thread.sleep(retryInterval);
|
||||||
|
@ -175,10 +194,51 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
LOG.warn("Client retry sleep interrupted! ");
|
LOG.warn("Client retry sleep interrupted! ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new ClientHandlerException("Failed to connect to timeline server. "
|
throw new RuntimeException("Failed to connect to timeline server. "
|
||||||
+ "Connection retries limit exceeded. "
|
+ "Connection retries limit exceeded. "
|
||||||
+ "The posted timeline event may be missing");
|
+ "The posted timeline event may be missing");
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private void logException(Exception e, int leftRetries) {
|
||||||
|
if (leftRetries > 0) {
|
||||||
|
LOG.info("Exception caught by TimelineClientConnectionRetry,"
|
||||||
|
+ " will try " + leftRetries + " more time(s).\nMessage: "
|
||||||
|
+ e.getMessage());
|
||||||
|
} else {
|
||||||
|
// note that maxRetries may be -1 at the very beginning
|
||||||
|
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
|
||||||
|
+ " will keep retrying.\nMessage: "
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TimelineJerseyRetryFilter extends ClientFilter {
|
||||||
|
@Override
|
||||||
|
public ClientResponse handle(final ClientRequest cr)
|
||||||
|
throws ClientHandlerException {
|
||||||
|
// Set up the retry operation
|
||||||
|
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
|
||||||
|
@Override
|
||||||
|
public Object run() {
|
||||||
|
// Try pass the request, if fail, keep retrying
|
||||||
|
return getNext().handle(cr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean shouldRetryOn(Exception e) {
|
||||||
|
// Only retry on connection exceptions
|
||||||
|
return (e instanceof ClientHandlerException)
|
||||||
|
&& (e.getCause() instanceof ConnectException);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ClientHandlerException("Jersey retry failed!\nMessage: "
|
||||||
|
+ e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimelineClientImpl() {
|
public TimelineClientImpl() {
|
||||||
|
@ -201,10 +261,12 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
authenticator = new PseudoDelegationTokenAuthenticator();
|
authenticator = new PseudoDelegationTokenAuthenticator();
|
||||||
}
|
}
|
||||||
authenticator.setConnectionConfigurator(connConfigurator);
|
authenticator.setConnectionConfigurator(connConfigurator);
|
||||||
|
token = new DelegationTokenAuthenticatedURL.Token();
|
||||||
|
|
||||||
|
connectionRetry = new TimelineClientConnectionRetry(conf);
|
||||||
client = new Client(new URLConnectionClientHandler(
|
client = new Client(new URLConnectionClientHandler(
|
||||||
new TimelineURLConnectionFactory()), cc);
|
new TimelineURLConnectionFactory()), cc);
|
||||||
token = new DelegationTokenAuthenticatedURL.Token();
|
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
|
||||||
retryFilter = new TimelineJerseyRetryFilter(conf);
|
|
||||||
client.addFilter(retryFilter);
|
client.addFilter(retryFilter);
|
||||||
|
|
||||||
if (YarnConfiguration.useHttps(conf)) {
|
if (YarnConfiguration.useHttps(conf)) {
|
||||||
|
@ -282,36 +344,45 @@ public class TimelineClientImpl extends TimelineClient {
|
||||||
@Override
|
@Override
|
||||||
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
|
||||||
final String renewer) throws IOException, YarnException {
|
final String renewer) throws IOException, YarnException {
|
||||||
boolean isProxyAccess =
|
// Set up the retry operation
|
||||||
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
TimelineClientRetryOp tokenRetryOp = new TimelineClientRetryOp() {
|
||||||
== UserGroupInformation.AuthenticationMethod.PROXY;
|
@Override
|
||||||
UserGroupInformation callerUGI = isProxyAccess ?
|
public Object run() throws IOException {
|
||||||
UserGroupInformation.getCurrentUser().getRealUser()
|
// Try pass the request, if fail, keep retrying
|
||||||
: UserGroupInformation.getCurrentUser();
|
boolean isProxyAccess =
|
||||||
final String doAsUser = isProxyAccess ?
|
UserGroupInformation.getCurrentUser().getAuthenticationMethod()
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
== UserGroupInformation.AuthenticationMethod.PROXY;
|
||||||
try {
|
UserGroupInformation callerUGI = isProxyAccess ?
|
||||||
return callerUGI.doAs(
|
UserGroupInformation.getCurrentUser().getRealUser()
|
||||||
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
: UserGroupInformation.getCurrentUser();
|
||||||
@Override
|
final String doAsUser = isProxyAccess ?
|
||||||
public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
UserGroupInformation.getCurrentUser().getShortUserName() : null;
|
||||||
DelegationTokenAuthenticatedURL authUrl =
|
try {
|
||||||
new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
return callerUGI.doAs(
|
||||||
return (Token) authUrl.getDelegationToken(
|
new PrivilegedExceptionAction<Token<TimelineDelegationTokenIdentifier>>() {
|
||||||
resURI.toURL(), token, renewer, doAsUser);
|
@Override
|
||||||
|
public Token<TimelineDelegationTokenIdentifier> run() throws Exception {
|
||||||
|
DelegationTokenAuthenticatedURL authUrl =
|
||||||
|
new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
||||||
|
return (Token) authUrl.getDelegationToken(
|
||||||
|
resURI.toURL(), token, renewer, doAsUser);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
throw new IOException(e.getCause());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
} catch (UndeclaredThrowableException e) {
|
@Override
|
||||||
throw new IOException(e.getCause());
|
public boolean shouldRetryOn(Exception e) {
|
||||||
} catch (InterruptedException e) {
|
// Only retry on connection exceptions
|
||||||
throw new IOException(e);
|
return (e instanceof ConnectException);
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
@Private
|
return (Token<TimelineDelegationTokenIdentifier>)
|
||||||
@VisibleForTesting
|
connectionRetry.retryOn(tokenRetryOp);
|
||||||
public TimelineJerseyRetryFilter getRetryFilter() {
|
|
||||||
return retryFilter;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
|
|
@ -27,6 +27,8 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
||||||
|
@ -183,8 +185,8 @@ public class TestTimelineClient {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCheckRetryCount() throws Exception {
|
public void testCheckRetryCount() throws Exception {
|
||||||
int newMaxRetries = 1;
|
int newMaxRetries = 5;
|
||||||
long newIntervalMs = 1500;
|
long newIntervalMs = 500;
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||||
newMaxRetries);
|
newMaxRetries);
|
||||||
|
@ -197,13 +199,44 @@ public class TestTimelineClient {
|
||||||
client.putEntities(generateEntity());
|
client.putEntities(generateEntity());
|
||||||
Assert.fail("Exception expected!"
|
Assert.fail("Exception expected!"
|
||||||
+ "Timeline server should be off to run this test. ");
|
+ "Timeline server should be off to run this test. ");
|
||||||
} catch (ClientHandlerException ce) {
|
} catch (RuntimeException ce) {
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(
|
||||||
"Handler exception for reason other than retry: " + ce.getMessage(),
|
"Handler exception for reason other than retry: " + ce.getMessage(),
|
||||||
ce.getMessage().contains("Connection retries limit exceeded"));
|
ce.getMessage().contains("Connection retries limit exceeded"));
|
||||||
// we would expect this exception here, check if the client has retried
|
// we would expect this exception here, check if the client has retried
|
||||||
Assert.assertTrue("Retry filter didn't perform any retries! ", client
|
Assert.assertTrue("Retry filter didn't perform any retries! ", client
|
||||||
.getRetryFilter().retried);
|
.connectionRetry.retried);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTokenRetry() throws Exception {
|
||||||
|
int newMaxRetries = 5;
|
||||||
|
long newIntervalMs = 500;
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||||
|
newMaxRetries);
|
||||||
|
conf.setLong(YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||||
|
newIntervalMs);
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
// use kerberos to bypass the issue in HADOOP-11215
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||||
|
"kerberos");
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
TimelineClientImpl client = createTimelineClient(conf);
|
||||||
|
try {
|
||||||
|
// try getting a delegation token
|
||||||
|
client.getDelegationToken(
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
Assert.fail("Exception expected!"
|
||||||
|
+ "Timeline server should be off to run this test. ");
|
||||||
|
} catch (RuntimeException ce) {
|
||||||
|
Assert.assertTrue(
|
||||||
|
"Handler exception for reason other than retry: " + ce.toString(), ce
|
||||||
|
.getMessage().contains("Connection retries limit exceeded"));
|
||||||
|
// we would expect this exception here, check if the client has retried
|
||||||
|
Assert.assertTrue("Retry filter didn't perform any retries! ",
|
||||||
|
client.connectionRetry.retried);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue