diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java index 932945bdd10..6d4bccd80c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerLaunchContext.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; import org.apache.hadoop.yarn.server.api.AuxiliaryService; @@ -46,6 +47,7 @@ *
  • Optional, application-specific binary service data.
  • *
  • Environment variables for the launched process.
  • *
  • Command to launch the container.
  • + *
  • Retry strategy when container exits with failure.
  • * * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest) @@ -61,6 +63,18 @@ public static ContainerLaunchContext newInstance( Map environment, List commands, Map serviceData, ByteBuffer tokens, Map acls) { + return newInstance(localResources, environment, commands, serviceData, + tokens, acls, null); + } + + @Public + @Unstable + public static ContainerLaunchContext newInstance( + Map localResources, + Map environment, List commands, + Map serviceData, ByteBuffer tokens, + Map acls, + ContainerRetryContext containerRetryContext) { ContainerLaunchContext container = Records.newRecord(ContainerLaunchContext.class); container.setLocalResources(localResources); @@ -69,6 +83,7 @@ public static ContainerLaunchContext newInstance( container.setServiceData(serviceData); container.setTokens(tokens); container.setApplicationACLs(acls); + container.setContainerRetryContext(containerRetryContext); return container; } @@ -195,4 +210,22 @@ public static ContainerLaunchContext newInstance( @Public @Stable public abstract void setApplicationACLs(Map acls); + + /** + * Get the ContainerRetryContext to relaunch container. + * @return ContainerRetryContext to relaunch container. + */ + @Public + @Unstable + public abstract ContainerRetryContext getContainerRetryContext(); + + /** + * Set the ContainerRetryContext to relaunch container. + * @param containerRetryContext ContainerRetryContext to + * relaunch container. + */ + @Public + @Unstable + public abstract void setContainerRetryContext( + ContainerRetryContext containerRetryContext); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java new file mode 100644 index 00000000000..ef8bd1763e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryContext.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +import java.util.Set; + +/** + * {@code ContainerRetryContext} indicates how container retry after it fails + * to run. + *

    + * It provides details such as: + *

      + *
    • + * {@link ContainerRetryPolicy} : + * - NEVER_RETRY(DEFAULT value): no matter what error code is when container + * fails to run, just do not retry. + * - RETRY_ON_ALL_ERRORS: no matter what error code is, when container fails + * to run, just retry. + * - RETRY_ON_SPECIFIC_ERROR_CODES: when container fails to run, do retry if + * the error code is one of errorCodes, otherwise do not retry. + * + * Note: if error code is 137(SIGKILL) or 143(SIGTERM), it will not retry + * because it is usually killed on purpose. + *
    • + *
    • + * maxRetries specifies how many times to retry if need to retry. + * If the value is -1, it means retry forever. + *
    • + *
    • retryInterval specifies delaying some time before relaunch + * container, the unit is millisecond.
    • + *
    + */ +@Public +@Unstable +public abstract class ContainerRetryContext { + public static final int RETRY_FOREVER = -1; + public static final int RETRY_INVALID = -1000; + public static final ContainerRetryContext NEVER_RETRY_CONTEXT = + newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 0, 0); + + @Private + @Unstable + public static ContainerRetryContext newInstance( + ContainerRetryPolicy retryPolicy, Set errorCodes, + int maxRetries, int retryInterval) { + ContainerRetryContext containerRetryContext = + Records.newRecord(ContainerRetryContext.class); + containerRetryContext.setRetryPolicy(retryPolicy); + containerRetryContext.setErrorCodes(errorCodes); + containerRetryContext.setMaxRetries(maxRetries); + containerRetryContext.setRetryInterval(retryInterval); + return containerRetryContext; + } + + public abstract ContainerRetryPolicy getRetryPolicy(); + public abstract void setRetryPolicy(ContainerRetryPolicy retryPolicy); + public abstract Set getErrorCodes(); + public abstract void setErrorCodes(Set errorCodes); + public abstract int getMaxRetries(); + public abstract void setMaxRetries(int maxRetries); + public abstract int getRetryInterval(); + public abstract void setRetryInterval(int retryInterval); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java new file mode 100644 index 00000000000..75c9d105afd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerRetryPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +/** + *

    Retry policy for relaunching a Container.

    + */ +@Public +@Unstable +public enum ContainerRetryPolicy { + /** Never retry. */ + NEVER_RETRY, + /** Retry for all error codes. */ + RETRY_ON_ALL_ERRORS, + /** Retry for specific error codes. */ + RETRY_ON_SPECIFIC_ERROR_CODES +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 75965dd379b..a4213cef44e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -846,6 +846,14 @@ public static boolean isAclEnabled(Configuration conf) { NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; + /** + * Maximum size of contain's diagnostics to keep for relaunching container + * case. + **/ + public static final String NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = + NM_PREFIX + "container-diagnostics-maximum-size"; + public static final int DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE = 10000; + /** Interval at which the delayed token removal thread runs */ public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS = RM_PREFIX + "delayed.delegation-token.removal-interval-ms"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 635f2f061ba..60cdfd155e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -511,6 +511,7 @@ message ContainerLaunchContextProto { repeated StringStringMapProto environment = 4; repeated string command = 5; repeated ApplicationACLMapProto application_ACLs = 6; + optional ContainerRetryContextProto container_retry_context = 7; } message ContainerStatusProto { @@ -534,6 +535,19 @@ message ContainerResourceChangeRequestProto { optional ResourceProto capability = 2; } +message ContainerRetryContextProto { + optional ContainerRetryPolicyProto retry_policy = 1 [default = NEVER_RETRY]; + repeated int32 error_codes = 2; + optional int32 max_retries = 3 [default = 0]; + optional int32 retry_interval = 4 [default = 0]; +} + +enum ContainerRetryPolicyProto { + NEVER_RETRY = 0; + RETRY_ON_ALL_ERRORS = 1; + RETRY_ON_SPECIFIC_ERROR_CODES = 2; +} + //////////////////////////////////////////////////////////////////////// ////// From common////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 2b85ba8dc81..297397421df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -76,6 +77,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -256,6 +259,13 @@ public static enum DSEntity { // File length needed for local resource private long shellScriptPathLen = 0; + // Container retry options + private ContainerRetryPolicy containerRetryPolicy = + ContainerRetryPolicy.NEVER_RETRY; + private Set containerRetryErrorCodes = null; + private int containerMaxRetries = 0; + private int containrRetryInterval = 0; + // Timeline domain ID private String domainId = null; @@ -378,6 +388,18 @@ public boolean init(String[] args) throws ParseException, IOException { opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed"); opts.addOption("priority", true, "Application Priority. Default 0"); + opts.addOption("container_retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); + opts.addOption("container_retry_error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " + + "codes is specified with this option, " + + "e.g. --container_retry_error_codes 1,2,3"); + opts.addOption("container_max_retries", true, + "If container could retry, it specifies max retires"); + opts.addOption("container_retry_interval", true, + "Interval between each retry, unit is milliseconds"); opts.addOption("debug", false, "Dump out debug information"); opts.addOption("help", false, "Print usage"); @@ -515,6 +537,21 @@ public boolean init(String[] args) throws ParseException, IOException { } requestPriority = Integer.parseInt(cliParser .getOptionValue("priority", "0")); + + containerRetryPolicy = ContainerRetryPolicy.values()[ + Integer.parseInt(cliParser.getOptionValue( + "container_retry_policy", "0"))]; + if (cliParser.hasOption("container_retry_error_codes")) { + containerRetryErrorCodes = new HashSet<>(); + for (String errorCode : + cliParser.getOptionValue("container_retry_error_codes").split(",")) { + containerRetryErrorCodes.add(Integer.parseInt(errorCode)); + } + } + containerMaxRetries = Integer.parseInt( + cliParser.getOptionValue("container_max_retries", "0")); + containrRetryInterval = Integer.parseInt(cliParser.getOptionValue( + "container_retry_interval", "0")); return true; } @@ -1069,9 +1106,13 @@ public void run() { // "hadoop dfs" command inside the distributed shell. Map myShellEnv = new HashMap(shellEnv); myShellEnv.put(YARN_SHELL_ID, shellId); + ContainerRetryContext containerRetryContext = + ContainerRetryContext.newInstance( + containerRetryPolicy, containerRetryErrorCodes, + containerMaxRetries, containrRetryInterval); ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, myShellEnv, commands, null, allTokens.duplicate(), - null); + null, containerRetryContext); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index e864ad23005..9139b08e530 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -169,6 +169,8 @@ public class Client { private long attemptFailuresValidityInterval = -1; + private Vector containerRetryOptions = new Vector<>(5); + // Debug flag boolean debugFlag = false; @@ -288,6 +290,18 @@ public Client(Configuration conf) throws Exception { + " will be allocated, \"\" means containers" + " can be allocated anywhere, if you don't specify the option," + " default node_label_expression of queue will be used."); + opts.addOption("container_retry_policy", true, + "Retry policy when container fails to run, " + + "0: NEVER_RETRY, 1: RETRY_ON_ALL_ERRORS, " + + "2: RETRY_ON_SPECIFIC_ERROR_CODES"); + opts.addOption("container_retry_error_codes", true, + "When retry policy is set to RETRY_ON_SPECIFIC_ERROR_CODES, error " + + "codes is specified with this option, " + + "e.g. --container_retry_error_codes 1,2,3"); + opts.addOption("container_max_retries", true, + "If container could retry, it specifies max retires"); + opts.addOption("container_retry_interval", true, + "Interval between each retry, unit is milliseconds"); } /** @@ -430,6 +444,24 @@ public boolean init(String[] args) throws ParseException { } } + // Get container retry options + if (cliParser.hasOption("container_retry_policy")) { + containerRetryOptions.add("--container_retry_policy " + + cliParser.getOptionValue("container_retry_policy")); + } + if (cliParser.hasOption("container_retry_error_codes")) { + containerRetryOptions.add("--container_retry_error_codes " + + cliParser.getOptionValue("container_retry_error_codes")); + } + if (cliParser.hasOption("container_max_retries")) { + containerRetryOptions.add("--container_max_retries " + + cliParser.getOptionValue("container_max_retries")); + } + if (cliParser.hasOption("container_retry_interval")) { + containerRetryOptions.add("--container_retry_interval " + + cliParser.getOptionValue("container_retry_interval")); + } + return true; } @@ -639,6 +671,8 @@ public boolean run() throws IOException, YarnException { vargs.add("--debug"); } + vargs.addAll(containerRetryOptions); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java index 30403caa971..1efe5417103 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerLaunchContextPBImpl.java @@ -29,10 +29,12 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerLaunchContextProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringBytesMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.StringLocalResourceMapProto; @@ -56,7 +58,8 @@ public class ContainerLaunchContextPBImpl private Map environment = null; private List commands = null; private Map applicationACLS = null; - + private ContainerRetryContext containerRetryContext = null; + public ContainerLaunchContextPBImpl() { builder = ContainerLaunchContextProto.newBuilder(); } @@ -120,6 +123,10 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.containerRetryContext != null) { + builder.setContainerRetryContext( + convertToProtoFormat(this.containerRetryContext)); + } } private void mergeLocalToProto() { @@ -462,6 +469,27 @@ public void setApplicationACLs( this.applicationACLS.putAll(appACLs); } + public ContainerRetryContext getContainerRetryContext() { + ContainerLaunchContextProtoOrBuilder p = viaProto ? proto : builder; + if (this.containerRetryContext != null) { + return this.containerRetryContext; + } + if (!p.hasContainerRetryContext()) { + return null; + } + this.containerRetryContext = convertFromProtoFormat( + p.getContainerRetryContext()); + return this.containerRetryContext; + } + + public void setContainerRetryContext(ContainerRetryContext retryContext) { + maybeInitBuilder(); + if (retryContext == null) { + builder.clearContainerRetryContext(); + } + this.containerRetryContext = retryContext; + } + private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { return new LocalResourcePBImpl(p); } @@ -469,4 +497,14 @@ private LocalResourcePBImpl convertFromProtoFormat(LocalResourceProto p) { private LocalResourceProto convertToProtoFormat(LocalResource t) { return ((LocalResourcePBImpl)t).getProto(); } -} + + private ContainerRetryContextPBImpl convertFromProtoFormat( + ContainerRetryContextProto p) { + return new ContainerRetryContextPBImpl(p); + } + + private ContainerRetryContextProto convertToProtoFormat( + ContainerRetryContext t) { + return ((ContainerRetryContextPBImpl)t).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java new file mode 100644 index 00000000000..a5ef70de2f6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerRetryContextPBImpl.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + + +import com.google.protobuf.TextFormat; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProtoOrBuilder; + +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of ContainerRetryContext. + */ +public class ContainerRetryContextPBImpl extends ContainerRetryContext { + private ContainerRetryContextProto proto = + ContainerRetryContextProto.getDefaultInstance(); + private ContainerRetryContextProto.Builder builder = null; + private boolean viaProto = false; + + private Set errorCodes = null; + + public ContainerRetryContextPBImpl() { + builder = ContainerRetryContextProto.newBuilder(); + } + + public ContainerRetryContextPBImpl(ContainerRetryContextProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerRetryContextProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + if (this.errorCodes != null) { + builder.clearErrorCodes(); + builder.addAllErrorCodes(this.errorCodes); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRetryContextProto.newBuilder(proto); + } + viaProto = false; + } + + public ContainerRetryPolicy getRetryPolicy() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryPolicy()) { + return ContainerRetryPolicy.NEVER_RETRY; + } + return convertFromProtoFormat(p.getRetryPolicy()); + } + + public void setRetryPolicy(ContainerRetryPolicy containerRetryPolicy) { + maybeInitBuilder(); + if (containerRetryPolicy == null) { + builder.clearRetryPolicy(); + return; + } + builder.setRetryPolicy(convertToProtoFormat(containerRetryPolicy)); + } + + private void initErrorCodes() { + if (this.errorCodes != null) { + return; + } + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + this.errorCodes = new HashSet<>(); + this.errorCodes.addAll(p.getErrorCodesList()); + } + + public Set getErrorCodes() { + initErrorCodes(); + return this.errorCodes; + } + + public void setErrorCodes(Set errCodes) { + maybeInitBuilder(); + if (errCodes == null || errCodes.isEmpty()) { + builder.clearErrorCodes(); + } + this.errorCodes = errCodes; + } + + public int getMaxRetries() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxRetries()) { + return 0; + } + return p.getMaxRetries(); + } + + public void setMaxRetries(int maxRetries) { + maybeInitBuilder(); + builder.setMaxRetries(maxRetries); + } + + public int getRetryInterval() { + ContainerRetryContextProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRetryInterval()) { + return 0; + } + return p.getRetryInterval(); + } + + public void setRetryInterval(int retryInterval) { + maybeInitBuilder(); + builder.setRetryInterval(retryInterval); + } + + private ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy containerRetryPolicy) { + return ProtoUtils.convertToProtoFormat(containerRetryPolicy); + } + + private ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto containerRetryPolicyProto) { + return ProtoUtils.convertFromProtoFormat(containerRetryPolicyProto); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 9d683f1924d..236df90d927 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; @@ -309,4 +311,17 @@ public static Resource convertFromProtoFormat( YarnProtos.ResourceProto resource) { return new ResourcePBImpl(resource); } + + /* + * ContainerRetryPolicy + */ + public static ContainerRetryPolicyProto convertToProtoFormat( + ContainerRetryPolicy e) { + return ContainerRetryPolicyProto.valueOf(e.name()); + } + + public static ContainerRetryPolicy convertFromProtoFormat( + ContainerRetryPolicyProto e) { + return ContainerRetryPolicy.valueOf(e.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 1c5ee9535a2..2be402ab6f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1581,6 +1581,13 @@ NONE + + Maximum size of contain's diagnostics to keep for relaunching + container case. + yarn.nodemanager.container-diagnostics-maximum-size + 10000 + + Max number of threads in NMClientAsync to process container management events diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 07b06fa8fe4..14f61b736eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -121,6 +121,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -165,6 +166,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceChangeRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -199,6 +201,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceChangeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryContextProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -354,7 +357,7 @@ private static Object genTypeValue(Type type) { return rand.nextBoolean(); } else if (type.equals(byte.class)) { return bytes[rand.nextInt(4)]; - } else if (type.equals(int.class)) { + } else if (type.equals(int.class) || type.equals(Integer.class)) { return rand.nextInt(1000000); } else if (type.equals(long.class)) { return Long.valueOf(rand.nextInt(1000000)); @@ -478,6 +481,7 @@ public static void setup() throws Exception { generateByNewInstance(ApplicationResourceUsageReport.class); generateByNewInstance(ApplicationReport.class); generateByNewInstance(Container.class); + generateByNewInstance(ContainerRetryContext.class); generateByNewInstance(ContainerLaunchContext.class); generateByNewInstance(ApplicationSubmissionContext.class); generateByNewInstance(ContainerReport.class); @@ -968,6 +972,12 @@ public void testContainerIdPBImpl() throws Exception { validatePBImplRecord(ContainerIdPBImpl.class, ContainerIdProto.class); } + @Test + public void testContainerRetryPBImpl() throws Exception { + validatePBImplRecord(ContainerRetryContextPBImpl.class, + ContainerRetryContextProto.class); + } + @Test public void testContainerLaunchContextPBImpl() throws Exception { validatePBImplRecord(ContainerLaunchContextPBImpl.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 8c74bf5831f..d08ee67311a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -308,6 +308,7 @@ public void writeLaunchEnv(OutputStream out, } public enum ExitCode { + SUCCESS(0), FORCE_KILLED(137), TERMINATED(143), LOST(154); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 5cc4e192870..f8cb4eee709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -408,6 +408,24 @@ public long getLastDisksCheckTime() { return lastDisksCheckTime; } + public boolean isGoodLocalDir(String path) { + return isInGoodDirs(getLocalDirs(), path); + } + + public boolean isGoodLogDir(String path) { + return isInGoodDirs(getLogDirs(), path); + } + + private boolean isInGoodDirs(List goodDirs, String path) { + for (String goodDir : goodDirs) { + if (path.startsWith(goodDir)) { + return true; + } + } + + return false; + } + /** * Set good local dirs and good log dirs in the configuration so that the * LocalDirAllocator objects will use this updated configuration only. @@ -551,6 +569,10 @@ public Path getLocalPathForWrite(String pathStr, long size, checkWrite); } + public Path getLocalPathForRead(String pathStr) throws IOException { + return getPathToRead(pathStr, getLocalDirsForRead()); + } + public Path getLogPathForWrite(String pathStr, boolean checkWrite) throws IOException { return logDirsAllocator.getLocalPathForWrite(pathStr, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 29ab7f997a5..162823c9cdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -354,8 +354,7 @@ private void recoverContainer(RecoveredContainerState rcs) YarnServerSecurityUtils.parseCredentials(launchContext); Container container = new ContainerImpl(getConfig(), dispatcher, req.getContainerLaunchContext(), - credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability(), context); + credentials, metrics, token, context, rcs); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 1d2ec5687b8..7571964d93f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -55,6 +55,18 @@ public interface Container extends EventHandler { NMContainerStatus getNMContainerStatus(); + boolean isRetryContextSet(); + + boolean shouldRetry(int errorCode); + + String getWorkDir(); + + void setWorkDir(String workDir); + + String getLogDir(); + + void setLogDir(String logDir); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 676721435c1..b1ddc2ef952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; @@ -71,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -98,11 +102,17 @@ public class ContainerImpl implements Container { private final String user; private int exitCode = ContainerExitStatus.INVALID; private final StringBuilder diagnostics; + private final int diagnosticsMaxSize; private boolean wasLaunched; private long containerLocalizationStartTime; private long containerLaunchStartTime; private ContainerMetrics containerMetrics; private static Clock clock = SystemClock.getInstance(); + private final ContainerRetryContext containerRetryContext; + // remaining retries to relaunch container if needed + private int remainingRetryAttempts; + private String workDir; + private String logDir; /** The NM-wide configuration - not specific to this container */ private final Configuration daemonConf; @@ -138,6 +148,16 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, this.dispatcher = dispatcher; this.stateStore = context.getNMStateStore(); this.launchContext = launchContext; + if (launchContext != null + && launchContext.getContainerRetryContext() != null) { + this.containerRetryContext = launchContext.getContainerRetryContext(); + } else { + this.containerRetryContext = ContainerRetryContext.NEVER_RETRY_CONTEXT; + } + this.remainingRetryAttempts = containerRetryContext.getMaxRetries(); + this.diagnosticsMaxSize = conf.getInt( + YarnConfiguration.NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE, + YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE); this.containerTokenIdentifier = containerTokenIdentifier; this.containerId = containerTokenIdentifier.getContainerID(); this.resource = containerTokenIdentifier.getResource(); @@ -172,22 +192,24 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, public ContainerImpl(Configuration conf, Dispatcher dispatcher, ContainerLaunchContext launchContext, Credentials creds, NodeManagerMetrics metrics, - ContainerTokenIdentifier containerTokenIdentifier, - RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled, Resource recoveredCapability, - Context context) { + ContainerTokenIdentifier containerTokenIdentifier, Context context, + RecoveredContainerState rcs) { this(conf, dispatcher, launchContext, creds, metrics, containerTokenIdentifier, context); - this.recoveredStatus = recoveredStatus; - this.exitCode = exitCode; - this.recoveredAsKilled = wasKilled; - this.diagnostics.append(diagnostics); + this.recoveredStatus = rcs.getStatus(); + this.exitCode = rcs.getExitCode(); + this.recoveredAsKilled = rcs.getKilled(); + this.diagnostics.append(rcs.getDiagnostics()); + Resource recoveredCapability = rcs.getCapability(); if (recoveredCapability != null && !this.resource.equals(recoveredCapability)) { // resource capability had been updated before NM was down this.resource = Resource.newInstance(recoveredCapability.getMemory(), recoveredCapability.getVirtualCores()); } + this.remainingRetryAttempts = rcs.getRemainingRetryAttempts(); + this.workDir = rcs.getWorkDir(); + this.logDir = rcs.getLogDir(); } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = @@ -267,9 +289,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, new ExitedWithSuccessTransition(true)) .addTransition(ContainerState.RUNNING, - ContainerState.EXITED_WITH_FAILURE, + EnumSet.of(ContainerState.RELAUNCHING, + ContainerState.EXITED_WITH_FAILURE), ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, - new ExitedWithFailureTransition(true)) + new RetryFailureTransition()) .addTransition(ContainerState.RUNNING, ContainerState.RUNNING, ContainerEventType.UPDATE_DIAGNOSTICS_MSG, UPDATE_DIAGNOSTICS_TRANSITION) @@ -279,6 +302,19 @@ ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.CONTAINER_KILLED_ON_REQUEST, new KilledExternallyTransition()) + // From RELAUNCHING State + .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING, + ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) + .addTransition(ContainerState.RELAUNCHING, + ContainerState.EXITED_WITH_FAILURE, + ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, + new ExitedWithFailureTransition(true)) + .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING, + ContainerEventType.UPDATE_DIAGNOSTICS_MSG, + UPDATE_DIAGNOSTICS_TRANSITION) + .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, + ContainerEventType.KILL_CONTAINER, new KillTransition()) + // From CONTAINER_EXITED_WITH_SUCCESS State .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP, @@ -382,6 +418,7 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { case LOCALIZATION_FAILED: case LOCALIZED: case RUNNING: + case RELAUNCHING: case EXITED_WITH_SUCCESS: case EXITED_WITH_FAILURE: case KILLING: @@ -408,7 +445,8 @@ public String getUser() { public Map> getLocalizedResources() { this.readLock.lock(); try { - if (ContainerState.LOCALIZED == getContainerState()) { + if (ContainerState.LOCALIZED == getContainerState() + || ContainerState.RELAUNCHING == getContainerState()) { return localizedResources; } else { return null; @@ -501,6 +539,26 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { } } + @Override + public String getWorkDir() { + return workDir; + } + + @Override + public void setWorkDir(String workDir) { + this.workDir = workDir; + } + + @Override + public String getLogDir() { + return logDir; + } + + @Override + public void setLogDir(String logDir) { + this.logDir = logDir; + } + @SuppressWarnings("unchecked") private void sendFinishedEvents() { // Inform the application @@ -527,6 +585,14 @@ private void sendLaunchEvent() { new ContainersLauncherEvent(this, launcherEvent)); } + @SuppressWarnings("unchecked") // dispatcher not typed + private void sendRelaunchEvent() { + ContainersLauncherEventType launcherEvent = + ContainersLauncherEventType.RELAUNCH_CONTAINER; + dispatcher.getEventHandler().handle( + new ContainersLauncherEvent(this, launcherEvent)); + } + // Inform the ContainersMonitor to start monitoring the container's // resource usage. @SuppressWarnings("unchecked") // dispatcher not typed @@ -552,6 +618,9 @@ private void addDiagnostics(String... diags) { for (String s : diags) { this.diagnostics.append(s); } + if (isRetryContextSet() && diagnostics.length() > diagnosticsMaxSize) { + diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize); + } try { stateStore.storeContainerDiagnostics(containerId, diagnostics); } catch (IOException e) { @@ -876,6 +945,100 @@ public void transition(ContainerImpl container, ContainerEvent event) { } } + /** + * Transition to EXITED_WITH_FAILURE or LOCALIZED state upon + * CONTAINER_EXITED_WITH_FAILURE state. + **/ + @SuppressWarnings("unchecked") // dispatcher not typed + static class RetryFailureTransition implements + MultipleArcTransition { + + @Override + public ContainerState transition(final ContainerImpl container, + ContainerEvent event) { + ContainerExitEvent exitEvent = (ContainerExitEvent) event; + container.exitCode = exitEvent.getExitCode(); + if (exitEvent.getDiagnosticInfo() != null) { + if (container.containerRetryContext.getRetryPolicy() + != ContainerRetryPolicy.NEVER_RETRY) { + int n = container.containerRetryContext.getMaxRetries() + - container.remainingRetryAttempts; + container.addDiagnostics("Diagnostic message from attempt " + + n + " : ", "\n"); + } + container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n"); + } + + if (container.shouldRetry(container.exitCode)) { + if (container.remainingRetryAttempts > 0) { + container.remainingRetryAttempts--; + try { + container.stateStore.storeContainerRemainingRetryAttempts( + container.getContainerId(), container.remainingRetryAttempts); + } catch (IOException e) { + LOG.warn( + "Unable to update remainingRetryAttempts in state store for " + + container.getContainerId(), e); + } + } + LOG.info("Relaunching Container " + container.getContainerId() + + ". Remaining retry attempts(after relaunch) : " + + container.remainingRetryAttempts + + ". Interval between retries is " + + container.containerRetryContext.getRetryInterval() + "ms"); + container.wasLaunched = false; + container.metrics.endRunningContainer(); + if (container.containerRetryContext.getRetryInterval() == 0) { + container.sendRelaunchEvent(); + } else { + // wait for some time, then send launch event + new Thread() { + @Override + public void run() { + try { + Thread.sleep( + container.containerRetryContext.getRetryInterval()); + container.sendRelaunchEvent(); + } catch (InterruptedException e) { + return; + } + } + }.start(); + } + return ContainerState.RELAUNCHING; + } else { + new ExitedWithFailureTransition(true).transition(container, event); + return ContainerState.EXITED_WITH_FAILURE; + } + } + } + + @Override + public boolean isRetryContextSet() { + return containerRetryContext.getRetryPolicy() + != ContainerRetryPolicy.NEVER_RETRY; + } + + @Override + public boolean shouldRetry(int errorCode) { + if (errorCode == ExitCode.SUCCESS.getExitCode() + || errorCode == ExitCode.FORCE_KILLED.getExitCode() + || errorCode == ExitCode.TERMINATED.getExitCode()) { + return false; + } + + ContainerRetryPolicy retryPolicy = containerRetryContext.getRetryPolicy(); + if (retryPolicy == ContainerRetryPolicy.RETRY_ON_ALL_ERRORS + || (retryPolicy == ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES + && containerRetryContext.getErrorCodes() != null + && containerRetryContext.getErrorCodes().contains(errorCode))) { + return remainingRetryAttempts > 0 + || remainingRetryAttempts == ContainerRetryContext.RETRY_FOREVER; + } + + return false; + } + /** * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java index a43df892874..6b96204a9e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java @@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container; public enum ContainerState { - NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS, - EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL, - CONTAINER_RESOURCES_CLEANINGUP, DONE + NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, RELAUNCHING, + EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, + CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 76ee90e576c..a3b53e35e46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller; @@ -98,7 +99,7 @@ public class ContainerLaunch implements Callable { protected final Dispatcher dispatcher; protected final ContainerExecutor exec; - private final Application app; + protected final Application app; protected final Container container; private final Configuration conf; private final Context context; @@ -112,7 +113,7 @@ public class ContainerLaunch implements Callable { protected Path pidFilePath = null; - private final LocalDirsHandlerService dirsHandler; + protected final LocalDirsHandlerService dirsHandler; public ContainerLaunch(Context context, Configuration configuration, Dispatcher dispatcher, ContainerExecutor exec, Application app, @@ -156,33 +157,19 @@ public static String expandEnvironment(String var, @Override @SuppressWarnings("unchecked") // dispatcher not typed public Integer call() { + if (!validateContainerState()) { + return 0; + } + final ContainerLaunchContext launchContext = container.getLaunchContext(); - Map> localResources = null; ContainerId containerID = container.getContainerId(); String containerIdStr = ConverterUtils.toString(containerID); final List command = launchContext.getCommands(); int ret = -1; - // CONTAINER_KILLED_ON_REQUEST should not be missed if the container - // is already at KILLING - if (container.getContainerState() == ContainerState.KILLING) { - dispatcher.getEventHandler().handle( - new ContainerExitEvent(containerID, - ContainerEventType.CONTAINER_KILLED_ON_REQUEST, - Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() : - ExitCode.TERMINATED.getExitCode(), - "Container terminated before launch.")); - return 0; - } - Path containerLogDir; try { - localResources = container.getLocalizedResources(); - if (localResources == null) { - throw RPCUtil.getRemoteException( - "Unable to get local resources when Container " + containerID + - " is at " + container.getContainerState()); - } + Map> localResources = getLocalizedResources(); final String user = container.getUser(); // /////////////////////////// Variable expansion @@ -193,6 +180,7 @@ public Integer call() { .getRelativeContainerLogDir(appIdStr, containerIdStr); containerLogDir = dirsHandler.getLogPathForWrite(relativeContainerLogDir, false); + recordContainerLogDir(containerID, containerLogDir.toString()); for (String str : command) { // TODO: Should we instead work via symlinks without this grammar? newCmds.add(expandEnvironment(str, containerLogDir)); @@ -233,6 +221,7 @@ public Integer call() { + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr + Path.SEPARATOR + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, false); + recordContainerWorkDir(containerID, containerWorkDir.toString()); String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr); @@ -241,11 +230,8 @@ public Integer call() { pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath); List localDirs = dirsHandler.getLocalDirs(); List logDirs = dirsHandler.getLogDirs(); - - List containerLogDirs = new ArrayList(); - for( String logDir : logDirs) { - containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir); - } + List containerLocalDirs = getContainerLocalDirs(localDirs); + List containerLogDirs = getContainerLogDirs(logDirs); if (!dirsHandler.areDisksHealthy()) { ret = ContainerExitStatus.DISKS_FAILED; @@ -253,7 +239,6 @@ public Integer call() { + dirsHandler.getDisksHealthReport(false)); } - List containerLocalDirs = new ArrayList<>(localDirs.size()); try { // /////////// Write out the container-script in the nmPrivate space. List appDirs = new ArrayList(localDirs.size()); @@ -262,14 +247,6 @@ public Integer call() { Path userdir = new Path(usersdir, user); Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE); appDirs.add(new Path(appsdir, appIdStr)); - - String containerLocalDir = localDir + Path.SEPARATOR + - ContainerLocalizer.USERCACHE + Path.SEPARATOR + user - + Path.SEPARATOR - + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr - + Path.SEPARATOR; - - containerLocalDirs.add(containerLocalDir); } containerScriptOutStream = lfs.create(nmPrivateContainerScriptPath, @@ -301,35 +278,19 @@ public Integer call() { IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream); } - // LaunchContainer is a blocking call. We are here almost means the - // container is launched, so send out the event. - dispatcher.getEventHandler().handle(new ContainerEvent( - containerID, - ContainerEventType.CONTAINER_LAUNCHED)); - context.getNMStateStore().storeContainerLaunched(containerID); - - // Check if the container is signalled to be killed. - if (!shouldLaunchContainer.compareAndSet(false, true)) { - LOG.info("Container " + containerIdStr + " not launched as " - + "cleanup already called"); - ret = ExitCode.TERMINATED.getExitCode(); - } - else { - exec.activateContainer(containerID, pidFilePath); - ret = exec.launchContainer(new ContainerStartContext.Builder() - .setContainer(container) - .setLocalizedResources(localResources) - .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath) - .setNmPrivateTokensPath(nmPrivateTokensPath) - .setUser(user) - .setAppId(appIdStr) - .setContainerWorkDir(containerWorkDir) - .setLocalDirs(localDirs) - .setLogDirs(logDirs) - .setContainerLocalDirs(containerLocalDirs) - .setContainerLogDirs(containerLogDirs) - .build()); - } + ret = launchContainer(new ContainerStartContext.Builder() + .setContainer(container) + .setLocalizedResources(localResources) + .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath) + .setNmPrivateTokensPath(nmPrivateTokensPath) + .setUser(user) + .setAppId(appIdStr) + .setContainerWorkDir(containerWorkDir) + .setLocalDirs(localDirs) + .setLogDirs(logDirs) + .setContainerLocalDirs(containerLocalDirs) + .setContainerLogDirs(containerLogDirs) + .build()); } catch (Throwable e) { LOG.warn("Failed to launch container.", e); dispatcher.getEventHandler().handle(new ContainerExitEvent( @@ -337,46 +298,138 @@ public Integer call() { e.getMessage())); return ret; } finally { - completed.set(true); - exec.deactivateContainer(containerID); - try { - context.getNMStateStore().storeContainerCompleted(containerID, ret); - } catch (IOException e) { - LOG.error("Unable to set exit code for container " + containerID); - } + setContainerCompletedStatus(ret); } + handleContainerExitCode(ret, containerLogDir); + + return ret; + } + + @SuppressWarnings("unchecked") + protected boolean validateContainerState() { + // CONTAINER_KILLED_ON_REQUEST should not be missed if the container + // is already at KILLING + if (container.getContainerState() == ContainerState.KILLING) { + dispatcher.getEventHandler().handle( + new ContainerExitEvent(container.getContainerId(), + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, + Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() : + ExitCode.TERMINATED.getExitCode(), + "Container terminated before launch.")); + return false; + } + + return true; + } + + protected List getContainerLogDirs(List logDirs) { + List containerLogDirs = new ArrayList<>(logDirs.size()); + String appIdStr = app.getAppId().toString(); + String containerIdStr = ConverterUtils.toString(container.getContainerId()); + String relativeContainerLogDir = ContainerLaunch + .getRelativeContainerLogDir(appIdStr, containerIdStr); + + for(String logDir : logDirs) { + containerLogDirs.add(logDir + Path.SEPARATOR + relativeContainerLogDir); + } + + return containerLogDirs; + } + + protected List getContainerLocalDirs(List localDirs) { + List containerLocalDirs = new ArrayList<>(localDirs.size()); + String user = container.getUser(); + String appIdStr = app.getAppId().toString(); + String relativeContainerLocalDir = ContainerLocalizer.USERCACHE + + Path.SEPARATOR + user + Path.SEPARATOR + ContainerLocalizer.APPCACHE + + Path.SEPARATOR + appIdStr + Path.SEPARATOR; + + for (String localDir : localDirs) { + containerLocalDirs.add(localDir + Path.SEPARATOR + + relativeContainerLocalDir); + } + + return containerLocalDirs; + } + + protected Map> getLocalizedResources() + throws YarnException { + Map> localResources = container.getLocalizedResources(); + if (localResources == null) { + throw RPCUtil.getRemoteException( + "Unable to get local resources when Container " + container + + " is at " + container.getContainerState()); + } + return localResources; + } + + @SuppressWarnings("unchecked") + protected int launchContainer(ContainerStartContext ctx) throws IOException { + ContainerId containerId = container.getContainerId(); + + // LaunchContainer is a blocking call. We are here almost means the + // container is launched, so send out the event. + dispatcher.getEventHandler().handle(new ContainerEvent( + containerId, + ContainerEventType.CONTAINER_LAUNCHED)); + context.getNMStateStore().storeContainerLaunched(containerId); + + // Check if the container is signalled to be killed. + if (!shouldLaunchContainer.compareAndSet(false, true)) { + LOG.info("Container " + containerId + " not launched as " + + "cleanup already called"); + return ExitCode.TERMINATED.getExitCode(); + } else { + exec.activateContainer(containerId, pidFilePath); + return exec.launchContainer(ctx); + } + } + + protected void setContainerCompletedStatus(int exitCode) { + ContainerId containerId = container.getContainerId(); + completed.set(true); + exec.deactivateContainer(containerId); + try { + if (!container.shouldRetry(exitCode)) { + context.getNMStateStore().storeContainerCompleted(containerId, + exitCode); + } + } catch (IOException e) { + LOG.error("Unable to set exit code for container " + containerId); + } + } + + @SuppressWarnings("unchecked") + protected void handleContainerExitCode(int exitCode, Path containerLogDir) { + ContainerId containerId = container.getContainerId(); + if (LOG.isDebugEnabled()) { - LOG.debug("Container " + containerIdStr + " completed with exit code " - + ret); + LOG.debug("Container " + containerId + " completed with exit code " + + exitCode); } StringBuilder diagnosticInfo = new StringBuilder("Container exited with a non-zero exit code "); - diagnosticInfo.append(ret); + diagnosticInfo.append(exitCode); diagnosticInfo.append(". "); - if (ret == ExitCode.FORCE_KILLED.getExitCode() - || ret == ExitCode.TERMINATED.getExitCode()) { + if (exitCode == ExitCode.FORCE_KILLED.getExitCode() + || exitCode == ExitCode.TERMINATED.getExitCode()) { // If the process was killed, Send container_cleanedup_after_kill and // just break out of this method. dispatcher.getEventHandler().handle( - new ContainerExitEvent(containerID, - ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret, - diagnosticInfo.toString())); - return ret; - } - - if (ret != 0) { - handleContainerExitWithFailure(containerID, ret, containerLogDir, + new ContainerExitEvent(containerId, + ContainerEventType.CONTAINER_KILLED_ON_REQUEST, exitCode, + diagnosticInfo.toString())); + } else if (exitCode != 0) { + handleContainerExitWithFailure(containerId, exitCode, containerLogDir, diagnosticInfo); - return ret; + } else { + LOG.info("Container " + containerId + " succeeded "); + dispatcher.getEventHandler().handle( + new ContainerEvent(containerId, + ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); } - - LOG.info("Container " + containerIdStr + " succeeded "); - dispatcher.getEventHandler().handle( - new ContainerEvent(containerID, - ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)); - return 0; } /** @@ -389,8 +442,8 @@ public Integer call() { * @param diagnosticInfo */ @SuppressWarnings("unchecked") - private void handleContainerExitWithFailure(ContainerId containerID, int ret, - Path containerLogDir, StringBuilder diagnosticInfo) { + protected void handleContainerExitWithFailure(ContainerId containerID, + int ret, Path containerLogDir, StringBuilder diagnosticInfo) { LOG.warn(diagnosticInfo); String errorFileNamePattern = @@ -689,7 +742,8 @@ public static String getRelativeContainerLogDir(String appIdStr, return appIdStr + Path.SEPARATOR + containerIdStr; } - private String getContainerPrivateDir(String appIdStr, String containerIdStr) { + protected String getContainerPrivateDir(String appIdStr, + String containerIdStr) { return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr + Path.SEPARATOR; } @@ -1106,4 +1160,20 @@ public void sanitizeEnv(Map environment, Path pwd, public static String getExitCodeFile(String pidFile) { return pidFile + EXIT_CODE_FILE_SUFFIX; } + + private void recordContainerLogDir(ContainerId containerId, + String logDir) throws IOException{ + if (container.isRetryContextSet()) { + container.setLogDir(logDir); + context.getNMStateStore().storeContainerLogDir(containerId, logDir); + } + } + + private void recordContainerWorkDir(ContainerId containerId, + String workDir) throws IOException{ + if (container.isRetryContextSet()) { + container.setWorkDir(workDir); + context.getNMStateStore().storeContainerWorkDir(containerId, workDir); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRelaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRelaunch.java new file mode 100644 index 00000000000..711d5cdc261 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerRelaunch.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext; +import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Relaunch container. + */ +public class ContainerRelaunch extends ContainerLaunch { + + private static final Log LOG = LogFactory.getLog(ContainerRelaunch.class); + + public ContainerRelaunch(Context context, Configuration configuration, + Dispatcher dispatcher, ContainerExecutor exec, Application app, + Container container, LocalDirsHandlerService dirsHandler, + ContainerManagerImpl containerManager) { + super(context, configuration, dispatcher, exec, app, container, dirsHandler, + containerManager); + } + + @Override + @SuppressWarnings("unchecked") + public Integer call() { + if (!validateContainerState()) { + return 0; + } + + ContainerId containerId = container.getContainerId(); + String containerIdStr = ConverterUtils.toString(containerId); + int ret = -1; + Path containerLogDir; + try { + Path containerWorkDir = getContainerWorkDir(); + cleanupPreviousContainerFiles(containerWorkDir); + + containerLogDir = getContainerLogDir(); + + Map> localResources = getLocalizedResources(); + + String appIdStr = app.getAppId().toString(); + Path nmPrivateContainerScriptPath = + getNmPrivateContainerScriptPath(appIdStr, containerIdStr); + Path nmPrivateTokensPath = + getNmPrivateTokensPath(appIdStr, containerIdStr); + pidFilePath = getPidFilePath(appIdStr, containerIdStr); + + LOG.info("Relaunch container with " + + "workDir = " + containerWorkDir.toString() + + ", logDir = " + containerLogDir.toString() + + ", nmPrivateContainerScriptPath = " + + nmPrivateContainerScriptPath.toString() + + ", nmPrivateTokensPath = " + nmPrivateTokensPath.toString() + + ", pidFilePath = " + pidFilePath.toString()); + + List localDirs = dirsHandler.getLocalDirs(); + List logDirs = dirsHandler.getLogDirs(); + List containerLocalDirs = getContainerLocalDirs(localDirs); + List containerLogDirs = getContainerLogDirs(logDirs); + + if (!dirsHandler.areDisksHealthy()) { + ret = ContainerExitStatus.DISKS_FAILED; + throw new IOException("Most of the disks failed. " + + dirsHandler.getDisksHealthReport(false)); + } + + ret = launchContainer(new ContainerStartContext.Builder() + .setContainer(container) + .setLocalizedResources(localResources) + .setNmPrivateContainerScriptPath(nmPrivateContainerScriptPath) + .setNmPrivateTokensPath(nmPrivateTokensPath) + .setUser(container.getUser()) + .setAppId(appIdStr) + .setContainerWorkDir(containerWorkDir) + .setLocalDirs(localDirs) + .setLogDirs(logDirs) + .setContainerLocalDirs(containerLocalDirs) + .setContainerLogDirs(containerLogDirs) + .build()); + } catch (Throwable e) { + LOG.warn("Failed to relaunch container.", e); + dispatcher.getEventHandler().handle(new ContainerExitEvent( + containerId, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret, + e.getMessage())); + return ret; + } finally { + setContainerCompletedStatus(ret); + } + + handleContainerExitCode(ret, containerLogDir); + + return ret; + } + + private Path getContainerWorkDir() throws IOException { + String containerWorkDir = container.getWorkDir(); + if (containerWorkDir == null + || !dirsHandler.isGoodLocalDir(containerWorkDir)) { + throw new IOException( + "Could not find a good work dir " + containerWorkDir + + " for container " + container); + } + + return new Path(containerWorkDir); + } + + private Path getContainerLogDir() throws IOException { + String containerLogDir = container.getLogDir(); + if (containerLogDir == null || !dirsHandler.isGoodLogDir(containerLogDir)) { + throw new IOException("Could not find a good log dir " + containerLogDir + + " for container " + container); + } + + return new Path(containerLogDir); + } + + private Path getNmPrivateContainerScriptPath(String appIdStr, + String containerIdStr) throws IOException { + return dirsHandler.getLocalPathForRead( + getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + CONTAINER_SCRIPT); + } + + private Path getNmPrivateTokensPath(String appIdStr, + String containerIdStr) throws IOException { + return dirsHandler.getLocalPathForRead( + getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR + + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, + containerIdStr)); + } + + private Path getPidFilePath(String appIdStr, + String containerIdStr) throws IOException { + return dirsHandler.getLocalPathForRead( + getPidFileSubpath(appIdStr, containerIdStr)); + } + + /** + * Clean up container's previous files for container relaunch. + */ + private void cleanupPreviousContainerFiles(Path containerWorkDir) { + // delete ContainerScriptPath + deleteAsUser(new Path(containerWorkDir, CONTAINER_SCRIPT)); + // delete TokensPath + deleteAsUser(new Path(containerWorkDir, FINAL_CONTAINER_TOKENS_FILE)); + } + + private void deleteAsUser(Path path) { + try { + exec.deleteAsUser(new DeletionAsUserContext.Builder() + .setUser(container.getUser()) + .setSubDir(path) + .build()); + } catch (Exception e) { + LOG.warn("Failed to delete " + path, e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index a34051ce44a..e5fff00fbe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -118,6 +118,16 @@ public void handle(ContainersLauncherEvent event) { containerLauncher.submit(launch); running.put(containerId, launch); break; + case RELAUNCH_CONTAINER: + app = context.getApplications().get( + containerId.getApplicationAttemptId().getApplicationId()); + + ContainerRelaunch relaunch = + new ContainerRelaunch(context, getConfig(), dispatcher, exec, app, + event.getContainer(), dirsHandler, containerManager); + containerLauncher.submit(relaunch); + running.put(containerId, relaunch); + break; case RECOVER_CONTAINER: app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java index a88564db85b..2d7bc743021 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java @@ -20,6 +20,7 @@ public enum ContainersLauncherEventType { LAUNCH_CONTAINER, + RELAUNCH_CONTAINER, RECOVER_CONTAINER, CLEANUP_CONTAINER, // The process(grp) itself. SIGNAL_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index d74c6a8a6c0..6e9efe123c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -110,6 +110,10 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; + private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = + "/remainingRetryAttempts"; + private static final String CONTAINER_WORK_DIR_KEY_SUFFIX = "/workdir"; + private static final String CONTAINER_LOG_DIR_KEY_SUFFIX = "/logdir"; private static final String CURRENT_MASTER_KEY_SUFFIX = "CurrentMasterKey"; private static final String PREV_MASTER_KEY_SUFFIX = "PreviousMasterKey"; @@ -247,6 +251,13 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { rcs.capability = new ResourcePBImpl( ResourceProto.parseFrom(entry.getValue())); + } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { + rcs.setRemainingRetryAttempts( + Integer.parseInt(asString(entry.getValue()))); + } else if (suffix.equals(CONTAINER_WORK_DIR_KEY_SUFFIX)) { + rcs.setWorkDir(asString(entry.getValue())); + } else if (suffix.equals(CONTAINER_LOG_DIR_KEY_SUFFIX)) { + rcs.setLogDir(asString(entry.getValue())); } else { throw new IOException("Unexpected container state key: " + key); } @@ -356,6 +367,42 @@ public void storeContainerCompleted(ContainerId containerId, } } + @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_REMAIN_RETRIES_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(Integer.toString(remainingRetryAttempts))); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerWorkDir(ContainerId containerId, + String workDir) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_WORK_DIR_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(workDir)); + } catch (DBException e) { + throw new IOException(e); + } + } + + @Override + public void storeContainerLogDir(ContainerId containerId, + String logDir) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_LOG_DIR_KEY_SUFFIX; + try { + db.put(bytes(key), bytes(logDir)); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void removeContainer(ContainerId containerId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index a887e71e9e1..08b80e961a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -99,6 +99,21 @@ public void storeContainerCompleted(ContainerId containerId, int exitCode) throws IOException { } + @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + } + + @Override + public void storeContainerWorkDir(ContainerId containerId, + String workDir) throws IOException { + } + + @Override + public void storeContainerLogDir(ContainerId containerId, + String logDir) throws IOException { + } + @Override public void removeContainer(ContainerId containerId) throws IOException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 463815ec9c1..ccf1e709d99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; @@ -72,6 +73,9 @@ public static class RecoveredContainerState { String diagnostics = ""; StartContainerRequest startRequest; Resource capability; + private int remainingRetryAttempts = ContainerRetryContext.RETRY_INVALID; + private String workDir; + private String logDir; public RecoveredContainerStatus getStatus() { return status; @@ -97,6 +101,30 @@ public Resource getCapability() { return capability; } + public int getRemainingRetryAttempts() { + return remainingRetryAttempts; + } + + public void setRemainingRetryAttempts(int retryAttempts) { + this.remainingRetryAttempts = retryAttempts; + } + + public String getWorkDir() { + return workDir; + } + + public void setWorkDir(String workDir) { + this.workDir = workDir; + } + + public String getLogDir() { + return logDir; + } + + public void setLogDir(String logDir) { + this.logDir = logDir; + } + @Override public String toString() { return new StringBuffer("Status: ").append(getStatus()) @@ -105,6 +133,9 @@ public String toString() { .append(", Diagnostics: ").append(getDiagnostics()) .append(", Capability: ").append(getCapability()) .append(", StartRequest: ").append(getStartRequest()) + .append(", RemainingRetryAttempts: ").append(remainingRetryAttempts) + .append(", WorkDir: ").append(workDir) + .append(", LogDir: ").append(logDir) .toString(); } } @@ -323,6 +354,34 @@ public abstract void storeContainerKilled(ContainerId containerId) public abstract void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException; + /** + * Record remaining retry attempts for a container. + * @param containerId the container ID + * @param remainingRetryAttempts the remain retry times when container + * fails to run + * @throws IOException + */ + public abstract void storeContainerRemainingRetryAttempts( + ContainerId containerId, int remainingRetryAttempts) throws IOException; + + /** + * Record working directory for a container. + * @param containerId the container ID + * @param workDir the working directory + * @throws IOException + */ + public abstract void storeContainerWorkDir( + ContainerId containerId, String workDir) throws IOException; + + /** + * Record log directory for a container. + * @param containerId the container ID + * @param logDir the log directory + * @throws IOException + */ + public abstract void storeContainerLogDir( + ContainerId containerId, String logDir) throws IOException; + /** * Remove records corresponding to a container * @param containerId the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java index cc98bdc54d8..118bc42bcb0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java @@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryContext; +import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -660,6 +662,69 @@ public void testLaunchAfterKillRequest() throws Exception { } } + @Test + public void testContainerRetry() throws Exception{ + ContainerRetryContext containerRetryContext1 = ContainerRetryContext + .newInstance(ContainerRetryPolicy.NEVER_RETRY, null, 3, 0); + testContainerRetry(containerRetryContext1, 2, 0); + + ContainerRetryContext containerRetryContext2 = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 0); + testContainerRetry(containerRetryContext2, 2, 3); + + ContainerRetryContext containerRetryContext3 = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_ALL_ERRORS, null, 3, 0); + // If exit code is 0, it will not retry + testContainerRetry(containerRetryContext3, 0, 0); + + ContainerRetryContext containerRetryContext4 = ContainerRetryContext + .newInstance( + ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, null, 3, 0); + testContainerRetry(containerRetryContext4, 2, 0); + + HashSet errorCodes = new HashSet<>(); + errorCodes.add(2); + errorCodes.add(6); + ContainerRetryContext containerRetryContext5 = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + errorCodes, 3, 0); + testContainerRetry(containerRetryContext5, 2, 3); + + HashSet errorCodes2 = new HashSet<>(); + errorCodes.add(143); + ContainerRetryContext containerRetryContext6 = ContainerRetryContext + .newInstance(ContainerRetryPolicy.RETRY_ON_SPECIFIC_ERROR_CODES, + errorCodes2, 3, 0); + // If exit code is 143(SIGTERM), it will not retry even it is in errorCodes. + testContainerRetry(containerRetryContext6, 143, 0); + } + + private void testContainerRetry(ContainerRetryContext containerRetryContext, + int exitCode, int expectedRetries) throws Exception{ + WrappedContainer wc = null; + try { + int retryTimes = 0; + wc = new WrappedContainer(24, 314159265358979L, 4344, "yak", + containerRetryContext); + wc.initContainer(); + wc.localizeResources(); + wc.launchContainer(); + while (true) { + wc.containerFailed(exitCode); + if (wc.c.getContainerState() == ContainerState.RUNNING) { + retryTimes ++; + } else { + break; + } + } + Assert.assertEquals(expectedRetries, retryTimes); + } finally { + if (wc != null) { + wc.finished(); + } + } + } + private void verifyCleanupCall(WrappedContainer wc) throws Exception { ResourcesReleasedMatcher matchesReq = new ResourcesReleasedMatcher(wc.localResources, EnumSet.of( @@ -802,12 +867,23 @@ private class WrappedContainer { WrappedContainer(int appId, long timestamp, int id, String user) throws IOException { - this(appId, timestamp, id, user, true, false); + this(appId, timestamp, id, user, null); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + ContainerRetryContext containerRetryContext) throws IOException { + this(appId, timestamp, id, user, true, false, containerRetryContext); + } + + WrappedContainer(int appId, long timestamp, int id, String user, + boolean withLocalRes, boolean withServiceData) throws IOException { + this(appId, timestamp, id, user, withLocalRes, withServiceData, null); } @SuppressWarnings("rawtypes") WrappedContainer(int appId, long timestamp, int id, String user, - boolean withLocalRes, boolean withServiceData) throws IOException { + boolean withLocalRes, boolean withServiceData, + ContainerRetryContext containerRetryContext) throws IOException { dispatcher = new DrainDispatcher(); dispatcher.init(new Configuration()); @@ -884,6 +960,7 @@ private class WrappedContainer { serviceData = Collections. emptyMap(); } when(ctxt.getServiceData()).thenReturn(serviceData); + when(ctxt.getContainerRetryContext()).thenReturn(containerRetryContext); c = new ContainerImpl(conf, dispatcher, ctxt, null, metrics, identifier, context); @@ -1005,6 +1082,10 @@ public void containerFailed(int exitCode) { assert containerStatus.getDiagnostics().contains(diagnosticMsg); assert containerStatus.getExitStatus() == exitCode; drainDispatcherEvents(); + // If container needs retry, relaunch it + if (c.getContainerState() == ContainerState.RELAUNCHING) { + launchContainer(); + } } public void killContainer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 12798963390..46522453ff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -115,6 +115,9 @@ public synchronized List loadContainersState() rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; rcsCopy.capability = rcs.capability; + rcsCopy.setRemainingRetryAttempts(rcs.getRemainingRetryAttempts()); + rcsCopy.setWorkDir(rcs.getWorkDir()); + rcsCopy.setLogDir(rcs.getLogDir()); result.add(rcsCopy); } return result; @@ -167,6 +170,27 @@ public synchronized void storeContainerCompleted(ContainerId containerId, rcs.exitCode = exitCode; } + @Override + public void storeContainerRemainingRetryAttempts(ContainerId containerId, + int remainingRetryAttempts) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setRemainingRetryAttempts(remainingRetryAttempts); + } + + @Override + public void storeContainerWorkDir(ContainerId containerId, + String workDir) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setWorkDir(workDir); + } + + @Override + public void storeContainerLogDir(ContainerId containerId, + String logDir) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.setLogDir(logDir); + } + @Override public synchronized void removeContainer(ContainerId containerId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index e44e5e51e64..ccc9254afc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -334,6 +334,18 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + // store remainingRetryAttempts, workDir and logDir + stateStore.storeContainerRemainingRetryAttempts(containerId, 6); + stateStore.storeContainerWorkDir(containerId, "/test/workdir"); + stateStore.storeContainerLogDir(containerId, "/test/logdir"); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(6, rcs.getRemainingRetryAttempts()); + assertEquals("/test/workdir", rcs.getWorkDir()); + assertEquals("/test/logdir", rcs.getLogDir()); + // remove the container and verify not recovered stateStore.removeContainer(containerId); restartStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index 394a92cb197..0b95dba4651 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -144,4 +144,32 @@ public ContainerTokenIdentifier getContainerTokenIdentifier() { public NMContainerStatus getNMContainerStatus() { return null; } + + @Override + public boolean isRetryContextSet() { + return false; + } + + @Override + public boolean shouldRetry(int errorCode) { + return false; + } + + @Override + public String getWorkDir() { + return null; + } + + @Override + public void setWorkDir(String workDir) { + } + + @Override + public String getLogDir() { + return null; + } + + @Override + public void setLogDir(String logDir) { + } }