YARN-8881. [YARN-8851] Add basic pluggable device plugin framework. (Zhankun Tang via wangda)

Change-Id: If9a2f68cd4713b4ec932cdeda68106f17437c3d3
This commit is contained in:
Wangda Tan 2018-11-19 08:54:31 -08:00
parent 93666087bc
commit 6357803645
21 changed files with 1773 additions and 11 deletions

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.io.Serializable;
import java.util.Objects;
/**
* Represent one "device" resource.
* */
public final class Device implements Serializable, Comparable {
private static final long serialVersionUID = -7270474563684671656L;
/**
* An plugin specified index number.
* Must set. Recommend starting from 0
* */
private final int id;
/**
* The device node like "/dev/devname".
* Optional
* */
private final String devPath;
/**
* The major device number.
* Optional
* */
private final int majorNumber;
/**
* The minor device number.
* Optional
* */
private final int minorNumber;
/**
* PCI Bus ID in format [[[[<domain>]:]<bus>]:][<slot>][.[<func>]].
* Optional. Can get from "lspci -D" in Linux
* */
private final String busID;
/**
* Is healthy or not.
* false by default
* */
private boolean isHealthy;
/**
* Plugin customized status info.
* Optional
* */
private String status;
/**
* Private constructor.
* @param builder
*/
private Device(Builder builder) {
if (builder.id == -1) {
throw new IllegalArgumentException("Please set the id for Device");
}
this.id = builder.id;
this.devPath = builder.devPath;
this.majorNumber = builder.majorNumber;
this.minorNumber = builder.minorNumber;
this.busID = builder.busID;
this.isHealthy = builder.isHealthy;
this.status = builder.status;
}
public int getId() {
return id;
}
public String getDevPath() {
return devPath;
}
public int getMajorNumber() {
return majorNumber;
}
public int getMinorNumber() {
return minorNumber;
}
public String getBusID() {
return busID;
}
public boolean isHealthy() {
return isHealthy;
}
public String getStatus() {
return status;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Device device = (Device) o;
return id == device.getId()
&& Objects.equals(devPath, device.getDevPath())
&& majorNumber == device.getMajorNumber()
&& minorNumber == device.getMinorNumber()
&& Objects.equals(busID, device.getBusID());
}
@Override
public int hashCode() {
return Objects.hash(id, devPath, majorNumber, minorNumber, busID);
}
@Override
public int compareTo(Object o) {
if (o == null || (!(o instanceof Device))) {
return -1;
}
Device other = (Device) o;
int result = Integer.compare(id, other.getId());
if (0 != result) {
return result;
}
result = Integer.compare(majorNumber, other.getMajorNumber());
if (0 != result) {
return result;
}
result = Integer.compare(minorNumber, other.getMinorNumber());
if (0 != result) {
return result;
}
result = devPath.compareTo(other.getDevPath());
if (0 != result) {
return result;
}
return busID.compareTo(other.getBusID());
}
@Override
public String toString() {
return "(" + getId() + ", " + getDevPath() + ", "
+ getMajorNumber() + ":" + getMinorNumber() + ")";
}
/**
* Builder for Device.
* */
public final static class Builder {
// default -1 representing the value is not set
private int id = -1;
private String devPath = "";
private int majorNumber;
private int minorNumber;
private String busID = "";
private boolean isHealthy;
private String status = "";
public static Builder newInstance() {
return new Builder();
}
public Device build() {
return new Device(this);
}
public Builder setId(int i) {
this.id = i;
return this;
}
public Builder setDevPath(String dp) {
this.devPath = dp;
return this;
}
public Builder setMajorNumber(int maN) {
this.majorNumber = maN;
return this;
}
public Builder setMinorNumber(int miN) {
this.minorNumber = miN;
return this;
}
public Builder setBusID(String bI) {
this.busID = bI;
return this;
}
public Builder setHealthy(boolean healthy) {
isHealthy = healthy;
return this;
}
public Builder setStatus(String s) {
this.status = s;
return this;
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.util.Set;
/**
* A must interface for vendor plugin to implement.
* */
public interface DevicePlugin {
/**
* Called first when device plugin framework wants to register.
* @return DeviceRegisterRequest {@link DeviceRegisterRequest}
* */
DeviceRegisterRequest getRegisterRequestInfo()
throws Exception;
/**
* Called when update node resource.
* @return a set of {@link Device}, {@link java.util.TreeSet} recommended
* */
Set<Device> getDevices() throws Exception;
/**
* Asking how these devices should be prepared/used
* before/when container launch. A plugin can do some tasks in its own or
* define it in DeviceRuntimeSpec to let the framework do it.
* For instance, define {@code VolumeSpec} to let the
* framework to create volume before running container.
*
* @param allocatedDevices A set of allocated {@link Device}.
* @param yarnRuntime Indicate which runtime YARN will use
* Could be {@code RUNTIME_DEFAULT} or {@code RUNTIME_DOCKER}
* in {@link DeviceRuntimeSpec} constants. The default means YARN's
* non-docker container runtime is used. The docker means YARN's
* docker container runtime is used.
* @return a {@link DeviceRuntimeSpec} description about environment,
* {@link VolumeSpec}, {@link MountVolumeSpec}. etc
* */
DeviceRuntimeSpec onDevicesAllocated(Set<Device> allocatedDevices,
YarnRuntimeType yarnRuntime) throws Exception;
/**
* Called after device released.
* @param releasedDevices A set of released devices
* */
void onDevicesReleased(Set<Device> releasedDevices)
throws Exception;
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.util.Objects;
/**
* Contains plugin register request info.
* */
public final class DeviceRegisterRequest {
// plugin's own version
private final String pluginVersion;
private final String resourceName;
private DeviceRegisterRequest(Builder builder) {
this.resourceName = Objects.requireNonNull(builder.resourceName);
this.pluginVersion = builder.pluginVersion;
}
public String getResourceName() {
return resourceName;
}
public String getPluginVersion() {
return pluginVersion;
}
/**
* Builder class for construct {@link DeviceRegisterRequest}.
* */
public final static class Builder {
private String pluginVersion;
private String resourceName;
private Builder() {}
public static Builder newInstance() {
return new Builder();
}
public DeviceRegisterRequest build() {
return new DeviceRegisterRequest(this);
}
public Builder setResourceName(String resName) {
this.resourceName = resName;
return this;
}
public Builder setPluginVersion(String plVersion) {
this.pluginVersion = plVersion;
return this;
}
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
/**
* This is a spec used to prepare & run container.
* It's return value of onDeviceAllocated invoked by the framework.
* For preparation, if volumeSpecs is populated then the framework will
* create the volume before using the device
* When running container, the envs indicates environment variable needed.
* The containerRuntime indicates what Docker runtime to use.
* The volume & device mounts describes key isolation requirements
* */
public final class DeviceRuntimeSpec implements Serializable {
private static final long serialVersionUID = 554704120015467660L;
/**
* The containerRuntime gives device framework a hint (not forced to).
* On which containerRuntime be used
* (if empty then default "runc" is used).
* For instance, it could be "nvidia" in Nvidia GPU Docker v2.
* The "nvidia" will be passed as a parameter to docker run
* with --runtime "nvidia"
*/
private final String containerRuntime;
private final Map<String, String> envs;
private final Set<MountVolumeSpec> volumeMounts;
private final Set<MountDeviceSpec> deviceMounts;
private final Set<VolumeSpec> volumeSpecs;
private DeviceRuntimeSpec(Builder builder) {
this.containerRuntime = builder.containerRuntime;
this.deviceMounts = builder.deviceMounts;
this.envs = builder.envs;
this.volumeSpecs = builder.volumeSpecs;
this.volumeMounts = builder.volumeMounts;
}
public String getContainerRuntime() {
return containerRuntime;
}
public Map<String, String> getEnvs() {
return envs;
}
public Set<MountVolumeSpec> getVolumeMounts() {
return volumeMounts;
}
public Set<MountDeviceSpec> getDeviceMounts() {
return deviceMounts;
}
public Set<VolumeSpec> getVolumeSpecs() {
return volumeSpecs;
}
/**
* Builder for DeviceRuntimeSpec.
* */
public final static class Builder {
private String containerRuntime;
private Map<String, String> envs;
private Set<MountVolumeSpec> volumeMounts;
private Set<MountDeviceSpec> deviceMounts;
private Set<VolumeSpec> volumeSpecs;
private Builder() {
containerRuntime = "";
envs = new HashMap<>();
volumeSpecs = new TreeSet<>();
deviceMounts = new TreeSet<>();
volumeMounts = new TreeSet<>();
}
public static Builder newInstance() {
return new Builder();
}
public DeviceRuntimeSpec build() {
return new DeviceRuntimeSpec(this);
}
public Builder setContainerRuntime(String cRuntime) {
this.containerRuntime = cRuntime;
return this;
}
public Builder addVolumeSpec(VolumeSpec spec) {
this.volumeSpecs.add(spec);
return this;
}
public Builder addMountVolumeSpec(MountVolumeSpec spec) {
this.volumeMounts.add(spec);
return this;
}
public Builder addMountDeviceSpec(MountDeviceSpec spec) {
this.deviceMounts.add(spec);
return this;
}
public Builder addEnv(String key, String value) {
this.envs.put(Objects.requireNonNull(key),
Objects.requireNonNull(value));
return this;
}
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.io.Serializable;
import java.util.Objects;
/**
* Describe one device mount.
* */
public final class MountDeviceSpec implements Serializable, Comparable {
private static final long serialVersionUID = -160806358136943052L;
private final String devicePathInHost;
private final String devicePathInContainer;
// r for only read, rw can do read and write
private final String devicePermission;
public final static String RO = "r";
public final static String RW = "rw";
private MountDeviceSpec(Builder builder) {
this.devicePathInContainer = builder.devicePathInContainer;
this.devicePathInHost = builder.devicePathInHost;
this.devicePermission = builder.devicePermission;
}
public String getDevicePathInHost() {
return devicePathInHost;
}
public String getDevicePathInContainer() {
return devicePathInContainer;
}
public String getDevicePermission() {
return devicePermission;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()){
return false;
}
MountDeviceSpec other = (MountDeviceSpec) o;
return Objects.equals(devicePathInHost, other.getDevicePathInHost())
&& Objects.equals(devicePathInContainer,
other.getDevicePathInContainer())
&& Objects.equals(devicePermission, other.getDevicePermission());
}
@Override
public int hashCode() {
return Objects.hash(devicePathInContainer,
devicePathInHost, devicePermission);
}
@Override
public int compareTo(Object o) {
if (o == null || (!(o instanceof MountDeviceSpec))) {
return -1;
}
MountDeviceSpec other = (MountDeviceSpec) o;
int result = devicePathInContainer.compareTo(
other.getDevicePathInContainer());
if (0 != result) {
return result;
}
result = devicePathInHost.compareTo(other.getDevicePathInHost());
if (0 != result) {
return result;
}
return devicePermission.compareTo(other.getDevicePermission());
}
/**
* Builder for MountDeviceSpec.
* */
public final static class Builder {
private String devicePathInHost;
private String devicePathInContainer;
private String devicePermission;
private Builder() {}
public static Builder newInstance() {
return new Builder();
}
public MountDeviceSpec build() {
return new MountDeviceSpec(this);
}
public Builder setDevicePermission(String permission) {
this.devicePermission = permission;
return this;
}
public Builder setDevicePathInContainer(String pathInContainer) {
this.devicePathInContainer = pathInContainer;
return this;
}
public Builder setDevicePathInHost(String pathInHost) {
this.devicePathInHost = pathInHost;
return this;
}
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.io.Serializable;
import java.util.Objects;
/**
* Describe one volume mount.
* */
public final class MountVolumeSpec implements Serializable, Comparable {
private static final long serialVersionUID = 2479676805545997492L;
// host path or volume name
private final String hostPath;
// path in the container
private final String mountPath;
// if true, data in mountPath can only be read
// "-v hostPath:mountPath:ro"
private final Boolean isReadOnly;
public final static String READONLYOPTION = "ro";
private MountVolumeSpec(Builder builder) {
this.hostPath = builder.hostPath;
this.mountPath = builder.mountPath;
this.isReadOnly = builder.isReadOnly;
}
public String getHostPath() {
return hostPath;
}
public String getMountPath() {
return mountPath;
}
public Boolean getReadOnly() {
return isReadOnly;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()){
return false;
}
MountVolumeSpec other = (MountVolumeSpec) o;
return Objects.equals(hostPath, other.getHostPath())
&& Objects.equals(mountPath, other.getMountPath())
&& Objects.equals(isReadOnly, other.getReadOnly());
}
@Override
public int hashCode() {
return Objects.hash(hostPath, mountPath, isReadOnly);
}
@Override
public int compareTo(Object o) {
return 0;
}
/**
* Builder for MountVolumeSpec.
* */
public final static class Builder {
private String hostPath;
private String mountPath;
private Boolean isReadOnly;
private Builder() {}
public static Builder newInstance() {
return new Builder();
}
public MountVolumeSpec build() {
return new MountVolumeSpec(this);
}
public Builder setHostPath(String hPath) {
this.hostPath = hPath;
return this;
}
public Builder setMountPath(String mPath) {
this.mountPath = mPath;
return this;
}
public Builder setReadOnly(Boolean readOnly) {
isReadOnly = readOnly;
return this;
}
}
}

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
import java.io.Serializable;
import java.util.Objects;
/**
* Describe one volume creation or deletion.
* */
public final class VolumeSpec implements Serializable, Comparable {
private static final long serialVersionUID = 3483619025106416736L;
private final String volumeDriver;
private final String volumeName;
private final String volumeOperation;
public final static String CREATE = "create";
public final static String DELETE = "delete";
private VolumeSpec(Builder builder) {
this.volumeDriver = builder.volumeDriver;
this.volumeName = builder.volumeName;
this.volumeOperation = builder.volumeOperation;
}
public String getVolumeDriver() {
return volumeDriver;
}
public String getVolumeName() {
return volumeName;
}
public String getVolumeOperation() {
return volumeOperation;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()){
return false;
}
VolumeSpec other = (VolumeSpec) o;
return Objects.equals(volumeDriver, other.getVolumeDriver())
&& Objects.equals(volumeName, other.getVolumeName())
&& Objects.equals(volumeOperation, other.getVolumeOperation());
}
@Override
public int hashCode() {
return Objects.hash(volumeDriver, volumeName, volumeOperation);
}
@Override
public int compareTo(Object o) {
return 0;
}
/**
* Builder for VolumeSpec.
* */
public final static class Builder {
private String volumeDriver;
private String volumeName;
private String volumeOperation;
private Builder(){}
public static Builder newInstance() {
return new Builder();
}
public VolumeSpec build() {
return new VolumeSpec(this);
}
public Builder setVolumeDriver(String volDriver) {
this.volumeDriver = volDriver;
return this;
}
public Builder setVolumeName(String volName) {
this.volumeName = volName;
return this;
}
public Builder setVolumeOperation(String volOperation) {
this.volumeOperation = volOperation;
return this;
}
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;
/**
* YarnRuntime parameter enum for {@link DevicePlugin}.
* It's passed into {@code onDevicesAllocated}.
* Device plugin could populate {@link DeviceRuntimeSpec}
* based on which YARN container runtime will use.
* */
public enum YarnRuntimeType {
RUNTIME_DEFAULT("default"),
RUNTIME_DOCKER("docker");
private final String name;
YarnRuntimeType(String n) {
this.name = n;
}
public String getName() {
return name;
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin;

View File

@ -18,18 +18,26 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin; package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.DevicePluginAdapter;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -50,8 +58,9 @@ public class ResourcePluginManager {
private Map<String, ResourcePlugin> configuredPlugins = private Map<String, ResourcePlugin> configuredPlugins =
Collections.emptyMap(); Collections.emptyMap();
public synchronized void initialize(Context context) public synchronized void initialize(Context context)
throws YarnException { throws YarnException, ClassNotFoundException {
Configuration conf = context.getConf(); Configuration conf = context.getConf();
Map<String, ResourcePlugin> pluginMap = new HashMap<>(); Map<String, ResourcePlugin> pluginMap = new HashMap<>();
@ -111,7 +120,7 @@ public synchronized void initialize(Context context)
public void initializePluggableDevicePlugins(Context context, public void initializePluggableDevicePlugins(Context context,
Configuration configuration, Configuration configuration,
Map<String, ResourcePlugin> pluginMap) Map<String, ResourcePlugin> pluginMap)
throws YarnRuntimeException { throws YarnRuntimeException, ClassNotFoundException {
LOG.info("The pluggable device framework enabled," + LOG.info("The pluggable device framework enabled," +
"trying to load the vendor plugins"); "trying to load the vendor plugins");
@ -121,6 +130,109 @@ public void initializePluggableDevicePlugins(Context context,
throw new YarnRuntimeException("Null value found in configuration: " throw new YarnRuntimeException("Null value found in configuration: "
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES); + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
} }
for (String pluginClassName : pluginClassNames) {
Class<?> pluginClazz = Class.forName(pluginClassName);
if (!DevicePlugin.class.isAssignableFrom(pluginClazz)) {
throw new YarnRuntimeException("Class: " + pluginClassName
+ " not instance of " + DevicePlugin.class.getCanonicalName());
}
// sanity-check before initialization
checkInterfaceCompatibility(DevicePlugin.class, pluginClazz);
DevicePlugin dpInstance =
(DevicePlugin) ReflectionUtils.newInstance(
pluginClazz, configuration);
// Try to register plugin
// TODO: handle the plugin method timeout issue
DeviceRegisterRequest request = null;
try {
request = dpInstance.getRegisterRequestInfo();
} catch (Exception e) {
throw new YarnRuntimeException("Exception thrown from plugin's"
+ " getRegisterRequestInfo:"
+ e.getMessage());
}
String resourceName = request.getResourceName();
// check if someone has already registered this resource type name
if (pluginMap.containsKey(resourceName)) {
throw new YarnRuntimeException(resourceName
+ " already registered! Please change resource type name"
+ " or configure correct resource type name"
+ " in resource-types.xml for "
+ pluginClassName);
}
// check resource name is valid and configured in resource-types.xml
if (!isConfiguredResourceName(resourceName)) {
throw new YarnRuntimeException(resourceName
+ " is not configured inside "
+ YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE
+ " , please configure it first");
}
LOG.info("New resource type: {} registered successfully by {}",
resourceName,
pluginClassName);
DevicePluginAdapter pluginAdapter = new DevicePluginAdapter(
resourceName, dpInstance);
LOG.info("Adapter of {} created. Initializing..", pluginClassName);
try {
pluginAdapter.initialize(context);
} catch (YarnException e) {
throw new YarnRuntimeException("Adapter of "
+ pluginClassName + " init failed!");
}
LOG.info("Adapter of {} init success!", pluginClassName);
// Store plugin as adapter instance
pluginMap.put(request.getResourceName(), pluginAdapter);
} // end for
}
@VisibleForTesting
// Check if the implemented interfaces' signature is compatible
public void checkInterfaceCompatibility(Class<?> expectedClass,
Class<?> actualClass) throws YarnRuntimeException{
LOG.debug("Checking implemented interface's compatibility: {}",
expectedClass.getSimpleName());
Method[] expectedDevicePluginMethods = expectedClass.getMethods();
// Check method compatibility
boolean found;
for (Method method: expectedDevicePluginMethods) {
found = false;
LOG.debug("Try to find method: {}",
method.getName());
for (Method m : actualClass.getDeclaredMethods()) {
if (m.getName().equals(method.getName())) {
LOG.debug("Method {} found in class {}",
actualClass.getSimpleName(),
m.getName());
found = true;
break;
}
}
if (!found) {
LOG.error("Method {} is not found in plugin",
method.getName());
throw new YarnRuntimeException(
"Method " + method.getName()
+ " is expected but not implemented in "
+ actualClass.getCanonicalName());
}
}// end for
LOG.info("{} compatibility is ok.",
expectedClass.getSimpleName());
}
@VisibleForTesting
public boolean isConfiguredResourceName(String resourceName) {
// check configured
Map<String, ResourceInformation> configuredResourceTypes =
ResourceUtils.getResourceTypes();
if (!configuredResourceTypes.containsKey(resourceName)) {
return false;
}
return true;
} }
public synchronized void cleanup() throws YarnException { public synchronized void cleanup() throws YarnException {

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.DockerCommandPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
/**
* The {@link DevicePluginAdapter} will adapt existing hooks
* into vendor plugin's logic.
* It decouples the vendor plugin from YARN's device framework
*
* */
public class DevicePluginAdapter implements ResourcePlugin {
private final static Log LOG = LogFactory.getLog(DevicePluginAdapter.class);
private final String resourceName;
private final DevicePlugin devicePlugin;
private DeviceResourceUpdaterImpl deviceResourceUpdater;
public DevicePluginAdapter(String name, DevicePlugin dp) {
resourceName = name;
devicePlugin = dp;
}
@Override
public void initialize(Context context) throws YarnException {
deviceResourceUpdater = new DeviceResourceUpdaterImpl(
resourceName, devicePlugin);
LOG.info(resourceName + " plugin adapter initialized");
return;
}
@Override
public ResourceHandler createResourceHandler(Context nmContext,
CGroupsHandler cGroupsHandler,
PrivilegedOperationExecutor privilegedOperationExecutor) {
return null;
}
@Override
public NodeResourceUpdaterPlugin getNodeResourceHandlerInstance() {
return deviceResourceUpdater;
}
@Override
public void cleanup() {
}
@Override
public DockerCommandPlugin getDockerCommandPluginInstance() {
return null;
}
@Override
public NMResourceInfo getNMResourceInfo() throws YarnException {
return null;
}
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import java.util.Set;
/**
* Hooks into NodeStatusUpdater to update resource.
* */
public class DeviceResourceUpdaterImpl extends NodeResourceUpdaterPlugin {
final static Log LOG = LogFactory.getLog(DeviceResourceUpdaterImpl.class);
private String resourceName;
private DevicePlugin devicePlugin;
public DeviceResourceUpdaterImpl(String resourceName,
DevicePlugin devicePlugin) {
this.devicePlugin = devicePlugin;
this.resourceName = resourceName;
}
@Override
public void updateConfiguredResource(Resource res)
throws YarnException {
LOG.info(resourceName + " plugin update resource ");
Set<Device> devices = null;
try {
devices = devicePlugin.getDevices();
} catch (Exception e) {
throw new YarnException("Exception thrown from plugin's getDevices"
+ e.getMessage());
}
if (null == devices) {
LOG.warn(resourceName
+ " plugin failed to discover resource ( null value got).");
return;
}
res.setResourceValue(resourceName, devices.size());
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase; import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase;
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
@ -42,7 +43,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -51,6 +55,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.io.File;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -64,9 +69,16 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
private YarnConfiguration conf; private YarnConfiguration conf;
private String tempResourceTypesFile;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
this.conf = createNMConfig(); this.conf = createNMConfig();
// setup resource-types.xml
ResourceUtils.resetResourceTypes();
String resourceTypesFile = "resource-types-pluggable-devices.xml";
this.tempResourceTypesFile =
TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
} }
ResourcePluginManager stubResourcePluginmanager() { ResourcePluginManager stubResourcePluginmanager() {
@ -101,6 +113,11 @@ public void tearDown() {
// ignore // ignore
} }
} }
// cleanup resource-types.xml
File dest = new File(this.tempResourceTypesFile);
if (dest.exists()) {
dest.delete();
}
} }
private class CustomizedResourceHandler implements ResourceHandler { private class CustomizedResourceHandler implements ResourceHandler {
@ -153,8 +170,8 @@ public MyMockNM(ResourcePluginManager rpm) {
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
((NodeManager.NMContext)context).setResourcePluginManager(rpm); ((NodeManager.NMContext)context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, return new BaseNodeStatusUpdaterForTest(context, dispatcher,
metrics, new BaseResourceTrackerForTest()); healthChecker, metrics, new BaseResourceTrackerForTest());
} }
@Override @Override
@ -174,7 +191,8 @@ protected ResourcePluginManager createResourcePluginManager() {
} }
public class MyLCE extends LinuxContainerExecutor { public class MyLCE extends LinuxContainerExecutor {
private PrivilegedOperationExecutor poe = mock(PrivilegedOperationExecutor.class); private PrivilegedOperationExecutor poe =
mock(PrivilegedOperationExecutor.class);
@Override @Override
protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() { protected PrivilegedOperationExecutor getPrivilegedOperationExecutor() {
@ -199,7 +217,8 @@ public void testResourcePluginManagerInitialization() throws Exception {
* Make sure ResourcePluginManager is invoked during NM update. * Make sure ResourcePluginManager is invoked during NM update.
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testNodeStatusUpdaterWithResourcePluginsEnabled() throws Exception { public void testNodeStatusUpdaterWithResourcePluginsEnabled()
throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager(); final ResourcePluginManager rpm = stubResourcePluginmanager();
nm = new MyMockNM(rpm); nm = new MyMockNM(rpm);
@ -211,15 +230,16 @@ public void testNodeStatusUpdaterWithResourcePluginsEnabled() throws Exception {
rpm.getNameToPlugins().get("resource1") rpm.getNameToPlugins().get("resource1")
.getNodeResourceHandlerInstance(); .getNodeResourceHandlerInstance();
verify(nodeResourceUpdaterPlugin, times(1)).updateConfiguredResource( verify(nodeResourceUpdaterPlugin, times(1))
any(Resource.class)); .updateConfiguredResource(any(Resource.class));
} }
/* /*
* Make sure ResourcePluginManager is used to initialize ResourceHandlerChain * Make sure ResourcePluginManager is used to initialize ResourceHandlerChain
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Exception { public void testLinuxContainerExecutorWithResourcePluginsEnabled()
throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager(); final ResourcePluginManager rpm = stubResourcePluginmanager();
final LinuxContainerExecutor lce = new MyLCE(); final LinuxContainerExecutor lce = new MyLCE();
@ -261,6 +281,9 @@ protected ContainerExecutor createContainerExecutor(
boolean newHandlerAdded = false; boolean newHandlerAdded = false;
for (ResourceHandler h : ((ResourceHandlerChain) handler) for (ResourceHandler h : ((ResourceHandlerChain) handler)
.getResourceHandlerList()) { .getResourceHandlerList()) {
if (h instanceof DevicePluginAdapter) {
Assert.assertTrue(false);
}
if (h instanceof CustomizedResourceHandler) { if (h instanceof CustomizedResourceHandler) {
newHandlerAdded = true; newHandlerAdded = true;
break; break;
@ -320,7 +343,7 @@ public void testInitializationWithPluggableDeviceFrameworkEnabled()
true); true);
conf.setStrings( conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES, YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
"com.cmp1.hdw1plugin"); FakeTestDevicePlugin1.class.getCanonicalName());
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
verify(rpmSpy, times(1)).initialize( verify(rpmSpy, times(1)).initialize(
@ -332,7 +355,8 @@ public void testInitializationWithPluggableDeviceFrameworkEnabled()
// Enable pluggable framework, but leave device classes un-configured // Enable pluggable framework, but leave device classes un-configured
// initializePluggableDevicePlugins invoked but it should throw an exception // initializePluggableDevicePlugins invoked but it should throw an exception
@Test(timeout = 30000) @Test(timeout = 30000)
public void testInitializationWithPluggableDeviceFrameworkEnabled2() { public void testInitializationWithPluggableDeviceFrameworkEnabled2()
throws ClassNotFoundException {
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
@ -353,4 +377,131 @@ public void testInitializationWithPluggableDeviceFrameworkEnabled2() {
any(Context.class), any(Configuration.class), any(Map.class)); any(Context.class), any(Configuration.class), any(Map.class));
Assert.assertTrue(fail); Assert.assertTrue(fail);
} }
@Test(timeout = 30000)
public void testNormalInitializationOfPluggableDeviceClasses()
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
FakeTestDevicePlugin1.class.getCanonicalName());
nm.init(conf);
nm.start();
Map<String, ResourcePlugin> pluginMap = rpmSpy.getNameToPlugins();
Assert.assertEquals(1, pluginMap.size());
ResourcePlugin rp = pluginMap.get("cmpA.com/hdwA");
if (!(rp instanceof DevicePluginAdapter)) {
Assert.assertTrue(false);
}
verify(rpmSpy, times(1)).checkInterfaceCompatibility(
DevicePlugin.class, FakeTestDevicePlugin1.class);
}
// Fail to load a class which doesn't implement interface DevicePlugin
@Test(timeout = 30000)
public void testLoadInvalidPluggableDeviceClasses()
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
FakeTestDevicePlugin2.class.getCanonicalName());
String expectedMessage = "Class: "
+ FakeTestDevicePlugin2.class.getCanonicalName()
+ " not instance of " + DevicePlugin.class.getCanonicalName();
String actualMessage = "";
try {
nm.init(conf);
nm.start();
} catch (YarnRuntimeException e) {
actualMessage = e.getMessage();
}
Assert.assertEquals(expectedMessage, actualMessage);
}
// Fail to register duplicated resource name.
@Test(timeout = 30000)
public void testLoadDuplicateResourceNameDevicePlugin()
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
FakeTestDevicePlugin1.class.getCanonicalName() + "," +
FakeTestDevicePlugin3.class.getCanonicalName());
String expectedMessage = "cmpA.com/hdwA" +
" already registered! Please change resource type name"
+ " or configure correct resource type name"
+ " in resource-types.xml for "
+ FakeTestDevicePlugin3.class.getCanonicalName();
String actualMessage = "";
try {
nm.init(conf);
nm.start();
} catch (YarnRuntimeException e) {
actualMessage = e.getMessage();
}
Assert.assertEquals(expectedMessage, actualMessage);
}
/**
* Fail a plugin due to incompatible interface implemented.
* It doesn't implement the "getRegisterRequestInfo"
*/
@Test(timeout = 30000)
public void testIncompatibleDevicePlugin()
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
FakeTestDevicePlugin4.class.getCanonicalName());
String expectedMessage = "Method getRegisterRequestInfo" +
" is expected but not implemented in "
+ FakeTestDevicePlugin4.class.getCanonicalName();
String actualMessage = "";
try {
nm.init(conf);
nm.start();
} catch (YarnRuntimeException e) {
actualMessage = e.getMessage();
}
Assert.assertEquals(expectedMessage, actualMessage);
}
@Test(timeout = 30000)
public void testRequestedResourceNameIsConfigured()
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager();
String resourceName = "a.com/a";
Assert.assertFalse(rpm.isConfiguredResourceName(resourceName));
resourceName = "cmp.com/cmp";
Assert.assertTrue(rpm.isConfiguredResourceName(resourceName));
}
} }

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
import java.util.Set;
import java.util.TreeSet;
/**
* Used only for testing.
* A fake normal vendor plugin
* */
public class FakeTestDevicePlugin1 implements DevicePlugin {
@Override
public DeviceRegisterRequest getRegisterRequestInfo() {
return DeviceRegisterRequest.Builder.newInstance()
.setResourceName("cmpA.com/hdwA").build();
}
@Override
public Set<Device> getDevices() {
TreeSet<Device> r = new TreeSet<>();
r.add(Device.Builder.newInstance()
.setId(0)
.setDevPath("/dev/hdwA0")
.setMajorNumber(243)
.setMinorNumber(0)
.setBusID("0000:65:00.0")
.setHealthy(true)
.build());
return r;
}
@Override
public DeviceRuntimeSpec onDevicesAllocated(Set<Device> allocatedDevices,
YarnRuntimeType yarnRuntime) throws Exception {
return null;
}
@Override
public void onDevicesReleased(Set<Device> allocatedDevices) {
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
/**
* Only used for testing.
* This isn't a implementation of DevicePlugin
* */
public class FakeTestDevicePlugin2 {
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.YarnRuntimeType;
import java.util.Set;
import java.util.TreeSet;
/**
* Only used for testing.
* This plugin register a same name with FakeTestDevicePlugin1
* */
public class FakeTestDevicePlugin3 implements DevicePlugin {
@Override
public DeviceRegisterRequest getRegisterRequestInfo() {
return DeviceRegisterRequest.Builder.newInstance()
.setResourceName("cmpA.com/hdwA").build();
}
@Override
public Set<Device> getDevices() {
TreeSet<Device> r = new TreeSet<>();
r.add(Device.Builder.newInstance()
.setId(0)
.setDevPath("/dev/hdwA0")
.setMajorNumber(243)
.setMinorNumber(0)
.setBusID("0000:65:00.0")
.setHealthy(true)
.build());
return r;
}
@Override
public DeviceRuntimeSpec onDevicesAllocated(Set<Device> allocatedDevices,
YarnRuntimeType yarnRuntime) throws Exception {
return null;
}
@Override
public void onDevicesReleased(Set<Device> allocatedDevices) {
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.Device;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DevicePlugin;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRegisterRequest;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.DeviceRuntimeSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.YarnRuntimeType;
import java.util.Set;
/**
* Implement the interface but missed getRegisterRequestInfo method.
* This is equivalent to implements a old version of DevicePlugin
*/
public abstract class FakeTestDevicePlugin4 implements DevicePlugin {
public DeviceRegisterRequest getOldRegisterRequestInfo() {
return null;
}
@Override
public Set<Device> getDevices() {
return null;
}
@Override
public DeviceRuntimeSpec onDevicesAllocated(Set<Device> allocatedDevices,
YarnRuntimeType yarnRuntime) throws Exception {
return null;
}
@Override
public void onDevicesReleased(Set<Device> releasedDevices) {
}
}

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.nodemanager.*;
import org.apache.hadoop.yarn.server.nodemanager.api.deviceplugin.*;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
/**
* Unit tests for DevicePluginAdapter.
* About interaction with vendor plugin
* */
public class TestDevicePluginAdapter {
protected static final Logger LOG =
LoggerFactory.getLogger(TestDevicePluginAdapter.class);
private YarnConfiguration conf;
private String tempResourceTypesFile;
@Before
public void setup() throws Exception {
this.conf = new YarnConfiguration();
// setup resource-types.xml
ResourceUtils.resetResourceTypes();
String resourceTypesFile = "resource-types-pluggable-devices.xml";
this.tempResourceTypesFile =
TestResourceUtils.setupResourceTypes(this.conf, resourceTypesFile);
}
@After
public void tearDown() throws IOException {
// cleanup resource-types.xml
File dest = new File(this.tempResourceTypesFile);
if (dest.exists()) {
dest.delete();
}
}
@Test
public void testDeviceResourceUpdaterImpl() throws YarnException {
Resource nodeResource = mock(Resource.class);
// Init an plugin
MyPlugin plugin = new MyPlugin();
MyPlugin spyPlugin = spy(plugin);
String resourceName = MyPlugin.RESOURCE_NAME;
// Init an adapter for the plugin
DevicePluginAdapter adapter = new DevicePluginAdapter(
resourceName,
spyPlugin);
adapter.initialize(mock(Context.class));
adapter.getNodeResourceHandlerInstance()
.updateConfiguredResource(nodeResource);
verify(spyPlugin, times(1)).getDevices();
verify(nodeResource, times(1)).setResourceValue(
resourceName, 3);
}
private class MyPlugin implements DevicePlugin {
private final static String RESOURCE_NAME = "cmpA.com/hdwA";
@Override
public DeviceRegisterRequest getRegisterRequestInfo() {
return DeviceRegisterRequest.Builder.newInstance()
.setResourceName(RESOURCE_NAME)
.setPluginVersion("v1.0").build();
}
@Override
public Set<Device> getDevices() {
TreeSet<Device> r = new TreeSet<>();
r.add(Device.Builder.newInstance()
.setId(0)
.setDevPath("/dev/hdwA0")
.setMajorNumber(256)
.setMinorNumber(0)
.setBusID("0000:80:00.0")
.setHealthy(true)
.build());
r.add(Device.Builder.newInstance()
.setId(1)
.setDevPath("/dev/hdwA1")
.setMajorNumber(256)
.setMinorNumber(0)
.setBusID("0000:80:01.0")
.setHealthy(true)
.build());
r.add(Device.Builder.newInstance()
.setId(2)
.setDevPath("/dev/hdwA2")
.setMajorNumber(256)
.setMinorNumber(0)
.setBusID("0000:80:02.0")
.setHealthy(true)
.build());
return r;
}
@Override
public DeviceRuntimeSpec onDevicesAllocated(Set<Device> allocatedDevices,
YarnRuntimeType yarnRuntime) throws Exception {
return null;
}
@Override
public void onDevicesReleased(Set<Device> releasedDevices) {
}
} // MyPlugin
}

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.resource-types</name>
<value>cmp.com/cmp,cmpA.com/hdwA</value>
</property>
</configuration>