YARN-9505. Add container allocation latency for Opportunistic Scheduler. Contributed by Abhishek Modi.

This commit is contained in:
Giovanni Matteo Fumarola 2019-05-17 12:03:21 -07:00
parent 3e5e5b028a
commit 12c81610e0
4 changed files with 169 additions and 128 deletions

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -86,6 +87,9 @@ public class OpportunisticSchedulerMetrics {
@Metric("Aggregate # of allocated off-switch opportunistic containers") @Metric("Aggregate # of allocated off-switch opportunistic containers")
MutableCounterLong aggregateOffSwitchOContainersAllocated; MutableCounterLong aggregateOffSwitchOContainersAllocated;
@Metric("Aggregate latency for opportunistic container allocation")
MutableQuantiles allocateLatencyOQuantiles;
@VisibleForTesting @VisibleForTesting
public int getAllocatedContainers() { public int getAllocatedContainers() {
return allocatedOContainers.value(); return allocatedOContainers.value();
@ -138,4 +142,8 @@ public class OpportunisticSchedulerMetrics {
public void incrOffSwitchOppContainers() { public void incrOffSwitchOppContainers() {
aggregateOffSwitchOContainersAllocated.incr(); aggregateOffSwitchOContainersAllocated.incr();
} }
public void addAllocateOLatencyEntry(long latency) {
allocateLatencyOQuantiles.add(latency);
}
} }

View File

@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -241,9 +242,15 @@ public class OpportunisticContainerAllocator {
private final Map<String, AtomicInteger> nodeLocations = new HashMap<>(); private final Map<String, AtomicInteger> nodeLocations = new HashMap<>();
private final Map<String, AtomicInteger> rackLocations = new HashMap<>(); private final Map<String, AtomicInteger> rackLocations = new HashMap<>();
private final ResourceRequest request; private final ResourceRequest request;
private final long timestamp;
EnrichedResourceRequest(ResourceRequest request) { EnrichedResourceRequest(ResourceRequest request) {
this.request = request; this.request = request;
timestamp = Time.monotonicNow();
}
long getTimestamp() {
return timestamp;
} }
ResourceRequest getRequest() { ResourceRequest getRequest() {

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.server.scheduler; package org.apache.hadoop.yarn.server.scheduler;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -191,7 +194,14 @@ public class OpportunisticContainerContext {
err.removeLocation(allocation.getResourceName()); err.removeLocation(allocation.getResourceName());
} }
} }
getOppSchedulerMetrics().addAllocateOLatencyEntry(
Time.monotonicNow() - err.getTimestamp());
} }
} }
} }
@VisibleForTesting
OpportunisticSchedulerMetrics getOppSchedulerMetrics() {
return OpportunisticSchedulerMetrics.getMetrics();
}
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
@ -43,10 +44,18 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestOpportunisticContainerAllocator { public class TestOpportunisticContainerAllocator {
private static final Logger LOG = private static final Logger LOG =
@ -54,6 +63,11 @@ public class TestOpportunisticContainerAllocator {
private static final int GB = 1024; private static final int GB = 1024;
private OpportunisticContainerAllocator allocator = null; private OpportunisticContainerAllocator allocator = null;
private OpportunisticContainerContext oppCntxt = null; private OpportunisticContainerContext oppCntxt = null;
private static final Priority PRIORITY_NORMAL = Priority.newInstance(1);
private static final Resource CAPABILITY_1GB =
Resources.createResource(1 * GB);
private static final ExecutionTypeRequest OPPORTUNISTIC_REQ =
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
@Before @Before
public void setup() { public void setup() {
@ -97,10 +111,8 @@ public class TestOpportunisticContainerAllocator {
ResourceBlacklistRequest.newInstance( ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL,
"*", Resources.createResource(1 * GB), 1, true, null, "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -120,10 +132,8 @@ public class TestOpportunisticContainerAllocator {
ResourceBlacklistRequest.newInstance( ResourceBlacklistRequest.newInstance(
Arrays.asList("h1", "h2"), new ArrayList<>()); Arrays.asList("h1", "h2"), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL,
"*", Resources.createResource(1 * GB), 1, true, null, "*", CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -147,21 +157,21 @@ public class TestOpportunisticContainerAllocator {
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1) ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3) ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
@ -197,45 +207,45 @@ public class TestOpportunisticContainerAllocator {
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1) ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3) ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3) ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(3) ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
@ -272,23 +282,23 @@ public class TestOpportunisticContainerAllocator {
Arrays.asList( Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2) .numContainers(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2) .numContainers(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.numContainers(2) .numContainers(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
@ -323,18 +333,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*", ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
Resources.createResource(1 * GB), 1, true, null, CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
ExecutionTypeRequest.newInstance( ResourceRequest.newInstance(PRIORITY_NORMAL, "h1",
ExecutionType.OPPORTUNISTIC, true)), CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(1), "h1", ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
Resources.createResource(1 * GB), 1, true, null, CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 1, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -367,39 +371,39 @@ public class TestOpportunisticContainerAllocator {
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1) ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1) ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(1) ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(), .executionType(ExecutionType.OPPORTUNISTIC).build(),
ResourceRequest.newBuilder().allocationRequestId(2) ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName(ResourceRequest.ANY) .resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
@ -437,18 +441,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*", ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
Resources.createResource(1 * GB), 2, true, null, CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
ExecutionTypeRequest.newInstance( ResourceRequest.newInstance(PRIORITY_NORMAL, "h1",
ExecutionType.OPPORTUNISTIC, true)), CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(1), "h1", ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
Resources.createResource(1 * GB), 2, true, null, CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -484,18 +482,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*", ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
Resources.createResource(1 * GB), 2, true, null, CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
ExecutionTypeRequest.newInstance( ResourceRequest.newInstance(PRIORITY_NORMAL, "h6",
ExecutionType.OPPORTUNISTIC, true)), CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(1), "h6", ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3",
Resources.createResource(1 * GB), 2, true, null, CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r3",
Resources.createResource(1 * GB), 2, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -524,18 +516,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*", ResourceRequest.newInstance(PRIORITY_NORMAL, "*",
Resources.createResource(1 * GB), 1000, true, null, CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ),
ExecutionTypeRequest.newInstance( ResourceRequest.newInstance(PRIORITY_NORMAL, "h1",
ExecutionType.OPPORTUNISTIC, true)), CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(1), "h1", ResourceRequest.newInstance(PRIORITY_NORMAL, "/r1",
Resources.createResource(1 * GB), 1000, true, null, CAPABILITY_1GB, 1000, true, null, OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)),
ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
Resources.createResource(1 * GB), 1000, true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -567,21 +553,21 @@ public class TestOpportunisticContainerAllocator {
List<ResourceRequest> reqs = new ArrayList<>(); List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("*") .resourceName("*")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(Priority.newInstance(1)) .priority(PRIORITY_NORMAL)
.resourceName("/r1") .resourceName("/r1")
.capability(Resources.createResource(1 * GB)) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
} }
@ -613,10 +599,8 @@ public class TestOpportunisticContainerAllocator {
ResourceBlacklistRequest.newInstance( ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), Arrays.asList(ResourceRequest.newInstance(PRIORITY_NORMAL,
"*", Resources.createResource(1 * GB), 1, true, "label", "*", CAPABILITY_1GB, 1, true, "label", OPPORTUNISTIC_REQ));
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -655,18 +639,13 @@ public class TestOpportunisticContainerAllocator {
ResourceBlacklistRequest.newInstance( ResourceBlacklistRequest.newInstance(
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(2); allocator.setMaxAllocationsPerAMHeartbeat(2);
final Priority priority = Priority.newInstance(1); List<ResourceRequest> reqs = Arrays.asList(
final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 3,
ExecutionType.OPPORTUNISTIC, true); true, null, OPPORTUNISTIC_REQ),
final Resource resource = Resources.createResource(1 * GB); ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 3,
List<ResourceRequest> reqs = true, null, OPPORTUNISTIC_REQ),
Arrays.asList( ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 3,
ResourceRequest.newInstance(priority, "*", true, null, OPPORTUNISTIC_REQ));
resource, 3, true, null, oppRequest),
ResourceRequest.newInstance(priority, "h6",
resource, 3, true, null, oppRequest),
ResourceRequest.newInstance(priority, "/r3",
resource, 3, true, null, oppRequest));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -708,15 +687,14 @@ public class TestOpportunisticContainerAllocator {
allocator.setMaxAllocationsPerAMHeartbeat(2); allocator.setMaxAllocationsPerAMHeartbeat(2);
final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance( final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true); ExecutionType.OPPORTUNISTIC, true);
final Resource resource = Resources.createResource(1 * GB);
List<ResourceRequest> reqs = List<ResourceRequest> reqs =
Arrays.asList( Arrays.asList(
ResourceRequest.newInstance(Priority.newInstance(1), "*", ResourceRequest.newInstance(Priority.newInstance(1), "*",
resource, 1, true, null, oppRequest), CAPABILITY_1GB, 1, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(2), "h6", ResourceRequest.newInstance(Priority.newInstance(2), "h6",
resource, 2, true, null, oppRequest), CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(Priority.newInstance(3), "/r3", ResourceRequest.newInstance(Priority.newInstance(3), "/r3",
resource, 2, true, null, oppRequest)); CAPABILITY_1GB, 2, true, null, OPPORTUNISTIC_REQ));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1); ApplicationId.newInstance(0L, 1), 1);
@ -761,14 +739,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(-1); allocator.setMaxAllocationsPerAMHeartbeat(-1);
Priority priority = Priority.newInstance(1);
Resource capability = Resources.createResource(1 * GB);
List<ResourceRequest> reqs = new ArrayList<>(); List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(priority) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(capability) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
} }
@ -802,14 +778,12 @@ public class TestOpportunisticContainerAllocator {
new ArrayList<>(), new ArrayList<>()); new ArrayList<>(), new ArrayList<>());
allocator.setMaxAllocationsPerAMHeartbeat(100); allocator.setMaxAllocationsPerAMHeartbeat(100);
Priority priority = Priority.newInstance(1);
Resource capability = Resources.createResource(1 * GB);
List<ResourceRequest> reqs = new ArrayList<>(); List<ResourceRequest> reqs = new ArrayList<>();
for (int i = 0; i < 20; i++) { for (int i = 0; i < 20; i++) {
reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
.priority(priority) .priority(PRIORITY_NORMAL)
.resourceName("h1") .resourceName("h1")
.capability(capability) .capability(CAPABILITY_1GB)
.relaxLocality(true) .relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build()); .executionType(ExecutionType.OPPORTUNISTIC).build());
} }
@ -829,4 +803,46 @@ public class TestOpportunisticContainerAllocator {
// all containers should be allocated in single heartbeat. // all containers should be allocated in single heartbeat.
Assert.assertEquals(20, containers.size()); Assert.assertEquals(20, containers.size());
} }
/**
* Test opportunistic container allocation latency metrics.
* @throws Exception
*/
@Test
public void testAllocationLatencyMetrics() throws Exception {
oppCntxt = spy(oppCntxt);
OpportunisticSchedulerMetrics metrics =
mock(OpportunisticSchedulerMetrics.class);
when(oppCntxt.getOppSchedulerMetrics()).thenReturn(metrics);
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(
Collections.emptyList(), Collections.emptyList());
List<ResourceRequest> reqs = Arrays.asList(
ResourceRequest.newInstance(PRIORITY_NORMAL, "*", CAPABILITY_1GB, 2,
true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(PRIORITY_NORMAL, "h6", CAPABILITY_1GB, 2,
true, null, OPPORTUNISTIC_REQ),
ResourceRequest.newInstance(PRIORITY_NORMAL, "/r3", CAPABILITY_1GB, 2,
true, null, OPPORTUNISTIC_REQ));
ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0L, 1), 1);
oppCntxt.updateNodeList(
Arrays.asList(
RemoteNode.newInstance(
NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
RemoteNode.newInstance(
NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
RemoteNode.newInstance(
NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
LOG.info("Containers: {}", containers);
Assert.assertEquals(2, containers.size());
// for each allocated container, latency should be added.
verify(metrics, times(2)).addAllocateOLatencyEntry(anyLong());
}
} }