YARN-7248. NM returns new SCHEDULED container status to older clients. Contributed by Arun Suresh

This commit is contained in:
Jason Lowe 2017-09-28 14:10:15 -05:00
parent a530e7ab3b
commit 85d81ae58e
13 changed files with 229 additions and 71 deletions

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Stable;
/**
@ -35,12 +34,5 @@ public enum ContainerState {
RUNNING,
/** Completed container */
COMPLETE,
/** Scheduled (awaiting resources) at the NM. */
@InterfaceStability.Unstable
SCHEDULED,
/** Paused at the NM. */
PAUSED
COMPLETE
}

View File

@ -201,4 +201,26 @@ public abstract class ContainerStatus {
throw new UnsupportedOperationException(
"subclass must implement this method");
}
/**
* Add Extra state information of the container (SCHEDULED, LOCALIZING etc.).
* @param subState Extra State Information.
*/
@Private
@Unstable
public void setContainerSubState(ContainerSubState subState) {
throw new UnsupportedOperationException(
"subclass must implement this method");
}
/**
* Get Extra state information of the container (SCHEDULED, LOCALIZING etc.).
* @return Extra State information.
*/
@Private
@Unstable
public ContainerSubState getContainerSubState() {
throw new UnsupportedOperationException(
"subclass must implement this method");
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Container Sub-State.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum ContainerSubState {
/*
* NEW, LOCALIZING, SCHEDULED,
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
*/
SCHEDULED,
/*
* RUNNING, REINITIALIZING, PAUSING, KILLING
*/
RUNNING,
/*
* PAUSED, RESUMING
*/
PAUSED,
/*
* LOCALIZATION_FAILED, EXITED_WITH_SUCCESS,
* EXITED_WITH_FAILURE,
* CONTAINER_CLEANEDUP_AFTER_KILL,
* CONTAINER_RESOURCES_CLEANINGUP
*/
COMPLETING,
/*
* DONE
*/
DONE
}

View File

@ -109,8 +109,37 @@ enum ContainerStateProto {
C_NEW = 1;
C_RUNNING = 2;
C_COMPLETE = 3;
C_SCHEDULED = 4;
C_PAUSED = 5;
}
enum ContainerSubStateProto {
/**
* NEW, LOCALIZING, SCHEDULED,
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
*/
CSS_SCHEDULED = 1;
/**
* RUNNING, REINITIALIZING, PAUSING, KILLING
*/
CSS_RUNNING = 2;
/**
* PAUSED, RESUMING
*/
CSS_PAUSED = 3;
/**
* LOCALIZATION_FAILED, EXITED_WITH_SUCCESS,
* EXITED_WITH_FAILURE,
* CONTAINER_CLEANEDUP_AFTER_KILL,
* CONTAINER_RESOURCES_CLEANINGUP
*/
CSS_COMPLETING = 4;
/**
* DONE
*/
CSS_DONE = 5;
}
message ContainerProto {
@ -616,6 +645,7 @@ message ContainerStatusProto {
optional ResourceProto capability = 5;
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
repeated StringStringMapProto container_attributes = 7;
optional ContainerSubStateProto container_sub_state = 8;
}
enum ContainerExitStatusProto {

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos;
@ -96,7 +97,8 @@ public class ContainerStatusPBImpl extends ContainerStatus {
sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
sb.append("ExitStatus: ").append(getExitStatus()).append(", ");
sb.append("IP: ").append(getIPs()).append(", ");
sb.append("Host: ").append(getHost());
sb.append("Host: ").append(getHost()).append(", ");
sb.append("ContainerSubState: ").append(getContainerSubState());
sb.append("]");
return sb.toString();
}
@ -213,6 +215,26 @@ public class ContainerStatusPBImpl extends ContainerStatus {
}
builder.setState(convertToProtoFormat(state));
}
@Override
public synchronized ContainerSubState getContainerSubState() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasContainerSubState()) {
return null;
}
return ProtoUtils.convertFromProtoFormat(p.getContainerSubState());
}
@Override
public synchronized void setContainerSubState(ContainerSubState subState) {
maybeInitBuilder();
if (subState == null) {
builder.clearContainerSubState();
return;
}
builder.setContainerSubState(ProtoUtils.convertToProtoFormat(subState));
}
@Override
public synchronized ContainerId getContainerId() {
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportPro
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerSubStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
@ -92,7 +94,7 @@ public class ProtoUtils {
/*
* ContainerState
*/
private static String CONTAINER_STATE_PREFIX = "C_";
private final static String CONTAINER_STATE_PREFIX = "C_";
public static ContainerStateProto convertToProtoFormat(ContainerState e) {
return ContainerStateProto.valueOf(CONTAINER_STATE_PREFIX + e.name());
}
@ -100,10 +102,24 @@ public class ProtoUtils {
return ContainerState.valueOf(e.name().replace(CONTAINER_STATE_PREFIX, ""));
}
/*
* Container SubState
*/
private final static String CONTAINER_SUB_STATE_PREFIX = "CSS_";
public static ContainerSubStateProto convertToProtoFormat(
ContainerSubState e) {
return ContainerSubStateProto.valueOf(
CONTAINER_SUB_STATE_PREFIX + e.name());
}
public static ContainerSubState convertFromProtoFormat(
ContainerSubStateProto e) {
return ContainerSubState.valueOf(
e.name().substring(CONTAINER_SUB_STATE_PREFIX.length()));
}
/*
* NodeState
*/
private static String NODE_STATE_PREFIX = "NS_";
private final static String NODE_STATE_PREFIX = "NS_";
public static NodeStateProto convertToProtoFormat(NodeState e) {
return NodeStateProto.valueOf(NODE_STATE_PREFIX + e.name());
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -714,7 +715,6 @@ public class ContainerImpl implements Container {
case SCHEDULED:
case PAUSED:
case RESUMING:
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
case RUNNING:
case RELAUNCHING:
case REINITIALIZING:
@ -732,6 +732,36 @@ public class ContainerImpl implements Container {
}
}
// NOTE: Please update the doc in the ContainerSubState class as
// well as the yarn_protos.proto file if this mapping is ever modified.
private ContainerSubState getContainerSubState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
case SCHEDULED:
case REINITIALIZING_AWAITING_KILL:
case RELAUNCHING:
return ContainerSubState.SCHEDULED;
case REINITIALIZING:
case PAUSING:
case KILLING:
case RUNNING:
return ContainerSubState.RUNNING;
case PAUSED:
case RESUMING:
return ContainerSubState.PAUSED;
case LOCALIZATION_FAILED:
case EXITED_WITH_SUCCESS:
case EXITED_WITH_FAILURE:
case CONTAINER_CLEANEDUP_AFTER_KILL:
case CONTAINER_RESOURCES_CLEANINGUP:
return ContainerSubState.COMPLETING;
case DONE:
default:
return ContainerSubState.DONE;
}
}
public NMTimelinePublisher getNMTimelinePublisher() {
return context.getNMTimelinePublisher();
}
@ -800,6 +830,7 @@ public class ContainerImpl implements Container {
this.containerTokenIdentifier.getExecutionType());
status.setIPs(ips == null ? null : Arrays.asList(ips.split(",")));
status.setHost(host);
status.setContainerSubState(getContainerSubState());
return status;
} finally {
this.readLock.unlock();

View File

@ -18,7 +18,16 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
/**
* States used by the container state machine.
*/
public enum ContainerState {
// NOTE: In case of future additions / deletions / modifications to this
// enum, please ensure that the following are also correspondingly
// updated:
// 1. ContainerImpl::getContainerSubState().
// 2. the doc in the ContainerSubState class.
// 3. the doc in the yarn_protos.proto file.
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, REINITIALIZING_AWAITING_KILL,
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,

View File

@ -159,7 +159,7 @@ public class TestEventFlow {
containerManager.startContainers(allRequests);
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
Arrays.asList(ContainerState.RUNNING, ContainerState.SCHEDULED), 20);
Arrays.asList(ContainerState.RUNNING), 20);
List<ContainerId> containerIds = new ArrayList<ContainerId>();
containerIds.add(cID);

View File

@ -256,8 +256,7 @@ public class TestNodeManagerShutdown {
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus =
containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
Assert.assertTrue(
EnumSet.of(ContainerState.RUNNING, ContainerState.SCHEDULED)
Assert.assertTrue(EnumSet.of(ContainerState.RUNNING)
.contains(containerStatus.getState()));
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerSubState;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -311,9 +312,8 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
Assert.assertEquals(ContainerSubState.SCHEDULED,
status.getContainerSubState());
}
ContainerScheduler containerScheduler =
@ -378,13 +378,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert.assertEquals(ContainerSubState.RUNNING,
status.getContainerSubState());
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
Assert.assertEquals(ContainerSubState.SCHEDULED,
status.getContainerSubState());
}
}
@ -458,17 +456,15 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert.assertEquals(ContainerSubState.RUNNING,
status.getContainerSubState());
} else if (status.getContainerId().equals(createContainerId(
maxOppQueueLength + 1))) {
Assert.assertTrue(status.getDiagnostics().contains(
"Opportunistic container queue is full"));
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
Assert.assertEquals(ContainerSubState.SCHEDULED,
status.getContainerSubState());
}
System.out.println("\nStatus : [" + status + "]\n");
}
@ -545,13 +541,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
Assert.assertTrue(status.getDiagnostics().contains(
"Container Killed to make room for Guaranteed Container"));
} else if (status.getContainerId().equals(createContainerId(1))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
Assert.assertEquals(ContainerSubState.SCHEDULED,
status.getContainerSubState());
} else if (status.getContainerId().equals(createContainerId(2))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert.assertEquals(ContainerSubState.RUNNING,
status.getContainerSubState());
}
System.out.println("\nStatus : [" + status + "]\n");
}
@ -799,8 +793,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
"Container De-queued to meet NM queuing limits")) {
deQueuedContainers++;
}
if (status.getState() ==
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
if (ContainerSubState.SCHEDULED == status.getContainerSubState()) {
numQueuedOppContainers++;
}
}
@ -1079,11 +1072,9 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
List<ContainerStatus> containerStatuses = containerManager
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getState() ==
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
if (ContainerSubState.RUNNING == status.getContainerSubState()) {
runningContainersNo++;
} else if (status.getState() ==
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
} else if (ContainerSubState.SCHEDULED == status.getContainerSubState()) {
queuedContainersNo++;
}
System.out.println("\nStatus : [" + status + "]\n");
@ -1106,34 +1097,27 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
}
statRequest = GetContainerStatusesRequest.newInstance(statList);
HashMap<org.apache.hadoop.yarn.api.records.ContainerState, ContainerStatus>
map = new HashMap<>();
HashMap<ContainerSubState, ContainerStatus> map = new HashMap<>();
for (int i=0; i < 10; i++) {
containerStatuses = containerManager.getContainerStatuses(statRequest)
.getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
System.out.println("\nStatus : [" + status + "]\n");
map.put(status.getState(), status);
if (map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) &&
map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) &&
map.containsKey(
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)) {
map.put(status.getContainerSubState(), status);
if (map.containsKey(ContainerSubState.RUNNING) &&
map.containsKey(ContainerSubState.SCHEDULED) &&
map.containsKey(ContainerSubState.DONE)) {
break;
}
Thread.sleep(1000);
}
}
Assert.assertEquals(createContainerId(0),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING)
.getContainerId());
map.get(ContainerSubState.RUNNING).getContainerId());
Assert.assertEquals(createContainerId(1),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)
.getContainerId());
map.get(ContainerSubState.DONE).getContainerId());
Assert.assertEquals(createContainerId(2),
map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
.getContainerId());
map.get(ContainerSubState.SCHEDULED).getContainerId());
}
/**
@ -1186,13 +1170,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
.getContainerStatuses(statRequest).getContainerStatuses();
for (ContainerStatus status : containerStatuses) {
if (status.getContainerId().equals(createContainerId(0))) {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
status.getState());
Assert.assertEquals(ContainerSubState.RUNNING,
status.getContainerSubState());
} else {
Assert.assertEquals(
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
status.getState());
Assert.assertEquals(ContainerSubState.SCHEDULED,
status.getContainerSubState());
}
}

View File

@ -856,8 +856,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
containers = startEvent.getNMContainerStatuses();
if (containers != null && !containers.isEmpty()) {
for (NMContainerStatus container : containers) {
if (container.getContainerState() == ContainerState.RUNNING ||
container.getContainerState() == ContainerState.SCHEDULED) {
if (container.getContainerState() == ContainerState.RUNNING) {
rmNode.launchedContainers.add(container.getContainerId());
}
}
@ -1412,8 +1411,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
// Process running containers
if (remoteContainer.getState() == ContainerState.RUNNING ||
remoteContainer.getState() == ContainerState.SCHEDULED) {
if (remoteContainer.getState() == ContainerState.RUNNING) {
++numRemoteRunningContainers;
if (!launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.

View File

@ -2085,7 +2085,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
NMContainerStatus queuedOpp =
NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED,
NMContainerStatus.newInstance(c1, 1, ContainerState.RUNNING,
Resource.newInstance(1024, 1), "Dummy Queued OC",
ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
ExecutionType.OPPORTUNISTIC);