YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to support container resizing. Contributed by Meng Ding

(cherry picked from commit c3dc1af072)
This commit is contained in:
Jian He 2015-08-20 21:04:14 -07:00 committed by Wangda Tan
parent b8955d81c5
commit b6c594c6ea
17 changed files with 627 additions and 89 deletions

View File

@ -160,6 +160,9 @@ Release 2.8.0 - UNRELEASED
YARN-1643. Make ContainersMonitor support changing monitoring size of an YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan) allocated container. (Meng Ding and Wangda Tan)
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
support container resizing. (Meng Ding via jianhe)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
failoverThread = createAndStartFailoverThread(); failoverThread = createAndStartFailoverThread();
NodeStatus status = NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null, null, null); null, null, null, null, null);
NodeHeartbeatRequest request2 = NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null); NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2); resourceTracker.nodeHeartbeat(request2);

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -70,4 +71,7 @@ public interface NodeHeartbeatResponse {
boolean getAreNodeLabelsAcceptedByRM(); boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
List<Container> getContainersToDecrease();
void addAllContainersToDecrease(List<Container> containersToDecrease);
} }

View File

@ -27,12 +27,15 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@ -58,7 +61,9 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null; private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null; private MasterKey nmTokenMasterKey = null;
private List<Container> containersToDecrease = null;
public NodeHeartbeatResponsePBImpl() { public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder(); builder = NodeHeartbeatResponseProto.newBuilder();
} }
@ -96,6 +101,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.systemCredentials != null) { if (this.systemCredentials != null) {
addSystemCredentialsToProto(); addSystemCredentialsToProto();
} }
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
} }
private void addSystemCredentialsToProto() { private void addSystemCredentialsToProto() {
@ -408,6 +416,64 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAllApplicationsToCleanup(iterable); builder.addAllApplicationsToCleanup(iterable);
} }
private void initContainersToDecrease() {
if (this.containersToDecrease != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersToDecreaseList();
this.containersToDecrease = new ArrayList<>();
for (ContainerProto c : list) {
this.containersToDecrease.add(convertFromProtoFormat(c));
}
}
@Override
public List<Container> getContainersToDecrease() {
initContainersToDecrease();
return this.containersToDecrease;
}
@Override
public void addAllContainersToDecrease(
final List<Container> containersToDecrease) {
if (containersToDecrease == null) {
return;
}
initContainersToDecrease();
this.containersToDecrease.addAll(containersToDecrease);
}
private void addContainersToDecreaseToProto() {
maybeInitBuilder();
builder.clearContainersToDecrease();
if (this.containersToDecrease == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter = containersToDecrease.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToDecrease(iterable);
}
@Override @Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() { public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
@ -484,6 +550,14 @@ public class NodeHeartbeatResponsePBImpl extends
return ((MasterKeyPBImpl) t).getProto(); return ((MasterKeyPBImpl) t).getProto();
} }
private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}
private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto();
}
@Override @Override
public boolean getAreNodeLabelsAcceptedByRM() { public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p = NodeHeartbeatResponseProtoOrBuilder p =

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -48,6 +49,7 @@ public abstract class NodeStatus {
* @param nodeHealthStatus Health status of the node. * @param nodeHealthStatus Health status of the node.
* @param containersUtilization Utilization of the containers in this node. * @param containersUtilization Utilization of the containers in this node.
* @param nodeUtilization Utilization of the node. * @param nodeUtilization Utilization of the node.
* @param increasedContainers Containers whose resource has been increased.
* @return New {@code NodeStatus} with the provided information. * @return New {@code NodeStatus} with the provided information.
*/ */
public static NodeStatus newInstance(NodeId nodeId, int responseId, public static NodeStatus newInstance(NodeId nodeId, int responseId,
@ -55,7 +57,8 @@ public abstract class NodeStatus {
List<ApplicationId> keepAliveApplications, List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus, NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization, ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) { ResourceUtilization nodeUtilization,
List<Container> increasedContainers) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId); nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId); nodeStatus.setNodeId(nodeId);
@ -64,6 +67,7 @@ public abstract class NodeStatus {
nodeStatus.setNodeHealthStatus(nodeHealthStatus); nodeStatus.setNodeHealthStatus(nodeHealthStatus);
nodeStatus.setContainersUtilization(containersUtilization); nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization); nodeStatus.setNodeUtilization(nodeUtilization);
nodeStatus.setIncreasedContainers(increasedContainers);
return nodeStatus; return nodeStatus;
} }
@ -108,4 +112,13 @@ public abstract class NodeStatus {
@Unstable @Unstable
public abstract void setNodeUtilization( public abstract void setNodeUtilization(
ResourceUtilization nodeUtilization); ResourceUtilization nodeUtilization);
@Public
@Unstable
public abstract List<Container> getIncreasedContainers();
@Private
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);
} }

View File

@ -24,13 +24,16 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
@ -49,7 +52,8 @@ public class NodeStatusPBImpl extends NodeStatus {
private List<ContainerStatus> containers = null; private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null; private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null; private List<ApplicationId> keepAliveApplications = null;
private List<Container> increasedContainers = null;
public NodeStatusPBImpl() { public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder(); builder = NodeStatusProto.newBuilder();
} }
@ -79,6 +83,9 @@ public class NodeStatusPBImpl extends NodeStatus {
if (this.keepAliveApplications != null) { if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto(); addKeepAliveApplicationsToProto();
} }
if (this.increasedContainers != null) {
addIncreasedContainersToProto();
}
} }
private synchronized void mergeLocalToProto() { private synchronized void mergeLocalToProto() {
@ -165,6 +172,37 @@ public class NodeStatusPBImpl extends NodeStatus {
builder.addAllKeepAliveApplications(iterable); builder.addAllKeepAliveApplications(iterable);
} }
private synchronized void addIncreasedContainersToProto() {
maybeInitBuilder();
builder.clearIncreasedContainers();
if (increasedContainers == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter =
increasedContainers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllIncreasedContainers(iterable);
}
@Override @Override
public int hashCode() { public int hashCode() {
return getProto().hashCode(); return getProto().hashCode();
@ -336,6 +374,31 @@ public class NodeStatusPBImpl extends NodeStatus {
.setNodeUtilization(convertToProtoFormat(nodeUtilization)); .setNodeUtilization(convertToProtoFormat(nodeUtilization));
} }
@Override
public synchronized List<Container> getIncreasedContainers() {
if (increasedContainers != null) {
return increasedContainers;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getIncreasedContainersList();
this.increasedContainers = new ArrayList<>();
for (ContainerProto c : list) {
this.increasedContainers.add(convertFromProtoFormat(c));
}
return this.increasedContainers;
}
@Override
public synchronized void setIncreasedContainers(
List<Container> increasedContainers) {
maybeInitBuilder();
if (increasedContainers == null) {
builder.clearIncreasedContainers();
return;
}
this.increasedContainers = increasedContainers;
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) { private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto(); return ((NodeIdPBImpl)nodeId).getProto();
} }
@ -377,4 +440,14 @@ public class NodeStatusPBImpl extends NodeStatus {
ResourceUtilizationProto p) { ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p); return new ResourceUtilizationPBImpl(p);
} }
private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);
}
private ContainerProto convertToProtoFormat(
Container c) {
return ((ContainerPBImpl)c).getProto();
}
} }

View File

@ -38,6 +38,7 @@ message NodeStatusProto {
repeated ApplicationIdProto keep_alive_applications = 5; repeated ApplicationIdProto keep_alive_applications = 5;
optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7; optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
} }
message MasterKeyProto { message MasterKeyProto {
@ -60,4 +61,4 @@ message ResourceUtilizationProto {
optional int32 pmem = 1; optional int32 pmem = 1;
optional int32 vmem = 2; optional int32 vmem = 2;
optional float cpu = 3; optional float cpu = 3;
} }

View File

@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
} }
message SystemCredentialsForAppsProto { message SystemCredentialsForAppsProto {

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
@ -168,6 +169,20 @@ public class TestYarnServerApiClasses {
assertTrue(copy.getAreNodeLabelsAcceptedByRM()); assertTrue(copy.getAreNodeLabelsAcceptedByRM());
} }
@Test
public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
original.addAllContainersToDecrease(
Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
getDecreasedContainer(2, 3, 1024, 1)));
NodeHeartbeatResponsePBImpl copy =
new NodeHeartbeatResponsePBImpl(original.getProto());
assertEquals(1, copy.getContainersToDecrease().get(0)
.getId().getContainerId());
assertEquals(1024, copy.getContainersToDecrease().get(1)
.getResource().getMemory());
}
/** /**
* Test RegisterNodeManagerRequestPBImpl. * Test RegisterNodeManagerRequestPBImpl.
*/ */
@ -244,6 +259,9 @@ public class TestYarnServerApiClasses {
original.setNodeHealthStatus(getNodeHealthStatus()); original.setNodeHealthStatus(getNodeHealthStatus());
original.setNodeId(getNodeId()); original.setNodeId(getNodeId());
original.setResponseId(1); original.setResponseId(1);
original.setIncreasedContainers(
Arrays.asList(getIncreasedContainer(1, 2, 2048, 2),
getIncreasedContainer(2, 3, 4096, 3)));
NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto()); NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto());
assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId() assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId()
@ -252,7 +270,10 @@ public class TestYarnServerApiClasses {
assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime()); assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime());
assertEquals(9090, copy.getNodeId().getPort()); assertEquals(9090, copy.getNodeId().getPort());
assertEquals(1, copy.getResponseId()); assertEquals(1, copy.getResponseId());
assertEquals(1, copy.getIncreasedContainers().get(0)
.getId().getContainerId());
assertEquals(4096, copy.getIncreasedContainers().get(1)
.getResource().getMemory());
} }
@Test @Test
@ -347,6 +368,22 @@ public class TestYarnServerApiClasses {
return new ApplicationIdPBImpl(appId.getProto()); return new ApplicationIdPBImpl(appId.getProto());
} }
private Container getDecreasedContainer(int containerID,
int appAttemptId, int memory, int vCores) {
ContainerId containerId = getContainerId(containerID, appAttemptId);
Resource capability = Resource.newInstance(memory, vCores);
return Container.newInstance(
containerId, null, null, capability, null, null);
}
private Container getIncreasedContainer(int containerID,
int appAttemptId, int memory, int vCores) {
ContainerId containerId = getContainerId(containerID, appAttemptId);
Resource capability = Resource.newInstance(memory, vCores);
return Container.newInstance(
containerId, null, null, capability, null, null);
}
private NodeStatus getNodeStatus() { private NodeStatus getNodeStatus() {
NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class); NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
status.setContainersStatuses(new ArrayList<ContainerStatus>()); status.setContainersStatuses(new ArrayList<ContainerStatus>());

View File

@ -62,6 +62,9 @@ public interface Context {
ConcurrentMap<ContainerId, Container> getContainers(); ConcurrentMap<ContainerId, Container> getContainers();
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers();
NMContainerTokenSecretManager getContainerTokenSecretManager(); NMContainerTokenSecretManager getContainerTokenSecretManager();
NMTokenSecretManagerInNM getNMTokenSecretManager(); NMTokenSecretManagerInNM getNMTokenSecretManager();

View File

@ -439,6 +439,10 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers = protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>(); new ConcurrentSkipListMap<ContainerId, Container>();
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
new ConcurrentHashMap<>();
private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager;
private ContainerManagementProtocol containerManager; private ContainerManagementProtocol containerManager;
@ -492,6 +496,12 @@ public class NodeManager extends CompositeService
return this.containers; return this.containers;
} }
@Override
public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers() {
return this.increasedContainers;
}
@Override @Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() { public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager; return this.containerTokenSecretManager;

View File

@ -310,18 +310,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting @VisibleForTesting
protected void registerWithRM() protected void registerWithRM()
throws YarnException, IOException { throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses(); RegisterNodeManagerResponse regNMResponse;
Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, // Synchronize NM-RM registration with
nodeManagerVersionId, containerReports, getRunningApplications(), // ContainerManagerImpl#increaseContainersResource and
nodeLabels); // ContainerManagerImpl#startContainers to avoid race condition
if (containerReports != null) { // during RM recovery
LOG.info("Registering with RM using containers :" + containerReports); synchronized (this.context) {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
regNMResponse =
resourceTracker.registerNodeManager(request);
// Make sure rmIdentifier is set before we release the lock
this.rmIdentifier = regNMResponse.getRMIdentifier();
} }
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resource Manager instructs NM to shutdown. // if the Resource Manager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message = String message =
@ -418,10 +428,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
List<ContainerStatus> containersStatuses = getContainerStatuses(); List<ContainerStatus> containersStatuses = getContainerStatuses();
ResourceUtilization containersUtilization = getContainersUtilization(); ResourceUtilization containersUtilization = getContainersUtilization();
ResourceUtilization nodeUtilization = getNodeUtilization(); ResourceUtilization nodeUtilization = getNodeUtilization();
List<org.apache.hadoop.yarn.api.records.Container> increasedContainers
= getIncreasedContainers();
NodeStatus nodeStatus = NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses, NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus, createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization); containersUtilization, nodeUtilization, increasedContainers);
return nodeStatus; return nodeStatus;
} }
@ -448,6 +460,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return nodeResourceMonitor.getUtilization(); return nodeResourceMonitor.getUtilization();
} }
/* Get the containers whose resource has been increased since last
* NM-RM heartbeat.
*/
private List<org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers() {
List<org.apache.hadoop.yarn.api.records.Container>
increasedContainers = new ArrayList<>(
this.context.getIncreasedContainers().values());
for (org.apache.hadoop.yarn.api.records.Container
container : increasedContainers) {
this.context.getIncreasedContainers().remove(container.getId());
}
return increasedContainers;
}
// Iterate through the NMContext and clone and get all the containers' // Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the // statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections. // recentlyStoppedContainers collections.
@ -765,6 +792,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
((NMContext) context) ((NMContext) context)
.setSystemCrendentialsForApps(parseCredentials(systemCredentials)); .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
} }
List<org.apache.hadoop.yarn.api.records.Container>
containersToDecrease = response.getContainersToDecrease();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
);
}
} catch (ConnectException e) { } catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM //catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(

View File

@ -563,8 +563,7 @@ public class ContainerManagerImpl extends CompositeService implements
List<ApplicationId> appIds = List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet()); new ArrayList<ApplicationId>(applications.keySet());
this.handle( this.handle(new CMgrCompletedAppsEvent(appIds,
new CMgrCompletedAppsEvent(appIds,
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for Applications to be Finished"); LOG.info("Waiting for Applications to be Finished");
@ -584,8 +583,8 @@ public class ContainerManagerImpl extends CompositeService implements
if (applications.isEmpty()) { if (applications.isEmpty()) {
LOG.info("All applications in FINISHED state"); LOG.info("All applications in FINISHED state");
} else { } else {
LOG.info("Done waiting for Applications to be Finished. Still alive: " + LOG.info("Done waiting for Applications to be Finished. Still alive: "
applications.keySet()); + applications.keySet());
} }
} }
@ -759,13 +758,12 @@ public class ContainerManagerImpl extends CompositeService implements
* Start a list of containers on this NodeManager. * Start a list of containers on this NodeManager.
*/ */
@Override @Override
public StartContainersResponse public StartContainersResponse startContainers(
startContainers(StartContainersRequest requests) throws YarnException, StartContainersRequest requests) throws YarnException, IOException {
IOException {
if (blockNewContainerRequests.get()) { if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException( throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not" "Rejecting new containers as NodeManager has not"
+ " yet connected with ResourceManager"); + " yet connected with ResourceManager");
} }
UserGroupInformation remoteUgi = getRemoteUgi(); UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
@ -773,42 +771,50 @@ public class ContainerManagerImpl extends CompositeService implements
List<ContainerId> succeededContainers = new ArrayList<ContainerId>(); List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers = Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>(); new HashMap<ContainerId, SerializedException>();
for (StartContainerRequest request : requests.getStartContainerRequests()) { // Synchronize with NodeStatusUpdaterImpl#registerWithRM
ContainerId containerId = null; // to avoid race condition during NM-RM resync (due to RM restart) while a
try { // container is being started, in particular when the container has not yet
if (request.getContainerToken() == null || // been added to the containers map in NMContext.
request.getContainerToken().getIdentifier() == null) { synchronized (this.context) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG); for (StartContainerRequest request : requests
} .getStartContainerRequests()) {
ContainerTokenIdentifier containerTokenIdentifier = ContainerId containerId = null;
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); try {
verifyAndGetContainerTokenIdentifier(request.getContainerToken(), if (request.getContainerToken() == null
containerTokenIdentifier); || request.getContainerToken().getIdentifier() == null) {
containerId = containerTokenIdentifier.getContainerID(); throw new IOException(INVALID_CONTAINERTOKEN_MSG);
}
// Initialize the AMRMProxy service instance only if the container is of ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
// type AM and if the AMRMProxy service is enabled .newContainerTokenIdentifier(request.getContainerToken());
if (isARMRMProxyEnabled() verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
&& containerTokenIdentifier.getContainerType().equals( containerTokenIdentifier);
ContainerType.APPLICATION_MASTER)) { containerId = containerTokenIdentifier.getContainerID();
this.amrmProxyService.processApplicationStartRequest(request);
}
startContainerInternal(nmTokenIdentifier, // Initialize the AMRMProxy service instance only if the container is of
containerTokenIdentifier, request); // type AM and if the AMRMProxy service is enabled
succeededContainers.add(containerId); if (isARMRMProxyEnabled() && containerTokenIdentifier
} catch (YarnException e) { .getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
failedContainers.put(containerId, SerializedException.newInstance(e)); this.amrmProxyService.processApplicationStartRequest(request);
} catch (InvalidToken ie) { }
failedContainers.put(containerId, SerializedException.newInstance(ie));
throw ie; startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
} catch (IOException e) { request);
throw RPCUtil.getRemoteException(e); succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (InvalidToken ie) {
failedContainers
.put(containerId, SerializedException.newInstance(ie));
throw ie;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
} }
return StartContainersResponse
.newInstance(getAuxServiceMetaData(), succeededContainers,
failedContainers);
} }
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
succeededContainers, failedContainers);
} }
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@ -959,7 +965,7 @@ public class ContainerManagerImpl extends CompositeService implements
InvalidToken { InvalidToken {
byte[] password = byte[] password =
context.getContainerTokenSecretManager().retrievePassword( context.getContainerTokenSecretManager().retrievePassword(
containerTokenIdentifier); containerTokenIdentifier);
byte[] tokenPass = token.getPassword().array(); byte[] tokenPass = token.getPassword().array();
if (password == null || tokenPass == null if (password == null || tokenPass == null
|| !Arrays.equals(password, tokenPass)) { || !Arrays.equals(password, tokenPass)) {
@ -989,32 +995,39 @@ public class ContainerManagerImpl extends CompositeService implements
= new ArrayList<ContainerId>(); = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers = Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>(); new HashMap<ContainerId, SerializedException>();
// Process container resource increase requests // Synchronize with NodeStatusUpdaterImpl#registerWithRM
for (org.apache.hadoop.yarn.api.records.Token token : // to avoid race condition during NM-RM resync (due to RM restart) while a
requests.getContainersToIncrease()) { // container resource is being increased in NM, in particular when the
ContainerId containerId = null; // increased container has not yet been added to the increasedContainers
try { // map in NMContext.
if (token.getIdentifier() == null) { synchronized (this.context) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG); // Process container resource increase requests
for (org.apache.hadoop.yarn.api.records.Token token :
requests.getContainersToIncrease()) {
ContainerId containerId = null;
try {
if (token.getIdentifier() == null) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG);
}
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(token);
verifyAndGetContainerTokenIdentifier(token,
containerTokenIdentifier);
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, false);
containerId = containerTokenIdentifier.getContainerID();
// Reuse the startContainer logic to update NMToken,
// as container resource increase request will have come with
// an updated NMToken.
updateNMTokenIdentifier(nmTokenIdentifier);
Resource resource = containerTokenIdentifier.getResource();
changeContainerResourceInternal(containerId, resource, true);
successfullyIncreasedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
} }
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(token);
verifyAndGetContainerTokenIdentifier(token,
containerTokenIdentifier);
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, false);
containerId = containerTokenIdentifier.getContainerID();
// Reuse the startContainer logic to update NMToken,
// as container resource increase request will have come with
// an updated NMToken.
updateNMTokenIdentifier(nmTokenIdentifier);
Resource resource = containerTokenIdentifier.getResource();
changeContainerResourceInternal(containerId, resource, true);
successfullyIncreasedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
} }
} }
return IncreaseContainersResourceResponse.newInstance( return IncreaseContainersResourceResponse.newInstance(
@ -1075,6 +1088,16 @@ public class ContainerManagerImpl extends CompositeService implements
+ " is not smaller than the current resource " + " is not smaller than the current resource "
+ currentResource.toString()); + currentResource.toString());
} }
if (increase) {
org.apache.hadoop.yarn.api.records.Container increasedContainer =
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, targetResource, null, null);
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " resource is being increased.");
}
}
this.readLock.lock(); this.readLock.lock();
try { try {
if (!serviceStopped) { if (!serviceStopped) {

View File

@ -18,21 +18,35 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -41,8 +55,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@ -50,6 +69,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -57,12 +78,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -87,7 +111,10 @@ public class TestNodeManagerResync {
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent = private final NodeManagerEvent resyncEvent =
new NodeManagerEvent(NodeManagerEventType.RESYNC); new NodeManagerEvent(NodeManagerEventType.RESYNC);
private final long DUMMY_RM_IDENTIFIER = 1234;
protected static Log LOG = LogFactory
.getLog(TestNodeManagerResync.class);
@Before @Before
public void setup() throws UnsupportedFileSystemException { public void setup() throws UnsupportedFileSystemException {
@ -209,6 +236,32 @@ public class TestNodeManagerResync {
nm.stop(); nm.stop();
} }
@SuppressWarnings("unchecked")
@Test(timeout=60000)
public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
throws IOException, InterruptedException, YarnException {
NodeManager nm = new TestNodeManager4();
YarnConfiguration conf = createNMConfig();
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
nm.init(conf);
nm.start();
// Start a container and make sure it is in RUNNING state
((TestNodeManager4)nm).startContainer();
// Simulate a container resource increase in a separate thread
((TestNodeManager4)nm).increaseContainersResource();
// Simulate RM restart by sending a RESYNC event
LOG.info("Sending out RESYNC event");
nm.getNMDispatcher().getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}
// This is to test when NM gets the resync response from last heart beat, it // This is to test when NM gets the resync response from last heart beat, it
// should be able to send the already-sent-via-last-heart-beat container // should be able to send the already-sent-via-last-heart-beat container
@ -588,6 +641,211 @@ public class TestNodeManagerResync {
} }
}} }}
class TestNodeManager4 extends NodeManager {
private Thread increaseContainerResourceThread = null;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterImpl4(context, dispatcher,
healthChecker, metrics);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(
ContainerId containerId, Container container,
boolean stopRequest, NMTokenIdentifier identifier)
throws YarnException {
// do nothing
}
@Override
protected void authorizeUser(UserGroupInformation remoteUgi,
NMTokenIdentifier nmTokenIdentifier) {
// do nothing
}
@Override
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
boolean startRequest) throws YarnException {
try {
// Sleep 2 seconds to simulate a pro-longed increase action.
// If during this time a RESYNC event is sent by RM, the
// resync action should block until the increase action is
// completed.
// See testContainerResourceIncreaseIsSynchronizedWithRMResync()
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void updateNMTokenIdentifier(
NMTokenIdentifier nmTokenIdentifier)
throws SecretManager.InvalidToken {
// Do nothing
}
@Override
public Map<String, ByteBuffer> getAuxServiceMetaData() {
return new HashMap<>();
}
@Override
protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
return new NMTokenIdentifier();
}
};
}
// Start a container in NM
public void startContainer()
throws IOException, InterruptedException, YarnException {
LOG.info("Start a container and wait until it is in RUNNING state");
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
Resource resource = Resource.newInstance(1024, 1);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
getContainerToken(resource));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
getContainerManager().startContainers(allRequests);
// Make sure the container reaches RUNNING state
ContainerId cId = TestContainerManager.createContainerId(0);
BaseContainerManagerTest.waitForNMContainerState(
getContainerManager(), cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
}
// Increase container resource in a thread
public void increaseContainersResource()
throws InterruptedException {
LOG.info("Increase a container resource in a separate thread");
increaseContainerResourceThread = new IncreaseContainersResourceThread();
increaseContainerResourceThread.start();
}
class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}
@Override
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
try {
try {
// Check status before registerWithRM
List<ContainerId> containerIds = new ArrayList<>();
ContainerId cId = TestContainerManager.createContainerId(0);
containerIds.add(cId);
GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus = getContainerManager()
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
assertEquals(Resource.newInstance(1024, 1),
containerStatus.getCapability());
// Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
// This function should be synchronized with
// increaseContainersResource().
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// Check status after registerWithRM
containerStatus = getContainerManager()
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
assertEquals(Resource.newInstance(4096, 2),
containerStatus.getCapability());
} catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
} finally {
syncBarrier.await();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class IncreaseContainersResourceThread extends Thread {
@Override
public void run() {
// Construct container resource increase request
List<Token> increaseTokens = new ArrayList<Token>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
try {
increaseTokens.add(getContainerToken(targetResource));
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
getContainerManager()
.increaseContainersResource(increaseRequest);
Assert.assertEquals(
1, increaseResponse.getSuccessfullyIncreasedContainers()
.size());
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private Token getContainerToken(Resource resource) throws IOException {
ContainerId cId = TestContainerManager.createContainerId(0);
return TestContainerManager.createContainerToken(
cId, DUMMY_RM_IDENTIFIER,
getNMContext().getNodeId(), user, resource,
getNMContext().getContainerTokenSecretManager(), null);
}
}
public static NMContainerStatus createNMContainerStatus(int id, public static NMContainerStatus createNMContainerStatus(int id,
ContainerState containerState) { ContainerState containerState) {
ApplicationId applicationId = ApplicationId.newInstance(0, 1); ApplicationId applicationId = ApplicationId.newInstance(0, 1);

View File

@ -619,6 +619,11 @@ public abstract class BaseAMRMProxyTest {
return null; return null;
} }
@Override
public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> getIncreasedContainers() {
return null;
}
@Override @Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() { public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return null; return null;

View File

@ -93,8 +93,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NMToken;
@ -292,8 +290,8 @@ public class MockResourceManagerFacade implements
new ArrayList<ContainerStatus>(), containerList, new ArrayList<ContainerStatus>(), containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(), new ArrayList<NMToken>(),
new ArrayList<ContainerResourceIncrease>(), new ArrayList<Container>(),
new ArrayList<ContainerResourceDecrease>()); new ArrayList<Container>());
} }
@Override @Override

View File

@ -108,7 +108,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
super.setup(); super.setup();
} }
private ContainerId createContainerId(int id) { public static ContainerId createContainerId(int id) {
ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId = ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1); ApplicationAttemptId.newInstance(appId, 1);