YARN-5176. More test cases for queuing of containers at the NM. (Konstantinos Karanasos via asuresh)
(cherry picked from commit 76f0800c21
)
This commit is contained in:
parent
10f0c0475e
commit
154c7c343b
|
@ -160,6 +160,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
containerTokenId.getExecutionType());
|
containerTokenId.getExecutionType());
|
||||||
|
|
||||||
if (foundInQueue) {
|
if (foundInQueue) {
|
||||||
|
LOG.info("Removing queued container with ID " + containerID);
|
||||||
this.context.getQueuingContext().getKilledQueuedContainers().put(
|
this.context.getQueuingContext().getKilledQueuedContainers().put(
|
||||||
containerTokenId,
|
containerTokenId,
|
||||||
"Queued container request removed by ApplicationMaster.");
|
"Queued container request removed by ApplicationMaster.");
|
||||||
|
@ -502,6 +503,16 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
return allocatedOpportunisticContainers.size();
|
return allocatedOpportunisticContainers.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getNumQueuedGuaranteedContainers() {
|
||||||
|
return queuedGuaranteedContainers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public int getNumQueuedOpportunisticContainers() {
|
||||||
|
return queuedOpportunisticContainers.size();
|
||||||
|
}
|
||||||
|
|
||||||
class QueuingApplicationEventDispatcher implements
|
class QueuingApplicationEventDispatcher implements
|
||||||
EventHandler<ApplicationEvent> {
|
EventHandler<ApplicationEvent> {
|
||||||
private EventHandler<ApplicationEvent> applicationEventDispatcher;
|
private EventHandler<ApplicationEvent> applicationEventDispatcher;
|
||||||
|
|
|
@ -40,10 +40,17 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
@ -51,6 +58,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
@ -71,6 +79,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
|
@ -354,4 +363,59 @@ public abstract class BaseContainerManagerTest {
|
||||||
Assert.assertEquals("ContainerState is not correct (timedout)",
|
Assert.assertEquals("ContainerState is not correct (timedout)",
|
||||||
finalState, currentState);
|
finalState, currentState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||||
|
NodeId nodeId, String user,
|
||||||
|
NMContainerTokenSecretManager containerTokenSecretManager)
|
||||||
|
throws IOException {
|
||||||
|
return createContainerToken(cId, rmIdentifier, nodeId, user,
|
||||||
|
containerTokenSecretManager, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||||
|
NodeId nodeId, String user,
|
||||||
|
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
|
LogAggregationContext logAggregationContext)
|
||||||
|
throws IOException {
|
||||||
|
Resource r = BuilderUtils.newResource(1024, 1);
|
||||||
|
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
|
||||||
|
containerTokenSecretManager, logAggregationContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||||
|
NodeId nodeId, String user, Resource resource,
|
||||||
|
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
|
LogAggregationContext logAggregationContext)
|
||||||
|
throws IOException {
|
||||||
|
ContainerTokenIdentifier containerTokenIdentifier =
|
||||||
|
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||||
|
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||||
|
Priority.newInstance(0), 0, logAggregationContext, null);
|
||||||
|
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||||
|
.retrievePassword(containerTokenIdentifier),
|
||||||
|
containerTokenIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
||||||
|
NodeId nodeId, String user, Resource resource,
|
||||||
|
NMContainerTokenSecretManager containerTokenSecretManager,
|
||||||
|
LogAggregationContext logAggregationContext, ExecutionType executionType)
|
||||||
|
throws IOException {
|
||||||
|
ContainerTokenIdentifier containerTokenIdentifier =
|
||||||
|
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
||||||
|
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
||||||
|
Priority.newInstance(0), 0, logAggregationContext, null,
|
||||||
|
ContainerType.TASK, executionType);
|
||||||
|
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
||||||
|
.retrievePassword(containerTokenIdentifier),
|
||||||
|
containerTokenIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ContainerId createContainerId(int id) {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
||||||
|
ApplicationAttemptId appAttemptId =
|
||||||
|
ApplicationAttemptId.newInstance(appId, 1);
|
||||||
|
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.timeout;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileReader;
|
import java.io.FileReader;
|
||||||
|
@ -38,10 +43,10 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
|
@ -58,13 +63,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
||||||
|
@ -75,7 +76,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.ContainerType;
|
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
||||||
|
@ -90,8 +90,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -99,11 +97,6 @@ import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.mockito.Mockito.never;
|
|
||||||
import static org.mockito.Mockito.timeout;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
|
|
||||||
public class TestContainerManager extends BaseContainerManagerTest {
|
public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
public TestContainerManager() throws UnsupportedFileSystemException {
|
public TestContainerManager() throws UnsupportedFileSystemException {
|
||||||
|
@ -120,14 +113,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
super.setup();
|
super.setup();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ContainerId createContainerId(int id) {
|
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
|
||||||
ApplicationAttemptId appAttemptId =
|
|
||||||
ApplicationAttemptId.newInstance(appId, 1);
|
|
||||||
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
|
|
||||||
return containerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainerManagerImpl
|
protected ContainerManagerImpl
|
||||||
createContainerManager(DeletionService delSrvc) {
|
createContainerManager(DeletionService delSrvc) {
|
||||||
|
@ -1153,53 +1138,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
||||||
assertEquals(targetResource, containerStatus.getCapability());
|
assertEquals(targetResource, containerStatus.getCapability());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
||||||
NodeId nodeId, String user,
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager)
|
|
||||||
throws IOException {
|
|
||||||
return createContainerToken(cId, rmIdentifier, nodeId, user,
|
|
||||||
containerTokenSecretManager, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
||||||
NodeId nodeId, String user,
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
|
||||||
LogAggregationContext logAggregationContext)
|
|
||||||
throws IOException {
|
|
||||||
Resource r = BuilderUtils.newResource(1024, 1);
|
|
||||||
return createContainerToken(cId, rmIdentifier, nodeId, user, r,
|
|
||||||
containerTokenSecretManager, logAggregationContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
||||||
NodeId nodeId, String user, Resource resource,
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
|
||||||
LogAggregationContext logAggregationContext)
|
|
||||||
throws IOException {
|
|
||||||
ContainerTokenIdentifier containerTokenIdentifier =
|
|
||||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
|
||||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
|
||||||
Priority.newInstance(0), 0, logAggregationContext, null);
|
|
||||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
|
||||||
.retrievePassword(containerTokenIdentifier),
|
|
||||||
containerTokenIdentifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
||||||
NodeId nodeId, String user, Resource resource,
|
|
||||||
NMContainerTokenSecretManager containerTokenSecretManager,
|
|
||||||
LogAggregationContext logAggregationContext, ExecutionType executionType)
|
|
||||||
throws IOException {
|
|
||||||
ContainerTokenIdentifier containerTokenIdentifier =
|
|
||||||
new ContainerTokenIdentifier(cId, nodeId.toString(), user, resource,
|
|
||||||
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
|
|
||||||
Priority.newInstance(0), 0, logAggregationContext, null,
|
|
||||||
ContainerType.TASK, executionType);
|
|
||||||
return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
|
|
||||||
.retrievePassword(containerTokenIdentifier),
|
|
||||||
containerTokenIdentifier);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOutputThreadDumpSignal() throws IOException,
|
public void testOutputThreadDumpSignal() throws IOException,
|
||||||
InterruptedException, YarnException {
|
InterruptedException, YarnException {
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -32,42 +37,27 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
|
||||||
.ContainersMonitorImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
import java.io.File;
|
* Class for testing the {@link QueuingContainerManagerImpl}.
|
||||||
import java.io.IOException;
|
*/
|
||||||
import java.io.PrintWriter;
|
public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class TestQueuingContainerManager extends TestContainerManager {
|
|
||||||
|
|
||||||
interface HasResources {
|
interface HasResources {
|
||||||
boolean decide(Context context, ContainerId cId);
|
boolean decide(Context context, ContainerId cId);
|
||||||
|
@ -119,15 +109,6 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
return ugi;
|
return ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void authorizeGetAndStopContainerRequest(
|
|
||||||
ContainerId containerId, Container container, boolean stopRequest,
|
|
||||||
NMTokenIdentifier identifier) throws YarnException {
|
|
||||||
if (container == null || container.getUser().equals("Fail")) {
|
|
||||||
throw new YarnException("Reject this container");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ContainersMonitor createContainersMonitor(
|
protected ContainersMonitor createContainersMonitor(
|
||||||
ContainerExecutor exec) {
|
ContainerExecutor exec) {
|
||||||
|
@ -148,7 +129,7 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getVCoresAllocatedForContainers() {
|
public long getVCoresAllocatedForContainers() {
|
||||||
return 2;
|
return 4;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -186,54 +167,17 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test to verify that an OPPORTUNISTIC container is killed when
|
* Starting one GUARANTEED and one OPPORTUNISTIC container.
|
||||||
* a GUARANTEED container arrives and all the Node Resources are used up
|
|
||||||
*
|
|
||||||
* For this specific test case, 4 containers are requested (last one being
|
|
||||||
* guaranteed). Assumptions :
|
|
||||||
* 1) The first OPPORTUNISTIC Container will start running
|
|
||||||
* 2) The second and third OPP containers will be queued
|
|
||||||
* 3) When the GUARANTEED container comes in, the running OPP container
|
|
||||||
* will be killed to make room
|
|
||||||
* 4) After the GUARANTEED container finishes, the remaining 2 OPP
|
|
||||||
* containers will be dequeued and run.
|
|
||||||
* 5) Only the first OPP container will be killed.
|
|
||||||
*
|
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testSimpleOpportunisticContainer() throws Exception {
|
public void testStartMultipleContainers() throws Exception {
|
||||||
shouldDeleteWait = true;
|
shouldDeleteWait = true;
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
|
||||||
// ////// Create the resources for the container
|
|
||||||
File dir = new File(tmpDir, "dir");
|
|
||||||
dir.mkdirs();
|
|
||||||
File file = new File(dir, "file");
|
|
||||||
PrintWriter fileWriter = new PrintWriter(file);
|
|
||||||
fileWriter.write("Hello World!");
|
|
||||||
fileWriter.close();
|
|
||||||
|
|
||||||
// ////// Construct the container-spec.
|
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
URL resource_alpha =
|
|
||||||
ConverterUtils.getYarnUrlFromPath(localFS
|
|
||||||
.makeQualified(new Path(file.getAbsolutePath())));
|
|
||||||
LocalResource rsrc_alpha =
|
|
||||||
recordFactory.newRecordInstance(LocalResource.class);
|
|
||||||
rsrc_alpha.setResource(resource_alpha);
|
|
||||||
rsrc_alpha.setSize(-1);
|
|
||||||
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
||||||
rsrc_alpha.setType(LocalResourceType.FILE);
|
|
||||||
rsrc_alpha.setTimestamp(file.lastModified());
|
|
||||||
String destinationFile = "dest_file";
|
|
||||||
Map<String, LocalResource> localResources =
|
|
||||||
new HashMap<String, LocalResource>();
|
|
||||||
localResources.put(destinationFile, rsrc_alpha);
|
|
||||||
containerLaunchContext.setLocalResources(localResources);
|
|
||||||
|
|
||||||
// Start 3 OPPORTUNISTIC containers and 1 GUARANTEED container
|
|
||||||
List<StartContainerRequest> list = new ArrayList<>();
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
list.add(StartContainerRequest.newInstance(
|
list.add(StartContainerRequest.newInstance(
|
||||||
containerLaunchContext,
|
containerLaunchContext,
|
||||||
|
@ -241,6 +185,122 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
context.getNodeId(),
|
context.getNodeId(),
|
||||||
user, BuilderUtils.newResource(1024, 1),
|
user, BuilderUtils.newResource(1024, 1),
|
||||||
context.getContainerTokenSecretManager(), null,
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.GUARANTEED)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(1024, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
BaseContainerManagerTest.waitForContainerState(containerManager,
|
||||||
|
createContainerId(0),
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
|
||||||
|
BaseContainerManagerTest.waitForContainerState(containerManager,
|
||||||
|
createContainerId(1),
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
|
||||||
|
|
||||||
|
// Ensure all containers are running.
|
||||||
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
GetContainerStatusesRequest statRequest =
|
||||||
|
GetContainerStatusesRequest.newInstance(statList);
|
||||||
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
status.getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit both a GUARANTEED and an OPPORTUNISTIC container, each of which
|
||||||
|
* requires more resources than available at the node, and make sure they
|
||||||
|
* are both queued.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testQueueMultipleContainers() throws Exception {
|
||||||
|
shouldDeleteWait = true;
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(3072, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.GUARANTEED)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(3072, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
// Ensure both containers are queued.
|
||||||
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
GetContainerStatusesRequest statRequest =
|
||||||
|
GetContainerStatusesRequest.newInstance(statList);
|
||||||
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||||
|
status.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure both containers are properly queued.
|
||||||
|
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
|
||||||
|
.getQueuedContainers().size());
|
||||||
|
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
|
||||||
|
.getNumQueuedGuaranteedContainers());
|
||||||
|
Assert.assertEquals(1, ((QueuingContainerManagerImpl) containerManager)
|
||||||
|
.getNumQueuedOpportunisticContainers());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Starts one OPPORTUNISTIC container that takes up the whole node's
|
||||||
|
* resources, and submit two more that will be queued.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStartAndQueueMultipleContainers() throws Exception {
|
||||||
|
shouldDeleteWait = true;
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(2048, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
ExecutionType.OPPORTUNISTIC)));
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
list.add(StartContainerRequest.newInstance(
|
list.add(StartContainerRequest.newInstance(
|
||||||
containerLaunchContext,
|
containerLaunchContext,
|
||||||
|
@ -256,21 +316,185 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
user, BuilderUtils.newResource(1024, 1),
|
user, BuilderUtils.newResource(1024, 1),
|
||||||
context.getContainerTokenSecretManager(), null,
|
context.getContainerTokenSecretManager(), null,
|
||||||
ExecutionType.OPPORTUNISTIC)));
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
// GUARANTEED
|
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
// Ensure first container is running and others are queued.
|
||||||
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
|
||||||
|
.newInstance(Arrays.asList(createContainerId(0)));
|
||||||
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
status.getState());
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||||
|
status.getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure two containers are properly queued.
|
||||||
|
Assert.assertEquals(2, containerManager.getContext().getQueuingContext()
|
||||||
|
.getQueuedContainers().size());
|
||||||
|
Assert.assertEquals(0, ((QueuingContainerManagerImpl) containerManager)
|
||||||
|
.getNumQueuedGuaranteedContainers());
|
||||||
|
Assert.assertEquals(2, ((QueuingContainerManagerImpl) containerManager)
|
||||||
|
.getNumQueuedOpportunisticContainers());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
|
||||||
|
* requests by each container as such that only one can run in parallel.
|
||||||
|
* Thus, the OPPORTUNISTIC container that started running, will be
|
||||||
|
* killed for the GUARANTEED container to start.
|
||||||
|
* Once the GUARANTEED container finishes its execution, the remaining
|
||||||
|
* OPPORTUNISTIC container will be executed.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKillOpportunisticForGuaranteedContainer() throws Exception {
|
||||||
|
shouldDeleteWait = true;
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(2048, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(2048, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(2048, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.GUARANTEED)));
|
||||||
|
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
|
createContainerId(0), ContainerState.DONE, 30);
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
// Get container statuses. Container 0 should be killed, container 1
|
||||||
|
// should be queued and container 2 should be running.
|
||||||
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
GetContainerStatusesRequest statRequest =
|
||||||
|
GetContainerStatusesRequest.newInstance(statList);
|
||||||
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
|
Assert.assertTrue(status.getDiagnostics()
|
||||||
|
.contains("Container killed by the ApplicationMaster"));
|
||||||
|
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||||
|
status.getState());
|
||||||
|
} else if (status.getContainerId().equals(createContainerId(2))) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
status.getState());
|
||||||
|
}
|
||||||
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
||||||
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
|
createContainerId(2), ContainerState.DONE, 30);
|
||||||
|
Thread.sleep(5000);
|
||||||
|
statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
|
||||||
|
createContainerId(1)));
|
||||||
|
ContainerStatus contStatus1 = containerManager.getContainerStatuses(
|
||||||
|
statRequest).getContainerStatuses().get(0);
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
contStatus1.getState());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit three OPPORTUNISTIC containers that can run concurrently, and one
|
||||||
|
* GUARANTEED that needs to kill two of the OPPORTUNISTIC for it to run.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testKillMultipleOpportunisticContainers() throws Exception {
|
||||||
|
shouldDeleteWait = true;
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(512, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(512, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(512, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
list.add(StartContainerRequest.newInstance(
|
list.add(StartContainerRequest.newInstance(
|
||||||
containerLaunchContext,
|
containerLaunchContext,
|
||||||
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
|
createContainerToken(createContainerId(3), DUMMY_RM_IDENTIFIER,
|
||||||
context.getNodeId(),
|
context.getNodeId(),
|
||||||
user, BuilderUtils.newResource(1024, 1),
|
user, BuilderUtils.newResource(1500, 1),
|
||||||
context.getContainerTokenSecretManager(), null,
|
context.getContainerTokenSecretManager(), null,
|
||||||
ExecutionType.GUARANTEED)));
|
ExecutionType.GUARANTEED)));
|
||||||
|
|
||||||
StartContainersRequest allRequests =
|
StartContainersRequest allRequests =
|
||||||
StartContainersRequest.newInstance(list);
|
StartContainersRequest.newInstance(list);
|
||||||
|
|
||||||
containerManager.startContainers(allRequests);
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
Thread.sleep(10000);
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
|
createContainerId(0), ContainerState.DONE, 30);
|
||||||
|
Thread.sleep(5000);
|
||||||
|
|
||||||
|
// Get container statuses. Container 0 should be killed, container 1
|
||||||
|
// should be queued and container 2 should be running.
|
||||||
|
int killedContainers = 0;
|
||||||
|
int runningContainers = 0;
|
||||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
statList.add(createContainerId(i));
|
statList.add(createContainerId(i));
|
||||||
|
@ -280,12 +504,18 @@ public class TestQueuingContainerManager extends TestContainerManager {
|
||||||
List<ContainerStatus> containerStatuses = containerManager
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
// Ensure that the first opportunistic container is killed
|
if (status.getDiagnostics().contains(
|
||||||
if (status.getContainerId().equals(createContainerId(0))) {
|
"Container killed by the ApplicationMaster")) {
|
||||||
Assert.assertTrue(status.getDiagnostics()
|
killedContainers++;
|
||||||
.contains("Container killed by the ApplicationMaster"));
|
}
|
||||||
|
if (status.getState() ==
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
||||||
|
runningContainers++;
|
||||||
}
|
}
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(2, killedContainers);
|
||||||
|
Assert.assertEquals(2, runningContainers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue