YARN-5982. Simplify opportunistic container parameters and metrics. (Konstantinos Karanasos via asuresh)

This commit is contained in:
Arun Suresh 2016-12-09 16:41:25 -08:00
parent 55f5886ea2
commit b0aace21b1
9 changed files with 68 additions and 197 deletions

View File

@ -323,47 +323,6 @@ public class YarnConfiguration extends Configuration {
public static final boolean
OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT = false;
/** Minimum memory (in MB) used for allocating an opportunistic container. */
public static final String OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB =
YARN_PREFIX + "opportunistic-containers.min-memory-mb";
public static final int OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT = 512;
/** Minimum virtual CPU cores used for allocating an opportunistic container.
* */
public static final String OPPORTUNISTIC_CONTAINERS_MIN_VCORES =
YARN_PREFIX + "opportunistic-containers.min-vcores";
public static final int OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT = 1;
/** Maximum memory (in MB) used for allocating an opportunistic container. */
public static final String OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB =
YARN_PREFIX + "opportunistic-containers.max-memory-mb";
public static final int OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT = 2048;
/** Maximum virtual CPU cores used for allocating an opportunistic container.
* */
public static final String OPPORTUNISTIC_CONTAINERS_MAX_VCORES =
YARN_PREFIX + "opportunistic-containers.max-vcores";
public static final int OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT = 4;
/** Incremental memory (in MB) used for allocating an opportunistic container.
* */
public static final String OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB =
YARN_PREFIX + "opportunistic-containers.incr-memory-mb";
public static final int OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT =
512;
/** Incremental virtual CPU cores used for allocating an opportunistic
* container. */
public static final String OPPORTUNISTIC_CONTAINERS_INCR_VCORES =
YARN_PREFIX + "opportunistic-containers.incr-vcores";
public static final int OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT = 1;
/** Container token expiry for opportunistic containers. */
public static final String OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS =
YARN_PREFIX + "opportunistic-containers.container-token-expiry-ms";
public static final int OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT =
600000;
/** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during container allocation. */
public static final String OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED =

View File

@ -2764,63 +2764,6 @@
<value>false</value>
</property>
<property>
<description>
Minimum memory (in MB) used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.min-memory-mb</name>
<value>512</value>
</property>
<property>
<description>
Minimum virtual CPU cores used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.min-vcores</name>
<value>1</value>
</property>
<property>
<description>
Maximum memory (in MB) used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.max-memory-mb</name>
<value>2048</value>
</property>
<property>
<description>
Maximum virtual CPU cores used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.max-vcores</name>
<value>4</value>
</property>
<property>
<description>
Incremental memory (in MB) used for allocating an opportunistic container.
</description>
<name>yarn.opportunistic-containers.incr-memory-mb</name>
<value>512</value>
</property>
<property>
<description>
Incremental virtual CPU cores used for allocating an opportunistic
container.
</description>
<name>yarn.opportunistic-containers.incr-vcores</name>
<value>1</value>
</property>
<property>
<description>
Container token expiry for opportunistic containers.
</description>
<name>yarn.opportunistic-containers.container-token-expiry-ms</name>
<value>600000</value>
</property>
<property>
<description>
Number of nodes to be used by the Opportunistic Container Allocator for

View File

@ -170,11 +170,11 @@ public class ContainerScheduler extends AbstractService implements
this.opportunisticContainersStatus.setWaitQueueLength(
getNumQueuedContainers());
this.opportunisticContainersStatus.setOpportMemoryUsed(
metrics.getOpportMemoryUsed());
metrics.getAllocatedOpportunisticGB());
this.opportunisticContainersStatus.setOpportCoresUsed(
metrics.getOpportCoresUsed());
metrics.getAllocatedOpportunisticVCores());
this.opportunisticContainersStatus.setRunningOpportContainers(
metrics.getRunningOpportContainers());
metrics.getRunningOpportunisticContainers());
return this.opportunisticContainersStatus;
}
@ -196,7 +196,7 @@ public class ContainerScheduler extends AbstractService implements
this.utilizationTracker.subtractContainerResource(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.opportunisticContainerCompleted(container);
this.metrics.completeOpportunisticContainer(container.getResource());
}
startPendingContainers();
}
@ -298,7 +298,7 @@ public class ContainerScheduler extends AbstractService implements
this.utilizationTracker.addContainerResources(container);
if (container.getContainerTokenIdentifier().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
this.metrics.opportunisticContainerStarted(container);
this.metrics.startOpportunisticContainer(container.getResource());
}
container.sendLaunchEvent();
}

View File

@ -29,8 +29,6 @@ import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.yarn.api.records.Resource;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
.Container;
@Metrics(about="Metrics for node manager", context="yarn")
public class NodeManagerMetrics {
@ -64,12 +62,12 @@ public class NodeManagerMetrics {
@Metric("Disk utilization % on good log dirs")
MutableGaugeInt goodLogDirsDiskUtilizationPerc;
@Metric("Memory used by Opportunistic Containers in MB")
MutableGaugeLong opportMemoryUsed;
@Metric("# of Virtual Cores used by opportunistic containers")
MutableGaugeInt opportCoresUsed;
@Metric("Current allocated memory by opportunistic containers in GB")
MutableGaugeLong allocatedOpportunisticGB;
@Metric("Current allocated Virtual Cores by opportunistic containers")
MutableGaugeInt allocatedOpportunisticVCores;
@Metric("# of running opportunistic containers")
MutableGaugeInt runningOpportContainers;
MutableGaugeInt runningOpportunisticContainers;
// CHECKSTYLE:ON:VisibilityModifier
@ -77,6 +75,7 @@ public class NodeManagerMetrics {
private long allocatedMB;
private long availableMB;
private long allocatedOpportunisticMB;
public NodeManagerMetrics(JvmMetrics jvmMetrics) {
this.jvmMetrics = jvmMetrics;
@ -141,30 +140,6 @@ public class NodeManagerMetrics {
containersReIniting.decr();
}
public long getOpportMemoryUsed() {
return opportMemoryUsed.value();
}
public int getOpportCoresUsed() {
return opportCoresUsed.value();
}
public int getRunningOpportContainers() {
return runningOpportContainers.value();
}
public void opportunisticContainerCompleted(Container container) {
opportMemoryUsed.decr(container.getResource().getMemorySize());
opportCoresUsed.decr(container.getResource().getVirtualCores());
runningOpportContainers.decr();
}
public void opportunisticContainerStarted(Container container) {
opportMemoryUsed.incr(container.getResource().getMemorySize());
opportCoresUsed.incr(container.getResource().getVirtualCores());
runningOpportContainers.incr();
}
public void allocateContainer(Resource res) {
allocatedContainers.incr();
allocatedMB = allocatedMB + res.getMemorySize();
@ -196,6 +171,22 @@ public class NodeManagerMetrics {
availableVCores.decr(deltaVCores);
}
public void startOpportunisticContainer(Resource res) {
runningOpportunisticContainers.incr();
allocatedOpportunisticMB = allocatedOpportunisticMB + res.getMemorySize();
allocatedOpportunisticGB
.set((int) Math.ceil(allocatedOpportunisticMB / 1024d));
allocatedOpportunisticVCores.incr(res.getVirtualCores());
}
public void completeOpportunisticContainer(Resource res) {
runningOpportunisticContainers.decr();
allocatedOpportunisticMB = allocatedOpportunisticMB - res.getMemorySize();
allocatedOpportunisticGB
.set((int) Math.ceil(allocatedOpportunisticMB / 1024d));
allocatedOpportunisticVCores.decr(res.getVirtualCores());
}
public void addResource(Resource res) {
availableMB = availableMB + res.getMemorySize();
availableGB.incr((int)Math.floor(availableMB/1024d));
@ -272,4 +263,16 @@ public class NodeManagerMetrics {
public int getContainersRolledbackOnFailure() {
return containersRolledBackOnFailure.value();
}
public long getAllocatedOpportunisticGB() {
return allocatedOpportunisticGB.value();
}
public int getAllocatedOpportunisticVCores() {
return allocatedOpportunisticVCores.value();
}
public int getRunningOpportunisticContainers() {
return runningOpportunisticContainers.value();
}
}

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -199,11 +198,12 @@ public class OpportunisticContainerAllocatorAMService
}
});
int tokenExpiryInterval = getConfig()
.getInt(YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT);
opCtx.updateAllocationParams(createMinContainerResource(),
createMaxContainerResource(), createIncrContainerResource(),
.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
opCtx.updateAllocationParams(
rmContext.getScheduler().getMinimumResourceCapability(),
rmContext.getScheduler().getMaximumResourceCapability(),
rmContext.getScheduler().getMinimumResourceCapability(),
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
@ -273,14 +273,14 @@ public class OpportunisticContainerAllocatorAMService
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
dsResp.setMinContainerResource(createMinContainerResource());
dsResp.setMaxContainerResource(createMaxContainerResource());
dsResp.setIncrContainerResource(createIncrContainerResource());
dsResp.setMinContainerResource(
rmContext.getScheduler().getMinimumResourceCapability());
dsResp.setMaxContainerResource(
rmContext.getScheduler().getMaximumResourceCapability());
dsResp.setContainerTokenExpiryInterval(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_TOKEN_EXPIRY_MS_DEFAULT));
YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS));
dsResp.setContainerIdStart(
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
@ -384,18 +384,6 @@ public class OpportunisticContainerAllocatorAMService
return nodeMonitor.getThresholdCalculator();
}
private Resource createIncrContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_INCR_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_INCR_VCORES_DEFAULT)
);
}
private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
@ -425,30 +413,6 @@ public class OpportunisticContainerAllocatorAMService
: null;
}
private Resource createMaxContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB,
YarnConfiguration
.OPPORTUNISTIC_CONTAINERS_MAX_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MAX_VCORES_DEFAULT)
);
}
private Resource createMinContainerResource() {
return Resource.newInstance(
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB,
YarnConfiguration.
OPPORTUNISTIC_CONTAINERS_MIN_MEMORY_MB_DEFAULT),
getConfig().getInt(
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES,
YarnConfiguration.OPPORTUNISTIC_CONTAINERS_MIN_VCORES_DEFAULT)
);
}
private static ApplicationAttemptId getAppAttemptId() throws YarnException {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();

View File

@ -149,10 +149,10 @@ public abstract class SchedulerNode {
*/
public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
deductUnallocatedResource(container.getResource());
++numContainers;
}
++numContainers;
launchedContainers.put(container.getId(), rmContainer);
@ -251,8 +251,8 @@ public abstract class SchedulerNode {
Container container) {
if (container.getExecutionType() == ExecutionType.GUARANTEED) {
addUnallocatedResource(container.getResource());
--numContainers;
}
--numContainers;
}
/**

View File

@ -49,6 +49,7 @@ class NodesPage extends RmView {
static class NodesBlock extends HtmlBlock {
final ResourceManager rm;
private static final long BYTES_IN_MB = 1024 * 1024;
private static final long BYTES_IN_GB = 1024 * 1024 * 1024;
private static boolean opportunisticContainersEnabled;
@Inject
@ -181,8 +182,9 @@ class NodesPage extends RmView {
nodeTableData
.append(String.valueOf(info.getNumRunningOpportContainers()))
.append("\",\"").append("<br title='")
.append(String.valueOf(info.getUsedMemoryOpport())).append("'>")
.append(StringUtils.byteDesc(info.getUsedMemoryOpport()))
.append(String.valueOf(info.getUsedMemoryOpportGB())).append("'>")
.append(StringUtils.byteDesc(
info.getUsedMemoryOpportGB() * BYTES_IN_GB))
.append("\",\"")
.append(String.valueOf(info.getUsedVirtualCoresOpport()))
.append("\",\"")

View File

@ -51,7 +51,7 @@ public class NodeInfo {
protected long usedVirtualCores;
protected long availableVirtualCores;
private int numRunningOpportContainers;
private long usedMemoryOpport; // Memory in bytes.
private long usedMemoryOpportGB;
private long usedVirtualCoresOpport;
private int numQueuedContainers;
protected ArrayList<String> nodeLabels = new ArrayList<String>();
@ -85,7 +85,7 @@ public class NodeInfo {
// Status of opportunistic containers.
this.numRunningOpportContainers = 0;
this.usedMemoryOpport = 0;
this.usedMemoryOpportGB = 0;
this.usedVirtualCoresOpport = 0;
this.numQueuedContainers = 0;
OpportunisticContainersStatus opportStatus =
@ -93,7 +93,7 @@ public class NodeInfo {
if (opportStatus != null) {
this.numRunningOpportContainers =
opportStatus.getRunningOpportContainers();
this.usedMemoryOpport = opportStatus.getOpportMemoryUsed();
this.usedMemoryOpportGB = opportStatus.getOpportMemoryUsed();
this.usedVirtualCoresOpport = opportStatus.getOpportCoresUsed();
this.numQueuedContainers = opportStatus.getQueuedOpportContainers();
}
@ -165,8 +165,8 @@ public class NodeInfo {
return numRunningOpportContainers;
}
public long getUsedMemoryOpport() {
return usedMemoryOpport;
public long getUsedMemoryOpportGB() {
return usedMemoryOpportGB;
}
public long getUsedVirtualCoresOpport() {

View File

@ -726,7 +726,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
"aggregatedContainersVirtualMemoryMB"),
WebServicesTestUtils.getXmlFloat(element, "containersCPUUsage"),
WebServicesTestUtils.getXmlInt(element, "numRunningOpportContainers"),
WebServicesTestUtils.getXmlLong(element, "usedMemoryOpport"),
WebServicesTestUtils.getXmlLong(element, "usedMemoryOpportGB"),
WebServicesTestUtils.getXmlInt(element, "usedVirtualCoresOpport"),
WebServicesTestUtils.getXmlInt(element, "numQueuedContainers"));
}
@ -753,7 +753,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
resourceInfo.getInt("aggregatedContainersVirtualMemoryMB"),
resourceInfo.getDouble("containersCPUUsage"),
nodeInfo.getInt("numRunningOpportContainers"),
nodeInfo.getLong("usedMemoryOpport"),
nodeInfo.getLong("usedMemoryOpportGB"),
nodeInfo.getInt("usedVirtualCoresOpport"),
nodeInfo.getInt("numQueuedContainers"));
}
@ -766,7 +766,7 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
int nodePhysicalMemoryMB, int nodeVirtualMemoryMB, double nodeCPUUsage,
int containersPhysicalMemoryMB, int containersVirtualMemoryMB,
double containersCPUUsage, int numRunningOpportContainers,
long usedMemoryOpport, int usedVirtualCoresOpport,
long usedMemoryOpportGB, int usedVirtualCoresOpport,
int numQueuedContainers)
throws JSONException, Exception {
@ -827,8 +827,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
numRunningOpportContainers,
opportunisticStatus.getRunningOpportContainers(),
numRunningOpportContainers);
assertEquals("usedMemoryOpport doesn't match: " + usedMemoryOpport,
opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpport);
assertEquals("usedMemoryOpportGB doesn't match: " + usedMemoryOpportGB,
opportunisticStatus.getOpportMemoryUsed(), usedMemoryOpportGB);
assertEquals(
"usedVirtualCoresOpport doesn't match: " + usedVirtualCoresOpport,
opportunisticStatus.getOpportCoresUsed(), usedVirtualCoresOpport);