YARN-9174. Backport YARN-7224 for refactoring of GpuDevice class

This commit is contained in:
Jonathan Hung 2019-02-06 16:44:26 -08:00
parent 7ec4d7c6ce
commit 16faceb0da
13 changed files with 384 additions and 178 deletions

View File

@ -26,12 +26,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
@ -54,8 +53,8 @@ import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
public class GpuResourceAllocator { public class GpuResourceAllocator {
final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class); final static Log LOG = LogFactory.getLog(GpuResourceAllocator.class);
private Set<Integer> allowedGpuDevices = new TreeSet<>(); private Set<GpuDevice> allowedGpuDevices = new TreeSet<>();
private Map<Integer, ContainerId> usedDevices = new TreeMap<>(); private Map<GpuDevice, ContainerId> usedDevices = new TreeMap<>();
private Context nmContext; private Context nmContext;
public GpuResourceAllocator(Context ctx) { public GpuResourceAllocator(Context ctx) {
@ -63,14 +62,14 @@ public class GpuResourceAllocator {
} }
/** /**
* Contains allowed and denied devices with minor number. * Contains allowed and denied devices
* Denied devices will be useful for cgroups devices module to do blacklisting * Denied devices will be useful for cgroups devices module to do blacklisting
*/ */
static class GpuAllocation { static class GpuAllocation {
private Set<Integer> allowed = Collections.emptySet(); private Set<GpuDevice> allowed = Collections.emptySet();
private Set<Integer> denied = Collections.emptySet(); private Set<GpuDevice> denied = Collections.emptySet();
GpuAllocation(Set<Integer> allowed, Set<Integer> denied) { GpuAllocation(Set<GpuDevice> allowed, Set<GpuDevice> denied) {
if (allowed != null) { if (allowed != null) {
this.allowed = ImmutableSet.copyOf(allowed); this.allowed = ImmutableSet.copyOf(allowed);
} }
@ -79,21 +78,21 @@ public class GpuResourceAllocator {
} }
} }
public Set<Integer> getAllowedGPUs() { public Set<GpuDevice> getAllowedGPUs() {
return allowed; return allowed;
} }
public Set<Integer> getDeniedGPUs() { public Set<GpuDevice> getDeniedGPUs() {
return denied; return denied;
} }
} }
/** /**
* Add GPU to allowed list * Add GPU to allowed list
* @param minorNumber minor number of the GPU device. * @param gpuDevice gpu device
*/ */
public synchronized void addGpu(int minorNumber) { public synchronized void addGpu(GpuDevice gpuDevice) {
allowedGpuDevices.add(minorNumber); allowedGpuDevices.add(gpuDevice);
} }
private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices, private String getResourceHandlerExceptionMessage(int numRequestedGpuDevices,
@ -117,42 +116,42 @@ public class GpuResourceAllocator {
+ containerId); + containerId);
} }
for (Serializable deviceId : c.getResourceMappings().getAssignedResources( for (Serializable gpuDeviceSerializable : c.getResourceMappings()
GPU_URI)){ .getAssignedResources(GPU_URI)) {
if (!(deviceId instanceof String)) { if (!(gpuDeviceSerializable instanceof GpuDevice)) {
throw new ResourceHandlerException( throw new ResourceHandlerException(
"Trying to recover device id, however it" "Trying to recover device id, however it"
+ " is not String, this shouldn't happen"); + " is not GpuDevice, this shouldn't happen");
} }
GpuDevice gpuDevice = (GpuDevice) gpuDeviceSerializable;
int devId;
try {
devId = Integer.parseInt((String)deviceId);
} catch (NumberFormatException e) {
throw new ResourceHandlerException("Failed to recover device id because"
+ "it is not a valid integer, devId:" + deviceId);
}
// Make sure it is in allowed GPU device. // Make sure it is in allowed GPU device.
if (!allowedGpuDevices.contains(devId)) { if (!allowedGpuDevices.contains(gpuDevice)) {
throw new ResourceHandlerException("Try to recover device id = " + devId throw new ResourceHandlerException(
+ " however it is not in allowed device list:" + StringUtils "Try to recover device = " + gpuDevice
.join(",", allowedGpuDevices)); + " however it is not in allowed device list:" + StringUtils
.join(",", allowedGpuDevices));
} }
// Make sure it is not occupied by anybody else // Make sure it is not occupied by anybody else
if (usedDevices.containsKey(devId)) { if (usedDevices.containsKey(gpuDevice)) {
throw new ResourceHandlerException("Try to recover device id = " + devId throw new ResourceHandlerException(
+ " however it is already assigned to container=" + usedDevices "Try to recover device id = " + gpuDevice
.get(devId) + ", please double check what happened."); + " however it is already assigned to container=" + usedDevices
.get(gpuDevice) + ", please double check what happened.");
} }
usedDevices.put(devId, containerId); usedDevices.put(gpuDevice, containerId);
} }
} }
private int getRequestedGpus(Resource requestedResource) { /**
* Get number of requested GPUs from resource.
* @param requestedResource requested resource
* @return #gpus.
*/
public static int getRequestedGpus(Resource requestedResource) {
try { try {
return Long.valueOf(requestedResource.getResourceValue( return Long.valueOf(requestedResource.getResourceValue(
GPU_URI)).intValue(); GPU_URI)).intValue();
@ -164,8 +163,8 @@ public class GpuResourceAllocator {
/** /**
* Assign GPU to requestor * Assign GPU to requestor
* @param container container to allocate * @param container container to allocate
* @return List of denied Gpus with minor numbers * @return allocation results.
* @throws ResourceHandlerException When failed to * @throws ResourceHandlerException When failed to assign GPUs.
*/ */
public synchronized GpuAllocation assignGpus(Container container) public synchronized GpuAllocation assignGpus(Container container)
throws ResourceHandlerException { throws ResourceHandlerException {
@ -180,12 +179,12 @@ public class GpuResourceAllocator {
containerId)); containerId));
} }
Set<Integer> assignedGpus = new HashSet<>(); Set<GpuDevice> assignedGpus = new TreeSet<>();
for (int deviceNum : allowedGpuDevices) { for (GpuDevice gpu : allowedGpuDevices) {
if (!usedDevices.containsKey(deviceNum)) { if (!usedDevices.containsKey(gpu)) {
usedDevices.put(deviceNum, containerId); usedDevices.put(gpu, containerId);
assignedGpus.add(deviceNum); assignedGpus.add(gpu);
if (assignedGpus.size() == numRequestedGpuDevices) { if (assignedGpus.size() == numRequestedGpuDevices) {
break; break;
} }
@ -194,21 +193,10 @@ public class GpuResourceAllocator {
// Record in state store if we allocated anything // Record in state store if we allocated anything
if (!assignedGpus.isEmpty()) { if (!assignedGpus.isEmpty()) {
List<Serializable> allocatedDevices = new ArrayList<>();
for (int gpu : assignedGpus) {
allocatedDevices.add(String.valueOf(gpu));
}
try { try {
// Update Container#getResourceMapping.
ResourceMappings.AssignedResources assignedResources =
new ResourceMappings.AssignedResources();
assignedResources.updateAssignedResources(allocatedDevices);
container.getResourceMappings().addAssignedResources(GPU_URI,
assignedResources);
// Update state store. // Update state store.
nmContext.getNMStateStore().storeAssignedResources(containerId, nmContext.getNMStateStore().storeAssignedResources(container, GPU_URI,
GPU_URI, allocatedDevices); new ArrayList<Serializable>(assignedGpus));
} catch (IOException e) { } catch (IOException e) {
cleanupAssignGpus(containerId); cleanupAssignGpus(containerId);
throw new ResourceHandlerException(e); throw new ResourceHandlerException(e);
@ -226,7 +214,7 @@ public class GpuResourceAllocator {
* @param containerId containerId * @param containerId containerId
*/ */
public synchronized void cleanupAssignGpus(ContainerId containerId) { public synchronized void cleanupAssignGpus(ContainerId containerId) {
Iterator<Map.Entry<Integer, ContainerId>> iter = Iterator<Map.Entry<GpuDevice, ContainerId>> iter =
usedDevices.entrySet().iterator(); usedDevices.entrySet().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
if (iter.next().getValue().equals(containerId)) { if (iter.next().getValue().equals(containerId)) {
@ -236,7 +224,7 @@ public class GpuResourceAllocator {
} }
@VisibleForTesting @VisibleForTesting
public synchronized Map<Integer, ContainerId> getDeviceAllocationMapping() { public synchronized Map<GpuDevice, ContainerId> getDeviceAllocationMapping() {
return new HashMap<>(usedDevices); return new HashMap<>(usedDevices);
} }
} }

View File

@ -24,8 +24,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@ -35,6 +33,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer;
import java.util.ArrayList; import java.util.ArrayList;
@ -64,17 +63,23 @@ public class GpuResourceHandlerImpl implements ResourceHandler {
@Override @Override
public List<PrivilegedOperation> bootstrap(Configuration configuration) public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException { throws ResourceHandlerException {
List<Integer> minorNumbersOfUsableGpus; List<GpuDevice> usableGpus;
try { try {
minorNumbersOfUsableGpus = GpuDiscoverer.getInstance() usableGpus = GpuDiscoverer.getInstance()
.getMinorNumbersOfGpusUsableByYarn(); .getGpusUsableByYarn();
if (usableGpus == null || usableGpus.isEmpty()) {
String message = "GPU is enabled on the NodeManager, but couldn't find "
+ "any usable GPU devices, please double check configuration.";
LOG.error(message);
throw new ResourceHandlerException(message);
}
} catch (YarnException e) { } catch (YarnException e) {
LOG.error("Exception when trying to get usable GPU device", e); LOG.error("Exception when trying to get usable GPU device", e);
throw new ResourceHandlerException(e); throw new ResourceHandlerException(e);
} }
for (int minorNumber : minorNumbersOfUsableGpus) { for (GpuDevice gpu : usableGpus) {
gpuAllocator.addGpu(minorNumber); gpuAllocator.addGpu(gpu);
} }
// And initialize cgroups // And initialize cgroups
@ -102,10 +107,13 @@ public class GpuResourceHandlerImpl implements ResourceHandler {
PrivilegedOperation.OperationType.GPU, Arrays PrivilegedOperation.OperationType.GPU, Arrays
.asList(CONTAINER_ID_CLI_OPTION, containerIdStr)); .asList(CONTAINER_ID_CLI_OPTION, containerIdStr));
if (!allocation.getDeniedGPUs().isEmpty()) { if (!allocation.getDeniedGPUs().isEmpty()) {
List<Integer> minorNumbers = new ArrayList<>();
for (GpuDevice deniedGpu : allocation.getDeniedGPUs()) {
minorNumbers.add(deniedGpu.getMinorNumber());
}
privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_GPUS_CLI_OPTION, privilegedOperation.appendArgs(Arrays.asList(EXCLUDED_GPUS_CLI_OPTION,
StringUtils.join(",", allocation.getDeniedGPUs()))); StringUtils.join(",", minorNumbers)));
} }
privilegedOperationExecutor.executePrivilegedOperation( privilegedOperationExecutor.executePrivilegedOperation(
privilegedOperation, true); privilegedOperation, true);
} catch (PrivilegedOperationException e) { } catch (PrivilegedOperationException e) {

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu;
import java.io.Serializable;
/**
* This class is used to represent GPU device while allocation.
*/
public class GpuDevice implements Serializable, Comparable {
private int index;
private int minorNumber;
private static final long serialVersionUID = -6812314470754667710L;
public GpuDevice(int index, int minorNumber) {
this.index = index;
this.minorNumber = minorNumber;
}
public int getIndex() {
return index;
}
public int getMinorNumber() {
return minorNumber;
}
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof GpuDevice)) {
return false;
}
GpuDevice other = (GpuDevice) obj;
return index == other.index && minorNumber == other.minorNumber;
}
@Override
public int compareTo(Object obj) {
if (obj == null || (!(obj instanceof GpuDevice))) {
return -1;
}
GpuDevice other = (GpuDevice) obj;
int result = Integer.compare(index, other.index);
if (0 != result) {
return result;
}
return Integer.compare(minorNumber, other.minorNumber);
}
@Override
public int hashCode() {
final int prime = 47;
return prime * index + minorNumber;
}
@Override
public String toString() {
return "(index=" + index + ",minor_number=" + minorNumber + ")";
}
}

View File

@ -136,12 +136,12 @@ public class GpuDiscoverer {
} }
/** /**
* Get list of minor device numbers of Gpu devices usable by YARN. * Get list of GPU devices usable by YARN.
* *
* @return List of minor device numbers of Gpu devices. * @return List of GPU devices
* @throws YarnException when any issue happens * @throws YarnException when any issue happens
*/ */
public synchronized List<Integer> getMinorNumbersOfGpusUsableByYarn() public synchronized List<GpuDevice> getGpusUsableByYarn()
throws YarnException { throws YarnException {
validateConfOrThrowException(); validateConfOrThrowException();
@ -149,7 +149,7 @@ public class GpuDiscoverer {
YarnConfiguration.NM_GPU_ALLOWED_DEVICES, YarnConfiguration.NM_GPU_ALLOWED_DEVICES,
YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES); YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES);
List<Integer> minorNumbers = new ArrayList<>(); List<GpuDevice> gpuDevices = new ArrayList<>();
if (allowedDevicesStr.equals( if (allowedDevicesStr.equals(
YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) { YarnConfiguration.AUTOMATICALLY_DISCOVER_GPU_DEVICES)) {
@ -167,21 +167,31 @@ public class GpuDiscoverer {
} }
if (lastDiscoveredGpuInformation.getGpus() != null) { if (lastDiscoveredGpuInformation.getGpus() != null) {
for (PerGpuDeviceInformation gpu : lastDiscoveredGpuInformation for (int i = 0; i < lastDiscoveredGpuInformation.getGpus().size();
.getGpus()) { i++) {
minorNumbers.add(gpu.getMinorNumber()); List<PerGpuDeviceInformation> gpuInfos =
lastDiscoveredGpuInformation.getGpus();
gpuDevices.add(new GpuDevice(i, gpuInfos.get(i).getMinorNumber()));
} }
} }
} else{ } else{
for (String s : allowedDevicesStr.split(",")) { for (String s : allowedDevicesStr.split(",")) {
if (s.trim().length() > 0) { if (s.trim().length() > 0) {
minorNumbers.add(Integer.valueOf(s.trim())); String[] kv = s.trim().split(":");
if (kv.length != 2) {
throw new YarnException(
"Illegal format, it should be index:minor_number format, now it="
+ s);
}
gpuDevices.add(
new GpuDevice(Integer.parseInt(kv[0]), Integer.parseInt(kv[1])));
} }
} }
LOG.info("Allowed GPU devices with minor numbers:" + allowedDevicesStr); LOG.info("Allowed GPU devices:" + gpuDevices);
} }
return minorNumbers; return gpuDevices;
} }
public synchronized void initialize(Configuration conf) throws YarnException { public synchronized void initialize(Configuration conf) throws YarnException {

View File

@ -40,12 +40,14 @@ public class GpuNodeResourceUpdateHandler extends NodeResourceUpdaterPlugin {
public void updateConfiguredResource(Resource res) throws YarnException { public void updateConfiguredResource(Resource res) throws YarnException {
LOG.info("Initializing configured GPU resources for the NodeManager."); LOG.info("Initializing configured GPU resources for the NodeManager.");
List<Integer> usableGpus = List<GpuDevice> usableGpus =
GpuDiscoverer.getInstance().getMinorNumbersOfGpusUsableByYarn(); GpuDiscoverer.getInstance().getGpusUsableByYarn();
if (null == usableGpus || usableGpus.isEmpty()) { if (null == usableGpus || usableGpus.isEmpty()) {
LOG.info("Didn't find any usable GPUs on the NodeManager."); String message = "GPU is enabled, but couldn't find any usable GPUs on the "
+ "NodeManager.";
LOG.error(message);
// No gpu can be used by YARN. // No gpu can be used by YARN.
return; throw new YarnException(message);
} }
long nUsableGpus = usableGpus.size(); long nUsableGpus = usableGpus.size();

View File

@ -18,28 +18,9 @@
package org.apache.hadoop.yarn.server.nodemanager.recovery; package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.fusesource.leveldbjni.JniDBFactory.asString; import com.google.common.annotations.VisibleForTesting;
import static org.fusesource.leveldbjni.JniDBFactory.bytes; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import org.slf4j.Logger;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -51,9 +32,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@ -61,9 +44,10 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
@ -76,10 +60,26 @@ import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options; import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch; import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
public class NMLeveldbStateStoreService extends NMStateStoreService { public class NMLeveldbStateStoreService extends NMStateStoreService {
@ -1180,15 +1180,18 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
@Override @Override
public void storeAssignedResources(ContainerId containerId, public void storeAssignedResources(Container container,
String resourceType, List<Serializable> assignedResources) String resourceType, List<Serializable> assignedResources)
throws IOException { throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("storeAssignedResources: containerId=" + containerId LOG.debug(
+ ", assignedResources=" + StringUtils.join(",", assignedResources)); "storeAssignedResources: containerId=" + container.getContainerId()
+ ", assignedResources=" + StringUtils
.join(",", assignedResources));
} }
String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() String keyResChng = CONTAINERS_KEY_PREFIX + container.getContainerId().toString()
+ CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType; + CONTAINER_ASSIGNED_RESOURCES_KEY_SUFFIX + resourceType;
try { try {
WriteBatch batch = db.createWriteBatch(); WriteBatch batch = db.createWriteBatch();
@ -1206,6 +1209,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} catch (DBException e) { } catch (DBException e) {
throw new IOException(e); throw new IOException(e);
} }
// update container resource mapping.
updateContainerResourceMapping(container, resourceType, assignedResources);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
// The state store to use when state isn't being stored // The state store to use when state isn't being stored
public class NMNullStateStoreService extends NMStateStoreService { public class NMNullStateStoreService extends NMStateStoreService {
@ -268,7 +269,7 @@ public class NMNullStateStoreService extends NMStateStoreService {
} }
@Override @Override
public void storeAssignedResources(ContainerId containerId, public void storeAssignedResources(Container container,
String resourceType, List<Serializable> assignedResources) String resourceType, List<Serializable> assignedResources)
throws IOException { throws IOException {
} }

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Localize
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@Private @Private
@ -732,12 +733,12 @@ public abstract class NMStateStoreService extends AbstractService {
/** /**
* Store the assigned resources to a container. * Store the assigned resources to a container.
* *
* @param containerId Container Id * @param container NMContainer
* @param resourceType Resource Type * @param resourceType Resource Type
* @param assignedResources Assigned resources * @param assignedResources Assigned resources
* @throws IOException if fails * @throws IOException if fails
*/ */
public abstract void storeAssignedResources(ContainerId containerId, public abstract void storeAssignedResources(Container container,
String resourceType, List<Serializable> assignedResources) String resourceType, List<Serializable> assignedResources)
throws IOException; throws IOException;
@ -746,4 +747,14 @@ public abstract class NMStateStoreService extends AbstractService {
protected abstract void startStorage() throws IOException; protected abstract void startStorage() throws IOException;
protected abstract void closeStorage() throws IOException; protected abstract void closeStorage() throws IOException;
protected void updateContainerResourceMapping(Container container,
String resourceType, List<Serializable> assignedResources) {
// Update Container#getResourceMapping.
ResourceMappings.AssignedResources newAssigned =
new ResourceMappings.AssignedResources();
newAssigned.updateAssignedResources(assignedResources);
container.getResourceMappings().addAssignedResources(resourceType,
newAssigned);
}
} }

View File

@ -519,18 +519,20 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
commonLaunchContainer(appId, cid, cm); commonLaunchContainer(appId, cid, cm);
Container nmContainer = context.getContainers().get(cid);
Application app = context.getApplications().get(appId); Application app = context.getApplications().get(appId);
assertNotNull(app); assertNotNull(app);
// store resource mapping of the container // store resource mapping of the container
List<Serializable> gpuResources = List<Serializable> gpuResources =
Arrays.<Serializable>asList("1", "2", "3"); Arrays.<Serializable>asList("1", "2", "3");
stateStore.storeAssignedResources(cid, "gpu", gpuResources); stateStore.storeAssignedResources(nmContainer, "gpu", gpuResources);
List<Serializable> numaResources = Arrays.<Serializable>asList("numa1"); List<Serializable> numaResources = Arrays.<Serializable>asList("numa1");
stateStore.storeAssignedResources(cid, "numa", numaResources); stateStore.storeAssignedResources(nmContainer, "numa", numaResources);
List<Serializable> fpgaResources = List<Serializable> fpgaResources =
Arrays.<Serializable>asList("fpga1", "fpga2"); Arrays.<Serializable>asList("fpga1", "fpga2");
stateStore.storeAssignedResources(cid, "fpga", fpgaResources); stateStore.storeAssignedResources(nmContainer, "fpga", fpgaResources);
cm.stop(); cm.stop();
context = createContext(conf, stateStore); context = createContext(conf, stateStore);
@ -542,7 +544,6 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
app = context.getApplications().get(appId); app = context.getApplications().get(appId);
assertNotNull(app); assertNotNull(app);
Container nmContainer = context.getContainers().get(cid);
Assert.assertNotNull(nmContainer); Assert.assertNotNull(nmContainer);
ResourceMappings resourceMappings = nmContainer.getResourceMappings(); ResourceMappings resourceMappings = nmContainer.getResourceMappings();
List<Serializable> assignedResource = resourceMappings List<Serializable> assignedResource = resourceMappings

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resourc
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -36,9 +35,10 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerRuntimeConstants;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.TestResourceUtils; import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -46,6 +46,7 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -92,7 +93,7 @@ public class TestGpuResourceHandler {
@Test @Test
public void testBootStrap() throws Exception { public void testBootStrap() throws Exception {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0"); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
@ -106,8 +107,8 @@ public class TestGpuResourceHandler {
.newInstance(ApplicationId.newInstance(1234L, 1), 1), id); .newInstance(ApplicationId.newInstance(1234L, 1), 1), id);
} }
private static Container mockContainerWithGpuRequest(int id, private static Container mockContainerWithGpuRequest(int id, int numGpuRequest,
int numGpuRequest) { boolean dockerContainerEnabled) {
Container c = mock(Container.class); Container c = mock(Container.class);
when(c.getContainerId()).thenReturn(getContainerId(id)); when(c.getContainerId()).thenReturn(getContainerId(id));
@ -117,29 +118,46 @@ public class TestGpuResourceHandler {
res.setResourceValue(ResourceInformation.GPU_URI, numGpuRequest); res.setResourceValue(ResourceInformation.GPU_URI, numGpuRequest);
when(c.getResource()).thenReturn(res); when(c.getResource()).thenReturn(res);
when(c.getResourceMappings()).thenReturn(resMapping); when(c.getResourceMappings()).thenReturn(resMapping);
ContainerLaunchContext clc = mock(ContainerLaunchContext.class);
Map<String, String> env = new HashMap<>();
if (dockerContainerEnabled) {
env.put(ContainerRuntimeConstants.ENV_CONTAINER_TYPE, "docker");
}
when(clc.getEnvironment()).thenReturn(env);
when(c.getLaunchContext()).thenReturn(clc);
return c; return c;
} }
private static Container mockContainerWithGpuRequest(int id,
int numGpuRequest) {
return mockContainerWithGpuRequest(id, numGpuRequest, false);
}
private void verifyDeniedDevices(ContainerId containerId, private void verifyDeniedDevices(ContainerId containerId,
List<Integer> deniedDevices) List<GpuDevice> deniedDevices)
throws ResourceHandlerException, PrivilegedOperationException { throws ResourceHandlerException, PrivilegedOperationException {
verify(mockCGroupsHandler, times(1)).createCGroup( verify(mockCGroupsHandler, times(1)).createCGroup(
CGroupsHandler.CGroupController.DEVICES, containerId.toString()); CGroupsHandler.CGroupController.DEVICES, containerId.toString());
if (null != deniedDevices && !deniedDevices.isEmpty()) { if (null != deniedDevices && !deniedDevices.isEmpty()) {
List<Integer> deniedDevicesMinorNumber = new ArrayList<>();
for (GpuDevice deniedDevice : deniedDevices) {
deniedDevicesMinorNumber.add(deniedDevice.getMinorNumber());
}
verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation( verify(mockPrivilegedExecutor, times(1)).executePrivilegedOperation(
new PrivilegedOperation(PrivilegedOperation.OperationType.GPU, Arrays new PrivilegedOperation(PrivilegedOperation.OperationType.GPU, Arrays
.asList(GpuResourceHandlerImpl.CONTAINER_ID_CLI_OPTION, .asList(GpuResourceHandlerImpl.CONTAINER_ID_CLI_OPTION,
containerId.toString(), containerId.toString(),
GpuResourceHandlerImpl.EXCLUDED_GPUS_CLI_OPTION, GpuResourceHandlerImpl.EXCLUDED_GPUS_CLI_OPTION,
StringUtils.join(",", deniedDevices))), true); StringUtils.join(",", deniedDevicesMinorNumber))), true);
} }
} }
@Test private void commonTestAllocation(boolean dockerContainerEnabled)
public void testAllocation() throws Exception { throws Exception {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
gpuResourceHandler.bootstrap(conf); gpuResourceHandler.bootstrap(conf);
@ -147,31 +165,55 @@ public class TestGpuResourceHandler {
gpuResourceHandler.getGpuAllocator().getAvailableGpus()); gpuResourceHandler.getGpuAllocator().getAvailableGpus());
/* Start container 1, asks 3 containers */ /* Start container 1, asks 3 containers */
gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 3)); gpuResourceHandler.preStart(
mockContainerWithGpuRequest(1, 3, dockerContainerEnabled));
// Only device=4 will be blocked. // Only device=4 will be blocked.
verifyDeniedDevices(getContainerId(1), Arrays.asList(4)); if (dockerContainerEnabled) {
verifyDeniedDevices(getContainerId(1),
Collections.<GpuDevice>emptyList());
} else{
verifyDeniedDevices(getContainerId(1), Arrays.asList(new GpuDevice(3,4)));
}
/* Start container 2, asks 2 containers. Excepted to fail */ /* Start container 2, asks 2 containers. Excepted to fail */
boolean failedToAllocate = false; boolean failedToAllocate = false;
try { try {
gpuResourceHandler.preStart(mockContainerWithGpuRequest(2, 2)); gpuResourceHandler.preStart(
mockContainerWithGpuRequest(2, 2, dockerContainerEnabled));
} catch (ResourceHandlerException e) { } catch (ResourceHandlerException e) {
failedToAllocate = true; failedToAllocate = true;
} }
Assert.assertTrue(failedToAllocate); Assert.assertTrue(failedToAllocate);
/* Start container 3, ask 1 container, succeeded */ /* Start container 3, ask 1 container, succeeded */
gpuResourceHandler.preStart(mockContainerWithGpuRequest(3, 1)); gpuResourceHandler.preStart(
mockContainerWithGpuRequest(3, 1, dockerContainerEnabled));
// devices = 0/1/3 will be blocked // devices = 0/1/3 will be blocked
verifyDeniedDevices(getContainerId(3), Arrays.asList(0, 1, 3)); if (dockerContainerEnabled) {
verifyDeniedDevices(getContainerId(3),
Collections.<GpuDevice>emptyList());
} else {
verifyDeniedDevices(getContainerId(3), Arrays
.asList(new GpuDevice(0, 0), new GpuDevice(1, 1),
new GpuDevice(2, 3)));
}
/* Start container 4, ask 0 container, succeeded */ /* Start container 4, ask 0 container, succeeded */
gpuResourceHandler.preStart(mockContainerWithGpuRequest(4, 0)); gpuResourceHandler.preStart(
mockContainerWithGpuRequest(4, 0, dockerContainerEnabled));
// All devices will be blocked if (dockerContainerEnabled) {
verifyDeniedDevices(getContainerId(4), Arrays.asList(0, 1, 3, 4)); verifyDeniedDevices(getContainerId(4),
Collections.<GpuDevice>emptyList());
} else{
// All devices will be blocked
verifyDeniedDevices(getContainerId(4), Arrays
.asList(new GpuDevice(0, 0), new GpuDevice(1, 1), new GpuDevice(2, 3),
new GpuDevice(3, 4)));
}
/* Release container-1, expect cgroups deleted */ /* Release container-1, expect cgroups deleted */
gpuResourceHandler.postComplete(getContainerId(1)); gpuResourceHandler.postComplete(getContainerId(1));
@ -190,12 +232,24 @@ public class TestGpuResourceHandler {
gpuResourceHandler.getGpuAllocator().getAvailableGpus()); gpuResourceHandler.getGpuAllocator().getAvailableGpus());
} }
@Test
public void testAllocationWhenDockerContainerEnabled() throws Exception {
// When docker container is enabled, no devices should be written to
// devices.deny.
commonTestAllocation(true);
}
@Test
public void testAllocation() throws Exception {
commonTestAllocation(false);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testAssignedGpuWillBeCleanedupWhenStoreOpFails() public void testAssignedGpuWillBeCleanedupWhenStoreOpFails()
throws Exception { throws Exception {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
gpuResourceHandler.bootstrap(conf); gpuResourceHandler.bootstrap(conf);
@ -204,7 +258,7 @@ public class TestGpuResourceHandler {
doThrow(new IOException("Exception ...")).when(mockNMStateStore) doThrow(new IOException("Exception ...")).when(mockNMStateStore)
.storeAssignedResources( .storeAssignedResources(
any(ContainerId.class), anyString(), anyList()); any(Container.class), anyString(), anyList());
boolean exception = false; boolean exception = false;
/* Start container 1, asks 3 containers */ /* Start container 1, asks 3 containers */
@ -227,13 +281,16 @@ public class TestGpuResourceHandler {
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, " "); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, " ");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
gpuResourceHandler.bootstrap(conf); try {
Assert.assertEquals(0, gpuResourceHandler.bootstrap(conf);
gpuResourceHandler.getGpuAllocator().getAvailableGpus()); Assert.fail("Should fail because no GPU available");
} catch (ResourceHandlerException e) {
// Expected because of no resource available
}
/* Start container 1, asks 0 containers */ /* Start container 1, asks 0 containers */
gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 0)); gpuResourceHandler.preStart(mockContainerWithGpuRequest(1, 0));
verifyDeniedDevices(getContainerId(1), Collections.<Integer>emptyList()); verifyDeniedDevices(getContainerId(1), Collections.<GpuDevice>emptyList());
/* Start container 2, asks 1 containers. Excepted to fail */ /* Start container 2, asks 1 containers. Excepted to fail */
boolean failedToAllocate = false; boolean failedToAllocate = false;
@ -256,7 +313,7 @@ public class TestGpuResourceHandler {
@Test @Test
public void testAllocationStored() throws Exception { public void testAllocationStored() throws Exception {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
gpuResourceHandler.bootstrap(conf); gpuResourceHandler.bootstrap(conf);
@ -267,34 +324,34 @@ public class TestGpuResourceHandler {
Container container = mockContainerWithGpuRequest(1, 3); Container container = mockContainerWithGpuRequest(1, 3);
gpuResourceHandler.preStart(container); gpuResourceHandler.preStart(container);
verify(mockNMStateStore).storeAssignedResources(getContainerId(1), verify(mockNMStateStore).storeAssignedResources(container,
ResourceInformation.GPU_URI, ResourceInformation.GPU_URI, Arrays
Arrays.<Serializable>asList("0", "1", "3")); .<Serializable>asList(new GpuDevice(0, 0), new GpuDevice(1, 1),
new GpuDevice(2, 3)));
Assert.assertEquals(3, container.getResourceMappings()
.getAssignedResources(ResourceInformation.GPU_URI).size());
// Only device=4 will be blocked. // Only device=4 will be blocked.
verifyDeniedDevices(getContainerId(1), Arrays.asList(4)); verifyDeniedDevices(getContainerId(1), Arrays.asList(new GpuDevice(3, 4)));
/* Start container 2, ask 0 container, succeeded */ /* Start container 2, ask 0 container, succeeded */
container = mockContainerWithGpuRequest(2, 0); container = mockContainerWithGpuRequest(2, 0);
gpuResourceHandler.preStart(container); gpuResourceHandler.preStart(container);
verifyDeniedDevices(getContainerId(2), Arrays.asList(0, 1, 3, 4)); verifyDeniedDevices(getContainerId(2), Arrays
.asList(new GpuDevice(0, 0), new GpuDevice(1, 1), new GpuDevice(2, 3),
new GpuDevice(3, 4)));
Assert.assertEquals(0, container.getResourceMappings() Assert.assertEquals(0, container.getResourceMappings()
.getAssignedResources(ResourceInformation.GPU_URI).size()); .getAssignedResources(ResourceInformation.GPU_URI).size());
// Store assigned resource will not be invoked. // Store assigned resource will not be invoked.
verify(mockNMStateStore, never()).storeAssignedResources( verify(mockNMStateStore, never()).storeAssignedResources(
eq(getContainerId(2)), eq(ResourceInformation.GPU_URI), eq(container), eq(ResourceInformation.GPU_URI),
anyListOf(Serializable.class)); anyListOf(Serializable.class));
} }
@Test @Test
public void testRecoverResourceAllocation() throws Exception { public void testRecoverResourceAllocation() throws Exception {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,3,4"); conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:3,3:4");
GpuDiscoverer.getInstance().initialize(conf); GpuDiscoverer.getInstance().initialize(conf);
gpuResourceHandler.bootstrap(conf); gpuResourceHandler.bootstrap(conf);
@ -305,7 +362,8 @@ public class TestGpuResourceHandler {
ResourceMappings rmap = new ResourceMappings(); ResourceMappings rmap = new ResourceMappings();
ResourceMappings.AssignedResources ar = ResourceMappings.AssignedResources ar =
new ResourceMappings.AssignedResources(); new ResourceMappings.AssignedResources();
ar.updateAssignedResources(Arrays.<Serializable>asList("1", "3")); ar.updateAssignedResources(
Arrays.<Serializable>asList(new GpuDevice(1, 1), new GpuDevice(2, 3)));
rmap.addAssignedResources(ResourceInformation.GPU_URI, ar); rmap.addAssignedResources(ResourceInformation.GPU_URI, ar);
when(nmContainer.getResourceMappings()).thenReturn(rmap); when(nmContainer.getResourceMappings()).thenReturn(rmap);
@ -315,12 +373,15 @@ public class TestGpuResourceHandler {
// Reacquire container restore state of GPU Resource Allocator. // Reacquire container restore state of GPU Resource Allocator.
gpuResourceHandler.reacquireContainer(getContainerId(1)); gpuResourceHandler.reacquireContainer(getContainerId(1));
Map<Integer, ContainerId> deviceAllocationMapping = Map<GpuDevice, ContainerId> deviceAllocationMapping =
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertEquals(2, deviceAllocationMapping.size());
Assert.assertTrue( Assert.assertTrue(
deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); deviceAllocationMapping.keySet().contains(new GpuDevice(1, 1)));
Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); Assert.assertTrue(
deviceAllocationMapping.keySet().contains(new GpuDevice(2, 3)));
Assert.assertEquals(deviceAllocationMapping.get(new GpuDevice(1, 1)),
getContainerId(1));
// TEST CASE // TEST CASE
// Try to reacquire a container but requested device is not in allowed list. // Try to reacquire a container but requested device is not in allowed list.
@ -328,7 +389,8 @@ public class TestGpuResourceHandler {
rmap = new ResourceMappings(); rmap = new ResourceMappings();
ar = new ResourceMappings.AssignedResources(); ar = new ResourceMappings.AssignedResources();
// id=5 is not in allowed list. // id=5 is not in allowed list.
ar.updateAssignedResources(Arrays.<Serializable>asList("4", "5")); ar.updateAssignedResources(
Arrays.<Serializable>asList(new GpuDevice(3, 4), new GpuDevice(4, 5)));
rmap.addAssignedResources(ResourceInformation.GPU_URI, ar); rmap.addAssignedResources(ResourceInformation.GPU_URI, ar);
when(nmContainer.getResourceMappings()).thenReturn(rmap); when(nmContainer.getResourceMappings()).thenReturn(rmap);
@ -348,9 +410,10 @@ public class TestGpuResourceHandler {
deviceAllocationMapping = deviceAllocationMapping =
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertEquals(2, deviceAllocationMapping.size());
Assert.assertTrue( Assert.assertTrue(deviceAllocationMapping.keySet()
deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); .containsAll(Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 3))));
Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); Assert.assertEquals(deviceAllocationMapping.get(new GpuDevice(1, 1)),
getContainerId(1));
// TEST CASE // TEST CASE
// Try to reacquire a container but requested device is already assigned. // Try to reacquire a container but requested device is already assigned.
@ -358,7 +421,8 @@ public class TestGpuResourceHandler {
rmap = new ResourceMappings(); rmap = new ResourceMappings();
ar = new ResourceMappings.AssignedResources(); ar = new ResourceMappings.AssignedResources();
// id=3 is already assigned // id=3 is already assigned
ar.updateAssignedResources(Arrays.<Serializable>asList("4", "3")); ar.updateAssignedResources(
Arrays.<Serializable>asList(new GpuDevice(3, 4), new GpuDevice(2, 3)));
rmap.addAssignedResources("gpu", ar); rmap.addAssignedResources("gpu", ar);
when(nmContainer.getResourceMappings()).thenReturn(rmap); when(nmContainer.getResourceMappings()).thenReturn(rmap);
@ -378,8 +442,9 @@ public class TestGpuResourceHandler {
deviceAllocationMapping = deviceAllocationMapping =
gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping(); gpuResourceHandler.getGpuAllocator().getDeviceAllocationMapping();
Assert.assertEquals(2, deviceAllocationMapping.size()); Assert.assertEquals(2, deviceAllocationMapping.size());
Assert.assertTrue( Assert.assertTrue(deviceAllocationMapping.keySet()
deviceAllocationMapping.keySet().containsAll(Arrays.asList(1, 3))); .containsAll(Arrays.asList(new GpuDevice(1, 1), new GpuDevice(2, 3))));
Assert.assertEquals(deviceAllocationMapping.get(1), getContainerId(1)); Assert.assertEquals(deviceAllocationMapping.get(new GpuDevice(1, 1)),
getContainerId(1));
} }
} }

View File

@ -101,23 +101,41 @@ public class TestGpuDiscoverer {
GpuDeviceInformation info = plugin.getGpuDeviceInformation(); GpuDeviceInformation info = plugin.getGpuDeviceInformation();
Assert.assertTrue(info.getGpus().size() > 0); Assert.assertTrue(info.getGpus().size() > 0);
Assert.assertEquals(plugin.getMinorNumbersOfGpusUsableByYarn().size(), Assert.assertEquals(plugin.getGpusUsableByYarn().size(),
info.getGpus().size()); info.getGpus().size());
} }
@Test @Test
public void getNumberOfUsableGpusFromConfig() throws YarnException { public void getNumberOfUsableGpusFromConfig() throws YarnException {
Configuration conf = new Configuration(false); Configuration conf = new Configuration(false);
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0,1,2,4");
// Illegal format
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:2,3");
GpuDiscoverer plugin = new GpuDiscoverer(); GpuDiscoverer plugin = new GpuDiscoverer();
try {
plugin.initialize(conf);
plugin.getGpusUsableByYarn();
Assert.fail("Illegal format, should fail.");
} catch (YarnException e) {
// Expected
}
// Valid format
conf.set(YarnConfiguration.NM_GPU_ALLOWED_DEVICES, "0:0,1:1,2:2,3:4");
plugin = new GpuDiscoverer();
plugin.initialize(conf); plugin.initialize(conf);
List<Integer> minorNumbers = plugin.getMinorNumbersOfGpusUsableByYarn(); List<GpuDevice> usableGpuDevices = plugin.getGpusUsableByYarn();
Assert.assertEquals(4, minorNumbers.size()); Assert.assertEquals(4, usableGpuDevices.size());
Assert.assertTrue(0 == minorNumbers.get(0)); Assert.assertTrue(0 == usableGpuDevices.get(0).getIndex());
Assert.assertTrue(1 == minorNumbers.get(1)); Assert.assertTrue(1 == usableGpuDevices.get(1).getIndex());
Assert.assertTrue(2 == minorNumbers.get(2)); Assert.assertTrue(2 == usableGpuDevices.get(2).getIndex());
Assert.assertTrue(4 == minorNumbers.get(3)); Assert.assertTrue(3 == usableGpuDevices.get(3).getIndex());
Assert.assertTrue(0 == usableGpuDevices.get(0).getMinorNumber());
Assert.assertTrue(1 == usableGpuDevices.get(1).getMinorNumber());
Assert.assertTrue(2 == usableGpuDevices.get(2).getMinorNumber());
Assert.assertTrue(4 == usableGpuDevices.get(3).getMinorNumber());
} }
} }

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@ -515,14 +516,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
} }
@Override @Override
public void storeAssignedResources(ContainerId containerId, public void storeAssignedResources(Container container,
String resourceType, List<Serializable> assignedResources) String resourceType, List<Serializable> assignedResources)
throws IOException { throws IOException {
ResourceMappings.AssignedResources ar = ResourceMappings.AssignedResources ar =
new ResourceMappings.AssignedResources(); new ResourceMappings.AssignedResources();
ar.updateAssignedResources(assignedResources); ar.updateAssignedResources(assignedResources);
containerStates.get(containerId).getResourceMappings() containerStates.get(container.getContainerId()).getResourceMappings()
.addAssignedResources(resourceType, ar); .addAssignedResources(resourceType, ar);
// update container resource mapping.
updateContainerResourceMapping(container, resourceType, assignedResources);
} }
private static class TrackerState { private static class TrackerState {

View File

@ -29,6 +29,7 @@ import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -69,6 +70,8 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDelet
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@ -1143,17 +1146,22 @@ public class TestNMLeveldbStateStoreService {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5); ContainerId containerId = ContainerId.newContainerId(appAttemptId, 5);
storeMockContainer(containerId); storeMockContainer(containerId);
Container container = mock(Container.class);
when(container.getContainerId()).thenReturn(containerId);
ResourceMappings resourceMappings = new ResourceMappings();
when(container.getResourceMappings()).thenReturn(resourceMappings);
// Store ResourceMapping // Store ResourceMapping
stateStore.storeAssignedResources(containerId, "gpu", stateStore.storeAssignedResources(container, "gpu",
Arrays.<Serializable>asList("1", "2", "3")); Arrays.<Serializable>asList("1", "2", "3"));
// This will overwrite above // This will overwrite above
List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4"); List<Serializable> gpuRes1 = Arrays.<Serializable>asList("1", "2", "4");
stateStore.storeAssignedResources(containerId, "gpu", gpuRes1); stateStore.storeAssignedResources(container, "gpu", gpuRes1);
List<Serializable> fpgaRes = List<Serializable> fpgaRes =
Arrays.<Serializable>asList("3", "4", "5", "6"); Arrays.<Serializable>asList("3", "4", "5", "6");
stateStore.storeAssignedResources(containerId, "fpga", fpgaRes); stateStore.storeAssignedResources(container, "fpga", fpgaRes);
List<Serializable> numaRes = Arrays.<Serializable>asList("numa1"); List<Serializable> numaRes = Arrays.<Serializable>asList("numa1");
stateStore.storeAssignedResources(containerId, "numa", numaRes); stateStore.storeAssignedResources(container, "numa", numaRes);
// add a invalid key // add a invalid key
restartStateStore(); restartStateStore();
@ -1163,12 +1171,18 @@ public class TestNMLeveldbStateStoreService {
List<Serializable> res = rcs.getResourceMappings() List<Serializable> res = rcs.getResourceMappings()
.getAssignedResources("gpu"); .getAssignedResources("gpu");
Assert.assertTrue(res.equals(gpuRes1)); Assert.assertTrue(res.equals(gpuRes1));
Assert.assertTrue(
resourceMappings.getAssignedResources("gpu").equals(gpuRes1));
res = rcs.getResourceMappings().getAssignedResources("fpga"); res = rcs.getResourceMappings().getAssignedResources("fpga");
Assert.assertTrue(res.equals(fpgaRes)); Assert.assertTrue(res.equals(fpgaRes));
Assert.assertTrue(
resourceMappings.getAssignedResources("fpga").equals(fpgaRes));
res = rcs.getResourceMappings().getAssignedResources("numa"); res = rcs.getResourceMappings().getAssignedResources("numa");
Assert.assertTrue(res.equals(numaRes)); Assert.assertTrue(res.equals(numaRes));
Assert.assertTrue(
resourceMappings.getAssignedResources("numa").equals(numaRes));
} }
private StartContainerRequest storeMockContainer(ContainerId containerId) private StartContainerRequest storeMockContainer(ContainerId containerId)