YARN-3998. Add support in the NodeManager to re-launch containers. Contributed by Jun Gong.

(cherry picked from commit 0f25a1bb52)

 Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
This commit is contained in:
Varun Vasudev 2016-04-29 16:09:07 +05:30
parent 6ba39a1597
commit 6561e3b500
29 changed files with 1352 additions and 116 deletions

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService; import org.apache.hadoop.yarn.server.api.AuxiliaryService;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.util.Records;
* <li>Optional, application-specific binary service data.</li> * <li>Optional, application-specific binary service data.</li>
* <li>Environment variables for the launched process.</li> * <li>Environment variables for the launched process.</li>
* <li>Command to launch the container.</li> * <li>Command to launch the container.</li>
* <li>Retry strategy when container exits with failure.</li>
* </ul> * </ul>
* *
* @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest)
@ -61,6 +63,18 @@ public abstract class ContainerLaunchContext {
Map<String, String> environment, List<String> commands, Map<String, String> environment, List<String> commands,
Map<String, ByteBuffer> serviceData, ByteBuffer tokens, Map<String, ByteBuffer> serviceData, ByteBuffer tokens,
Map<ApplicationAccessType, String> acls) { Map<ApplicationAccessType, String> acls) {
return newInstance(localResources, environment, commands, serviceData,
tokens, acls, null);
}
@Public
@Unstable
public static ContainerLaunchContext newInstance(
Map<String, LocalResource> localResources,
Map<String, String> environment, List<String> commands,
Map<String, ByteBuffer> serviceData, ByteBuffer tokens,
Map<ApplicationAccessType, String> acls,
ContainerRetryContext containerRetryContext) {
ContainerLaunchContext container = ContainerLaunchContext container =
Records.newRecord(ContainerLaunchContext.class); Records.newRecord(ContainerLaunchContext.class);
container.setLocalResources(localResources); container.setLocalResources(localResources);
@ -69,6 +83,7 @@ public abstract class ContainerLaunchContext {
container.setServiceData(serviceData); container.setServiceData(serviceData);
container.setTokens(tokens); container.setTokens(tokens);
container.setApplicationACLs(acls); container.setApplicationACLs(acls);
container.setContainerRetryContext(containerRetryContext);
return container; return container;
} }
@ -195,4 +210,22 @@ public abstract class ContainerLaunchContext {
@Public @Public
@Stable @Stable
public abstract void setApplicationACLs(Map<ApplicationAccessType, String> acls); public abstract void setApplicationACLs(Map<ApplicationAccessType, String> acls);
/**
* Get the <code>ContainerRetryContext</code> to relaunch container.
* @return <code>ContainerRetryContext</code> to relaunch container.
*/
@Public
@Unstable
public abstract ContainerRetryContext getContainerRetryContext();
/**
* Set the <code>ContainerRetryContext</code> to relaunch container.
* @param containerRetryContext <code>ContainerRetryContext</code> to
* relaunch container.
*/
@Public
@Unstable
public abstract void setContainerRetryContext(
ContainerRetryContext containerRetryContext);
} }

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
import java.util.Set;
/**
* {@code ContainerRetryContext} indicates how container retry after it fails
* to run.
* <p>
* It provides details such as:
* <ul>
* <li>
* {@link ContainerRetryPolicy} :
* - NEVER_RETRY(DEFAULT value): no matter what error code is when container
* fails to run, just do not retry.
* - RETRY_ON_ALL_ERRORS: no matter what error code is, when container fails
* to run, just retry.
* - RETRY_ON_SPECIFIC_ERROR_CODES: when container fails to run, do retry if
* the error code is one of <em>errorCodes</em>, otherwise do not retry.
*
* Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry
* because it is usually killed on purpose.
* </li>
* <li>
* <em>maxRetries</em> specifies how many times to retry if need to retry.
* If the value is -1, it means retry forever.
* </li>
* <li><em>retryInterval</em> specifies delaying some time before relaunch
* container, the unit is millisecond.</li>
* </ul>
*/
@Public
@Unstable
public abstract class ContainerRetryContext {
public static final int RETRY_FOREVER = -1;
public static final int RETRY_INVALID = -1000;
public static final ContainerRetryContext NEVER_RETRY_CONTEXT =
newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0);
@Private
@Unstable
public static ContainerRetryContext newInstance(
ContainerRetryPolicy retryPolicy, Set<Integer> errorCodes,
int maxRetries, int retryInterval) {
ContainerRetryContext containerRetryContext =
Records.newRecord(ContainerRetryContext.class);
containerRetryContext.setRetryPolicy(retryPolicy);
containerRetryContext.setErrorCodes(errorCodes);
containerRetryContext.setMaxRetries(maxRetries);
containerRetryContext.setRetryInterval(retryInterval);
return containerRetryContext;
}
public abstract ContainerRetryPolicy getRetryPolicy();
public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy);
public abstract Set<Integer> getErrorCodes();
public abstract void setErrorCodes(Set<Integer> errorCodes);
public abstract int getMaxRetries();
public abstract void setMaxRetries(int maxRetries);
public abstract int getRetryInterval();
public abstract void setRetryInterval(int retryInterval);
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* <p>Retry policy for relaunching a <code>Container</code>.</p>
*/
@Public
@Unstable
public enum ContainerRetryPolicy {
/** Never retry. */
NEVER_RETRY,
/** Retry for all error codes. */
RETRY_ON_ALL_ERRORS,
/** Retry for specific error codes. */
RETRY_ON_SPECIFIC_ERROR_CODES
}

View File

@ -782,6 +782,14 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "resourcemanager.minimum.version"; NM_PREFIX + "resourcemanager.minimum.version";
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
/**
* Maximum size of contain's diagnostics to keep for relaunching container
* case.
**/
public static final String NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE =
NM_PREFIX + "container-diagnostics-maximum-size";
public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000;
/** Interval at which the delayed token removal thread runs */ /** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; RM_PREFIX + "delayed.delegation-token.removal-interval-ms";

View File

@ -504,6 +504,7 @@ message ContainerLaunchContextProto {
repeated StringStringMapProto environment = 4; repeated StringStringMapProto environment = 4;
repeated string command = 5; repeated string command = 5;
repeated ApplicationACLMapProto application_ACLs = 6; repeated ApplicationACLMapProto application_ACLs = 6;
optional ContainerRetryContextProto container_retry_context = 7;
} }
message ContainerStatusProto { message ContainerStatusProto {
@ -526,6 +527,19 @@ message ContainerResourceChangeRequestProto {
optional ResourceProto capability = 2; optional ResourceProto capability = 2;
} }
message ContainerRetryContextProto {
optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY];
repeated int32 error_codes = 2;
optional int32 max_retries = 3 [default = 0];
optional int32 retry_interval = 4 [default = 0];
}
enum ContainerRetryPolicyProto {
NEVER_RETRY = 0;
RETRY_ON_ALL_ERRORS = 1;
RETRY_ON_SPECIFIC_ERROR_CODES = 2;
}
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
////// From common////////////////////////////////////////////////////// ////// From common//////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////

View File

@ -32,6 +32,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -256,6 +259,13 @@ public class ApplicationMaster {
// File length needed for local resource // File length needed for local resource
private long shellScriptPathLen = 0; private long shellScriptPathLen = 0;
// Container retry options
private ContainerRetryPolicy containerRetryPolicy =
ContainerRetryPolicy.NEVER_RETRY;
private Set<Integer> containerRetryErrorCodes = null;
private int containerMaxRetries = 0;
private int containrRetryInterval = 0;
// Timeline domain ID // Timeline domain ID
private String domainId = null; private String domainId = null;
@ -378,6 +388,18 @@ public class ApplicationMaster {
opts.addOption("num_containers", true, opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed"); "No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0"); opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("container_retry_policy", true,
"Retry policy when container fails to run, "
+ "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
+ "2: RETRY_ON_SPECIFIC_ERROR_CODES");
opts.addOption("container_retry_error_codes", true,
"When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
+ "codes is specified with this option, "
+ "e.g. --container_retry_error_codes 1,2,3");
opts.addOption("container_max_retries", true,
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
opts.addOption("debug", false, "Dump out debug information"); opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage"); opts.addOption("help", false, "Print usage");
@ -515,6 +537,21 @@ public class ApplicationMaster {
} }
requestPriority = Integer.parseInt(cliParser requestPriority = Integer.parseInt(cliParser
.getOptionValue("priority", "0")); .getOptionValue("priority", "0"));
containerRetryPolicy = ContainerRetryPolicy.values()[
Integer.parseInt(cliParser.getOptionValue(
"container_retry_policy", "0"))];
if (cliParser.hasOption("container_retry_error_codes")) {
containerRetryErrorCodes = new HashSet<>();
for (String errorCode :
cliParser.getOptionValue("container_retry_error_codes").split(",")) {
containerRetryErrorCodes.add(Integer.parseInt(errorCode));
}
}
containerMaxRetries = Integer.parseInt(
cliParser.getOptionValue("container_max_retries", "0"));
containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
"container_retry_interval", "0"));
return true; return true;
} }
@ -1069,9 +1106,13 @@ public class ApplicationMaster {
// "hadoop dfs" command inside the distributed shell. // "hadoop dfs" command inside the distributed shell.
Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv); Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
myShellEnv.put(YARN_SHELL_ID, shellId); myShellEnv.put(YARN_SHELL_ID, shellId);
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
containerMaxRetries, containrRetryInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(), localResources, myShellEnv, commands, null, allTokens.duplicate(),
null); null, containerRetryContext);
containerListener.addContainer(container.getId(), container); containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx); nmClientAsync.startContainerAsync(container, ctx);
} }

View File

@ -169,6 +169,8 @@ public class Client {
private long attemptFailuresValidityInterval = -1; private long attemptFailuresValidityInterval = -1;
private Vector<CharSequence> containerRetryOptions = new Vector<>(5);
// Debug flag // Debug flag
boolean debugFlag = false; boolean debugFlag = false;
@ -288,6 +290,18 @@ public class Client {
+ " will be allocated, \"\" means containers" + " will be allocated, \"\" means containers"
+ " can be allocated anywhere, if you don't specify the option," + " can be allocated anywhere, if you don't specify the option,"
+ " default node_label_expression of queue will be used."); + " default node_label_expression of queue will be used.");
opts.addOption("container_retry_policy", true,
"Retry policy when container fails to run, "
+ "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, "
+ "2: RETRY_ON_SPECIFIC_ERROR_CODES");
opts.addOption("container_retry_error_codes", true,
"When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error "
+ "codes is specified with this option, "
+ "e.g. --container_retry_error_codes 1,2,3");
opts.addOption("container_max_retries", true,
"If container could retry, it specifies max retires");
opts.addOption("container_retry_interval", true,
"Interval between each retry, unit is milliseconds");
} }
/** /**
@ -430,6 +444,24 @@ public class Client {
} }
} }
// Get container retry options
if (cliParser.hasOption("container_retry_policy")) {
containerRetryOptions.add("--container_retry_policy "
+ cliParser.getOptionValue("container_retry_policy"));
}
if (cliParser.hasOption("container_retry_error_codes")) {
containerRetryOptions.add("--container_retry_error_codes "
+ cliParser.getOptionValue("container_retry_error_codes"));
}
if (cliParser.hasOption("container_max_retries")) {
containerRetryOptions.add("--container_max_retries "
+ cliParser.getOptionValue("container_max_retries"));
}
if (cliParser.hasOption("container_retry_interval")) {
containerRetryOptions.add("--container_retry_interval "
+ cliParser.getOptionValue("container_retry_interval"));
}
return true; return true;
} }
@ -639,6 +671,8 @@ public class Client {
vargs.add("--debug"); vargs.add("--debug");
} }
vargs.addAll(containerRetryOptions);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");

View File

@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto;
@ -56,6 +58,7 @@ extends ContainerLaunchContext {
private Map<String, String> environment = null; private Map<String, String> environment = null;
private List<String> commands = null; private List<String> commands = null;
private Map<ApplicationAccessType, String> applicationACLS = null; private Map<ApplicationAccessType, String> applicationACLS = null;
private ContainerRetryContext containerRetryContext = null;
public ContainerLaunchContextPBImpl() { public ContainerLaunchContextPBImpl() {
builder = ContainerLaunchContextProto.newBuilder(); builder = ContainerLaunchContextProto.newBuilder();
@ -120,6 +123,10 @@ extends ContainerLaunchContext {
if (this.applicationACLS != null) { if (this.applicationACLS != null) {
addApplicationACLs(); addApplicationACLs();
} }
if (this.containerRetryContext != null) {
builder.setContainerRetryContext(
convertToProtoFormat(this.containerRetryContext));
}
} }
private void mergeLocalToProto() { private void mergeLocalToProto() {
@ -462,6 +469,27 @@ extends ContainerLaunchContext {
this.applicationACLS.putAll(appACLs); this.applicationACLS.putAll(appACLs);
} }
public ContainerRetryContext getContainerRetryContext() {
ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.containerRetryContext != null) {
return this.containerRetryContext;
}
if (!p.hasContainerRetryContext()) {
return null;
}
this.containerRetryContext = convertFromProtoFormat(
p.getContainerRetryContext());
return this.containerRetryContext;
}
public void setContainerRetryContext(ContainerRetryContext retryContext) {
maybeInitBuilder();
if (retryContext == null) {
builder.clearContainerRetryContext();
}
this.containerRetryContext = retryContext;
}
private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) {
return new LocalResourcePBImpl(p); return new LocalResourcePBImpl(p);
} }
@ -469,4 +497,14 @@ extends ContainerLaunchContext {
private LocalResourceProto convertToProtoFormat(LocalResource t) { private LocalResourceProto convertToProtoFormat(LocalResource t) {
return ((LocalResourcePBImpl)t).getProto(); return ((LocalResourcePBImpl)t).getProto();
} }
private ContainerRetryContextPBImpl convertFromProtoFormat(
ContainerRetryContextProto p) {
return new ContainerRetryContextPBImpl(p);
}
private ContainerRetryContextProto convertToProtoFormat(
ContainerRetryContext t) {
return ((ContainerRetryContextPBImpl)t).getProto();
}
} }

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import com.google.protobuf.TextFormat;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder;
import java.util.HashSet;
import java.util.Set;
/**
* Implementation of ContainerRetryContext.
*/
public class ContainerRetryContextPBImpl extends ContainerRetryContext {
private ContainerRetryContextProto proto =
ContainerRetryContextProto.getDefaultInstance();
private ContainerRetryContextProto.Builder builder = null;
private boolean viaProto = false;
private Set<Integer> errorCodes = null;
public ContainerRetryContextPBImpl() {
builder = ContainerRetryContextProto.newBuilder();
}
public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) {
this.proto = proto;
viaProto = true;
}
public ContainerRetryContextProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
private void mergeLocalToBuilder() {
if (this.errorCodes != null) {
builder.clearErrorCodes();
builder.addAllErrorCodes(this.errorCodes);
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ContainerRetryContextProto.newBuilder(proto);
}
viaProto = false;
}
public ContainerRetryPolicy getRetryPolicy() {
ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasRetryPolicy()) {
return ContainerRetryPolicy.NEVER_RETRY;
}
return convertFromProtoFormat(p.getRetryPolicy());
}
public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) {
maybeInitBuilder();
if (containerRetryPolicy == null) {
builder.clearRetryPolicy();
return;
}
builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy));
}
private void initErrorCodes() {
if (this.errorCodes != null) {
return;
}
ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
this.errorCodes = new HashSet<>();
this.errorCodes.addAll(p.getErrorCodesList());
}
public Set<Integer> getErrorCodes() {
initErrorCodes();
return this.errorCodes;
}
public void setErrorCodes(Set<Integer> errCodes) {
maybeInitBuilder();
if (errCodes == null || errCodes.isEmpty()) {
builder.clearErrorCodes();
}
this.errorCodes = errCodes;
}
public int getMaxRetries() {
ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasMaxRetries()) {
return 0;
}
return p.getMaxRetries();
}
public void setMaxRetries(int maxRetries) {
maybeInitBuilder();
builder.setMaxRetries(maxRetries);
}
public int getRetryInterval() {
ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasRetryInterval()) {
return 0;
}
return p.getRetryInterval();
}
public void setRetryInterval(int retryInterval) {
maybeInitBuilder();
builder.setRetryInterval(retryInterval);
}
private ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy containerRetryPolicy) {
return ProtoUtils.convertToProtoFormat(containerRetryPolicy);
}
private ContainerRetryPolicy convertFromProtoFormat(
ContainerRetryPolicyProto containerRetryPolicyProto) {
return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType; import org.apache.hadoop.yarn.server.api.ContainerType;
@ -282,4 +284,17 @@ public class ProtoUtils {
public static ContainerType convertFromProtoFormat(ContainerTypeProto e) { public static ContainerType convertFromProtoFormat(ContainerTypeProto e) {
return ContainerType.valueOf(e.name()); return ContainerType.valueOf(e.name());
} }
/*
* ContainerRetryPolicy
*/
public static ContainerRetryPolicyProto convertToProtoFormat(
ContainerRetryPolicy e) {
return ContainerRetryPolicyProto.valueOf(e.name());
}
public static ContainerRetryPolicy convertFromProtoFormat(
ContainerRetryPolicyProto e) {
return ContainerRetryPolicy.valueOf(e.name());
}
} }

View File

@ -1574,6 +1574,13 @@
<value>NONE</value> <value>NONE</value>
</property> </property>
<property>
<description>Maximum size of contain's diagnostics to keep for relaunching
container case.</description>
<name>yarn.nodemanager.container-diagnostics-maximum-size</name>
<value>10000</value>
</property>
<property> <property>
<description>Max number of threads in NMClientAsync to process container <description>Max number of threads in NMClientAsync to process container
management events</description> management events</description>

View File

@ -121,6 +121,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@ -165,6 +166,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerLaunchContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@ -199,6 +201,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -354,7 +357,7 @@ public class TestPBImplRecords {
return rand.nextBoolean(); return rand.nextBoolean();
} else if (type.equals(byte.class)) { } else if (type.equals(byte.class)) {
return bytes[rand.nextInt(4)]; return bytes[rand.nextInt(4)];
} else if (type.equals(int.class)) { } else if (type.equals(int.class) || type.equals(Integer.class)) {
return rand.nextInt(1000000); return rand.nextInt(1000000);
} else if (type.equals(long.class)) { } else if (type.equals(long.class)) {
return Long.valueOf(rand.nextInt(1000000)); return Long.valueOf(rand.nextInt(1000000));
@ -478,6 +481,7 @@ public class TestPBImplRecords {
generateByNewInstance(ApplicationResourceUsageReport.class); generateByNewInstance(ApplicationResourceUsageReport.class);
generateByNewInstance(ApplicationReport.class); generateByNewInstance(ApplicationReport.class);
generateByNewInstance(Container.class); generateByNewInstance(Container.class);
generateByNewInstance(ContainerRetryContext.class);
generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ContainerLaunchContext.class);
generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ApplicationSubmissionContext.class);
generateByNewInstance(ContainerReport.class); generateByNewInstance(ContainerReport.class);
@ -968,6 +972,12 @@ public class TestPBImplRecords {
validatePBImplRecord(ContainerIdPBImpl.class, ContainerIdProto.class); validatePBImplRecord(ContainerIdPBImpl.class, ContainerIdProto.class);
} }
@Test
public void testContainerRetryPBImpl() throws Exception {
validatePBImplRecord(ContainerRetryContextPBImpl.class,
ContainerRetryContextProto.class);
}
@Test @Test
public void testContainerLaunchContextPBImpl() throws Exception { public void testContainerLaunchContextPBImpl() throws Exception {
validatePBImplRecord(ContainerLaunchContextPBImpl.class, validatePBImplRecord(ContainerLaunchContextPBImpl.class,

View File

@ -308,6 +308,7 @@ public abstract class ContainerExecutor implements Configurable {
} }
public enum ExitCode { public enum ExitCode {
SUCCESS(0),
FORCE_KILLED(137), FORCE_KILLED(137),
TERMINATED(143), TERMINATED(143),
LOST(154); LOST(154);

View File

@ -408,6 +408,24 @@ public class LocalDirsHandlerService extends AbstractService {
return lastDisksCheckTime; return lastDisksCheckTime;
} }
public boolean isGoodLocalDir(String path) {
return isInGoodDirs(getLocalDirs(), path);
}
public boolean isGoodLogDir(String path) {
return isInGoodDirs(getLogDirs(), path);
}
private boolean isInGoodDirs(List<String> goodDirs, String path) {
for (String goodDir : goodDirs) {
if (path.startsWith(goodDir)) {
return true;
}
}
return false;
}
/** /**
* Set good local dirs and good log dirs in the configuration so that the * Set good local dirs and good log dirs in the configuration so that the
* LocalDirAllocator objects will use this updated configuration only. * LocalDirAllocator objects will use this updated configuration only.
@ -551,6 +569,10 @@ public class LocalDirsHandlerService extends AbstractService {
checkWrite); checkWrite);
} }
public Path getLocalPathForRead(String pathStr) throws IOException {
return getPathToRead(pathStr, getLocalDirsForRead());
}
public Path getLogPathForWrite(String pathStr, boolean checkWrite) public Path getLogPathForWrite(String pathStr, boolean checkWrite)
throws IOException { throws IOException {
return logDirsAllocator.getLocalPathForWrite(pathStr, return logDirsAllocator.getLocalPathForWrite(pathStr,

View File

@ -350,8 +350,7 @@ public class ContainerManagerImpl extends CompositeService implements
YarnServerSecurityUtils.parseCredentials(launchContext); YarnServerSecurityUtils.parseCredentials(launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher, Container container = new ContainerImpl(getConfig(), dispatcher,
req.getContainerLaunchContext(), req.getContainerLaunchContext(),
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), credentials, metrics, token, context, rcs);
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context);
context.getContainers().put(containerId, container); context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container)); new ApplicationContainerInitEvent(container));

View File

@ -55,6 +55,18 @@ public interface Container extends EventHandler<ContainerEvent> {
NMContainerStatus getNMContainerStatus(); NMContainerStatus getNMContainerStatus();
boolean isRetryContextSet();
boolean shouldRetry(int errorCode);
String getWorkDir();
void setWorkDir(String workDir);
String getLogDir();
void setLogDir(String logDir);
String toString(); String toString();
} }

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@ -71,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -98,11 +102,17 @@ public class ContainerImpl implements Container {
private final String user; private final String user;
private int exitCode = ContainerExitStatus.INVALID; private int exitCode = ContainerExitStatus.INVALID;
private final StringBuilder diagnostics; private final StringBuilder diagnostics;
private final int diagnosticsMaxSize;
private boolean wasLaunched; private boolean wasLaunched;
private long containerLocalizationStartTime; private long containerLocalizationStartTime;
private long containerLaunchStartTime; private long containerLaunchStartTime;
private ContainerMetrics containerMetrics; private ContainerMetrics containerMetrics;
private static Clock clock = SystemClock.getInstance(); private static Clock clock = SystemClock.getInstance();
private final ContainerRetryContext containerRetryContext;
// remaining retries to relaunch container if needed
private int remainingRetryAttempts;
private String workDir;
private String logDir;
/** The NM-wide configuration - not specific to this container */ /** The NM-wide configuration - not specific to this container */
private final Configuration daemonConf; private final Configuration daemonConf;
@ -138,6 +148,16 @@ public class ContainerImpl implements Container {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
this.stateStore = context.getNMStateStore(); this.stateStore = context.getNMStateStore();
this.launchContext = launchContext; this.launchContext = launchContext;
if (launchContext != null
&& launchContext.getContainerRetryContext() != null) {
this.containerRetryContext = launchContext.getContainerRetryContext();
} else {
this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT;
}
this.remainingRetryAttempts = containerRetryContext.getMaxRetries();
this.diagnosticsMaxSize = conf.getInt(
YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE,
YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
this.containerTokenIdentifier = containerTokenIdentifier; this.containerTokenIdentifier = containerTokenIdentifier;
this.containerId = containerTokenIdentifier.getContainerID(); this.containerId = containerTokenIdentifier.getContainerID();
this.resource = containerTokenIdentifier.getResource(); this.resource = containerTokenIdentifier.getResource();
@ -172,22 +192,24 @@ public class ContainerImpl implements Container {
public ContainerImpl(Configuration conf, Dispatcher dispatcher, public ContainerImpl(Configuration conf, Dispatcher dispatcher,
ContainerLaunchContext launchContext, Credentials creds, ContainerLaunchContext launchContext, Credentials creds,
NodeManagerMetrics metrics, NodeManagerMetrics metrics,
ContainerTokenIdentifier containerTokenIdentifier, ContainerTokenIdentifier containerTokenIdentifier, Context context,
RecoveredContainerStatus recoveredStatus, int exitCode, RecoveredContainerState rcs) {
String diagnostics, boolean wasKilled, Resource recoveredCapability,
Context context) {
this(conf, dispatcher, launchContext, creds, metrics, this(conf, dispatcher, launchContext, creds, metrics,
containerTokenIdentifier, context); containerTokenIdentifier, context);
this.recoveredStatus = recoveredStatus; this.recoveredStatus = rcs.getStatus();
this.exitCode = exitCode; this.exitCode = rcs.getExitCode();
this.recoveredAsKilled = wasKilled; this.recoveredAsKilled = rcs.getKilled();
this.diagnostics.append(diagnostics); this.diagnostics.append(rcs.getDiagnostics());
Resource recoveredCapability = rcs.getCapability();
if (recoveredCapability != null if (recoveredCapability != null
&& !this.resource.equals(recoveredCapability)) { && !this.resource.equals(recoveredCapability)) {
// resource capability had been updated before NM was down // resource capability had been updated before NM was down
this.resource = Resource.newInstance(recoveredCapability.getMemory(), this.resource = Resource.newInstance(recoveredCapability.getMemory(),
recoveredCapability.getVirtualCores()); recoveredCapability.getVirtualCores());
} }
this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
this.workDir = rcs.getWorkDir();
this.logDir = rcs.getLogDir();
} }
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
@ -267,9 +289,10 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true)) new ExitedWithSuccessTransition(true))
.addTransition(ContainerState.RUNNING, .addTransition(ContainerState.RUNNING,
ContainerState.EXITED_WITH_FAILURE, EnumSet.of(ContainerState.RELAUNCHING,
ContainerState.EXITED_WITH_FAILURE),
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true)) new RetryFailureTransition())
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING, .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG, ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
@ -279,6 +302,19 @@ public class ContainerImpl implements Container {
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition()) new KilledExternallyTransition())
// From RELAUNCHING State
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.RELAUNCHING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State // From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
@ -382,6 +418,7 @@ public class ContainerImpl implements Container {
case LOCALIZATION_FAILED: case LOCALIZATION_FAILED:
case LOCALIZED: case LOCALIZED:
case RUNNING: case RUNNING:
case RELAUNCHING:
case EXITED_WITH_SUCCESS: case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE: case EXITED_WITH_FAILURE:
case KILLING: case KILLING:
@ -408,7 +445,8 @@ public class ContainerImpl implements Container {
public Map<Path,List<String>> getLocalizedResources() { public Map<Path,List<String>> getLocalizedResources() {
this.readLock.lock(); this.readLock.lock();
try { try {
if (ContainerState.LOCALIZED == getContainerState()) { if (ContainerState.LOCALIZED == getContainerState()
|| ContainerState.RELAUNCHING == getContainerState()) {
return localizedResources; return localizedResources;
} else { } else {
return null; return null;
@ -500,6 +538,26 @@ public class ContainerImpl implements Container {
} }
} }
@Override
public String getWorkDir() {
return workDir;
}
@Override
public void setWorkDir(String workDir) {
this.workDir = workDir;
}
@Override
public String getLogDir() {
return logDir;
}
@Override
public void setLogDir(String logDir) {
this.logDir = logDir;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void sendFinishedEvents() { private void sendFinishedEvents() {
// Inform the application // Inform the application
@ -526,6 +584,14 @@ public class ContainerImpl implements Container {
new ContainersLauncherEvent(this, launcherEvent)); new ContainersLauncherEvent(this, launcherEvent));
} }
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
ContainersLauncherEventType.RELAUNCH_CONTAINER;
dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent));
}
// Inform the ContainersMonitor to start monitoring the container's // Inform the ContainersMonitor to start monitoring the container's
// resource usage. // resource usage.
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
@ -551,6 +617,9 @@ public class ContainerImpl implements Container {
for (String s : diags) { for (String s : diags) {
this.diagnostics.append(s); this.diagnostics.append(s);
} }
if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) {
diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
}
try { try {
stateStore.storeContainerDiagnostics(containerId, diagnostics); stateStore.storeContainerDiagnostics(containerId, diagnostics);
} catch (IOException e) { } catch (IOException e) {
@ -875,6 +944,100 @@ public class ContainerImpl implements Container {
} }
} }
/**
* Transition to EXITED_WITH_FAILURE or LOCALIZED state upon
* CONTAINER_EXITED_WITH_FAILURE state.
**/
@SuppressWarnings("unchecked") // dispatcher not typed
static class RetryFailureTransition implements
MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
@Override
public ContainerState transition(final ContainerImpl container,
ContainerEvent event) {
ContainerExitEvent exitEvent = (ContainerExitEvent) event;
container.exitCode = exitEvent.getExitCode();
if (exitEvent.getDiagnosticInfo() != null) {
if (container.containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY) {
int n = container.containerRetryContext.getMaxRetries()
- container.remainingRetryAttempts;
container.addDiagnostics("Diagnostic message from attempt "
+ n + " : ", "\n");
}
container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
}
if (container.shouldRetry(container.exitCode)) {
if (container.remainingRetryAttempts > 0) {
container.remainingRetryAttempts--;
try {
container.stateStore.storeContainerRemainingRetryAttempts(
container.getContainerId(), container.remainingRetryAttempts);
} catch (IOException e) {
LOG.warn(
"Unable to update remainingRetryAttempts in state store for "
+ container.getContainerId(), e);
}
}
LOG.info("Relaunching Container " + container.getContainerId()
+ ". Remaining retry attempts(after relaunch) : "
+ container.remainingRetryAttempts
+ ". Interval between retries is "
+ container.containerRetryContext.getRetryInterval() + "ms");
container.wasLaunched = false;
container.metrics.endRunningContainer();
if (container.containerRetryContext.getRetryInterval() == 0) {
container.sendRelaunchEvent();
} else {
// wait for some time, then send launch event
new Thread() {
@Override
public void run() {
try {
Thread.sleep(
container.containerRetryContext.getRetryInterval());
container.sendRelaunchEvent();
} catch (InterruptedException e) {
return;
}
}
}.start();
}
return ContainerState.RELAUNCHING;
} else {
new ExitedWithFailureTransition(true).transition(container, event);
return ContainerState.EXITED_WITH_FAILURE;
}
}
}
@Override
public boolean isRetryContextSet() {
return containerRetryContext.getRetryPolicy()
!= ContainerRetryPolicy.NEVER_RETRY;
}
@Override
public boolean shouldRetry(int errorCode) {
if (errorCode == ExitCode.SUCCESS.getExitCode()
|| errorCode == ExitCode.FORCE_KILLED.getExitCode()
|| errorCode == ExitCode.TERMINATED.getExitCode()) {
return false;
}
ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy();
if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS
|| (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES
&& containerRetryContext.getErrorCodes() != null
&& containerRetryContext.getErrorCodes().contains(errorCode))) {
return remainingRetryAttempts > 0
|| remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER;
}
return false;
}
/** /**
* Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
*/ */

View File

@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState { public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS, NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING,
EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_RESOURCES_CLEANINGUP, DONE CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
} }

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
@ -98,7 +99,7 @@ public class ContainerLaunch implements Callable<Integer> {
protected final Dispatcher dispatcher; protected final Dispatcher dispatcher;
protected final ContainerExecutor exec; protected final ContainerExecutor exec;
private final Application app; protected final Application app;
protected final Container container; protected final Container container;
private final Configuration conf; private final Configuration conf;
private final Context context; private final Context context;
@ -112,7 +113,7 @@ public class ContainerLaunch implements Callable<Integer> {
protected Path pidFilePath = null; protected Path pidFilePath = null;
private final LocalDirsHandlerService dirsHandler; protected final LocalDirsHandlerService dirsHandler;
public ContainerLaunch(Context context, Configuration configuration, public ContainerLaunch(Context context, Configuration configuration,
Dispatcher dispatcher, ContainerExecutor exec, Application app, Dispatcher dispatcher, ContainerExecutor exec, Application app,
@ -156,33 +157,19 @@ public class ContainerLaunch implements Callable<Integer> {
@Override @Override
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() { public Integer call() {
if (!validateContainerState()) {
return 0;
}
final ContainerLaunchContext launchContext = container.getLaunchContext(); final ContainerLaunchContext launchContext = container.getLaunchContext();
Map<Path,List<String>> localResources = null;
ContainerId containerID = container.getContainerId(); ContainerId containerID = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerID); String containerIdStr = ConverterUtils.toString(containerID);
final List<String> command = launchContext.getCommands(); final List<String> command = launchContext.getCommands();
int ret = -1; int ret = -1;
// CONTAINER_KILLED_ON_REQUEST should not be missed if the container
// is already at KILLING
if (container.getContainerState() == ContainerState.KILLING) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerID,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
return 0;
}
Path containerLogDir; Path containerLogDir;
try { try {
localResources = container.getLocalizedResources(); Map<Path, List<String>> localResources = getLocalizedResources();
if (localResources == null) {
throw RPCUtil.getRemoteException(
"Unable to get local resources when Container " + containerID +
" is at " + container.getContainerState());
}
final String user = container.getUser(); final String user = container.getUser();
// /////////////////////////// Variable expansion // /////////////////////////// Variable expansion
@ -193,6 +180,7 @@ public class ContainerLaunch implements Callable<Integer> {
.getRelativeContainerLogDir(appIdStr, containerIdStr); .getRelativeContainerLogDir(appIdStr, containerIdStr);
containerLogDir = containerLogDir =
dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); dirsHandler.getLogPathForWrite(relativeContainerLogDir, false);
recordContainerLogDir(containerID, containerLogDir.toString());
for (String str : command) { for (String str : command) {
// TODO: Should we instead work via symlinks without this grammar? // TODO: Should we instead work via symlinks without this grammar?
newCmds.add(expandEnvironment(str, containerLogDir)); newCmds.add(expandEnvironment(str, containerLogDir));
@ -233,6 +221,7 @@ public class ContainerLaunch implements Callable<Integer> {
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR + containerIdStr, + Path.SEPARATOR + containerIdStr,
LocalDirAllocator.SIZE_UNKNOWN, false); LocalDirAllocator.SIZE_UNKNOWN, false);
recordContainerWorkDir(containerID, containerWorkDir.toString());
String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
@ -241,11 +230,8 @@ public class ContainerLaunch implements Callable<Integer> {
pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
List<String> localDirs = dirsHandler.getLocalDirs(); List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs(); List<String> logDirs = dirsHandler.getLogDirs();
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
List<String> containerLogDirs = new ArrayList<String>(); List<String> containerLogDirs = getContainerLogDirs(logDirs);
for( String logDir : logDirs) {
containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir);
}
if (!dirsHandler.areDisksHealthy()) { if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED; ret = ContainerExitStatus.DISKS_FAILED;
@ -253,7 +239,6 @@ public class ContainerLaunch implements Callable<Integer> {
+ dirsHandler.getDisksHealthReport(false)); + dirsHandler.getDisksHealthReport(false));
} }
List<String> containerLocalDirs = new ArrayList<>(localDirs.size());
try { try {
// /////////// Write out the container-script in the nmPrivate space. // /////////// Write out the container-script in the nmPrivate space.
List<Path> appDirs = new ArrayList<Path>(localDirs.size()); List<Path> appDirs = new ArrayList<Path>(localDirs.size());
@ -262,14 +247,6 @@ public class ContainerLaunch implements Callable<Integer> {
Path userdir = new Path(usersdir, user); Path userdir = new Path(usersdir, user);
Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, appIdStr)); appDirs.add(new Path(appsdir, appIdStr));
String containerLocalDir = localDir + Path.SEPARATOR +
ContainerLocalizer.USERCACHE + Path.SEPARATOR + user
+ Path.SEPARATOR
+ ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+ Path.SEPARATOR;
containerLocalDirs.add(containerLocalDir);
} }
containerScriptOutStream = containerScriptOutStream =
lfs.create(nmPrivateContainerScriptPath, lfs.create(nmPrivateContainerScriptPath,
@ -301,35 +278,19 @@ public class ContainerLaunch implements Callable<Integer> {
IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream); IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream);
} }
// LaunchContainer is a blocking call. We are here almost means the ret = launchContainer(new ContainerStartContext.Builder()
// container is launched, so send out the event. .setContainer(container)
dispatcher.getEventHandler().handle(new ContainerEvent( .setLocalizedResources(localResources)
containerID, .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
ContainerEventType.CONTAINER_LAUNCHED)); .setNmPrivateTokensPath(nmPrivateTokensPath)
context.getNMStateStore().storeContainerLaunched(containerID); .setUser(user)
.setAppId(appIdStr)
// Check if the container is signalled to be killed. .setContainerWorkDir(containerWorkDir)
if (!shouldLaunchContainer.compareAndSet(false, true)) { .setLocalDirs(localDirs)
LOG.info("Container " + containerIdStr + " not launched as " .setLogDirs(logDirs)
+ "cleanup already called"); .setContainerLocalDirs(containerLocalDirs)
ret = ExitCode.TERMINATED.getExitCode(); .setContainerLogDirs(containerLogDirs)
} .build());
else {
exec.activateContainer(containerID, pidFilePath);
ret = exec.launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setLocalizedResources(localResources)
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
.setNmPrivateTokensPath(nmPrivateTokensPath)
.setUser(user)
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.setContainerLocalDirs(containerLocalDirs)
.setContainerLogDirs(containerLogDirs)
.build());
}
} catch (Throwable e) { } catch (Throwable e) {
LOG.warn("Failed to launch container.", e); LOG.warn("Failed to launch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent( dispatcher.getEventHandler().handle(new ContainerExitEvent(
@ -337,46 +298,138 @@ public class ContainerLaunch implements Callable<Integer> {
e.getMessage())); e.getMessage()));
return ret; return ret;
} finally { } finally {
completed.set(true); setContainerCompletedStatus(ret);
exec.deactivateContainer(containerID);
try {
context.getNMStateStore().storeContainerCompleted(containerID, ret);
} catch (IOException e) {
LOG.error("Unable to set exit code for container " + containerID);
}
} }
handleContainerExitCode(ret, containerLogDir);
return ret;
}
@SuppressWarnings("unchecked")
protected boolean validateContainerState() {
// CONTAINER_KILLED_ON_REQUEST should not be missed if the container
// is already at KILLING
if (container.getContainerState() == ContainerState.KILLING) {
dispatcher.getEventHandler().handle(
new ContainerExitEvent(container.getContainerId(),
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
ExitCode.TERMINATED.getExitCode(),
"Container terminated before launch."));
return false;
}
return true;
}
protected List<String> getContainerLogDirs(List<String> logDirs) {
List<String> containerLogDirs = new ArrayList<>(logDirs.size());
String appIdStr = app.getAppId().toString();
String containerIdStr = ConverterUtils.toString(container.getContainerId());
String relativeContainerLogDir = ContainerLaunch
.getRelativeContainerLogDir(appIdStr, containerIdStr);
for(String logDir : logDirs) {
containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir);
}
return containerLogDirs;
}
protected List<String> getContainerLocalDirs(List<String> localDirs) {
List<String> containerLocalDirs = new ArrayList<>(localDirs.size());
String user = container.getUser();
String appIdStr = app.getAppId().toString();
String relativeContainerLocalDir = ContainerLocalizer.USERCACHE
+ Path.SEPARATOR + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE
+ Path.SEPARATOR + appIdStr + Path.SEPARATOR;
for (String localDir : localDirs) {
containerLocalDirs.add(localDir + Path.SEPARATOR
+ relativeContainerLocalDir);
}
return containerLocalDirs;
}
protected Map<Path, List<String>> getLocalizedResources()
throws YarnException {
Map<Path, List<String>> localResources = container.getLocalizedResources();
if (localResources == null) {
throw RPCUtil.getRemoteException(
"Unable to get local resources when Container " + container
+ " is at " + container.getContainerState());
}
return localResources;
}
@SuppressWarnings("unchecked")
protected int launchContainer(ContainerStartContext ctx) throws IOException {
ContainerId containerId = container.getContainerId();
// LaunchContainer is a blocking call. We are here almost means the
// container is launched, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
containerId,
ContainerEventType.CONTAINER_LAUNCHED));
context.getNMStateStore().storeContainerLaunched(containerId);
// Check if the container is signalled to be killed.
if (!shouldLaunchContainer.compareAndSet(false, true)) {
LOG.info("Container " + containerId + " not launched as "
+ "cleanup already called");
return ExitCode.TERMINATED.getExitCode();
} else {
exec.activateContainer(containerId, pidFilePath);
return exec.launchContainer(ctx);
}
}
protected void setContainerCompletedStatus(int exitCode) {
ContainerId containerId = container.getContainerId();
completed.set(true);
exec.deactivateContainer(containerId);
try {
if (!container.shouldRetry(exitCode)) {
context.getNMStateStore().storeContainerCompleted(containerId,
exitCode);
}
} catch (IOException e) {
LOG.error("Unable to set exit code for container " + containerId);
}
}
@SuppressWarnings("unchecked")
protected void handleContainerExitCode(int exitCode, Path containerLogDir) {
ContainerId containerId = container.getContainerId();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Container " + containerIdStr + " completed with exit code " LOG.debug("Container " + containerId + " completed with exit code "
+ ret); + exitCode);
} }
StringBuilder diagnosticInfo = StringBuilder diagnosticInfo =
new StringBuilder("Container exited with a non-zero exit code "); new StringBuilder("Container exited with a non-zero exit code ");
diagnosticInfo.append(ret); diagnosticInfo.append(exitCode);
diagnosticInfo.append(". "); diagnosticInfo.append(". ");
if (ret == ExitCode.FORCE_KILLED.getExitCode() if (exitCode == ExitCode.FORCE_KILLED.getExitCode()
|| ret == ExitCode.TERMINATED.getExitCode()) { || exitCode == ExitCode.TERMINATED.getExitCode()) {
// If the process was killed, Send container_cleanedup_after_kill and // If the process was killed, Send container_cleanedup_after_kill and
// just break out of this method. // just break out of this method.
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainerExitEvent(containerID, new ContainerExitEvent(containerId,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret, ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode,
diagnosticInfo.toString())); diagnosticInfo.toString()));
return ret; } else if (exitCode != 0) {
} handleContainerExitWithFailure(containerId, exitCode, containerLogDir,
if (ret != 0) {
handleContainerExitWithFailure(containerID, ret, containerLogDir,
diagnosticInfo); diagnosticInfo);
return ret; } else {
LOG.info("Container " + containerId + " succeeded ");
dispatcher.getEventHandler().handle(
new ContainerEvent(containerId,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
} }
LOG.info("Container " + containerIdStr + " succeeded ");
dispatcher.getEventHandler().handle(
new ContainerEvent(containerID,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
} }
/** /**
@ -389,8 +442,8 @@ public class ContainerLaunch implements Callable<Integer> {
* @param diagnosticInfo * @param diagnosticInfo
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void handleContainerExitWithFailure(ContainerId containerID, int ret, protected void handleContainerExitWithFailure(ContainerId containerID,
Path containerLogDir, StringBuilder diagnosticInfo) { int ret, Path containerLogDir, StringBuilder diagnosticInfo) {
LOG.warn(diagnosticInfo); LOG.warn(diagnosticInfo);
String errorFileNamePattern = String errorFileNamePattern =
@ -689,7 +742,8 @@ public class ContainerLaunch implements Callable<Integer> {
return appIdStr + Path.SEPARATOR + containerIdStr; return appIdStr + Path.SEPARATOR + containerIdStr;
} }
private String getContainerPrivateDir(String appIdStr, String containerIdStr) { protected String getContainerPrivateDir(String appIdStr,
String containerIdStr) {
return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+ Path.SEPARATOR; + Path.SEPARATOR;
} }
@ -1106,4 +1160,20 @@ public class ContainerLaunch implements Callable<Integer> {
public static String getExitCodeFile(String pidFile) { public static String getExitCodeFile(String pidFile) {
return pidFile + EXIT_CODE_FILE_SUFFIX; return pidFile + EXIT_CODE_FILE_SUFFIX;
} }
private void recordContainerLogDir(ContainerId containerId,
String logDir) throws IOException{
if (container.isRetryContextSet()) {
container.setLogDir(logDir);
context.getNMStateStore().storeContainerLogDir(containerId, logDir);
}
}
private void recordContainerWorkDir(ContainerId containerId,
String workDir) throws IOException{
if (container.isRetryContextSet()) {
container.setWorkDir(workDir);
context.getNMStateStore().storeContainerWorkDir(containerId, workDir);
}
}
} }

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
import org.apache.hadoop.yarn.util.ConverterUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* Relaunch container.
*/
public class ContainerRelaunch extends ContainerLaunch {
private static final Log LOG = LogFactory.getLog(ContainerRelaunch.class);
public ContainerRelaunch(Context context, Configuration configuration,
Dispatcher dispatcher, ContainerExecutor exec, Application app,
Container container, LocalDirsHandlerService dirsHandler,
ContainerManagerImpl containerManager) {
super(context, configuration, dispatcher, exec, app, container, dirsHandler,
containerManager);
}
@Override
@SuppressWarnings("unchecked")
public Integer call() {
if (!validateContainerState()) {
return 0;
}
ContainerId containerId = container.getContainerId();
String containerIdStr = ConverterUtils.toString(containerId);
int ret = -1;
Path containerLogDir;
try {
Path containerWorkDir = getContainerWorkDir();
cleanupPreviousContainerFiles(containerWorkDir);
containerLogDir = getContainerLogDir();
Map<Path, List<String>> localResources = getLocalizedResources();
String appIdStr = app.getAppId().toString();
Path nmPrivateContainerScriptPath =
getNmPrivateContainerScriptPath(appIdStr, containerIdStr);
Path nmPrivateTokensPath =
getNmPrivateTokensPath(appIdStr, containerIdStr);
pidFilePath = getPidFilePath(appIdStr, containerIdStr);
LOG.info("Relaunch container with "
+ "workDir = " + containerWorkDir.toString()
+ ", logDir = " + containerLogDir.toString()
+ ", nmPrivateContainerScriptPath = "
+ nmPrivateContainerScriptPath.toString()
+ ", nmPrivateTokensPath = " + nmPrivateTokensPath.toString()
+ ", pidFilePath = " + pidFilePath.toString());
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
List<String> containerLocalDirs = getContainerLocalDirs(localDirs);
List<String> containerLogDirs = getContainerLogDirs(logDirs);
if (!dirsHandler.areDisksHealthy()) {
ret = ContainerExitStatus.DISKS_FAILED;
throw new IOException("Most of the disks failed. "
+ dirsHandler.getDisksHealthReport(false));
}
ret = launchContainer(new ContainerStartContext.Builder()
.setContainer(container)
.setLocalizedResources(localResources)
.setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath)
.setNmPrivateTokensPath(nmPrivateTokensPath)
.setUser(container.getUser())
.setAppId(appIdStr)
.setContainerWorkDir(containerWorkDir)
.setLocalDirs(localDirs)
.setLogDirs(logDirs)
.setContainerLocalDirs(containerLocalDirs)
.setContainerLogDirs(containerLogDirs)
.build());
} catch (Throwable e) {
LOG.warn("Failed to relaunch container.", e);
dispatcher.getEventHandler().handle(new ContainerExitEvent(
containerId, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
e.getMessage()));
return ret;
} finally {
setContainerCompletedStatus(ret);
}
handleContainerExitCode(ret, containerLogDir);
return ret;
}
private Path getContainerWorkDir() throws IOException {
String containerWorkDir = container.getWorkDir();
if (containerWorkDir == null
|| !dirsHandler.isGoodLocalDir(containerWorkDir)) {
throw new IOException(
"Could not find a good work dir " + containerWorkDir
+ " for container " + container);
}
return new Path(containerWorkDir);
}
private Path getContainerLogDir() throws IOException {
String containerLogDir = container.getLogDir();
if (containerLogDir == null || !dirsHandler.isGoodLogDir(containerLogDir)) {
throw new IOException("Could not find a good log dir " + containerLogDir
+ " for container " + container);
}
return new Path(containerLogDir);
}
private Path getNmPrivateContainerScriptPath(String appIdStr,
String containerIdStr) throws IOException {
return dirsHandler.getLocalPathForRead(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ CONTAINER_SCRIPT);
}
private Path getNmPrivateTokensPath(String appIdStr,
String containerIdStr) throws IOException {
return dirsHandler.getLocalPathForRead(
getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
containerIdStr));
}
private Path getPidFilePath(String appIdStr,
String containerIdStr) throws IOException {
return dirsHandler.getLocalPathForRead(
getPidFileSubpath(appIdStr, containerIdStr));
}
/**
* Clean up container's previous files for container relaunch.
*/
private void cleanupPreviousContainerFiles(Path containerWorkDir) {
// delete ContainerScriptPath
deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT));
// delete TokensPath
deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE));
}
private void deleteAsUser(Path path) {
try {
exec.deleteAsUser(new DeletionAsUserContext.Builder()
.setUser(container.getUser())
.setSubDir(path)
.build());
} catch (Exception e) {
LOG.warn("Failed to delete " + path, e);
}
}
}

View File

@ -118,6 +118,16 @@ public class ContainersLauncher extends AbstractService
containerLauncher.submit(launch); containerLauncher.submit(launch);
running.put(containerId, launch); running.put(containerId, launch);
break; break;
case RELAUNCH_CONTAINER:
app = context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId());
ContainerRelaunch relaunch =
new ContainerRelaunch(context, getConfig(), dispatcher, exec, app,
event.getContainer(), dirsHandler, containerManager);
containerLauncher.submit(relaunch);
running.put(containerId, relaunch);
break;
case RECOVER_CONTAINER: case RECOVER_CONTAINER:
app = context.getApplications().get( app = context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId()); containerId.getApplicationAttemptId().getApplicationId());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
public enum ContainersLauncherEventType { public enum ContainersLauncherEventType {
LAUNCH_CONTAINER, LAUNCH_CONTAINER,
RELAUNCH_CONTAINER,
RECOVER_CONTAINER, RECOVER_CONTAINER,
CLEANUP_CONTAINER, // The process(grp) itself. CLEANUP_CONTAINER, // The process(grp) itself.
SIGNAL_CONTAINER, SIGNAL_CONTAINER,

View File

@ -110,6 +110,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
"/resourceChanged"; "/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
"/remainingRetryAttempts";
private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir";
private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir";
private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey";
private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey";
@ -247,6 +251,13 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
rcs.capability = new ResourcePBImpl( rcs.capability = new ResourcePBImpl(
ResourceProto.parseFrom(entry.getValue())); ResourceProto.parseFrom(entry.getValue()));
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue())));
} else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) {
rcs.setWorkDir(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) {
rcs.setLogDir(asString(entry.getValue()));
} else { } else {
throw new IOException("Unexpected container state key: " + key); throw new IOException("Unexpected container state key: " + key);
} }
@ -356,6 +367,42 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
} }
@Override
public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_REMAIN_RETRIES_KEY_SUFFIX;
try {
db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts)));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_WORK_DIR_KEY_SUFFIX;
try {
db.put(bytes(key), bytes(workDir));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void storeContainerLogDir(ContainerId containerId,
String logDir) throws IOException {
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_LOG_DIR_KEY_SUFFIX;
try {
db.put(bytes(key), bytes(logDir));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override @Override
public void removeContainer(ContainerId containerId) public void removeContainer(ContainerId containerId)
throws IOException { throws IOException {

View File

@ -99,6 +99,21 @@ public class NMNullStateStoreService extends NMStateStoreService {
throws IOException { throws IOException {
} }
@Override
public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
}
@Override
public void storeContainerLogDir(ContainerId containerId,
String logDir) throws IOException {
}
@Override @Override
public void removeContainer(ContainerId containerId) throws IOException { public void removeContainer(ContainerId containerId) throws IOException {
} }

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@ -72,6 +73,9 @@ public abstract class NMStateStoreService extends AbstractService {
String diagnostics = ""; String diagnostics = "";
StartContainerRequest startRequest; StartContainerRequest startRequest;
Resource capability; Resource capability;
private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID;
private String workDir;
private String logDir;
public RecoveredContainerStatus getStatus() { public RecoveredContainerStatus getStatus() {
return status; return status;
@ -97,6 +101,30 @@ public abstract class NMStateStoreService extends AbstractService {
return capability; return capability;
} }
public int getRemainingRetryAttempts() {
return remainingRetryAttempts;
}
public void setRemainingRetryAttempts(int retryAttempts) {
this.remainingRetryAttempts = retryAttempts;
}
public String getWorkDir() {
return workDir;
}
public void setWorkDir(String workDir) {
this.workDir = workDir;
}
public String getLogDir() {
return logDir;
}
public void setLogDir(String logDir) {
this.logDir = logDir;
}
@Override @Override
public String toString() { public String toString() {
return new StringBuffer("Status: ").append(getStatus()) return new StringBuffer("Status: ").append(getStatus())
@ -105,6 +133,9 @@ public abstract class NMStateStoreService extends AbstractService {
.append(", Diagnostics: ").append(getDiagnostics()) .append(", Diagnostics: ").append(getDiagnostics())
.append(", Capability: ").append(getCapability()) .append(", Capability: ").append(getCapability())
.append(", StartRequest: ").append(getStartRequest()) .append(", StartRequest: ").append(getStartRequest())
.append(", RemainingRetryAttempts: ").append(remainingRetryAttempts)
.append(", WorkDir: ").append(workDir)
.append(", LogDir: ").append(logDir)
.toString(); .toString();
} }
} }
@ -323,6 +354,34 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void storeContainerDiagnostics(ContainerId containerId, public abstract void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException; StringBuilder diagnostics) throws IOException;
/**
* Record remaining retry attempts for a container.
* @param containerId the container ID
* @param remainingRetryAttempts the remain retry times when container
* fails to run
* @throws IOException
*/
public abstract void storeContainerRemainingRetryAttempts(
ContainerId containerId, int remainingRetryAttempts) throws IOException;
/**
* Record working directory for a container.
* @param containerId the container ID
* @param workDir the working directory
* @throws IOException
*/
public abstract void storeContainerWorkDir(
ContainerId containerId, String workDir) throws IOException;
/**
* Record log directory for a container.
* @param containerId the container ID
* @param logDir the log directory
* @throws IOException
*/
public abstract void storeContainerLogDir(
ContainerId containerId, String logDir) throws IOException;
/** /**
* Remove records corresponding to a container * Remove records corresponding to a container
* @param containerId the container ID * @param containerId the container ID

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -660,6 +662,69 @@ public class TestContainer {
} }
} }
@Test
public void testContainerRetry() throws Exception{
ContainerRetryContext containerRetryContext1 = ContainerRetryContext
.newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 3, 0);
testContainerRetry(containerRetryContext1, 2, 0);
ContainerRetryContext containerRetryContext2 = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 0);
testContainerRetry(containerRetryContext2, 2, 3);
ContainerRetryContext containerRetryContext3 = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 0);
// If exit code is 0, it will not retry
testContainerRetry(containerRetryContext3, 0, 0);
ContainerRetryContext containerRetryContext4 = ContainerRetryContext
.newInstance(
ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, null, 3, 0);
testContainerRetry(containerRetryContext4, 2, 0);
HashSet<Integer> errorCodes = new HashSet<>();
errorCodes.add(2);
errorCodes.add(6);
ContainerRetryContext containerRetryContext5 = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
errorCodes, 3, 0);
testContainerRetry(containerRetryContext5, 2, 3);
HashSet<Integer> errorCodes2 = new HashSet<>();
errorCodes.add(143);
ContainerRetryContext containerRetryContext6 = ContainerRetryContext
.newInstance(ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES,
errorCodes2, 3, 0);
// If exit code is 143(SIGTERM), it will not retry even it is in errorCodes.
testContainerRetry(containerRetryContext6, 143, 0);
}
private void testContainerRetry(ContainerRetryContext containerRetryContext,
int exitCode, int expectedRetries) throws Exception{
WrappedContainer wc = null;
try {
int retryTimes = 0;
wc = new WrappedContainer(24, 314159265358979L, 4344, "yak",
containerRetryContext);
wc.initContainer();
wc.localizeResources();
wc.launchContainer();
while (true) {
wc.containerFailed(exitCode);
if (wc.c.getContainerState() == ContainerState.RUNNING) {
retryTimes ++;
} else {
break;
}
}
Assert.assertEquals(expectedRetries, retryTimes);
} finally {
if (wc != null) {
wc.finished();
}
}
}
private void verifyCleanupCall(WrappedContainer wc) throws Exception { private void verifyCleanupCall(WrappedContainer wc) throws Exception {
ResourcesReleasedMatcher matchesReq = ResourcesReleasedMatcher matchesReq =
new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@ -802,12 +867,23 @@ public class TestContainer {
WrappedContainer(int appId, long timestamp, int id, String user) WrappedContainer(int appId, long timestamp, int id, String user)
throws IOException { throws IOException {
this(appId, timestamp, id, user, true, false); this(appId, timestamp, id, user, null);
}
WrappedContainer(int appId, long timestamp, int id, String user,
ContainerRetryContext containerRetryContext) throws IOException {
this(appId, timestamp, id, user, true, false, containerRetryContext);
}
WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) throws IOException {
this(appId, timestamp, id, user, withLocalRes, withServiceData, null);
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
WrappedContainer(int appId, long timestamp, int id, String user, WrappedContainer(int appId, long timestamp, int id, String user,
boolean withLocalRes, boolean withServiceData) throws IOException { boolean withLocalRes, boolean withServiceData,
ContainerRetryContext containerRetryContext) throws IOException {
dispatcher = new DrainDispatcher(); dispatcher = new DrainDispatcher();
dispatcher.init(new Configuration()); dispatcher.init(new Configuration());
@ -884,6 +960,7 @@ public class TestContainer {
serviceData = Collections.<String, ByteBuffer> emptyMap(); serviceData = Collections.<String, ByteBuffer> emptyMap();
} }
when(ctxt.getServiceData()).thenReturn(serviceData); when(ctxt.getServiceData()).thenReturn(serviceData);
when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext);
c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier, c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier,
context); context);
@ -1005,6 +1082,10 @@ public class TestContainer {
assert containerStatus.getDiagnostics().contains(diagnosticMsg); assert containerStatus.getDiagnostics().contains(diagnosticMsg);
assert containerStatus.getExitStatus() == exitCode; assert containerStatus.getExitStatus() == exitCode;
drainDispatcherEvents(); drainDispatcherEvents();
// If container needs retry, relaunch it
if (c.getContainerState() == ContainerState.RELAUNCHING) {
launchContainer();
}
} }
public void killContainer() { public void killContainer() {

View File

@ -115,6 +115,9 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.diagnostics = rcs.diagnostics;
rcsCopy.startRequest = rcs.startRequest; rcsCopy.startRequest = rcs.startRequest;
rcsCopy.capability = rcs.capability; rcsCopy.capability = rcs.capability;
rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts());
rcsCopy.setWorkDir(rcs.getWorkDir());
rcsCopy.setLogDir(rcs.getLogDir());
result.add(rcsCopy); result.add(rcsCopy);
} }
return result; return result;
@ -167,6 +170,27 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
rcs.exitCode = exitCode; rcs.exitCode = exitCode;
} }
@Override
public void storeContainerRemainingRetryAttempts(ContainerId containerId,
int remainingRetryAttempts) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.setRemainingRetryAttempts(remainingRetryAttempts);
}
@Override
public void storeContainerWorkDir(ContainerId containerId,
String workDir) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.setWorkDir(workDir);
}
@Override
public void storeContainerLogDir(ContainerId containerId,
String logDir) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.setLogDir(logDir);
}
@Override @Override
public synchronized void removeContainer(ContainerId containerId) public synchronized void removeContainer(ContainerId containerId)
throws IOException { throws IOException {

View File

@ -334,6 +334,18 @@ public class TestNMLeveldbStateStoreService {
assertEquals(containerReq, rcs.getStartRequest()); assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics()); assertEquals(diags.toString(), rcs.getDiagnostics());
// store remainingRetryAttempts, workDir and logDir
stateStore.storeContainerRemainingRetryAttempts(containerId, 6);
stateStore.storeContainerWorkDir(containerId, "/test/workdir");
stateStore.storeContainerLogDir(containerId, "/test/logdir");
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(6, rcs.getRemainingRetryAttempts());
assertEquals("/test/workdir", rcs.getWorkDir());
assertEquals("/test/logdir", rcs.getLogDir());
// remove the container and verify not recovered // remove the container and verify not recovered
stateStore.removeContainer(containerId); stateStore.removeContainer(containerId);
restartStateStore(); restartStateStore();

View File

@ -144,4 +144,32 @@ public class MockContainer implements Container {
public NMContainerStatus getNMContainerStatus() { public NMContainerStatus getNMContainerStatus() {
return null; return null;
} }
@Override
public boolean isRetryContextSet() {
return false;
}
@Override
public boolean shouldRetry(int errorCode) {
return false;
}
@Override
public String getWorkDir() {
return null;
}
@Override
public void setWorkDir(String workDir) {
}
@Override
public String getLogDir() {
return null;
}
@Override
public void setLogDir(String logDir) {
}
} }