YARN-7173. Container update RM-NM communication fix for backward compatibility. (Arun Suresh via wangda)

Change-Id: I1c39ed5c59dee739ba5044b61b3ef5ed203b79c1
(cherry picked from commit e74d1be04b)
This commit is contained in:
Wangda Tan 2017-09-11 20:46:41 -07:00 committed by Arun Suresh
parent 13310f91a5
commit d2e3818823
6 changed files with 97 additions and 9 deletions

View File

@ -113,4 +113,9 @@ public abstract class NodeHeartbeatResponse {
public abstract void setContainerQueuingLimit(
ContainerQueuingLimit containerQueuingLimit);
public abstract List<Container> getContainersToDecrease();
public abstract void addAllContainersToDecrease(
Collection<Container> containersToDecrease);
}

View File

@ -80,6 +80,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
private MasterKey nmTokenMasterKey = null;
private ContainerQueuingLimit containerQueuingLimit = null;
private List<Container> containersToUpdate = null;
// NOTE: This is required for backward compatibility.
private List<Container> containersToDecrease = null;
private List<SignalContainerRequest> containersToSignal = null;
public NodeHeartbeatResponsePBImpl() {
@ -126,6 +128,9 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
if (this.containersToUpdate != null) {
addContainersToUpdateToProto();
}
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
if (this.containersToSignal != null) {
addContainersToSignalToProto();
}
@ -572,6 +577,66 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
builder.addAllContainersToUpdate(iterable);
}
private void initContainersToDecrease() {
if (this.containersToDecrease != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersToDecreaseList();
this.containersToDecrease = new ArrayList<>();
for (ContainerProto c : list) {
this.containersToDecrease.add(convertFromProtoFormat(c));
}
}
@Override
public List<Container> getContainersToDecrease() {
initContainersToDecrease();
return this.containersToDecrease;
}
@Override
public void addAllContainersToDecrease(
final Collection<Container> containersToDecrease) {
if (containersToDecrease == null) {
return;
}
initContainersToDecrease();
this.containersToDecrease.addAll(containersToDecrease);
}
private void addContainersToDecreaseToProto() {
maybeInitBuilder();
builder.clearContainersToDecrease();
if (this.containersToDecrease == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter = containersToDecrease.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToDecrease(iterable);
}
@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
if (this.systemCredentials != null) {

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -641,7 +642,8 @@ public class RMContainerImpl implements RMContainer {
new AllocationExpirationInfo(event.getContainerId()));
container.eventHandler.handle(new RMNodeUpdateContainerEvent(
container.nodeId,
Collections.singletonList(container.getContainer())));
Collections.singletonMap(container.getContainer(),
ContainerUpdateType.DECREASE_RESOURCE)));
} else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {
// If nmContainerResource < rmContainerResource, this is caused by the
// following sequence:

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -174,6 +175,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Map<ContainerId, Container> toBeUpdatedContainers =
new HashMap<>();
// NOTE: This is required for backward compatibility.
private final Map<ContainerId, Container> toBeDecreasedContainers =
new HashMap<>();
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
new HashMap<>();
@ -626,6 +631,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
try {
response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear();
// NOTE: This is required for backward compatibility.
response.addAllContainersToDecrease(toBeDecreasedContainers.values());
toBeDecreasedContainers.clear();
} finally {
this.writeLock.unlock();
}
@ -1043,8 +1052,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
for (Container c : de.getToBeUpdatedContainers()) {
rmNode.toBeUpdatedContainers.put(c.getId(), c);
for (Map.Entry<Container, ContainerUpdateType> e :
de.getToBeUpdatedContainers().entrySet()) {
// NOTE: This is required for backward compatibility.
if (ContainerUpdateType.DECREASE_RESOURCE == e.getValue()) {
rmNode.toBeDecreasedContainers.put(e.getKey().getId(), e.getKey());
}
rmNode.toBeUpdatedContainers.put(e.getKey().getId(), e.getKey());
}
}
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.NodeId;
/**
@ -29,16 +31,15 @@ import org.apache.hadoop.yarn.api.records.NodeId;
*
*/
public class RMNodeUpdateContainerEvent extends RMNodeEvent {
private List<Container> toBeUpdatedContainers;
private Map<Container, ContainerUpdateType> toBeUpdatedContainers;
public RMNodeUpdateContainerEvent(NodeId nodeId,
List<Container> toBeUpdatedContainers) {
Map<Container, ContainerUpdateType> toBeUpdatedContainers) {
super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
this.toBeUpdatedContainers = toBeUpdatedContainers;
}
public List<Container> getToBeUpdatedContainers() {
public Map<Container, ContainerUpdateType> getToBeUpdatedContainers() {
return toBeUpdatedContainers;
}
}

View File

@ -690,7 +690,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
if (autoUpdate) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
Collections.singletonList(rmContainer.getContainer())));
Collections.singletonMap(
rmContainer.getContainer(), updateType)));
} else {
rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
rmContainer.getContainerId(),