YARN-1645. ContainerManager implementation to support container resizing. Contributed by Meng Ding & Wangda Tan
(cherry picked from commit ffd820c27a
)
This commit is contained in:
parent
c19e8ffdc7
commit
afe4afd0c9
|
@ -151,6 +151,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-1449. AM-NM protocol changes to support container resizing.
|
||||
(Meng Ding & Wangda Tan via jianhe)
|
||||
|
||||
YARN-1645. ContainerManager implementation to support container resizing.
|
||||
(Meng Ding & Wangda Tan via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import java.util.List;
|
||||
|
||||
public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent {
|
||||
|
||||
private final List<Container> containersToDecrease;
|
||||
|
||||
public CMgrDecreaseContainersResourceEvent(List<Container>
|
||||
containersToDecrease) {
|
||||
super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE);
|
||||
this.containersToDecrease = containersToDecrease;
|
||||
}
|
||||
|
||||
public List<Container> getContainersToDecrease() {
|
||||
return this.containersToDecrease;
|
||||
}
|
||||
}
|
|
@ -21,4 +21,5 @@ package org.apache.hadoop.yarn.server.nodemanager;
|
|||
public enum ContainerManagerEventType {
|
||||
FINISH_APPS,
|
||||
FINISH_CONTAINERS,
|
||||
DECREASE_CONTAINERS_RESOURCE
|
||||
}
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
|
||||
|
@ -95,6 +96,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
|
@ -113,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
|
@ -141,6 +144,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class ContainerManagerImpl extends CompositeService implements
|
||||
ServiceStateChangeListener, ContainerManagementProtocol,
|
||||
|
@ -681,33 +685,45 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
/**
|
||||
* @param containerTokenIdentifier
|
||||
* of the container to be started
|
||||
* of the container whose resource is to be started or increased
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
|
||||
protected void authorizeStartAndResourceIncreaseRequest(
|
||||
NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
boolean startRequest)
|
||||
throws YarnException {
|
||||
if (nmTokenIdentifier == null) {
|
||||
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
|
||||
}
|
||||
if (containerTokenIdentifier == null) {
|
||||
throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG);
|
||||
}
|
||||
/*
|
||||
* Check the following:
|
||||
* 1. The request comes from the same application attempt
|
||||
* 2. The request possess a container token that has not expired
|
||||
* 3. The request possess a container token that is granted by a known RM
|
||||
*/
|
||||
ContainerId containerId = containerTokenIdentifier.getContainerID();
|
||||
String containerIDStr = containerId.toString();
|
||||
boolean unauthorized = false;
|
||||
StringBuilder messageBuilder =
|
||||
new StringBuilder("Unauthorized request to start container. ");
|
||||
new StringBuilder("Unauthorized request to " + (startRequest ?
|
||||
"start container." : "increase container resource."));
|
||||
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
|
||||
equals(containerId.getApplicationAttemptId().getApplicationId())) {
|
||||
unauthorized = true;
|
||||
messageBuilder.append("\nNMToken for application attempt : ")
|
||||
.append(nmTokenIdentifier.getApplicationAttemptId())
|
||||
.append(" was used for starting container with container token")
|
||||
.append(" was used for "
|
||||
+ (startRequest ? "starting " : "increasing resource of ")
|
||||
+ "container with container token")
|
||||
.append(" issued for application attempt : ")
|
||||
.append(containerId.getApplicationAttemptId());
|
||||
} else if (!this.context.getContainerTokenSecretManager()
|
||||
} else if (startRequest && !this.context.getContainerTokenSecretManager()
|
||||
.isValidStartContainerRequest(containerTokenIdentifier)) {
|
||||
// Is the container being relaunched? Or RPC layer let startCall with
|
||||
// tokens generated off old-secret through?
|
||||
|
@ -729,6 +745,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
LOG.error(msg);
|
||||
throw RPCUtil.getRemoteException(msg);
|
||||
}
|
||||
if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
|
||||
.getRMIdentifier()) {
|
||||
// Is the container coming from unknown RM
|
||||
StringBuilder sb = new StringBuilder("\nContainer ");
|
||||
sb.append(containerTokenIdentifier.getContainerID().toString())
|
||||
.append(" rejected as it is allocated by a previous RM");
|
||||
throw new InvalidContainerException(sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -745,7 +769,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
}
|
||||
UserGroupInformation remoteUgi = getRemoteUgi();
|
||||
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
|
||||
authorizeUser(remoteUgi,nmTokenIdentifier);
|
||||
authorizeUser(remoteUgi, nmTokenIdentifier);
|
||||
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
|
||||
Map<ContainerId, SerializedException> failedContainers =
|
||||
new HashMap<ContainerId, SerializedException>();
|
||||
|
@ -844,16 +868,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
* belongs to correct Node Manager (part of retrieve password). c) It has
|
||||
* correct RMIdentifier. d) It is not expired.
|
||||
*/
|
||||
authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
|
||||
|
||||
if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
|
||||
.getRMIdentifier()) {
|
||||
// Is the container coming from unknown RM
|
||||
StringBuilder sb = new StringBuilder("\nContainer ");
|
||||
sb.append(containerTokenIdentifier.getContainerID().toString())
|
||||
.append(" rejected as it is allocated by a previous RM");
|
||||
throw new InvalidContainerException(sb.toString());
|
||||
}
|
||||
authorizeStartAndResourceIncreaseRequest(
|
||||
nmTokenIdentifier, containerTokenIdentifier, true);
|
||||
// update NMToken
|
||||
updateNMTokenIdentifier(nmTokenIdentifier);
|
||||
|
||||
|
@ -960,9 +976,118 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
@Override
|
||||
public IncreaseContainersResourceResponse increaseContainersResource(
|
||||
IncreaseContainersResourceRequest requests)
|
||||
throws YarnException, IOException {
|
||||
// To be implemented in YARN-1645
|
||||
return null;
|
||||
throws YarnException, IOException {
|
||||
if (blockNewContainerRequests.get()) {
|
||||
throw new NMNotYetReadyException(
|
||||
"Rejecting container resource increase as NodeManager has not"
|
||||
+ " yet connected with ResourceManager");
|
||||
}
|
||||
UserGroupInformation remoteUgi = getRemoteUgi();
|
||||
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
|
||||
authorizeUser(remoteUgi, nmTokenIdentifier);
|
||||
List<ContainerId> successfullyIncreasedContainers
|
||||
= new ArrayList<ContainerId>();
|
||||
Map<ContainerId, SerializedException> failedContainers =
|
||||
new HashMap<ContainerId, SerializedException>();
|
||||
// Process container resource increase requests
|
||||
for (org.apache.hadoop.yarn.api.records.Token token :
|
||||
requests.getContainersToIncrease()) {
|
||||
ContainerId containerId = null;
|
||||
try {
|
||||
if (token.getIdentifier() == null) {
|
||||
throw new IOException(INVALID_CONTAINERTOKEN_MSG);
|
||||
}
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
BuilderUtils.newContainerTokenIdentifier(token);
|
||||
verifyAndGetContainerTokenIdentifier(token,
|
||||
containerTokenIdentifier);
|
||||
authorizeStartAndResourceIncreaseRequest(
|
||||
nmTokenIdentifier, containerTokenIdentifier, false);
|
||||
containerId = containerTokenIdentifier.getContainerID();
|
||||
// Reuse the startContainer logic to update NMToken,
|
||||
// as container resource increase request will have come with
|
||||
// an updated NMToken.
|
||||
updateNMTokenIdentifier(nmTokenIdentifier);
|
||||
Resource resource = containerTokenIdentifier.getResource();
|
||||
changeContainerResourceInternal(containerId, resource, true);
|
||||
successfullyIncreasedContainers.add(containerId);
|
||||
} catch (YarnException | InvalidToken e) {
|
||||
failedContainers.put(containerId, SerializedException.newInstance(e));
|
||||
} catch (IOException e) {
|
||||
throw RPCUtil.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
return IncreaseContainersResourceResponse.newInstance(
|
||||
successfullyIncreasedContainers, failedContainers);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void changeContainerResourceInternal(
|
||||
ContainerId containerId, Resource targetResource, boolean increase)
|
||||
throws YarnException, IOException {
|
||||
Container container = context.getContainers().get(containerId);
|
||||
// Check container existence
|
||||
if (container == null) {
|
||||
if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
|
||||
throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
||||
+ " was recently stopped on node manager.");
|
||||
} else {
|
||||
throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
||||
+ " is not handled by this NodeManager");
|
||||
}
|
||||
}
|
||||
// Check container state
|
||||
org.apache.hadoop.yarn.server.nodemanager.
|
||||
containermanager.container.ContainerState currentState =
|
||||
container.getContainerState();
|
||||
if (currentState != org.apache.hadoop.yarn.server.
|
||||
nodemanager.containermanager.container.ContainerState.RUNNING) {
|
||||
throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
||||
+ " is in " + currentState.name() + " state."
|
||||
+ " Resource can only be changed when a container is in"
|
||||
+ " RUNNING state");
|
||||
}
|
||||
// Check validity of the target resource.
|
||||
Resource currentResource = container.getResource();
|
||||
if (currentResource.equals(targetResource)) {
|
||||
LOG.warn("Unable to change resource for container "
|
||||
+ containerId.toString()
|
||||
+ ". The target resource "
|
||||
+ targetResource.toString()
|
||||
+ " is the same as the current resource");
|
||||
return;
|
||||
}
|
||||
if (increase && !Resources.fitsIn(currentResource, targetResource)) {
|
||||
throw RPCUtil.getRemoteException("Unable to increase resource for "
|
||||
+ "container " + containerId.toString()
|
||||
+ ". The target resource "
|
||||
+ targetResource.toString()
|
||||
+ " is smaller than the current resource "
|
||||
+ currentResource.toString());
|
||||
}
|
||||
if (!increase &&
|
||||
(!Resources.fitsIn(Resources.none(), targetResource)
|
||||
|| !Resources.fitsIn(targetResource, currentResource))) {
|
||||
throw RPCUtil.getRemoteException("Unable to decrease resource for "
|
||||
+ "container " + containerId.toString()
|
||||
+ ". The target resource "
|
||||
+ targetResource.toString()
|
||||
+ " is not smaller than the current resource "
|
||||
+ currentResource.toString());
|
||||
}
|
||||
this.readLock.lock();
|
||||
try {
|
||||
if (!serviceStopped) {
|
||||
dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
|
||||
containerId, targetResource));
|
||||
} else {
|
||||
throw new YarnException(
|
||||
"Unable to change container resource as the NodeManager is "
|
||||
+ "in the process of shutting down");
|
||||
}
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
|
@ -1182,6 +1307,21 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
"Container Killed by ResourceManager"));
|
||||
}
|
||||
break;
|
||||
case DECREASE_CONTAINERS_RESOURCE:
|
||||
CMgrDecreaseContainersResourceEvent containersDecreasedEvent =
|
||||
(CMgrDecreaseContainersResourceEvent) event;
|
||||
for (org.apache.hadoop.yarn.api.records.Container container
|
||||
: containersDecreasedEvent.getContainersToDecrease()) {
|
||||
try {
|
||||
changeContainerResourceInternal(container.getId(),
|
||||
container.getResource(), false);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Unable to decrease container resource", e);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to update container resource in store", e);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new YarnRuntimeException(
|
||||
"Got an unknown ContainerManagerEvent type: " + event.getType());
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
public class ChangeContainerResourceEvent extends ContainerEvent {
|
||||
|
||||
private Resource resource;
|
||||
|
||||
public ChangeContainerResourceEvent(ContainerId c, Resource resource) {
|
||||
super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE);
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
public Resource getResource() {
|
||||
return this.resource;
|
||||
}
|
||||
}
|
|
@ -25,6 +25,10 @@ public enum ContainerEventType {
|
|||
KILL_CONTAINER,
|
||||
UPDATE_DIAGNOSTICS_MSG,
|
||||
CONTAINER_DONE,
|
||||
CHANGE_CONTAINER_RESOURCE,
|
||||
|
||||
// Producer: ContainerMonitor
|
||||
CONTAINER_RESOURCE_CHANGED,
|
||||
|
||||
// DownloadManager
|
||||
CONTAINER_INITED,
|
||||
|
|
|
@ -191,8 +191,10 @@ public class DummyContainerManager extends ContainerManagerImpl {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
|
||||
protected void authorizeStartAndResourceIncreaseRequest(
|
||||
NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
boolean startRequest) throws YarnException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
|
|
|
@ -189,6 +189,28 @@ public class TestContainerManagerWithLCE extends TestContainerManager {
|
|||
super.testStartContainerFailureWithUnknownAuxService();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testIncreaseContainerResourceWithInvalidRequests() throws Exception {
|
||||
// Don't run the test if the binary is not available.
|
||||
if (!shouldRunTest()) {
|
||||
LOG.info("LCE binary path is not passed. Not running the test");
|
||||
return;
|
||||
}
|
||||
LOG.info("Running testIncreaseContainerResourceWithInvalidRequests");
|
||||
super.testIncreaseContainerResourceWithInvalidRequests();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
|
||||
// Don't run the test if the binary is not available.
|
||||
if (!shouldRunTest()) {
|
||||
LOG.info("LCE binary path is not passed. Not running the test");
|
||||
return;
|
||||
}
|
||||
LOG.info("Running testIncreaseContainerResourceWithInvalidResource");
|
||||
super.testIncreaseContainerResourceWithInvalidResource();
|
||||
}
|
||||
|
||||
private boolean shouldRunTest() {
|
||||
return System
|
||||
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
|
||||
|
|
|
@ -209,12 +209,13 @@ public abstract class BaseContainerManagerTest {
|
|||
// do nothing
|
||||
}
|
||||
@Override
|
||||
protected void authorizeStartRequest(
|
||||
NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
protected void authorizeStartAndResourceIncreaseRequest(
|
||||
NMTokenIdentifier nmTokenIdentifier,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
boolean startRequest) throws YarnException {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateNMTokenIdentifier(
|
||||
NMTokenIdentifier nmTokenIdentifier) throws InvalidToken {
|
||||
|
@ -310,4 +311,34 @@ public abstract class BaseContainerManagerTest {
|
|||
app.getApplicationState().equals(finalState));
|
||||
}
|
||||
|
||||
public static void waitForNMContainerState(ContainerManagerImpl
|
||||
containerManager, ContainerId containerID,
|
||||
org.apache.hadoop.yarn.server.nodemanager.containermanager
|
||||
.container.ContainerState finalState)
|
||||
throws InterruptedException, YarnException, IOException {
|
||||
waitForNMContainerState(containerManager, containerID, finalState, 20);
|
||||
}
|
||||
|
||||
public static void waitForNMContainerState(ContainerManagerImpl
|
||||
containerManager, ContainerId containerID,
|
||||
org.apache.hadoop.yarn.server.nodemanager.containermanager
|
||||
.container.ContainerState finalState, int timeOutMax)
|
||||
throws InterruptedException, YarnException, IOException {
|
||||
Container container =
|
||||
containerManager.getContext().getContainers().get(containerID);
|
||||
org.apache.hadoop.yarn.server.nodemanager
|
||||
.containermanager.container.ContainerState currentState =
|
||||
container.getContainerState();
|
||||
int timeoutSecs = 0;
|
||||
while (!currentState.equals(finalState)
|
||||
&& timeoutSecs++ < timeOutMax) {
|
||||
Thread.sleep(1000);
|
||||
LOG.info("Waiting for NM container to get into state " + finalState
|
||||
+ ". Current state is " + currentState);
|
||||
currentState = container.getContainerState();
|
||||
}
|
||||
LOG.info("Container state is " + currentState);
|
||||
Assert.assertEquals("ContainerState is not correct (timedout)",
|
||||
finalState, currentState);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
|
@ -72,6 +74,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
||||
|
@ -87,6 +90,8 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
public TestContainerManager() throws UnsupportedFileSystemException {
|
||||
|
@ -803,7 +808,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
metrics, dirsHandler);
|
||||
String strExceptionMsg = "";
|
||||
try {
|
||||
cMgrImpl.authorizeStartRequest(null, new ContainerTokenIdentifier());
|
||||
cMgrImpl.authorizeStartAndResourceIncreaseRequest(
|
||||
null, new ContainerTokenIdentifier(), true);
|
||||
} catch(YarnException ye) {
|
||||
strExceptionMsg = ye.getMessage();
|
||||
}
|
||||
|
@ -812,7 +818,8 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
|
||||
strExceptionMsg = "";
|
||||
try {
|
||||
cMgrImpl.authorizeStartRequest(new NMTokenIdentifier(), null);
|
||||
cMgrImpl.authorizeStartAndResourceIncreaseRequest(
|
||||
new NMTokenIdentifier(), null, true);
|
||||
} catch(YarnException ye) {
|
||||
strExceptionMsg = ye.getMessage();
|
||||
}
|
||||
|
@ -878,6 +885,167 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
ContainerManagerImpl.INVALID_CONTAINERTOKEN_MSG);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncreaseContainerResourceWithInvalidRequests() throws Exception {
|
||||
containerManager.start();
|
||||
// Start 4 containers 0..4 with default resource (1024, 1)
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
ContainerLaunchContext containerLaunchContext = recordFactory
|
||||
.newRecordInstance(ContainerLaunchContext.class);
|
||||
for (int i = 0; i < 4; i++) {
|
||||
ContainerId cId = createContainerId(i);
|
||||
long identifier = DUMMY_RM_IDENTIFIER;
|
||||
Token containerToken = createContainerToken(cId, identifier,
|
||||
context.getNodeId(), user, context.getContainerTokenSecretManager());
|
||||
StartContainerRequest request = StartContainerRequest.newInstance(
|
||||
containerLaunchContext, containerToken);
|
||||
list.add(request);
|
||||
}
|
||||
StartContainersRequest requestList = StartContainersRequest
|
||||
.newInstance(list);
|
||||
StartContainersResponse response = containerManager
|
||||
.startContainers(requestList);
|
||||
|
||||
Assert.assertEquals(4, response.getSuccessfullyStartedContainers().size());
|
||||
int i = 0;
|
||||
for (ContainerId id : response.getSuccessfullyStartedContainers()) {
|
||||
Assert.assertEquals(i, id.getContainerId());
|
||||
i++;
|
||||
}
|
||||
|
||||
Thread.sleep(2000);
|
||||
// Construct container resource increase request,
|
||||
List<Token> increaseTokens = new ArrayList<Token>();
|
||||
// Add increase request for container-0, the request will fail as the
|
||||
// container will have exited, and won't be in RUNNING state
|
||||
ContainerId cId0 = createContainerId(0);
|
||||
Token containerToken =
|
||||
createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(), user,
|
||||
Resource.newInstance(1234, 3),
|
||||
context.getContainerTokenSecretManager(), null);
|
||||
increaseTokens.add(containerToken);
|
||||
// Add increase request for container-7, the request will fail as the
|
||||
// container does not exist
|
||||
ContainerId cId7 = createContainerId(7);
|
||||
containerToken =
|
||||
createContainerToken(cId7, DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(), user,
|
||||
Resource.newInstance(1234, 3),
|
||||
context.getContainerTokenSecretManager(), null);
|
||||
increaseTokens.add(containerToken);
|
||||
|
||||
IncreaseContainersResourceRequest increaseRequest =
|
||||
IncreaseContainersResourceRequest
|
||||
.newInstance(increaseTokens);
|
||||
IncreaseContainersResourceResponse increaseResponse =
|
||||
containerManager.increaseContainersResource(increaseRequest);
|
||||
// Check response
|
||||
Assert.assertEquals(
|
||||
0, increaseResponse.getSuccessfullyIncreasedContainers().size());
|
||||
Assert.assertEquals(2, increaseResponse.getFailedRequests().size());
|
||||
for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
|
||||
.getFailedRequests().entrySet()) {
|
||||
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
|
||||
if (cId0.equals(entry.getKey())) {
|
||||
Assert.assertTrue(entry.getValue().getMessage()
|
||||
.contains("Resource can only be changed when a "
|
||||
+ "container is in RUNNING state"));
|
||||
} else if (cId7.equals(entry.getKey())) {
|
||||
Assert.assertTrue(entry.getValue().getMessage()
|
||||
.contains("Container " + cId7.toString()
|
||||
+ " is not handled by this NodeManager"));
|
||||
} else {
|
||||
throw new YarnException("Received failed request from wrong"
|
||||
+ " container: " + entry.getKey().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
|
||||
containerManager.start();
|
||||
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
|
||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||
// Construct the Container-id
|
||||
ContainerId cId = createContainerId(0);
|
||||
if (Shell.WINDOWS) {
|
||||
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
|
||||
} else {
|
||||
fileWriter.write("\numask 0");
|
||||
fileWriter.write("\nexec sleep 100");
|
||||
}
|
||||
fileWriter.close();
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||
LocalResource rsrc_alpha =
|
||||
recordFactory.newRecordInstance(LocalResource.class);
|
||||
rsrc_alpha.setResource(resource_alpha);
|
||||
rsrc_alpha.setSize(-1);
|
||||
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
rsrc_alpha.setType(LocalResourceType.FILE);
|
||||
rsrc_alpha.setTimestamp(scriptFile.lastModified());
|
||||
String destinationFile = "dest_file";
|
||||
Map<String, LocalResource> localResources =
|
||||
new HashMap<String, LocalResource>();
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
containerLaunchContext.setLocalResources(localResources);
|
||||
List<String> commands =
|
||||
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
containerLaunchContext.setCommands(commands);
|
||||
|
||||
StartContainerRequest scRequest =
|
||||
StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
||||
user, context.getContainerTokenSecretManager()));
|
||||
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
||||
list.add(scRequest);
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
// Make sure the container reaches RUNNING state
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
|
||||
org.apache.hadoop.yarn.server.nodemanager.
|
||||
containermanager.container.ContainerState.RUNNING);
|
||||
// Construct container resource increase request,
|
||||
List<Token> increaseTokens = new ArrayList<Token>();
|
||||
// Add increase request. The increase request should fail
|
||||
// as the current resource does not fit in the target resource
|
||||
Token containerToken =
|
||||
createContainerToken(cId, DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(), user,
|
||||
Resource.newInstance(512, 1),
|
||||
context.getContainerTokenSecretManager(), null);
|
||||
increaseTokens.add(containerToken);
|
||||
IncreaseContainersResourceRequest increaseRequest =
|
||||
IncreaseContainersResourceRequest
|
||||
.newInstance(increaseTokens);
|
||||
IncreaseContainersResourceResponse increaseResponse =
|
||||
containerManager.increaseContainersResource(increaseRequest);
|
||||
// Check response
|
||||
Assert.assertEquals(
|
||||
0, increaseResponse.getSuccessfullyIncreasedContainers().size());
|
||||
Assert.assertEquals(1, increaseResponse.getFailedRequests().size());
|
||||
for (Map.Entry<ContainerId, SerializedException> entry : increaseResponse
|
||||
.getFailedRequests().entrySet()) {
|
||||
if (cId.equals(entry.getKey())) {
|
||||
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
|
||||
Assert.assertTrue(entry.getValue().getMessage()
|
||||
.contains("The target resource "
|
||||
+ Resource.newInstance(512, 1).toString()
|
||||
+ " is smaller than the current resource "
|
||||
+ Resource.newInstance(1024, 1)));
|
||||
} else {
|
||||
throw new YarnException("Received failed request from wrong"
|
||||
+ " container: " + entry.getKey().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||
|
@ -892,15 +1060,21 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
Resource r = BuilderUtils.newResource(1024, 1);
|
||||
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
|
||||
containerTokenSecretManager, logAggregationContext);
|
||||
}
|
||||
|
||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||
NodeId nodeId, String user, Resource resource,
|
||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||
LogAggregationContext logAggregationContext)
|
||||
throws IOException {
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
|
||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||
Priority.newInstance(0), 0, logAggregationContext, null);
|
||||
Token containerToken =
|
||||
BuilderUtils
|
||||
.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||
.retrievePassword(containerTokenIdentifier),
|
||||
containerTokenIdentifier);
|
||||
return containerToken;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue