YARN-8362. Bugfix logic in container retries in node manager.
Contributed by Chandni Singh
(cherry picked from commit 135941e00d
)
This commit is contained in:
parent
11a425d11a
commit
03209e8966
|
@ -1602,8 +1602,10 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
|
||||
}
|
||||
|
||||
if (container.shouldRetry(container.exitCode)) {
|
||||
// Updates to the retry context should be protected from concurrent
|
||||
// writes. It should only be called from this transition.
|
||||
container.retryPolicy.updateRetryContext(container.windowRetryContext);
|
||||
container.storeRetryContext();
|
||||
doRelaunch(container,
|
||||
container.windowRetryContext.getRemainingRetries(),
|
||||
|
|
|
@ -42,38 +42,58 @@ public class SlidingWindowRetryPolicy {
|
|||
|
||||
public boolean shouldRetry(RetryContext retryContext,
|
||||
int errorCode) {
|
||||
ContainerRetryContext containerRC = retryContext
|
||||
.containerRetryContext;
|
||||
ContainerRetryContext containerRC = retryContext.containerRetryContext;
|
||||
Preconditions.checkNotNull(containerRC, "container retry context null");
|
||||
ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy();
|
||||
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|
||||
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
|
||||
&& containerRC.getErrorCodes() != null
|
||||
&& containerRC.getErrorCodes().contains(errorCode))) {
|
||||
if (containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER) {
|
||||
return true;
|
||||
}
|
||||
int pendingRetries = calculatePendingRetries(retryContext);
|
||||
updateRetryContext(retryContext, pendingRetries);
|
||||
return pendingRetries > 0;
|
||||
return containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER
|
||||
|| calculateRemainingRetries(retryContext) > 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the pending number of retries.
|
||||
* <p>
|
||||
* When failuresValidityInterval is > 0, it also removes time entries from
|
||||
* <code>restartTimes</code> which are outside the validity interval.
|
||||
* Calculates the remaining number of retries.
|
||||
*
|
||||
* @return the pending retries.
|
||||
* @return the remaining retries.
|
||||
*/
|
||||
private int calculatePendingRetries(RetryContext retryContext) {
|
||||
private int calculateRemainingRetries(RetryContext retryContext) {
|
||||
ContainerRetryContext containerRC =
|
||||
retryContext.containerRetryContext;
|
||||
if (containerRC.getFailuresValidityInterval() > 0) {
|
||||
int validFailuresCount = 0;
|
||||
long currentTime = clock.getTime();
|
||||
for (int i = retryContext.restartTimes.size() - 1; i >= 0; i--) {
|
||||
long restartTime = retryContext.restartTimes.get(i);
|
||||
if (currentTime - restartTime
|
||||
<= containerRC.getFailuresValidityInterval()) {
|
||||
validFailuresCount++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return containerRC.getMaxRetries() - validFailuresCount;
|
||||
} else {
|
||||
return retryContext.getRemainingRetries();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates remaining retries and the restart time when
|
||||
* required in the retryContext.
|
||||
* <p>
|
||||
* When failuresValidityInterval is > 0, it also removes time entries from
|
||||
* <code>restartTimes</code> which are outside the validity interval.
|
||||
*/
|
||||
protected void updateRetryContext(RetryContext retryContext) {
|
||||
if (retryContext.containerRetryContext.getFailuresValidityInterval() > 0) {
|
||||
ContainerRetryContext containerRC = retryContext.containerRetryContext;
|
||||
Iterator<Long> iterator = retryContext.getRestartTimes().iterator();
|
||||
long currentTime = clock.getTime();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
long restartTime = iterator.next();
|
||||
if (currentTime - restartTime
|
||||
|
@ -83,23 +103,11 @@ public class SlidingWindowRetryPolicy {
|
|||
break;
|
||||
}
|
||||
}
|
||||
return containerRC.getMaxRetries() -
|
||||
retryContext.getRestartTimes().size();
|
||||
retryContext.setRemainingRetries(containerRC.getMaxRetries() -
|
||||
retryContext.restartTimes.size());
|
||||
retryContext.getRestartTimes().add(currentTime);
|
||||
} else {
|
||||
return retryContext.getRemainingRetries();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates remaining retries and the restart time when
|
||||
* required in the retryContext.
|
||||
*/
|
||||
private void updateRetryContext(RetryContext retryContext,
|
||||
int pendingRetries) {
|
||||
retryContext.setRemainingRetries(pendingRetries - 1);
|
||||
if (retryContext.containerRetryContext.getFailuresValidityInterval()
|
||||
> 0) {
|
||||
retryContext.getRestartTimes().add(clock.getTime());
|
||||
retryContext.remainingRetries--;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -64,12 +64,18 @@ public class TestSlidingWindowRetryPolicy {
|
|||
new SlidingWindowRetryPolicy.RetryContext(retryContext);
|
||||
Assert.assertTrue("retry 1",
|
||||
retryPolicy.shouldRetry(windowRetryContext, 12));
|
||||
retryPolicy.updateRetryContext(windowRetryContext);
|
||||
|
||||
clock.setTime(20);
|
||||
Assert.assertTrue("retry 2",
|
||||
retryPolicy.shouldRetry(windowRetryContext, 12));
|
||||
retryPolicy.updateRetryContext(windowRetryContext);
|
||||
|
||||
clock.setTime(40);
|
||||
Assert.assertTrue("retry 3",
|
||||
retryPolicy.shouldRetry(windowRetryContext, 12));
|
||||
retryPolicy.updateRetryContext(windowRetryContext);
|
||||
|
||||
clock.setTime(45);
|
||||
Assert.assertFalse("retry failed",
|
||||
retryPolicy.shouldRetry(windowRetryContext, 12));
|
||||
|
|
Loading…
Reference in New Issue