YARN-9100. Add tests for GpuResourceAllocator and do minor code cleanup. Contributed by Peter Bacsko
This commit is contained in:
parent
2a05e0ff3b
commit
2216ec54e5
|
@ -19,6 +19,8 @@
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -38,34 +40,44 @@ import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate GPU resources according to requirements
|
* Allocate GPU resources according to requirements.
|
||||||
*/
|
*/
|
||||||
public class GpuResourceAllocator {
|
public class GpuResourceAllocator {
|
||||||
final static Logger LOG = LoggerFactory.
|
final static Logger LOG = LoggerFactory.
|
||||||
getLogger(GpuResourceAllocator.class);
|
getLogger(GpuResourceAllocator.class);
|
||||||
|
|
||||||
private static final int WAIT_MS_PER_LOOP = 1000;
|
private static final int WAIT_MS_PER_LOOP = 1000;
|
||||||
|
|
||||||
private Set<GpuDevice> allowedGpuDevices = new TreeSet<>();
|
private Set<GpuDevice> allowedGpuDevices = new TreeSet<>();
|
||||||
private Map<GpuDevice, ContainerId> usedDevices = new TreeMap<>();
|
private Map<GpuDevice, ContainerId> usedDevices = new TreeMap<>();
|
||||||
private Context nmContext;
|
private Context nmContext;
|
||||||
|
private final int waitPeriodForResource;
|
||||||
|
|
||||||
public GpuResourceAllocator(Context ctx) {
|
public GpuResourceAllocator(Context ctx) {
|
||||||
this.nmContext = ctx;
|
this.nmContext = ctx;
|
||||||
|
// Wait for a maximum of 120 seconds if no available GPU are there which
|
||||||
|
// are yet to be released.
|
||||||
|
this.waitPeriodForResource = 120 * WAIT_MS_PER_LOOP;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
GpuResourceAllocator(Context ctx, int waitPeriodForResource) {
|
||||||
|
this.nmContext = ctx;
|
||||||
|
this.waitPeriodForResource = waitPeriodForResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains allowed and denied devices
|
* Contains allowed and denied devices.
|
||||||
* Denied devices will be useful for cgroups devices module to do blacklisting
|
* Denied devices will be useful for cgroups devices module to do blacklisting
|
||||||
*/
|
*/
|
||||||
static class GpuAllocation {
|
static class GpuAllocation {
|
||||||
|
@ -91,20 +103,13 @@ public class GpuResourceAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add GPU to allowed list
|
* Add GPU to the allowed list of GPUs.
|
||||||
* @param gpuDevice gpu device
|
* @param gpuDevice gpu device
|
||||||
*/
|
*/
|
||||||
public synchronized void addGpu(GpuDevice gpuDevice) {
|
public synchronized void addGpu(GpuDevice gpuDevice) {
|
||||||
allowedGpuDevices.add(gpuDevice);
|
allowedGpuDevices.add(gpuDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices,
|
|
||||||
ContainerId containerId) {
|
|
||||||
return "Failed to find enough GPUs, requestor=" + containerId
|
|
||||||
+ ", #RequestedGPUs=" + numRequestedGpuDevices + ", #availableGpus="
|
|
||||||
+ getAvailableGpus();
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized int getAvailableGpus() {
|
public synchronized int getAvailableGpus() {
|
||||||
return allowedGpuDevices.size() - usedDevices.size();
|
return allowedGpuDevices.size() - usedDevices.size();
|
||||||
|
@ -113,10 +118,10 @@ public class GpuResourceAllocator {
|
||||||
public synchronized void recoverAssignedGpus(ContainerId containerId)
|
public synchronized void recoverAssignedGpus(ContainerId containerId)
|
||||||
throws ResourceHandlerException {
|
throws ResourceHandlerException {
|
||||||
Container c = nmContext.getContainers().get(containerId);
|
Container c = nmContext.getContainers().get(containerId);
|
||||||
if (null == c) {
|
if (c == null) {
|
||||||
throw new ResourceHandlerException(
|
throw new ResourceHandlerException(
|
||||||
"This shouldn't happen, cannot find container with id="
|
"Cannot find container with id=" + containerId +
|
||||||
+ containerId);
|
", this should not occur under normal circumstances!");
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Starting recovery of GpuDevice for {}.", containerId);
|
LOG.info("Starting recovery of GpuDevice for {}.", containerId);
|
||||||
|
@ -125,7 +130,8 @@ public class GpuResourceAllocator {
|
||||||
if (!(gpuDeviceSerializable instanceof GpuDevice)) {
|
if (!(gpuDeviceSerializable instanceof GpuDevice)) {
|
||||||
throw new ResourceHandlerException(
|
throw new ResourceHandlerException(
|
||||||
"Trying to recover device id, however it"
|
"Trying to recover device id, however it"
|
||||||
+ " is not GpuDevice, this shouldn't happen");
|
+ " is not an instance of " + GpuDevice.class.getName()
|
||||||
|
+ ", this should not occur under normal circumstances!");
|
||||||
}
|
}
|
||||||
|
|
||||||
GpuDevice gpuDevice = (GpuDevice) gpuDeviceSerializable;
|
GpuDevice gpuDevice = (GpuDevice) gpuDeviceSerializable;
|
||||||
|
@ -134,8 +140,8 @@ public class GpuResourceAllocator {
|
||||||
if (!allowedGpuDevices.contains(gpuDevice)) {
|
if (!allowedGpuDevices.contains(gpuDevice)) {
|
||||||
throw new ResourceHandlerException(
|
throw new ResourceHandlerException(
|
||||||
"Try to recover device = " + gpuDevice
|
"Try to recover device = " + gpuDevice
|
||||||
+ " however it is not in allowed device list:" + StringUtils
|
+ " however it is not in the allowed device list:" +
|
||||||
.join(",", allowedGpuDevices));
|
StringUtils.join(",", allowedGpuDevices));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure it is not occupied by anybody else
|
// Make sure it is not occupied by anybody else
|
||||||
|
@ -168,7 +174,7 @@ public class GpuResourceAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assign GPU to requestor
|
* Assign GPU to the specified container.
|
||||||
* @param container container to allocate
|
* @param container container to allocate
|
||||||
* @return allocation results.
|
* @return allocation results.
|
||||||
* @throws ResourceHandlerException When failed to assign GPUs.
|
* @throws ResourceHandlerException When failed to assign GPUs.
|
||||||
|
@ -177,12 +183,11 @@ public class GpuResourceAllocator {
|
||||||
throws ResourceHandlerException {
|
throws ResourceHandlerException {
|
||||||
GpuAllocation allocation = internalAssignGpus(container);
|
GpuAllocation allocation = internalAssignGpus(container);
|
||||||
|
|
||||||
// Wait for a maximum of 120 seconds if no available GPU are there which
|
// Wait for a maximum of waitPeriodForResource seconds if no
|
||||||
// are yet to be released.
|
// available GPU are there which are yet to be released.
|
||||||
final int timeoutMsecs = 120 * WAIT_MS_PER_LOOP;
|
|
||||||
int timeWaiting = 0;
|
int timeWaiting = 0;
|
||||||
while (allocation == null) {
|
while (allocation == null) {
|
||||||
if (timeWaiting >= timeoutMsecs) {
|
if (timeWaiting >= waitPeriodForResource) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,6 +201,8 @@ public class GpuResourceAllocator {
|
||||||
allocation = internalAssignGpus(container);
|
allocation = internalAssignGpus(container);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// On any interrupt, break the loop and continue execution.
|
// On any interrupt, break the loop and continue execution.
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
LOG.warn("Interrupted while waiting for available GPU");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,8 +222,15 @@ public class GpuResourceAllocator {
|
||||||
Resource requestedResource = container.getResource();
|
Resource requestedResource = container.getResource();
|
||||||
ContainerId containerId = container.getContainerId();
|
ContainerId containerId = container.getContainerId();
|
||||||
int numRequestedGpuDevices = getRequestedGpus(requestedResource);
|
int numRequestedGpuDevices = getRequestedGpus(requestedResource);
|
||||||
// Assign Gpus to container if requested some.
|
|
||||||
|
// Assign GPUs to container if requested some.
|
||||||
if (numRequestedGpuDevices > 0) {
|
if (numRequestedGpuDevices > 0) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(String.format("Trying to assign %d GPUs to container: %s" +
|
||||||
|
", #AvailableGPUs=%d, #ReleasingGPUs=%d",
|
||||||
|
numRequestedGpuDevices, containerId,
|
||||||
|
getAvailableGpus(), getReleasingGpus()));
|
||||||
|
}
|
||||||
if (numRequestedGpuDevices > getAvailableGpus()) {
|
if (numRequestedGpuDevices > getAvailableGpus()) {
|
||||||
// If there are some devices which are getting released, wait for few
|
// If there are some devices which are getting released, wait for few
|
||||||
// seconds to get it.
|
// seconds to get it.
|
||||||
|
@ -227,8 +241,9 @@ public class GpuResourceAllocator {
|
||||||
|
|
||||||
if (numRequestedGpuDevices > getAvailableGpus()) {
|
if (numRequestedGpuDevices > getAvailableGpus()) {
|
||||||
throw new ResourceHandlerException(
|
throw new ResourceHandlerException(
|
||||||
getResourceHandlerExceptionMessage(numRequestedGpuDevices,
|
"Failed to find enough GPUs, requestor=" + containerId +
|
||||||
containerId));
|
", #RequestedGPUs=" + numRequestedGpuDevices +
|
||||||
|
", #AvailableGPUs=" + getAvailableGpus());
|
||||||
}
|
}
|
||||||
|
|
||||||
Set<GpuDevice> assignedGpus = new TreeSet<>();
|
Set<GpuDevice> assignedGpus = new TreeSet<>();
|
||||||
|
@ -250,7 +265,7 @@ public class GpuResourceAllocator {
|
||||||
nmContext.getNMStateStore().storeAssignedResources(container, GPU_URI,
|
nmContext.getNMStateStore().storeAssignedResources(container, GPU_URI,
|
||||||
new ArrayList<>(assignedGpus));
|
new ArrayList<>(assignedGpus));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
cleanupAssignGpus(containerId);
|
unassignGpus(containerId);
|
||||||
throw new ResourceHandlerException(e);
|
throw new ResourceHandlerException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -276,35 +291,34 @@ public class GpuResourceAllocator {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clean up all Gpus assigned to containerId
|
* Clean up all GPUs assigned to containerId.
|
||||||
* @param containerId containerId
|
* @param containerId containerId
|
||||||
*/
|
*/
|
||||||
public synchronized void cleanupAssignGpus(ContainerId containerId) {
|
public synchronized void unassignGpus(ContainerId containerId) {
|
||||||
Iterator<Map.Entry<GpuDevice, ContainerId>> iter =
|
if (LOG.isDebugEnabled()) {
|
||||||
usedDevices.entrySet().iterator();
|
LOG.debug("Trying to unassign GPU device from container " + containerId);
|
||||||
while (iter.hasNext()) {
|
|
||||||
if (iter.next().getValue().equals(containerId)) {
|
|
||||||
iter.remove();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
usedDevices.entrySet().removeIf(entry ->
|
||||||
|
entry.getValue().equals(containerId));
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized Map<GpuDevice, ContainerId> getDeviceAllocationMappingCopy() {
|
public synchronized Map<GpuDevice, ContainerId> getDeviceAllocationMapping() {
|
||||||
return new HashMap<>(usedDevices);
|
return ImmutableMap.copyOf(usedDevices);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized List<GpuDevice> getAllowedGpusCopy() {
|
public synchronized List<GpuDevice> getAllowedGpus() {
|
||||||
return new ArrayList<>(allowedGpuDevices);
|
return ImmutableList.copyOf(allowedGpuDevices);
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized List<AssignedGpuDevice> getAssignedGpusCopy() {
|
public synchronized List<AssignedGpuDevice> getAssignedGpus() {
|
||||||
List<AssignedGpuDevice> assigns = new ArrayList<>();
|
return usedDevices.entrySet().stream()
|
||||||
for (Map.Entry<GpuDevice, ContainerId> entry : usedDevices.entrySet()) {
|
.map(e -> {
|
||||||
assigns.add(new AssignedGpuDevice(entry.getKey().getIndex(),
|
final GpuDevice gpu = e.getKey();
|
||||||
entry.getKey().getMinorNumber(), entry.getValue()));
|
ContainerId containerId = e.getValue();
|
||||||
}
|
return new AssignedGpuDevice(gpu.getIndex(), gpu.getMinorNumber(),
|
||||||
return assigns;
|
containerId);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -177,7 +177,7 @@ public class GpuResourceHandlerImpl implements ResourceHandler {
|
||||||
@Override
|
@Override
|
||||||
public synchronized List<PrivilegedOperation> postComplete(
|
public synchronized List<PrivilegedOperation> postComplete(
|
||||||
ContainerId containerId) throws ResourceHandlerException {
|
ContainerId containerId) throws ResourceHandlerException {
|
||||||
gpuAllocator.cleanupAssignGpus(containerId);
|
gpuAllocator.unassignGpus(containerId);
|
||||||
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
|
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.DEVICES,
|
||||||
containerId.toString());
|
containerId.toString());
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -97,9 +97,9 @@ public class GpuResourcePlugin implements ResourcePlugin {
|
||||||
|
|
||||||
GpuResourceAllocator gpuResourceAllocator =
|
GpuResourceAllocator gpuResourceAllocator =
|
||||||
gpuResourceHandler.getGpuAllocator();
|
gpuResourceHandler.getGpuAllocator();
|
||||||
List<GpuDevice> totalGpus = gpuResourceAllocator.getAllowedGpusCopy();
|
List<GpuDevice> totalGpus = gpuResourceAllocator.getAllowedGpus();
|
||||||
List<AssignedGpuDevice> assignedGpuDevices =
|
List<AssignedGpuDevice> assignedGpuDevices =
|
||||||
gpuResourceAllocator.getAssignedGpusCopy();
|
gpuResourceAllocator.getAssignedGpus();
|
||||||
|
|
||||||
return new NMGpuResourceInfo(gpuDeviceInformation, totalGpus,
|
return new NMGpuResourceInfo(gpuDeviceInformation, totalGpus,
|
||||||
assignedGpuDevices);
|
assignedGpuDevices);
|
||||||
|
|
|
@ -0,0 +1,442 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
||||||
|
import static org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider.initResourceTypes;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyString;
|
||||||
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.gpu.GpuResourceAllocator.GpuAllocation;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.ArgumentMatcher;
|
||||||
|
import org.mockito.Captor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for GpuResourceAllocator.
|
||||||
|
*/
|
||||||
|
public class TestGpuResourceAllocator {
|
||||||
|
private static final int WAIT_PERIOD_FOR_RESOURCE = 100;
|
||||||
|
|
||||||
|
private static class ContainerMatcher implements ArgumentMatcher<Container> {
|
||||||
|
|
||||||
|
private Container container;
|
||||||
|
|
||||||
|
ContainerMatcher(Container container) {
|
||||||
|
this.container = container;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean matches(Container other) {
|
||||||
|
long expectedId = container.getContainerId().getContainerId();
|
||||||
|
long otherId = other.getContainerId().getContainerId();
|
||||||
|
return expectedId == otherId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Captor
|
||||||
|
private ArgumentCaptor<List<Serializable>> gpuCaptor;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private NMContext nmContext;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private NMStateStoreService nmStateStore;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
|
private GpuResourceAllocator testSubject;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
initResourceTypes(ResourceInformation.GPU_URI);
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
testSubject = createTestSubject(WAIT_PERIOD_FOR_RESOURCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private GpuResourceAllocator createTestSubject(int waitPeriodForResource) {
|
||||||
|
when(nmContext.getNMStateStore()).thenReturn(nmStateStore);
|
||||||
|
when(nmContext.getContainers()).thenReturn(new ConcurrentHashMap<>());
|
||||||
|
return new GpuResourceAllocator(nmContext, waitPeriodForResource);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource createGpuResourceRequest(int gpus) {
|
||||||
|
Resource res = Resource.newInstance(1024, 1);
|
||||||
|
|
||||||
|
if (gpus > 0) {
|
||||||
|
res.setResourceValue(ResourceInformation.GPU_URI, gpus);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Container> createMockContainers(int gpus,
|
||||||
|
int numberOfContainers) {
|
||||||
|
final long id = 111L;
|
||||||
|
|
||||||
|
List<Container> containers = Lists.newArrayList();
|
||||||
|
for (int i = 0; i < numberOfContainers; i++) {
|
||||||
|
containers.add(createMockContainer(gpus, id + i));
|
||||||
|
}
|
||||||
|
return containers;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Container createMockContainer(int gpus, long id) {
|
||||||
|
Resource res = createGpuResourceRequest(gpus);
|
||||||
|
ContainerId containerId = mock(ContainerId.class);
|
||||||
|
when(containerId.getContainerId()).thenReturn(id);
|
||||||
|
|
||||||
|
Container container = mock(Container.class);
|
||||||
|
when(container.getResource()).thenReturn(res);
|
||||||
|
when(container.getContainerId()).thenReturn(containerId);
|
||||||
|
when(container.getContainerState()).thenReturn(ContainerState.RUNNING);
|
||||||
|
nmContext.getContainers().put(containerId, container);
|
||||||
|
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createAndAddGpus(int numberOfGpus) {
|
||||||
|
for (int i = 0; i < numberOfGpus; i++) {
|
||||||
|
testSubject.addGpu(new GpuDevice(1, i));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(numberOfGpus, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(numberOfGpus, testSubject.getAvailableGpus());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addGpus(GpuDevice... gpus) {
|
||||||
|
for (GpuDevice gpu : gpus) {
|
||||||
|
testSubject.addGpu(gpu);
|
||||||
|
}
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(gpus.length, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(gpus.length, testSubject.getAvailableGpus());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addGpusAndDontVerify(GpuDevice... gpus) {
|
||||||
|
for (GpuDevice gpu : gpus) {
|
||||||
|
testSubject.addGpu(gpu);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupContainerAsReleasingGpus(Container... releasingContainers) {
|
||||||
|
ContainerState[] finalStates = new ContainerState[] {
|
||||||
|
ContainerState.KILLING, ContainerState.DONE,
|
||||||
|
ContainerState.LOCALIZATION_FAILED,
|
||||||
|
ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
|
||||||
|
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerState.EXITED_WITH_SUCCESS
|
||||||
|
};
|
||||||
|
|
||||||
|
final Random random = new Random();
|
||||||
|
for (Container container : releasingContainers) {
|
||||||
|
ContainerState state = finalStates[random.nextInt(finalStates.length)];
|
||||||
|
when(container.getContainerState()).thenReturn(state);
|
||||||
|
when(container.isContainerInFinalStates()).thenReturn(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAllocatedGpu(GpuDevice expectedGpu, Container container,
|
||||||
|
GpuAllocation allocation) throws IOException {
|
||||||
|
assertEquals(1, allocation.getAllowedGPUs().size());
|
||||||
|
assertEquals(0, allocation.getDeniedGPUs().size());
|
||||||
|
|
||||||
|
Set<GpuDevice> allowedGPUs = allocation.getAllowedGPUs();
|
||||||
|
|
||||||
|
GpuDevice allocatedGpu = allowedGPUs.iterator().next();
|
||||||
|
assertEquals(expectedGpu, allocatedGpu);
|
||||||
|
assertAssignmentInStateStore(expectedGpu, container);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAllocatedGpus(int gpus, int deniedGpus,
|
||||||
|
Container container,
|
||||||
|
GpuAllocation allocation) throws IOException {
|
||||||
|
assertEquals(gpus, allocation.getAllowedGPUs().size());
|
||||||
|
assertEquals(deniedGpus, allocation.getDeniedGPUs().size());
|
||||||
|
assertAssignmentInStateStore(gpus, container);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNoAllocation(GpuAllocation allocation) {
|
||||||
|
assertEquals(1, allocation.getDeniedGPUs().size());
|
||||||
|
assertEquals(0, allocation.getAllowedGPUs().size());
|
||||||
|
verifyZeroInteractions(nmStateStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAssignmentInStateStore(GpuDevice expectedGpu,
|
||||||
|
Container container) throws IOException {
|
||||||
|
verify(nmStateStore).storeAssignedResources(
|
||||||
|
argThat(new ContainerMatcher(container)), eq(GPU_URI),
|
||||||
|
gpuCaptor.capture());
|
||||||
|
|
||||||
|
List<Serializable> gpuList = gpuCaptor.getValue();
|
||||||
|
assertEquals(1, gpuList.size());
|
||||||
|
assertEquals(expectedGpu, gpuList.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAssignmentInStateStore(int gpus,
|
||||||
|
Container container) throws IOException {
|
||||||
|
verify(nmStateStore).storeAssignedResources(
|
||||||
|
argThat(new ContainerMatcher(container)), eq(GPU_URI),
|
||||||
|
gpuCaptor.capture());
|
||||||
|
|
||||||
|
List<Serializable> gpuList = gpuCaptor.getValue();
|
||||||
|
assertEquals(gpus, gpuList.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Set<GpuAllocation> findDuplicates(
|
||||||
|
List<GpuAllocation> allocations) {
|
||||||
|
final Set<GpuAllocation> result = new HashSet<>();
|
||||||
|
final Set<GpuAllocation> tmpSet = new HashSet<>();
|
||||||
|
|
||||||
|
for (GpuAllocation allocation : allocations) {
|
||||||
|
if (!tmpSet.add(allocation)) {
|
||||||
|
result.add(allocation);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNewGpuAllocatorHasEmptyCollectionOfDevices() {
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(0, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(0, testSubject.getAvailableGpus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddOneDevice() {
|
||||||
|
addGpus(new GpuDevice(1, 1));
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddMoreDevices() {
|
||||||
|
addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddMoreDevicesWithSameData() {
|
||||||
|
addGpusAndDontVerify(new GpuDevice(1, 1), new GpuDevice(1, 1));
|
||||||
|
assertEquals(0, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(0, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestZeroGpu() throws ResourceHandlerException {
|
||||||
|
addGpus(new GpuDevice(1, 1));
|
||||||
|
|
||||||
|
Container container = createMockContainer(0, 5L);
|
||||||
|
GpuAllocation allocation =
|
||||||
|
testSubject.assignGpus(container);
|
||||||
|
|
||||||
|
assertNoAllocation(allocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestOneGpu() throws ResourceHandlerException, IOException {
|
||||||
|
GpuDevice gpu = new GpuDevice(1, 1);
|
||||||
|
addGpus(gpu);
|
||||||
|
|
||||||
|
Container container = createMockContainer(1, 5L);
|
||||||
|
GpuAllocation allocation =
|
||||||
|
testSubject.assignGpus(container);
|
||||||
|
|
||||||
|
assertEquals(1, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(1, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(0, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
assertAllocatedGpu(gpu, container, allocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestMoreThanAvailableGpu()
|
||||||
|
throws ResourceHandlerException {
|
||||||
|
addGpus(new GpuDevice(1, 1));
|
||||||
|
Container container = createMockContainer(2, 5L);
|
||||||
|
|
||||||
|
exception.expect(ResourceHandlerException.class);
|
||||||
|
exception.expectMessage("Failed to find enough GPUs");
|
||||||
|
testSubject.assignGpus(container);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestMoreThanAvailableGpuAndOneContainerIsReleasingGpus()
|
||||||
|
throws ResourceHandlerException, IOException {
|
||||||
|
addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
|
||||||
|
Container container = createMockContainer(2, 5L);
|
||||||
|
GpuAllocation allocation = testSubject.assignGpus(container);
|
||||||
|
assertAllocatedGpus(2, 1, container, allocation);
|
||||||
|
|
||||||
|
assertEquals(2, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(2, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(3, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
setupContainerAsReleasingGpus(container);
|
||||||
|
Container container2 = createMockContainer(2, 6L);
|
||||||
|
|
||||||
|
exception.expect(ResourceHandlerException.class);
|
||||||
|
exception.expectMessage("as some other containers might not " +
|
||||||
|
"releasing GPUs");
|
||||||
|
GpuAllocation allocation2 = testSubject.assignGpus(container2);
|
||||||
|
assertAllocatedGpus(2, 1, container, allocation2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testThreeContainersJustTwoOfThemSatisfied()
|
||||||
|
throws ResourceHandlerException, IOException {
|
||||||
|
addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2),
|
||||||
|
new GpuDevice(1, 3), new GpuDevice(1, 4),
|
||||||
|
new GpuDevice(1, 5), new GpuDevice(1, 6));
|
||||||
|
Container container = createMockContainer(3, 5L);
|
||||||
|
Container container2 = createMockContainer(2, 6L);
|
||||||
|
Container container3 = createMockContainer(2, 6L);
|
||||||
|
|
||||||
|
GpuAllocation allocation = testSubject.assignGpus(container);
|
||||||
|
assertAllocatedGpus(3, 3, container, allocation);
|
||||||
|
assertEquals(3, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(3, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(6, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(3, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
GpuAllocation allocation2 = testSubject.assignGpus(container2);
|
||||||
|
assertAllocatedGpus(2, 4, container2, allocation2);
|
||||||
|
assertEquals(5, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(5, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(6, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
exception.expect(ResourceHandlerException.class);
|
||||||
|
exception.expectMessage("Failed to find enough GPUs");
|
||||||
|
testSubject.assignGpus(container3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReleaseAndAssignGpus()
|
||||||
|
throws ResourceHandlerException, IOException {
|
||||||
|
addGpus(new GpuDevice(1, 1), new GpuDevice(1, 2), new GpuDevice(1, 3));
|
||||||
|
Container container = createMockContainer(2, 5L);
|
||||||
|
GpuAllocation allocation = testSubject.assignGpus(container);
|
||||||
|
assertAllocatedGpus(2, 1, container, allocation);
|
||||||
|
|
||||||
|
assertEquals(2, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(2, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(3, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
setupContainerAsReleasingGpus(container);
|
||||||
|
Container container2 = createMockContainer(2, 6L);
|
||||||
|
try {
|
||||||
|
testSubject.assignGpus(container2);
|
||||||
|
} catch (ResourceHandlerException e) {
|
||||||
|
//intended as we have not enough GPUs available
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(2, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(2, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(3, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
testSubject.unassignGpus(container.getContainerId());
|
||||||
|
GpuAllocation allocation2 = testSubject.assignGpus(container2);
|
||||||
|
assertAllocatedGpus(2, 1, container, allocation2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateLotsOfContainersVerifyGpuAssignmentsAreCorrect()
|
||||||
|
throws ResourceHandlerException, IOException {
|
||||||
|
createAndAddGpus(100);
|
||||||
|
|
||||||
|
List<Container> containers = createMockContainers(3, 33);
|
||||||
|
List<GpuAllocation> allocations = Lists.newArrayList();
|
||||||
|
for (Container container : containers) {
|
||||||
|
GpuAllocation allocation = testSubject.assignGpus(container);
|
||||||
|
allocations.add(allocation);
|
||||||
|
assertAllocatedGpus(3, 97, container, allocation);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(99, testSubject.getDeviceAllocationMapping().size());
|
||||||
|
assertEquals(99, testSubject.getAssignedGpus().size());
|
||||||
|
assertEquals(100, testSubject.getAllowedGpus().size());
|
||||||
|
assertEquals(1, testSubject.getAvailableGpus());
|
||||||
|
|
||||||
|
Set<GpuAllocation> duplicateAllocations = findDuplicates(allocations);
|
||||||
|
assertEquals(0, duplicateAllocations.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGpuGetsUnassignedWhenStateStoreThrowsException()
|
||||||
|
throws ResourceHandlerException, IOException {
|
||||||
|
doThrow(new IOException("Failed to save container mappings " +
|
||||||
|
"to NM state store!"))
|
||||||
|
.when(nmStateStore).storeAssignedResources(any(Container.class),
|
||||||
|
anyString(), anyList());
|
||||||
|
|
||||||
|
createAndAddGpus(1);
|
||||||
|
|
||||||
|
exception.expect(ResourceHandlerException.class);
|
||||||
|
exception.expectMessage("Failed to save container mappings " +
|
||||||
|
"to NM state store");
|
||||||
|
Container container = createMockContainer(1, 5L);
|
||||||
|
testSubject.assignGpus(container);
|
||||||
|
}
|
||||||
|
}
|
|
@ -158,7 +158,7 @@ public class TestGpuResourceHandlerImpl {
|
||||||
gpuResourceHandler.bootstrap(conf);
|
gpuResourceHandler.bootstrap(conf);
|
||||||
|
|
||||||
List<GpuDevice> allowedGpus =
|
List<GpuDevice> allowedGpus =
|
||||||
gpuResourceHandler.getGpuAllocator().getAllowedGpusCopy();
|
gpuResourceHandler.getGpuAllocator().getAllowedGpus();
|
||||||
assertEquals("Unexpected number of allowed GPU devices!", 1,
|
assertEquals("Unexpected number of allowed GPU devices!", 1,
|
||||||
allowedGpus.size());
|
allowedGpus.size());
|
||||||
assertEquals("Expected GPU device does not equal to found device!",
|
assertEquals("Expected GPU device does not equal to found device!",
|
||||||
|
@ -497,7 +497,7 @@ public class TestGpuResourceHandlerImpl {
|
||||||
gpuResourceHandler.reacquireContainer(getContainerId(1));
|
gpuResourceHandler.reacquireContainer(getContainerId(1));
|
||||||
|
|
||||||
Map<GpuDevice, ContainerId> deviceAllocationMapping =
|
Map<GpuDevice, ContainerId> deviceAllocationMapping =
|
||||||
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
|
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
|
||||||
assertEquals("Unexpected number of allocated GPU devices!", 2,
|
assertEquals("Unexpected number of allocated GPU devices!", 2,
|
||||||
deviceAllocationMapping.size());
|
deviceAllocationMapping.size());
|
||||||
assertTrue("Expected GPU device is not found in allocations!",
|
assertTrue("Expected GPU device is not found in allocations!",
|
||||||
|
@ -533,7 +533,7 @@ public class TestGpuResourceHandlerImpl {
|
||||||
|
|
||||||
// Make sure internal state not changed.
|
// Make sure internal state not changed.
|
||||||
deviceAllocationMapping =
|
deviceAllocationMapping =
|
||||||
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
|
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
|
||||||
assertEquals("Unexpected number of allocated GPU devices!",
|
assertEquals("Unexpected number of allocated GPU devices!",
|
||||||
2, deviceAllocationMapping.size());
|
2, deviceAllocationMapping.size());
|
||||||
assertTrue("Expected GPU devices are not found in allocations!",
|
assertTrue("Expected GPU devices are not found in allocations!",
|
||||||
|
@ -568,7 +568,7 @@ public class TestGpuResourceHandlerImpl {
|
||||||
|
|
||||||
// Make sure internal state not changed.
|
// Make sure internal state not changed.
|
||||||
deviceAllocationMapping =
|
deviceAllocationMapping =
|
||||||
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMappingCopy();
|
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
|
||||||
assertEquals("Unexpected number of allocated GPU devices!",
|
assertEquals("Unexpected number of allocated GPU devices!",
|
||||||
2, deviceAllocationMapping.size());
|
2, deviceAllocationMapping.size());
|
||||||
assertTrue("Expected GPU devices are not found in allocations!",
|
assertTrue("Expected GPU devices are not found in allocations!",
|
||||||
|
|
Loading…
Reference in New Issue