YARN-3867. ContainerImpl changes to support container resizing. Contributed by Meng Ding
(cherry picked from commit 5f5a968d65
)
This commit is contained in:
parent
afe4afd0c9
commit
fa0a554ae0
|
@ -154,6 +154,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-1645. ContainerManager implementation to support container resizing.
|
||||
(Meng Ding & Wangda Tan via jianhe)
|
||||
|
||||
YARN-3867. ContainerImpl changes to support container resizing. (Meng Ding
|
||||
via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -200,13 +200,15 @@ public class BuilderUtils {
|
|||
}
|
||||
|
||||
public static ContainerStatus newContainerStatus(ContainerId containerId,
|
||||
ContainerState containerState, String diagnostics, int exitStatus) {
|
||||
ContainerState containerState, String diagnostics, int exitStatus,
|
||||
Resource capability) {
|
||||
ContainerStatus containerStatus = recordFactory
|
||||
.newRecordInstance(ContainerStatus.class);
|
||||
containerStatus.setState(containerState);
|
||||
containerStatus.setContainerId(containerId);
|
||||
containerStatus.setDiagnostics(diagnostics);
|
||||
containerStatus.setExitStatus(exitStatus);
|
||||
containerStatus.setCapability(capability);
|
||||
return containerStatus;
|
||||
}
|
||||
|
||||
|
|
|
@ -115,7 +115,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
|
@ -130,6 +129,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
|
@ -1078,7 +1078,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
this.readLock.lock();
|
||||
try {
|
||||
if (!serviceStopped) {
|
||||
dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
|
||||
getContainersMonitor().handle(
|
||||
new ChangeMonitoringContainerResourceEvent(
|
||||
containerId, targetResource));
|
||||
} else {
|
||||
throw new YarnException(
|
||||
|
|
|
@ -37,6 +37,8 @@ public interface Container extends EventHandler<ContainerEvent> {
|
|||
|
||||
Resource getResource();
|
||||
|
||||
void setResource(Resource targetResource);
|
||||
|
||||
ContainerTokenIdentifier getContainerTokenIdentifier();
|
||||
|
||||
String getUser();
|
||||
|
|
|
@ -25,10 +25,6 @@ public enum ContainerEventType {
|
|||
KILL_CONTAINER,
|
||||
UPDATE_DIAGNOSTICS_MSG,
|
||||
CONTAINER_DONE,
|
||||
CHANGE_CONTAINER_RESOURCE,
|
||||
|
||||
// Producer: ContainerMonitor
|
||||
CONTAINER_RESOURCE_CHANGED,
|
||||
|
||||
// DownloadManager
|
||||
CONTAINER_INITED,
|
||||
|
|
|
@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
|
|||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
public class ContainerImpl implements Container {
|
||||
|
||||
|
@ -91,7 +92,7 @@ public class ContainerImpl implements Container {
|
|||
private final ContainerLaunchContext launchContext;
|
||||
private final ContainerTokenIdentifier containerTokenIdentifier;
|
||||
private final ContainerId containerId;
|
||||
private final Resource resource;
|
||||
private volatile Resource resource;
|
||||
private final String user;
|
||||
private int exitCode = ContainerExitStatus.INVALID;
|
||||
private final StringBuilder diagnostics;
|
||||
|
@ -424,7 +425,7 @@ public class ContainerImpl implements Container {
|
|||
this.readLock.lock();
|
||||
try {
|
||||
return BuilderUtils.newContainerStatus(this.containerId,
|
||||
getCurrentState(), diagnostics.toString(), exitCode);
|
||||
getCurrentState(), diagnostics.toString(), exitCode, getResource());
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -451,7 +452,14 @@ public class ContainerImpl implements Container {
|
|||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
return this.resource;
|
||||
return Resources.clone(this.resource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource targetResource) {
|
||||
Resource currentResource = getResource();
|
||||
this.resource = Resources.clone(targetResource);
|
||||
this.metrics.changeContainer(currentResource, targetResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -16,17 +16,18 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
public class ChangeContainerResourceEvent extends ContainerEvent {
|
||||
public class ChangeMonitoringContainerResourceEvent extends ContainersMonitorEvent {
|
||||
private final Resource resource;
|
||||
|
||||
private Resource resource;
|
||||
|
||||
public ChangeContainerResourceEvent(ContainerId c, Resource resource) {
|
||||
super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE);
|
||||
public ChangeMonitoringContainerResourceEvent(ContainerId containerId,
|
||||
Resource resource) {
|
||||
super(containerId,
|
||||
ContainersMonitorEventType.CHANGE_MONITORING_CONTAINER_RESOURCE);
|
||||
this.resource = resource;
|
||||
}
|
||||
|
|
@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
|
|||
|
||||
public enum ContainersMonitorEventType {
|
||||
START_MONITORING_CONTAINER,
|
||||
STOP_MONITORING_CONTAINER
|
||||
STOP_MONITORING_CONTAINER,
|
||||
CHANGE_MONITORING_CONTAINER_RESOURCE
|
||||
}
|
||||
|
|
|
@ -133,6 +133,17 @@ public class NodeManagerMetrics {
|
|||
availableVCores.incr(res.getVirtualCores());
|
||||
}
|
||||
|
||||
public void changeContainer(Resource before, Resource now) {
|
||||
int deltaMB = now.getMemory() - before.getMemory();
|
||||
int deltaVCores = now.getVirtualCores() - before.getVirtualCores();
|
||||
allocatedMB = allocatedMB + deltaMB;
|
||||
allocatedGB.set((int)Math.ceil(allocatedMB/1024d));
|
||||
availableMB = availableMB - deltaMB;
|
||||
availableGB.set((int)Math.floor(availableMB/1024d));
|
||||
allocatedVCores.incr(deltaVCores);
|
||||
availableVCores.decr(deltaVCores);
|
||||
}
|
||||
|
||||
public void addResource(Resource res) {
|
||||
availableMB = availableMB + res.getMemory();
|
||||
availableGB.incr((int)Math.floor(availableMB/1024d));
|
||||
|
|
|
@ -1662,7 +1662,7 @@ public class TestNodeStatusUpdater {
|
|||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(contaierId, containerState,
|
||||
"test_containerStatus: id=" + id + ", containerState: "
|
||||
+ containerState, 0);
|
||||
+ containerState, 0, Resource.newInstance(1024, 1));
|
||||
return containerStatus;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,12 @@ public class TestNodeManagerMetrics {
|
|||
Resource resource = Records.newRecord(Resource.class);
|
||||
resource.setMemory(512); //512MiB
|
||||
resource.setVirtualCores(2);
|
||||
|
||||
Resource largerResource = Records.newRecord(Resource.class);
|
||||
largerResource.setMemory(1024);
|
||||
largerResource.setVirtualCores(2);
|
||||
Resource smallerResource = Records.newRecord(Resource.class);
|
||||
smallerResource.setMemory(256);
|
||||
smallerResource.setVirtualCores(1);
|
||||
|
||||
metrics.addResource(total);
|
||||
|
||||
|
@ -65,15 +70,20 @@ public class TestNodeManagerMetrics {
|
|||
metrics.initingContainer();
|
||||
metrics.runningContainer();
|
||||
|
||||
// Increase resource for a container
|
||||
metrics.changeContainer(resource, largerResource);
|
||||
// Decrease resource for a container
|
||||
metrics.changeContainer(resource, smallerResource);
|
||||
|
||||
Assert.assertTrue(!metrics.containerLaunchDuration.changed());
|
||||
metrics.addContainerLaunchDuration(1);
|
||||
Assert.assertTrue(metrics.containerLaunchDuration.changed());
|
||||
|
||||
// availableGB is expected to be floored,
|
||||
// while allocatedGB is expected to be ceiled.
|
||||
// allocatedGB: 3.5GB allocated memory is shown as 4GB
|
||||
// availableGB: 4.5GB available memory is shown as 4GB
|
||||
checkMetrics(10, 1, 1, 1, 1, 1, 4, 7, 4, 14, 2);
|
||||
// allocatedGB: 3.75GB allocated memory is shown as 4GB
|
||||
// availableGB: 4.25GB available memory is shown as 4GB
|
||||
checkMetrics(10, 1, 1, 1, 1, 1, 4, 7, 4, 13, 3);
|
||||
}
|
||||
|
||||
private void checkMetrics(int launched, int completed, int failed, int killed,
|
||||
|
|
|
@ -131,6 +131,10 @@ public class MockContainer implements Container {
|
|||
return this.containerTokenIdentifier.getResource();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource targetResource) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerTokenIdentifier getContainerTokenIdentifier() {
|
||||
return this.containerTokenIdentifier;
|
||||
|
|
|
@ -143,7 +143,7 @@ public class MockNM {
|
|||
new HashMap<ApplicationId, List<ContainerStatus>>(1);
|
||||
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(attemptId, containerId), containerState,
|
||||
"Success", 0);
|
||||
"Success", 0, BuilderUtils.newResource(memory, vCores));
|
||||
ArrayList<ContainerStatus> containerStatusList =
|
||||
new ArrayList<ContainerStatus>(1);
|
||||
containerStatusList.add(containerStatus);
|
||||
|
|
|
@ -193,7 +193,7 @@ public class NodeManager implements ContainerManagementProtocol {
|
|||
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(container.getId(),
|
||||
ContainerState.NEW, "", -1000);
|
||||
ContainerState.NEW, "", -1000, container.getResource());
|
||||
applicationContainers.add(container);
|
||||
containerStatusMap.put(container, containerStatus);
|
||||
Resources.subtractFrom(available, tokenId.getResource());
|
||||
|
|
|
@ -231,7 +231,8 @@ public class TestApplicationCleanup {
|
|||
ArrayList<ContainerStatus> containerStatusList =
|
||||
new ArrayList<ContainerStatus>();
|
||||
containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(0)
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0));
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0,
|
||||
conts.get(0).getResource()));
|
||||
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
||||
|
||||
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
|
@ -244,7 +245,8 @@ public class TestApplicationCleanup {
|
|||
containerStatuses.clear();
|
||||
containerStatusList.clear();
|
||||
containerStatusList.add(BuilderUtils.newContainerStatus(conts.get(0)
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0));
|
||||
.getId(), ContainerState.RUNNING, "nothing", 0,
|
||||
conts.get(0).getResource()));
|
||||
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
||||
|
||||
resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
|
|
|
@ -956,7 +956,8 @@ public class TestRMAppAttemptTransitions {
|
|||
int exitCode = 123;
|
||||
ContainerStatus cs =
|
||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
ContainerState.COMPLETE, containerDiagMsg, exitCode);
|
||||
ContainerState.COMPLETE, containerDiagMsg, exitCode,
|
||||
amContainer.getResource());
|
||||
NodeId anyNodeId = NodeId.newInstance("host", 1234);
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), cs, anyNodeId));
|
||||
|
@ -980,7 +981,8 @@ public class TestRMAppAttemptTransitions {
|
|||
String containerDiagMsg = "some error";
|
||||
int exitCode = 123;
|
||||
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
ContainerState.COMPLETE, containerDiagMsg, exitCode);
|
||||
ContainerState.COMPLETE, containerDiagMsg, exitCode,
|
||||
amContainer.getResource());
|
||||
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
|
||||
NodeId anyNodeId = NodeId.newInstance("host", 1234);
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
|
@ -992,7 +994,8 @@ public class TestRMAppAttemptTransitions {
|
|||
applicationAttempt.getAppAttemptState());
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0,
|
||||
amContainer.getResource()), anyNodeId));
|
||||
applicationAttempt.handle(new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
|
||||
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
||||
|
@ -1030,7 +1033,8 @@ public class TestRMAppAttemptTransitions {
|
|||
NodeId anyNodeId = NodeId.newInstance("host", 1234);
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0,
|
||||
amContainer.getResource()), anyNodeId));
|
||||
applicationAttempt.handle(new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
|
||||
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
||||
|
@ -1207,7 +1211,8 @@ public class TestRMAppAttemptTransitions {
|
|||
BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(
|
||||
applicationAttempt.getAppAttemptId(), 42),
|
||||
ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||
ContainerState.COMPLETE, "", 0,
|
||||
amContainer.getResource()), anyNodeId));
|
||||
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics);
|
||||
}
|
||||
|
@ -1227,7 +1232,8 @@ public class TestRMAppAttemptTransitions {
|
|||
new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||
ContainerState.COMPLETE, "", 0,
|
||||
amContainer.getResource()), anyNodeId));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics, 0, false);
|
||||
}
|
||||
|
@ -1256,7 +1262,8 @@ public class TestRMAppAttemptTransitions {
|
|||
NodeId anyNodeId = NodeId.newInstance("host", 1234);
|
||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||
amContainer.getId(), ContainerState.COMPLETE, "", 0,
|
||||
amContainer.getResource()), anyNodeId));
|
||||
assertEquals(RMAppAttemptState.FINAL_SAVING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
// send attempt_saved
|
||||
|
|
|
@ -870,7 +870,7 @@ public class TestCapacityScheduler {
|
|||
|
||||
// Check container can complete successfully in case of resource over-commitment.
|
||||
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||
c1.getId(), ContainerState.COMPLETE, "", 0);
|
||||
c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
|
||||
nm1.containerStatus(containerStatus);
|
||||
int waitCount = 0;
|
||||
while (attempt1.getJustFinishedContainers().size() < 1
|
||||
|
|
|
@ -746,7 +746,7 @@ public class TestFifoScheduler {
|
|||
Assert.assertEquals(GB, c1.getResource().getMemory());
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(c1.getId(), ContainerState.COMPLETE,
|
||||
"", 0);
|
||||
"", 0, c1.getResource());
|
||||
nm1.containerStatus(containerStatus);
|
||||
int waitCount = 0;
|
||||
while (attempt1.getJustFinishedContainers().size() < 1 && waitCount++ != 20) {
|
||||
|
@ -1141,7 +1141,7 @@ public class TestFifoScheduler {
|
|||
// over-commitment.
|
||||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(c1.getId(), ContainerState.COMPLETE,
|
||||
"", 0);
|
||||
"", 0, c1.getResource());
|
||||
nm1.containerStatus(containerStatus);
|
||||
int waitCount = 0;
|
||||
while (attempt1.getJustFinishedContainers().size() < 1 && waitCount++ != 20) {
|
||||
|
|
|
@ -171,7 +171,8 @@ public class TestAMRMTokens {
|
|||
ContainerStatus containerStatus =
|
||||
BuilderUtils.newContainerStatus(attempt.getMasterContainer().getId(),
|
||||
ContainerState.COMPLETE,
|
||||
"AM Container Finished", 0);
|
||||
"AM Container Finished", 0,
|
||||
attempt.getMasterContainer().getResource());
|
||||
rm.getRMContext()
|
||||
.getDispatcher()
|
||||
.getEventHandler()
|
||||
|
|
Loading…
Reference in New Issue