YARN-5015. Support sliding window retry capability for container restart. (Chandni Singh via wangda)

Change-Id: I07addd3e4ba8d98456ee2ff1d5c540a38fe61dea
This commit is contained in:
Wangda Tan 2018-03-13 17:55:17 -07:00
parent 9714fc1dd4
commit a5b27b3c67
14 changed files with 451 additions and 43 deletions

View File

@ -49,6 +49,13 @@
* </li>
* <li><em>retryInterval</em> specifies delaying some time before relaunch
* container, the unit is millisecond.</li>
* <li>
* <em>failuresValidityInterval</em>: default value is -1.
* When failuresValidityInterval in milliseconds is set to {@literal >} 0,
* the failure number will not take failures which happen out of the
* failuresValidityInterval into failure count. If failure count
* reaches to <em>maxRetries</em>, the container will be failed.
* </li>
* </ul>
*/
@Public
@ -63,16 +70,25 @@ public abstract class ContainerRetryContext {
@Unstable
public static ContainerRetryContext newInstance(
ContainerRetryPolicy retryPolicy, Set<Integer> errorCodes,
int maxRetries, int retryInterval) {
int maxRetries, int retryInterval, long failuresValidityInterval) {
ContainerRetryContext containerRetryContext =
Records.newRecord(ContainerRetryContext.class);
containerRetryContext.setRetryPolicy(retryPolicy);
containerRetryContext.setErrorCodes(errorCodes);
containerRetryContext.setMaxRetries(maxRetries);
containerRetryContext.setRetryInterval(retryInterval);
containerRetryContext.setFailuresValidityInterval(failuresValidityInterval);
return containerRetryContext;
}
@Private
@Unstable
public static ContainerRetryContext newInstance(
ContainerRetryPolicy retryPolicy, Set<Integer> errorCodes,
int maxRetries, int retryInterval) {
return newInstance(retryPolicy, errorCodes, maxRetries, retryInterval, -1);
}
public abstract ContainerRetryPolicy getRetryPolicy();
public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
public abstract Set<Integer> getErrorCodes();
@ -81,4 +97,7 @@ public static ContainerRetryContext newInstance(
public abstract void setMaxRetries(int maxRetries);
public abstract int getRetryInterval();
public abstract void setRetryInterval(int retryInterval);
public abstract long getFailuresValidityInterval();
public abstract void setFailuresValidityInterval(
long failuresValidityInterval);
}

View File

@ -754,6 +754,7 @@ message ContainerRetryContextProto {
repeated int32 error_codes = 2;
optional int32 max_retries = 3 [default = 0];
optional int32 retry_interval = 4 [default = 0];
optional int64 failures_validity_interval = 5 [default = -1];
}
enum ContainerRetryPolicyProto {

View File

@ -308,6 +308,7 @@ public enum DSEntity {
private Set<Integer> containerRetryErrorCodes = null;
private int containerMaxRetries = 0;
private int containrRetryInterval = 0;
private long containerFailuresValidityInterval = -1;
// Timeline domain ID
private String domainId = null;
@ -471,6 +472,9 @@ public boolean init(String[] args) throws ParseException, IOException {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("container_failures_validity_interval", true,
"Failures which are out of the time window will not be added to"
+ " the number of container retry attempts");
opts.addOption("placement_spec", true, "Placement specification");
opts.addOption("debug", false, "Dump out debug information");
@ -661,7 +665,8 @@ public boolean init(String[] args) throws ParseException, IOException {
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
containerFailuresValidityInterval = Long.parseLong(
cliParser.getOptionValue("container_failures_validity_interval", "-1"));
if (!YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient = null;
timelineV2Client = null;
@ -1385,7 +1390,8 @@ public void run() {
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
containerMaxRetries, containrRetryInterval);
containerMaxRetries, containrRetryInterval,
containerFailuresValidityInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);

View File

@ -373,6 +373,9 @@ public Client(Configuration conf) throws Exception {
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("container_failures_validity_interval", true,
"Failures which are out of the time window will not be added to"
+ " the number of container retry attempts");
opts.addOption("docker_client_config", true,
"The docker client configuration path. The scheme should be supplied"
+ " (i.e. file:// or hdfs://)."
@ -579,6 +582,10 @@ public boolean init(String[] args) throws ParseException {
containerRetryOptions.add("--container_retry_interval "
+ cliParser.getOptionValue("container_retry_interval"));
}
if (cliParser.hasOption("container_failures_validity_interval")) {
containerRetryOptions.add("--container_failures_validity_interval "
+ cliParser.getOptionValue("container_failures_validity_interval"));
}
if (cliParser.hasOption("flow_name")) {
flowName = cliParser.getOptionValue("flow_name");

View File

@ -165,6 +165,21 @@ public void setRetryInterval(int retryInterval) {
builder.setRetryInterval(retryInterval);
}
@Override
public long getFailuresValidityInterval() {
ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasFailuresValidityInterval()) {
return -1;
}
return p.getFailuresValidityInterval();
}
@Override
public void setFailuresValidityInterval(long failuresValidityInterval) {
maybeInitBuilder();
builder.setFailuresValidityInterval(failuresValidityInterval);
}
private ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy containerRetryPolicy) {
return ProtoUtils.convertToProtoFormat(containerRetryPolicy);

View File

@ -167,9 +167,11 @@ private ReInitializationContext createContextForRollback() {
private long containerLaunchStartTime;
private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance();
private ContainerRetryContext containerRetryContext;
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
private SlidingWindowRetryPolicy.RetryContext windowRetryContext;
private SlidingWindowRetryPolicy retryPolicy;
private String workDir;
private String logDir;
private String host;
@ -246,7 +248,10 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
// Configure the Retry Context
this.containerRetryContext = configureRetryContext(
conf, launchContext, this.containerId);
this.remainingRetryAttempts = this.containerRetryContext.getMaxRetries();
this.windowRetryContext = new SlidingWindowRetryPolicy
.RetryContext(containerRetryContext);
this.retryPolicy = new SlidingWindowRetryPolicy(clock);
stateMachine = stateMachineFactory.make(this, ContainerState.NEW,
context.getContainerStateTransitionListener());
this.context = context;
@ -289,7 +294,9 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher,
this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(rcs.getDiagnostics());
this.version = rcs.getVersion();
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.windowRetryContext.setRemainingRetries(
rcs.getRemainingRetryAttempts());
this.windowRetryContext.setRestartTimes(rcs.getRestartTimes());
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
this.resourceMappings = rcs.getResourceMappings();
@ -1591,27 +1598,15 @@ public ContainerState transition(final ContainerImpl container,
if (exitEvent.getDiagnosticInfo() != null) {
if (container.containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY) {
int n = container.containerRetryContext.getMaxRetries()
- container.remainingRetryAttempts;
container.addDiagnostics("Diagnostic message from attempt "
+ n + " : ", "\n");
container.addDiagnostics("Diagnostic message from attempt : \n");
}
container.addDiagnostics(exitEvent.getDiagnosticInfo() + "\n");
}
if (container.shouldRetry(container.exitCode)) {
if (container.remainingRetryAttempts > 0) {
container.remainingRetryAttempts--;
try {
container.stateStore.storeContainerRemainingRetryAttempts(
container.getContainerId(), container.remainingRetryAttempts);
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
+ container.getContainerId(), e);
}
}
doRelaunch(container, container.remainingRetryAttempts,
container.storeRetryContext();
doRelaunch(container,
container.windowRetryContext.getRemainingRetries(),
container.containerRetryContext.getRetryInterval());
return ContainerState.RELAUNCHING;
} else if (container.canRollback()) {
@ -1671,29 +1666,14 @@ public boolean isRetryContextSet() {
@Override
public boolean shouldRetry(int errorCode) {
return shouldRetry(errorCode, containerRetryContext,
remainingRetryAttempts);
}
public static boolean shouldRetry(int errorCode,
ContainerRetryContext retryContext, int remainingRetryAttempts) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
ContainerRetryPolicy retryPolicy = retryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& retryContext.getErrorCodes() != null
&& retryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
}
return false;
return retryPolicy.shouldRetry(windowRetryContext, errorCode);
}
/**
* Transition to EXITED_WITH_FAILURE
*/
@ -1729,9 +1709,9 @@ public void transition(ContainerImpl container,
container.containerRetryContext =
configureRetryContext(container.context.getConf(),
container.launchContext, container.containerId);
// Reset the retry attempts since its a fresh start
container.remainingRetryAttempts =
container.containerRetryContext.getMaxRetries();
container.windowRetryContext = new SlidingWindowRetryPolicy
.RetryContext(container.containerRetryContext);
container.retryPolicy = new SlidingWindowRetryPolicy(clock);
container.resourceSet =
container.reInitContext.mergedResourceSet(container.resourceSet);
@ -2209,4 +2189,30 @@ private static void removeDockerContainer(ContainerImpl container) {
container.getContainerId().toString());
deletionService.delete(deletionTask);
}
private void storeRetryContext() {
if (windowRetryContext.getRestartTimes() != null) {
try {
stateStore.storeContainerRestartTimes(containerId,
windowRetryContext.getRestartTimes());
} catch (IOException e) {
LOG.warn(
"Unable to update finishTimeForRetryAttempts in state store for "
+ containerId, e);
}
}
try {
stateStore.storeContainerRemainingRetryAttempts(containerId,
windowRetryContext.getRemainingRetries());
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
+ containerId, e);
}
}
@VisibleForTesting
SlidingWindowRetryPolicy getRetryPolicy() {
return retryPolicy;
}
}

View File

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.util.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* <p>Sliding window retry policy for relaunching a
* <code>Container</code> in Yarn.</p>
*/
@InterfaceStability.Unstable
public class SlidingWindowRetryPolicy {
private Clock clock;
public SlidingWindowRetryPolicy(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}
public boolean shouldRetry(RetryContext retryContext,
int errorCode) {
ContainerRetryContext containerRC = retryContext
.containerRetryContext;
Preconditions.checkNotNull(containerRC, "container retry context null");
ContainerRetryPolicy retryPolicy = containerRC.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRC.getErrorCodes() != null
&& containerRC.getErrorCodes().contains(errorCode))) {
if (containerRC.getMaxRetries() == ContainerRetryContext.RETRY_FOREVER) {
return true;
}
int pendingRetries = calculatePendingRetries(retryContext);
updateRetryContext(retryContext, pendingRetries);
return pendingRetries > 0;
}
return false;
}
/**
* Calculates the pending number of retries.
* <p>
* When failuresValidityInterval is > 0, it also removes time entries from
* <code>restartTimes</code> which are outside the validity interval.
*
* @return the pending retries.
*/
private int calculatePendingRetries(RetryContext retryContext) {
ContainerRetryContext containerRC =
retryContext.containerRetryContext;
if (containerRC.getFailuresValidityInterval() > 0) {
Iterator<Long> iterator = retryContext.getRestartTimes().iterator();
long currentTime = clock.getTime();
while (iterator.hasNext()) {
long restartTime = iterator.next();
if (currentTime - restartTime
> containerRC.getFailuresValidityInterval()) {
iterator.remove();
} else {
break;
}
}
return containerRC.getMaxRetries() -
retryContext.getRestartTimes().size();
} else {
return retryContext.getRemainingRetries();
}
}
/**
* Updates remaining retries and the restart time when
* required in the retryContext.
*/
private void updateRetryContext(RetryContext retryContext,
int pendingRetries) {
retryContext.setRemainingRetries(pendingRetries - 1);
if (retryContext.containerRetryContext.getFailuresValidityInterval()
> 0) {
retryContext.getRestartTimes().add(clock.getTime());
}
}
/**
* Sets the clock.
* @param clock clock
*/
public void setClock(Clock clock) {
this.clock = Preconditions.checkNotNull(clock);
}
/**
* Sliding window container retry context.
* <p>
* Besides {@link ContainerRetryContext}, it also provide details such as:
* <ul>
* <li>
* <em>remainingRetries</em>: specifies the number of pending retries. It is
* initially set to <code>containerRetryContext.maxRetries</code>.
* </li>
* <li>
* <em>restartTimes</em>: when
* <code>containerRetryContext.failuresValidityInterval</code> is set,
* then this records the times when the container is set to restart.
* </li>
* </ul>
*/
static class RetryContext {
private final ContainerRetryContext containerRetryContext;
private List<Long> restartTimes = new ArrayList<>();
private int remainingRetries;
RetryContext(ContainerRetryContext containerRetryContext) {
this.containerRetryContext = Preconditions
.checkNotNull(containerRetryContext);
this.remainingRetries = containerRetryContext.getMaxRetries();
}
ContainerRetryContext getContainerRetryContext() {
return containerRetryContext;
}
int getRemainingRetries() {
return remainingRetries;
}
void setRemainingRetries(int remainingRetries) {
this.remainingRetries = remainingRetries;
}
List<Long> getRestartTimes() {
return restartTimes;
}
void setRestartTimes(List<Long> restartTimes) {
if (restartTimes != null) {
this.restartTimes.clear();
this.restartTimes.addAll(restartTimes);
}
}
}
}

View File

@ -127,6 +127,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
"/remainingRetryAttempts";
private static final String CONTAINER_RESTART_TIMES_SUFFIX =
"/restartTimes";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
@ -338,6 +340,16 @@ private RecoveredContainerState loadContainerState(ContainerId containerId,
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue())));
} else if (suffix.equals(CONTAINER_RESTART_TIMES_SUFFIX)) {
String value = asString(entry.getValue());
// parse the string format of List<Long>, e.g. [34, 21, 22]
String[] unparsedRestartTimes =
value.substring(1, value.length() - 1).split(", ");
List<Long> restartTimes = new ArrayList<>();
for (String restartTime : unparsedRestartTimes) {
restartTimes.add(Long.parseLong(restartTime));
}
rcs.setRestartTimes(restartTimes);
} else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
@ -581,6 +593,18 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
}
}
@Override
public void storeContainerRestartTimes(ContainerId containerId,
List<Long> restartTimes) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_RESTART_TIMES_SUFFIX;
try {
db.put(bytes(key), bytes(restartTimes.toString()));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {

View File

@ -119,6 +119,11 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
}
@Override
public void storeContainerRestartTimes(ContainerId containerId,
List<Long> restartTimes) throws IOException {
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {

View File

@ -98,6 +98,7 @@ public static class RecoveredContainerState {
StartContainerRequest startRequest;
Resource capability;
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
private List<Long> restartTimes;
private String workDir;
private String logDir;
int version;
@ -150,6 +151,15 @@ public void setRemainingRetryAttempts(int retryAttempts) {
this.remainingRetryAttempts = retryAttempts;
}
public List<Long> getRestartTimes() {
return restartTimes;
}
public void setRestartTimes(
List<Long> restartTimes) {
this.restartTimes = restartTimes;
}
public String getWorkDir() {
return workDir;
}
@ -177,6 +187,7 @@ public String toString() {
.append(", Capability: ").append(getCapability())
.append(", StartRequest: ").append(getStartRequest())
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
.append(", RestartTimes: ").append(restartTimes)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
.toString();
@ -486,6 +497,16 @@ public abstract void storeContainerDiagnostics(ContainerId containerId,
public abstract void storeContainerRemainingRetryAttempts(
ContainerId containerId, int remainingRetryAttempts) throws IOException;
/**
* Record restart times for a container.
* @param containerId
* @param restartTimes
* @throws IOException
*/
public abstract void storeContainerRestartTimes(
ContainerId containerId, List<Long> restartTimes)
throws IOException;
/**
* Record working directory for a container.
* @param containerId the container ID

View File

@ -105,6 +105,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
@ -1109,6 +1110,38 @@ private void testContainerRestartInterval(
}
}
@Test
public void testContainerRetryFailureValidityInterval() throws Exception {
ContainerRetryContext containerRetryContext = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
WrappedContainer wc = null;
try {
wc = new WrappedContainer(25, 314159265358980L, 4200, "test",
containerRetryContext);
ControlledClock clock = new ControlledClock();
wc.getRetryPolicy().setClock(clock);
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
wc.containerFailed(12);
assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
clock.setTime(20);
wc.containerFailed(12);
assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
clock.setTime(40);
wc.containerFailed(12);
assertEquals(ContainerState.RUNNING, wc.c.getContainerState());
clock.setTime(45);
wc.containerFailed(12);
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
} finally {
if (wc != null) {
wc.finished();
}
}
}
private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@ -1574,5 +1607,9 @@ public int getLocalResourceCount() {
public String getDiagnostics() {
return c.cloneAndGetContainerStatus().getDiagnostics();
}
public SlidingWindowRetryPolicy getRetryPolicy() {
return ((ContainerImpl)c).getRetryPolicy();
}
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for {@link SlidingWindowRetryPolicy}.
*/
public class TestSlidingWindowRetryPolicy {
private ControlledClock clock;
private SlidingWindowRetryPolicy retryPolicy;
@Before
public void setup() {
clock = new ControlledClock();
retryPolicy = new SlidingWindowRetryPolicy(clock);
}
@Test
public void testNeverRetry() {
ContainerRetryContext retryContext =
ContainerRetryContext.NEVER_RETRY_CONTEXT;
Assert.assertFalse("never retry", retryPolicy.shouldRetry(
new SlidingWindowRetryPolicy.RetryContext(retryContext), 12));
}
@Test
public void testAlwaysRetry() {
ContainerRetryContext retryContext = ContainerRetryContext.newInstance(
ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, -1,
0, 10);
Assert.assertTrue("always retry", retryPolicy.shouldRetry(
new SlidingWindowRetryPolicy.RetryContext(retryContext), 12));
}
@Test
public void testFailuresValidityInterval() {
ContainerRetryContext retryContext = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 1, 0, 10);
SlidingWindowRetryPolicy.RetryContext windowRetryContext =
new SlidingWindowRetryPolicy.RetryContext(retryContext);
Assert.assertTrue("retry 1",
retryPolicy.shouldRetry(windowRetryContext, 12));
clock.setTime(20);
Assert.assertTrue("retry 2",
retryPolicy.shouldRetry(windowRetryContext, 12));
clock.setTime(40);
Assert.assertTrue("retry 3",
retryPolicy.shouldRetry(windowRetryContext, 12));
clock.setTime(45);
Assert.assertFalse("retry failed",
retryPolicy.shouldRetry(windowRetryContext, 12));
}
}

View File

@ -121,6 +121,7 @@ public synchronized List<RecoveredContainerState> loadContainersState()
rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability;
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setRestartTimes(rcs.getRestartTimes());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
rcsCopy.setResourceMappings(rcs.getResourceMappings());
@ -212,6 +213,14 @@ public void storeContainerRemainingRetryAttempts(ContainerId containerId,
rcs.setRemainingRetryAttempts(remainingRetryAttempts);
}
@Override
public void storeContainerRestartTimes(
ContainerId containerId, List<Long> restartTimes)
throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.setRestartTimes(restartTimes);
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {

View File

@ -371,6 +371,7 @@ public void testContainerStorage() throws IOException {
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
validateRetryAttempts(containerId);
// remove the container and verify not recovered
stateStore.removeContainer(containerId);
restartStateStore();
@ -378,6 +379,21 @@ public void testContainerStorage() throws IOException {
assertTrue(recoveredContainers.isEmpty());
}
private void validateRetryAttempts(ContainerId containerId)
throws IOException {
// store finishTimeForRetryAttempts
List<Long> finishTimeForRetryAttempts = Arrays.asList(1462700529039L,
1462700529050L, 1462700529120L);
stateStore.storeContainerRestartTimes(containerId,
finishTimeForRetryAttempts);
restartStateStore();
RecoveredContainerState rcs = stateStore.loadContainersState().get(0);
List<Long> recoveredRestartTimes = rcs.getRestartTimes();
assertEquals(1462700529039L, (long)recoveredRestartTimes.get(0));
assertEquals(1462700529050L, (long)recoveredRestartTimes.get(1));
assertEquals(1462700529120L, (long)recoveredRestartTimes.get(2));
}
private StartContainerRequest createContainerRequest(
ContainerId containerId) {
LocalResource lrsrc = LocalResource.newInstance(