YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to support container resizing. Contributed by Meng Ding

This commit is contained in:
Jian He 2015-08-20 21:04:14 -07:00 committed by Wangda Tan
parent c59ae4eeb1
commit c3dc1af072
17 changed files with 627 additions and 89 deletions

View File

@ -215,6 +215,9 @@ Release 2.8.0 - UNRELEASED
YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan)
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
support container resizing. (Meng Ding via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null, null, null);
null, null, null, null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);

View File

@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -70,4 +71,7 @@ public interface NodeHeartbeatResponse {
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
List<Container> getContainersToDecrease();
void addAllContainersToDecrease(List<Container> containersToDecrease);
}

View File

@ -27,12 +27,15 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@ -59,6 +62,8 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
private List<Container> containersToDecrease = null;
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
}
@ -96,6 +101,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
}
private void addSystemCredentialsToProto() {
@ -408,6 +416,64 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAllApplicationsToCleanup(iterable);
}
private void initContainersToDecrease() {
if (this.containersToDecrease != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersToDecreaseList();
this.containersToDecrease = new ArrayList<>();
for (ContainerProto c : list) {
this.containersToDecrease.add(convertFromProtoFormat(c));
}
}
@Override
public List<Container> getContainersToDecrease() {
initContainersToDecrease();
return this.containersToDecrease;
}
@Override
public void addAllContainersToDecrease(
final List<Container> containersToDecrease) {
if (containersToDecrease == null) {
return;
}
initContainersToDecrease();
this.containersToDecrease.addAll(containersToDecrease);
}
private void addContainersToDecreaseToProto() {
maybeInitBuilder();
builder.clearContainersToDecrease();
if (this.containersToDecrease == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter = containersToDecrease.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToDecrease(iterable);
}
@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
@ -484,6 +550,14 @@ public class NodeHeartbeatResponsePBImpl extends
return ((MasterKeyPBImpl) t).getProto();
}
private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}
private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto();
}
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
@ -48,6 +49,7 @@ public abstract class NodeStatus {
* @param nodeHealthStatus Health status of the node.
* @param containersUtilization Utilization of the containers in this node.
* @param nodeUtilization Utilization of the node.
* @param increasedContainers Containers whose resource has been increased.
* @return New {@code NodeStatus} with the provided information.
*/
public static NodeStatus newInstance(NodeId nodeId, int responseId,
@ -55,7 +57,8 @@ public abstract class NodeStatus {
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization,
List<Container> increasedContainers) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
@ -64,6 +67,7 @@ public abstract class NodeStatus {
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization);
nodeStatus.setIncreasedContainers(increasedContainers);
return nodeStatus;
}
@ -108,4 +112,13 @@ public abstract class NodeStatus {
@Unstable
public abstract void setNodeUtilization(
ResourceUtilization nodeUtilization);
@Public
@Unstable
public abstract List<Container> getIncreasedContainers();
@Private
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);
}

View File

@ -24,13 +24,16 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
@ -49,6 +52,7 @@ public class NodeStatusPBImpl extends NodeStatus {
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null;
private List<Container> increasedContainers = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@ -79,6 +83,9 @@ public class NodeStatusPBImpl extends NodeStatus {
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
if (this.increasedContainers != null) {
addIncreasedContainersToProto();
}
}
private synchronized void mergeLocalToProto() {
@ -165,6 +172,37 @@ public class NodeStatusPBImpl extends NodeStatus {
builder.addAllKeepAliveApplications(iterable);
}
private synchronized void addIncreasedContainersToProto() {
maybeInitBuilder();
builder.clearIncreasedContainers();
if (increasedContainers == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter =
increasedContainers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllIncreasedContainers(iterable);
}
@Override
public int hashCode() {
return getProto().hashCode();
@ -336,6 +374,31 @@ public class NodeStatusPBImpl extends NodeStatus {
.setNodeUtilization(convertToProtoFormat(nodeUtilization));
}
@Override
public synchronized List<Container> getIncreasedContainers() {
if (increasedContainers != null) {
return increasedContainers;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getIncreasedContainersList();
this.increasedContainers = new ArrayList<>();
for (ContainerProto c : list) {
this.increasedContainers.add(convertFromProtoFormat(c));
}
return this.increasedContainers;
}
@Override
public synchronized void setIncreasedContainers(
List<Container> increasedContainers) {
maybeInitBuilder();
if (increasedContainers == null) {
builder.clearIncreasedContainers();
return;
}
this.increasedContainers = increasedContainers;
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@ -377,4 +440,14 @@ public class NodeStatusPBImpl extends NodeStatus {
ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}
private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);
}
private ContainerProto convertToProtoFormat(
Container c) {
return ((ContainerPBImpl)c).getProto();
}
}

View File

@ -38,6 +38,7 @@ message NodeStatusProto {
repeated ApplicationIdProto keep_alive_applications = 5;
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
}
message MasterKeyProto {

View File

@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
}
message SystemCredentialsForAppsProto {

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -168,6 +169,20 @@ public class TestYarnServerApiClasses {
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
@Test
public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
original.addAllContainersToDecrease(
Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
getDecreasedContainer(2, 3, 1024, 1)));
NodeHeartbeatResponsePBImpl copy =
new NodeHeartbeatResponsePBImpl(original.getProto());
assertEquals(1, copy.getContainersToDecrease().get(0)
.getId().getContainerId());
assertEquals(1024, copy.getContainersToDecrease().get(1)
.getResource().getMemory());
}
/**
* Test RegisterNodeManagerRequestPBImpl.
*/
@ -244,6 +259,9 @@ public class TestYarnServerApiClasses {
original.setNodeHealthStatus(getNodeHealthStatus());
original.setNodeId(getNodeId());
original.setResponseId(1);
original.setIncreasedContainers(
Arrays.asList(getIncreasedContainer(1, 2, 2048, 2),
getIncreasedContainer(2, 3, 4096, 3)));
NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto());
assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId()
@ -252,7 +270,10 @@ public class TestYarnServerApiClasses {
assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime());
assertEquals(9090, copy.getNodeId().getPort());
assertEquals(1, copy.getResponseId());
assertEquals(1, copy.getIncreasedContainers().get(0)
.getId().getContainerId());
assertEquals(4096, copy.getIncreasedContainers().get(1)
.getResource().getMemory());
}
@Test
@ -347,6 +368,22 @@ public class TestYarnServerApiClasses {
return new ApplicationIdPBImpl(appId.getProto());
}
private Container getDecreasedContainer(int containerID,
int appAttemptId, int memory, int vCores) {
ContainerId containerId = getContainerId(containerID, appAttemptId);
Resource capability = Resource.newInstance(memory, vCores);
return Container.newInstance(
containerId, null, null, capability, null, null);
}
private Container getIncreasedContainer(int containerID,
int appAttemptId, int memory, int vCores) {
ContainerId containerId = getContainerId(containerID, appAttemptId);
Resource capability = Resource.newInstance(memory, vCores);
return Container.newInstance(
containerId, null, null, capability, null, null);
}
private NodeStatus getNodeStatus() {
NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
status.setContainersStatuses(new ArrayList<ContainerStatus>());

View File

@ -62,6 +62,9 @@ public interface Context {
ConcurrentMap<ContainerId, Container> getContainers();
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers();
NMContainerTokenSecretManager getContainerTokenSecretManager();
NMTokenSecretManagerInNM getNMTokenSecretManager();

View File

@ -439,6 +439,10 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
protected final ConcurrentMap<ContainerId,
org.apache.hadoop.yarn.api.records.Container> increasedContainers =
new ConcurrentHashMap<>();
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
private ContainerManagementProtocol containerManager;
@ -492,6 +496,12 @@ public class NodeManager extends CompositeService
return this.containers;
}
@Override
public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers() {
return this.increasedContainers;
}
@Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;

View File

@ -310,8 +310,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerResponse regNMResponse;
Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
// Synchronize NM-RM registration with
// ContainerManagerImpl#increaseContainersResource and
// ContainerManagerImpl#startContainers to avoid race condition
// during RM recovery
synchronized (this.context) {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications(),
@ -319,9 +326,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
regNMResponse =
resourceTracker.registerNodeManager(request);
// Make sure rmIdentifier is set before we release the lock
this.rmIdentifier = regNMResponse.getRMIdentifier();
}
// if the Resource Manager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
@ -418,10 +428,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
List<ContainerStatus> containersStatuses = getContainerStatuses();
ResourceUtilization containersUtilization = getContainersUtilization();
ResourceUtilization nodeUtilization = getNodeUtilization();
List<org.apache.hadoop.yarn.api.records.Container> increasedContainers
= getIncreasedContainers();
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus,
containersUtilization, nodeUtilization);
containersUtilization, nodeUtilization, increasedContainers);
return nodeStatus;
}
@ -448,6 +460,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return nodeResourceMonitor.getUtilization();
}
/* Get the containers whose resource has been increased since last
* NM-RM heartbeat.
*/
private List<org.apache.hadoop.yarn.api.records.Container>
getIncreasedContainers() {
List<org.apache.hadoop.yarn.api.records.Container>
increasedContainers = new ArrayList<>(
this.context.getIncreasedContainers().values());
for (org.apache.hadoop.yarn.api.records.Container
container : increasedContainers) {
this.context.getIncreasedContainers().remove(container.getId());
}
return increasedContainers;
}
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections.
@ -765,6 +792,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
((NMContext) context)
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
List<org.apache.hadoop.yarn.api.records.Container>
containersToDecrease = response.getContainersToDecrease();
if (!containersToDecrease.isEmpty()) {
dispatcher.getEventHandler().handle(
new CMgrDecreaseContainersResourceEvent(containersToDecrease)
);
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -563,8 +563,7 @@ public class ContainerManagerImpl extends CompositeService implements
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
this.handle(
new CMgrCompletedAppsEvent(appIds,
this.handle(new CMgrCompletedAppsEvent(appIds,
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for Applications to be Finished");
@ -584,8 +583,8 @@ public class ContainerManagerImpl extends CompositeService implements
if (applications.isEmpty()) {
LOG.info("All applications in FINISHED state");
} else {
LOG.info("Done waiting for Applications to be Finished. Still alive: " +
applications.keySet());
LOG.info("Done waiting for Applications to be Finished. Still alive: "
+ applications.keySet());
}
}
@ -759,9 +758,8 @@ public class ContainerManagerImpl extends CompositeService implements
* Start a list of containers on this NodeManager.
*/
@Override
public StartContainersResponse
startContainers(StartContainersRequest requests) throws YarnException,
IOException {
public StartContainersResponse startContainers(
StartContainersRequest requests) throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting new containers as NodeManager has not"
@ -773,42 +771,50 @@ public class ContainerManagerImpl extends CompositeService implements
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
for (StartContainerRequest request : requests.getStartContainerRequests()) {
// Synchronize with NodeStatusUpdaterImpl#registerWithRM
// to avoid race condition during NM-RM resync (due to RM restart) while a
// container is being started, in particular when the container has not yet
// been added to the containers map in NMContext.
synchronized (this.context) {
for (StartContainerRequest request : requests
.getStartContainerRequests()) {
ContainerId containerId = null;
try {
if (request.getContainerToken() == null ||
request.getContainerToken().getIdentifier() == null) {
if (request.getContainerToken() == null
|| request.getContainerToken().getIdentifier() == null) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG);
}
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
.newContainerTokenIdentifier(request.getContainerToken());
verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
containerTokenIdentifier);
containerId = containerTokenIdentifier.getContainerID();
// Initialize the AMRMProxy service instance only if the container is of
// type AM and if the AMRMProxy service is enabled
if (isARMRMProxyEnabled()
&& containerTokenIdentifier.getContainerType().equals(
ContainerType.APPLICATION_MASTER)) {
if (isARMRMProxyEnabled() && containerTokenIdentifier
.getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
this.amrmProxyService.processApplicationStartRequest(request);
}
startContainerInternal(nmTokenIdentifier,
containerTokenIdentifier, request);
startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
request);
succeededContainers.add(containerId);
} catch (YarnException e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (InvalidToken ie) {
failedContainers.put(containerId, SerializedException.newInstance(ie));
failedContainers
.put(containerId, SerializedException.newInstance(ie));
throw ie;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
return StartContainersResponse.newInstance(getAuxServiceMetaData(),
succeededContainers, failedContainers);
return StartContainersResponse
.newInstance(getAuxServiceMetaData(), succeededContainers,
failedContainers);
}
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@ -989,6 +995,12 @@ public class ContainerManagerImpl extends CompositeService implements
= new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
// Synchronize with NodeStatusUpdaterImpl#registerWithRM
// to avoid race condition during NM-RM resync (due to RM restart) while a
// container resource is being increased in NM, in particular when the
// increased container has not yet been added to the increasedContainers
// map in NMContext.
synchronized (this.context) {
// Process container resource increase requests
for (org.apache.hadoop.yarn.api.records.Token token :
requests.getContainersToIncrease()) {
@ -1017,6 +1029,7 @@ public class ContainerManagerImpl extends CompositeService implements
throw RPCUtil.getRemoteException(e);
}
}
}
return IncreaseContainersResourceResponse.newInstance(
successfullyIncreasedContainers, failedContainers);
}
@ -1075,6 +1088,16 @@ public class ContainerManagerImpl extends CompositeService implements
+ " is not smaller than the current resource "
+ currentResource.toString());
}
if (increase) {
org.apache.hadoop.yarn.api.records.Container increasedContainer =
org.apache.hadoop.yarn.api.records.Container.newInstance(
containerId, null, null, targetResource, null, null);
if (context.getIncreasedContainers().putIfAbsent(containerId,
increasedContainer) != null){
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " resource is being increased.");
}
}
this.readLock.lock();
try {
if (!serviceStopped) {

View File

@ -18,21 +18,35 @@
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -41,8 +55,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@ -50,6 +69,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@ -57,12 +78,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -87,7 +111,10 @@ public class TestNodeManagerResync {
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent =
new NodeManagerEvent(NodeManagerEventType.RESYNC);
private final long DUMMY_RM_IDENTIFIER = 1234;
protected static Log LOG = LogFactory
.getLog(TestNodeManagerResync.class);
@Before
public void setup() throws UnsupportedFileSystemException {
@ -209,6 +236,32 @@ public class TestNodeManagerResync {
nm.stop();
}
@SuppressWarnings("unchecked")
@Test(timeout=60000)
public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
throws IOException, InterruptedException, YarnException {
NodeManager nm = new TestNodeManager4();
YarnConfiguration conf = createNMConfig();
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
nm.init(conf);
nm.start();
// Start a container and make sure it is in RUNNING state
((TestNodeManager4)nm).startContainer();
// Simulate a container resource increase in a separate thread
((TestNodeManager4)nm).increaseContainersResource();
// Simulate RM restart by sending a RESYNC event
LOG.info("Sending out RESYNC event");
nm.getNMDispatcher().getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
Assert.assertFalse(assertionFailedInThread.get());
nm.stop();
}
// This is to test when NM gets the resync response from last heart beat, it
// should be able to send the already-sent-via-last-heart-beat container
@ -588,6 +641,211 @@ public class TestNodeManagerResync {
}
}}
class TestNodeManager4 extends NodeManager {
private Thread increaseContainerResourceThread = null;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterImpl4(context, dispatcher,
healthChecker, metrics);
}
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
metrics, dirsHandler){
@Override
public void
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
// do nothing
}
@Override
protected void authorizeGetAndStopContainerRequest(
ContainerId containerId, Container container,
boolean stopRequest, NMTokenIdentifier identifier)
throws YarnException {
// do nothing
}
@Override
protected void authorizeUser(UserGroupInformation remoteUgi,
NMTokenIdentifier nmTokenIdentifier) {
// do nothing
}
@Override
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
boolean startRequest) throws YarnException {
try {
// Sleep 2 seconds to simulate a pro-longed increase action.
// If during this time a RESYNC event is sent by RM, the
// resync action should block until the increase action is
// completed.
// See testContainerResourceIncreaseIsSynchronizedWithRMResync()
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
protected void updateNMTokenIdentifier(
NMTokenIdentifier nmTokenIdentifier)
throws SecretManager.InvalidToken {
// Do nothing
}
@Override
public Map<String, ByteBuffer> getAuxServiceMetaData() {
return new HashMap<>();
}
@Override
protected NMTokenIdentifier selectNMTokenIdentifier(
UserGroupInformation remoteUgi) {
return new NMTokenIdentifier();
}
};
}
// Start a container in NM
public void startContainer()
throws IOException, InterruptedException, YarnException {
LOG.info("Start a container and wait until it is in RUNNING state");
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
PrintWriter fileWriter = new PrintWriter(scriptFile);
if (Shell.WINDOWS) {
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
} else {
fileWriter.write("\numask 0");
fileWriter.write("\nexec sleep 100");
}
fileWriter.close();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
URL resource_alpha =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource rsrc_alpha =
recordFactory.newRecordInstance(LocalResource.class);
rsrc_alpha.setResource(resource_alpha);
rsrc_alpha.setSize(-1);
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
rsrc_alpha.setType(LocalResourceType.FILE);
rsrc_alpha.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, rsrc_alpha);
containerLaunchContext.setLocalResources(localResources);
List<String> commands =
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
containerLaunchContext.setCommands(commands);
Resource resource = Resource.newInstance(1024, 1);
StartContainerRequest scRequest =
StartContainerRequest.newInstance(
containerLaunchContext,
getContainerToken(resource));
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
getContainerManager().startContainers(allRequests);
// Make sure the container reaches RUNNING state
ContainerId cId = TestContainerManager.createContainerId(0);
BaseContainerManagerTest.waitForNMContainerState(
getContainerManager(), cId,
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState.RUNNING);
}
// Increase container resource in a thread
public void increaseContainersResource()
throws InterruptedException {
LOG.info("Increase a container resource in a separate thread");
increaseContainerResourceThread = new IncreaseContainersResourceThread();
increaseContainerResourceThread.start();
}
class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}
@Override
protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
try {
try {
// Check status before registerWithRM
List<ContainerId> containerIds = new ArrayList<>();
ContainerId cId = TestContainerManager.createContainerId(0);
containerIds.add(cId);
GetContainerStatusesRequest gcsRequest =
GetContainerStatusesRequest.newInstance(containerIds);
ContainerStatus containerStatus = getContainerManager()
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
assertEquals(Resource.newInstance(1024, 1),
containerStatus.getCapability());
// Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
// This function should be synchronized with
// increaseContainersResource().
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// Check status after registerWithRM
containerStatus = getContainerManager()
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
assertEquals(Resource.newInstance(4096, 2),
containerStatus.getCapability());
} catch (AssertionError ae) {
ae.printStackTrace();
assertionFailedInThread.set(true);
} finally {
syncBarrier.await();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class IncreaseContainersResourceThread extends Thread {
@Override
public void run() {
// Construct container resource increase request
List<Token> increaseTokens = new ArrayList<Token>();
// Add increase request.
Resource targetResource = Resource.newInstance(4096, 2);
try {
increaseTokens.add(getContainerToken(targetResource));
IncreaseContainersResourceRequest increaseRequest =
IncreaseContainersResourceRequest.newInstance(increaseTokens);
IncreaseContainersResourceResponse increaseResponse =
getContainerManager()
.increaseContainersResource(increaseRequest);
Assert.assertEquals(
1, increaseResponse.getSuccessfullyIncreasedContainers()
.size());
Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
} catch (Exception e) {
e.printStackTrace();
}
}
}
private Token getContainerToken(Resource resource) throws IOException {
ContainerId cId = TestContainerManager.createContainerId(0);
return TestContainerManager.createContainerToken(
cId, DUMMY_RM_IDENTIFIER,
getNMContext().getNodeId(), user, resource,
getNMContext().getContainerTokenSecretManager(), null);
}
}
public static NMContainerStatus createNMContainerStatus(int id,
ContainerState containerState) {
ApplicationId applicationId = ApplicationId.newInstance(0, 1);

View File

@ -619,6 +619,11 @@ public abstract class BaseAMRMProxyTest {
return null;
}
@Override
public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> getIncreasedContainers() {
return null;
}
@Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return null;

View File

@ -93,8 +93,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -292,8 +290,8 @@ public class MockResourceManagerFacade implements
new ArrayList<ContainerStatus>(), containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(),
new ArrayList<ContainerResourceIncrease>(),
new ArrayList<ContainerResourceDecrease>());
new ArrayList<Container>(),
new ArrayList<Container>());
}
@Override

View File

@ -108,7 +108,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
super.setup();
}
private ContainerId createContainerId(int id) {
public static ContainerId createContainerId(int id) {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);