YARN-7248. NM returns new SCHEDULED container status to older clients. Contributed by Arun Suresh
(cherry picked from commit 85d81ae58e
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
This commit is contained in:
parent
53c1115908
commit
913a64e4c9
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,12 +34,5 @@ public enum ContainerState {
|
||||||
RUNNING,
|
RUNNING,
|
||||||
|
|
||||||
/** Completed container */
|
/** Completed container */
|
||||||
COMPLETE,
|
COMPLETE
|
||||||
|
|
||||||
/** Scheduled (awaiting resources) at the NM. */
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
SCHEDULED,
|
|
||||||
|
|
||||||
/** Paused at the NM. */
|
|
||||||
PAUSED
|
|
||||||
}
|
}
|
|
@ -201,4 +201,26 @@ public abstract class ContainerStatus {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"subclass must implement this method");
|
"subclass must implement this method");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add Extra state information of the container (SCHEDULED, LOCALIZING etc.).
|
||||||
|
* @param subState Extra State Information.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public void setContainerSubState(ContainerSubState subState) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"subclass must implement this method");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get Extra state information of the container (SCHEDULED, LOCALIZING etc.).
|
||||||
|
* @return Extra State information.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public ContainerSubState getContainerSubState() {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"subclass must implement this method");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Container Sub-State.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public enum ContainerSubState {
|
||||||
|
/*
|
||||||
|
* NEW, LOCALIZING, SCHEDULED,
|
||||||
|
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
|
||||||
|
*/
|
||||||
|
SCHEDULED,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RUNNING, REINITIALIZING, PAUSING, KILLING
|
||||||
|
*/
|
||||||
|
RUNNING,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PAUSED, RESUMING
|
||||||
|
*/
|
||||||
|
PAUSED,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LOCALIZATION_FAILED, EXITED_WITH_SUCCESS,
|
||||||
|
* EXITED_WITH_FAILURE,
|
||||||
|
* CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
* CONTAINER_RESOURCES_CLEANINGUP
|
||||||
|
*/
|
||||||
|
COMPLETING,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DONE
|
||||||
|
*/
|
||||||
|
DONE
|
||||||
|
}
|
|
@ -82,8 +82,37 @@ enum ContainerStateProto {
|
||||||
C_NEW = 1;
|
C_NEW = 1;
|
||||||
C_RUNNING = 2;
|
C_RUNNING = 2;
|
||||||
C_COMPLETE = 3;
|
C_COMPLETE = 3;
|
||||||
C_SCHEDULED = 4;
|
}
|
||||||
C_PAUSED = 5;
|
|
||||||
|
enum ContainerSubStateProto {
|
||||||
|
/**
|
||||||
|
* NEW, LOCALIZING, SCHEDULED,
|
||||||
|
* REINITIALIZING_AWAITING_KILL, RELAUNCHING,
|
||||||
|
*/
|
||||||
|
CSS_SCHEDULED = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RUNNING, REINITIALIZING, PAUSING, KILLING
|
||||||
|
*/
|
||||||
|
CSS_RUNNING = 2;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* PAUSED, RESUMING
|
||||||
|
*/
|
||||||
|
CSS_PAUSED = 3;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* LOCALIZATION_FAILED, EXITED_WITH_SUCCESS,
|
||||||
|
* EXITED_WITH_FAILURE,
|
||||||
|
* CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
* CONTAINER_RESOURCES_CLEANINGUP
|
||||||
|
*/
|
||||||
|
CSS_COMPLETING = 4;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DONE
|
||||||
|
*/
|
||||||
|
CSS_DONE = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ContainerProto {
|
message ContainerProto {
|
||||||
|
@ -580,6 +609,7 @@ message ContainerStatusProto {
|
||||||
optional ResourceProto capability = 5;
|
optional ResourceProto capability = 5;
|
||||||
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
|
optional ExecutionTypeProto executionType = 6 [default = GUARANTEED];
|
||||||
repeated StringStringMapProto container_attributes = 7;
|
repeated StringStringMapProto container_attributes = 7;
|
||||||
|
optional ContainerSubStateProto container_sub_state = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum ContainerExitStatusProto {
|
enum ContainerExitStatusProto {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerSubState;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos;
|
import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||||
|
@ -96,7 +97,8 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
|
sb.append("Diagnostics: ").append(getDiagnostics()).append(", ");
|
||||||
sb.append("ExitStatus: ").append(getExitStatus()).append(", ");
|
sb.append("ExitStatus: ").append(getExitStatus()).append(", ");
|
||||||
sb.append("IP: ").append(getIPs()).append(", ");
|
sb.append("IP: ").append(getIPs()).append(", ");
|
||||||
sb.append("Host: ").append(getHost());
|
sb.append("Host: ").append(getHost()).append(", ");
|
||||||
|
sb.append("ContainerSubState: ").append(getContainerSubState());
|
||||||
sb.append("]");
|
sb.append("]");
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
@ -213,6 +215,26 @@ public class ContainerStatusPBImpl extends ContainerStatus {
|
||||||
}
|
}
|
||||||
builder.setState(convertToProtoFormat(state));
|
builder.setState(convertToProtoFormat(state));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized ContainerSubState getContainerSubState() {
|
||||||
|
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasContainerSubState()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return ProtoUtils.convertFromProtoFormat(p.getContainerSubState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void setContainerSubState(ContainerSubState subState) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (subState == null) {
|
||||||
|
builder.clearContainerSubState();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setContainerSubState(ProtoUtils.convertToProtoFormat(subState));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized ContainerId getContainerId() {
|
public synchronized ContainerId getContainerId() {
|
||||||
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
ContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
|
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerSubState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
@ -55,6 +56,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportPro
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationTimeoutTypeProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerSubStateProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.FinalApplicationStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
|
||||||
|
@ -85,7 +87,7 @@ public class ProtoUtils {
|
||||||
/*
|
/*
|
||||||
* ContainerState
|
* ContainerState
|
||||||
*/
|
*/
|
||||||
private static String CONTAINER_STATE_PREFIX = "C_";
|
private final static String CONTAINER_STATE_PREFIX = "C_";
|
||||||
public static ContainerStateProto convertToProtoFormat(ContainerState e) {
|
public static ContainerStateProto convertToProtoFormat(ContainerState e) {
|
||||||
return ContainerStateProto.valueOf(CONTAINER_STATE_PREFIX + e.name());
|
return ContainerStateProto.valueOf(CONTAINER_STATE_PREFIX + e.name());
|
||||||
}
|
}
|
||||||
|
@ -93,10 +95,24 @@ public class ProtoUtils {
|
||||||
return ContainerState.valueOf(e.name().replace(CONTAINER_STATE_PREFIX, ""));
|
return ContainerState.valueOf(e.name().replace(CONTAINER_STATE_PREFIX, ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Container SubState
|
||||||
|
*/
|
||||||
|
private final static String CONTAINER_SUB_STATE_PREFIX = "CSS_";
|
||||||
|
public static ContainerSubStateProto convertToProtoFormat(
|
||||||
|
ContainerSubState e) {
|
||||||
|
return ContainerSubStateProto.valueOf(
|
||||||
|
CONTAINER_SUB_STATE_PREFIX + e.name());
|
||||||
|
}
|
||||||
|
public static ContainerSubState convertFromProtoFormat(
|
||||||
|
ContainerSubStateProto e) {
|
||||||
|
return ContainerSubState.valueOf(
|
||||||
|
e.name().substring(CONTAINER_SUB_STATE_PREFIX.length()));
|
||||||
|
}
|
||||||
/*
|
/*
|
||||||
* NodeState
|
* NodeState
|
||||||
*/
|
*/
|
||||||
private static String NODE_STATE_PREFIX = "NS_";
|
private final static String NODE_STATE_PREFIX = "NS_";
|
||||||
public static NodeStateProto convertToProtoFormat(NodeState e) {
|
public static NodeStateProto convertToProtoFormat(NodeState e) {
|
||||||
return NodeStateProto.valueOf(NODE_STATE_PREFIX + e.name());
|
return NodeStateProto.valueOf(NODE_STATE_PREFIX + e.name());
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerSubState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -699,7 +700,6 @@ public class ContainerImpl implements Container {
|
||||||
case SCHEDULED:
|
case SCHEDULED:
|
||||||
case PAUSED:
|
case PAUSED:
|
||||||
case RESUMING:
|
case RESUMING:
|
||||||
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
|
|
||||||
case RUNNING:
|
case RUNNING:
|
||||||
case RELAUNCHING:
|
case RELAUNCHING:
|
||||||
case REINITIALIZING:
|
case REINITIALIZING:
|
||||||
|
@ -717,6 +717,36 @@ public class ContainerImpl implements Container {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: Please update the doc in the ContainerSubState class as
|
||||||
|
// well as the yarn_protos.proto file if this mapping is ever modified.
|
||||||
|
private ContainerSubState getContainerSubState() {
|
||||||
|
switch (stateMachine.getCurrentState()) {
|
||||||
|
case NEW:
|
||||||
|
case LOCALIZING:
|
||||||
|
case SCHEDULED:
|
||||||
|
case REINITIALIZING_AWAITING_KILL:
|
||||||
|
case RELAUNCHING:
|
||||||
|
return ContainerSubState.SCHEDULED;
|
||||||
|
case REINITIALIZING:
|
||||||
|
case PAUSING:
|
||||||
|
case KILLING:
|
||||||
|
case RUNNING:
|
||||||
|
return ContainerSubState.RUNNING;
|
||||||
|
case PAUSED:
|
||||||
|
case RESUMING:
|
||||||
|
return ContainerSubState.PAUSED;
|
||||||
|
case LOCALIZATION_FAILED:
|
||||||
|
case EXITED_WITH_SUCCESS:
|
||||||
|
case EXITED_WITH_FAILURE:
|
||||||
|
case CONTAINER_CLEANEDUP_AFTER_KILL:
|
||||||
|
case CONTAINER_RESOURCES_CLEANINGUP:
|
||||||
|
return ContainerSubState.COMPLETING;
|
||||||
|
case DONE:
|
||||||
|
default:
|
||||||
|
return ContainerSubState.DONE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getUser() {
|
public String getUser() {
|
||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
|
@ -781,6 +811,7 @@ public class ContainerImpl implements Container {
|
||||||
this.containerTokenIdentifier.getExecutionType());
|
this.containerTokenIdentifier.getExecutionType());
|
||||||
status.setIPs(ips == null ? null : Arrays.asList(ips.split(",")));
|
status.setIPs(ips == null ? null : Arrays.asList(ips.split(",")));
|
||||||
status.setHost(host);
|
status.setHost(host);
|
||||||
|
status.setContainerSubState(getContainerSubState());
|
||||||
return status;
|
return status;
|
||||||
} finally {
|
} finally {
|
||||||
this.readLock.unlock();
|
this.readLock.unlock();
|
||||||
|
|
|
@ -18,7 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* States used by the container state machine.
|
||||||
|
*/
|
||||||
public enum ContainerState {
|
public enum ContainerState {
|
||||||
|
// NOTE: In case of future additions / deletions / modifications to this
|
||||||
|
// enum, please ensure that the following are also correspondingly
|
||||||
|
// updated:
|
||||||
|
// 1. ContainerImpl::getContainerSubState().
|
||||||
|
// 2. the doc in the ContainerSubState class.
|
||||||
|
// 3. the doc in the yarn_protos.proto file.
|
||||||
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
|
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
|
||||||
REINITIALIZING, REINITIALIZING_AWAITING_KILL,
|
REINITIALIZING, REINITIALIZING_AWAITING_KILL,
|
||||||
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
|
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
|
||||||
|
|
|
@ -159,7 +159,7 @@ public class TestEventFlow {
|
||||||
containerManager.startContainers(allRequests);
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
|
BaseContainerManagerTest.waitForContainerState(containerManager, cID,
|
||||||
Arrays.asList(ContainerState.RUNNING, ContainerState.SCHEDULED), 20);
|
Arrays.asList(ContainerState.RUNNING), 20);
|
||||||
|
|
||||||
List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
||||||
containerIds.add(cID);
|
containerIds.add(cID);
|
||||||
|
|
|
@ -256,8 +256,7 @@ public class TestNodeManagerShutdown {
|
||||||
GetContainerStatusesRequest.newInstance(containerIds);
|
GetContainerStatusesRequest.newInstance(containerIds);
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
|
containerManager.getContainerStatuses(request).getContainerStatuses().get(0);
|
||||||
Assert.assertTrue(
|
Assert.assertTrue(EnumSet.of(ContainerState.RUNNING)
|
||||||
EnumSet.of(ContainerState.RUNNING, ContainerState.SCHEDULED)
|
|
||||||
.contains(containerStatus.getState()));
|
.contains(containerStatus.getState()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerSubState;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -311,9 +312,8 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
List<ContainerStatus> containerStatuses = containerManager
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.SCHEDULED,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerScheduler containerScheduler =
|
ContainerScheduler containerScheduler =
|
||||||
|
@ -378,13 +378,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
if (status.getContainerId().equals(createContainerId(0))) {
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.RUNNING,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.SCHEDULED,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,17 +456,15 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
if (status.getContainerId().equals(createContainerId(0))) {
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.RUNNING,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
} else if (status.getContainerId().equals(createContainerId(
|
} else if (status.getContainerId().equals(createContainerId(
|
||||||
maxOppQueueLength + 1))) {
|
maxOppQueueLength + 1))) {
|
||||||
Assert.assertTrue(status.getDiagnostics().contains(
|
Assert.assertTrue(status.getDiagnostics().contains(
|
||||||
"Opportunistic container queue is full"));
|
"Opportunistic container queue is full"));
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.SCHEDULED,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
}
|
}
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
@ -545,13 +541,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
Assert.assertTrue(status.getDiagnostics().contains(
|
Assert.assertTrue(status.getDiagnostics().contains(
|
||||||
"Container Killed to make room for Guaranteed Container"));
|
"Container Killed to make room for Guaranteed Container"));
|
||||||
} else if (status.getContainerId().equals(createContainerId(1))) {
|
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.SCHEDULED,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
} else if (status.getContainerId().equals(createContainerId(2))) {
|
} else if (status.getContainerId().equals(createContainerId(2))) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.RUNNING,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
}
|
}
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
@ -799,8 +793,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
"Container De-queued to meet NM queuing limits")) {
|
"Container De-queued to meet NM queuing limits")) {
|
||||||
deQueuedContainers++;
|
deQueuedContainers++;
|
||||||
}
|
}
|
||||||
if (status.getState() ==
|
if (ContainerSubState.SCHEDULED == status.getContainerSubState()) {
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
|
|
||||||
numQueuedOppContainers++;
|
numQueuedOppContainers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1079,11 +1072,9 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
List<ContainerStatus> containerStatuses = containerManager
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
if (status.getState() ==
|
if (ContainerSubState.RUNNING == status.getContainerSubState()) {
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
|
||||||
runningContainersNo++;
|
runningContainersNo++;
|
||||||
} else if (status.getState() ==
|
} else if (ContainerSubState.SCHEDULED == status.getContainerSubState()) {
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) {
|
|
||||||
queuedContainersNo++;
|
queuedContainersNo++;
|
||||||
}
|
}
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
|
@ -1106,34 +1097,27 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
statRequest = GetContainerStatusesRequest.newInstance(statList);
|
statRequest = GetContainerStatusesRequest.newInstance(statList);
|
||||||
HashMap<org.apache.hadoop.yarn.api.records.ContainerState, ContainerStatus>
|
HashMap<ContainerSubState, ContainerStatus> map = new HashMap<>();
|
||||||
map = new HashMap<>();
|
|
||||||
for (int i=0; i < 10; i++) {
|
for (int i=0; i < 10; i++) {
|
||||||
containerStatuses = containerManager.getContainerStatuses(statRequest)
|
containerStatuses = containerManager.getContainerStatuses(statRequest)
|
||||||
.getContainerStatuses();
|
.getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
map.put(status.getState(), status);
|
map.put(status.getContainerSubState(), status);
|
||||||
if (map.containsKey(
|
if (map.containsKey(ContainerSubState.RUNNING) &&
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) &&
|
map.containsKey(ContainerSubState.SCHEDULED) &&
|
||||||
map.containsKey(
|
map.containsKey(ContainerSubState.DONE)) {
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED) &&
|
|
||||||
map.containsKey(
|
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Assert.assertEquals(createContainerId(0),
|
Assert.assertEquals(createContainerId(0),
|
||||||
map.get(org.apache.hadoop.yarn.api.records.ContainerState.RUNNING)
|
map.get(ContainerSubState.RUNNING).getContainerId());
|
||||||
.getContainerId());
|
|
||||||
Assert.assertEquals(createContainerId(1),
|
Assert.assertEquals(createContainerId(1),
|
||||||
map.get(org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE)
|
map.get(ContainerSubState.DONE).getContainerId());
|
||||||
.getContainerId());
|
|
||||||
Assert.assertEquals(createContainerId(2),
|
Assert.assertEquals(createContainerId(2),
|
||||||
map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
|
map.get(ContainerSubState.SCHEDULED).getContainerId());
|
||||||
.getContainerId());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1186,13 +1170,11 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
if (status.getContainerId().equals(createContainerId(0))) {
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.RUNNING,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
} else {
|
} else {
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(ContainerSubState.SCHEDULED,
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
|
status.getContainerSubState());
|
||||||
status.getState());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -856,8 +856,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
containers = startEvent.getNMContainerStatuses();
|
containers = startEvent.getNMContainerStatuses();
|
||||||
if (containers != null && !containers.isEmpty()) {
|
if (containers != null && !containers.isEmpty()) {
|
||||||
for (NMContainerStatus container : containers) {
|
for (NMContainerStatus container : containers) {
|
||||||
if (container.getContainerState() == ContainerState.RUNNING ||
|
if (container.getContainerState() == ContainerState.RUNNING) {
|
||||||
container.getContainerState() == ContainerState.SCHEDULED) {
|
|
||||||
rmNode.launchedContainers.add(container.getContainerId());
|
rmNode.launchedContainers.add(container.getContainerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1412,8 +1411,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process running containers
|
// Process running containers
|
||||||
if (remoteContainer.getState() == ContainerState.RUNNING ||
|
if (remoteContainer.getState() == ContainerState.RUNNING) {
|
||||||
remoteContainer.getState() == ContainerState.SCHEDULED) {
|
|
||||||
++numRemoteRunningContainers;
|
++numRemoteRunningContainers;
|
||||||
if (!launchedContainers.contains(containerId)) {
|
if (!launchedContainers.contains(containerId)) {
|
||||||
// Just launched container. RM knows about it the first time.
|
// Just launched container. RM knows about it the first time.
|
||||||
|
|
|
@ -1982,7 +1982,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
|
ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
|
||||||
ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
|
ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
|
||||||
NMContainerStatus queuedOpp =
|
NMContainerStatus queuedOpp =
|
||||||
NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED,
|
NMContainerStatus.newInstance(c1, 1, ContainerState.RUNNING,
|
||||||
Resource.newInstance(1024, 1), "Dummy Queued OC",
|
Resource.newInstance(1024, 1), "Dummy Queued OC",
|
||||||
ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
|
ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
|
||||||
ExecutionType.OPPORTUNISTIC);
|
ExecutionType.OPPORTUNISTIC);
|
||||||
|
|
Loading…
Reference in New Issue