YARN-8880. Add configurations for pluggable plugin framework. Contributed by Zhankun Tang.

This commit is contained in:
Weiwei Yang 2018-11-08 12:23:00 +08:00
parent c96cbe8659
commit f8c72d7b3a
4 changed files with 170 additions and 14 deletions

View File

@ -1605,6 +1605,28 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String NM_RESOURCE_PLUGINS = public static final String NM_RESOURCE_PLUGINS =
NM_PREFIX + "resource-plugins"; NM_PREFIX + "resource-plugins";
/**
* This setting controls if pluggable device plugin framework is enabled.
* */
@Private
public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED =
NM_PREFIX + "pluggable-device-framework.enabled";
/**
* The pluggable device plugin framework is disabled by default
* */
@Private
public static final boolean DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED =
false;
/**
* This setting contains vendor plugin class names for
* device plugin framework to load. Split by comma
* */
@Private
public static final String NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES =
NM_PREFIX + "pluggable-device-framework.device-classes";
/** /**
* Prefix for gpu configurations. Work in progress: This configuration * Prefix for gpu configurations. Work in progress: This configuration
* parameter may be changed/removed in the future. * parameter may be changed/removed in the future.
@ -1647,7 +1669,7 @@ public static boolean isAclEnabled(Configuration conf) {
NVIDIA_DOCKER_V1; NVIDIA_DOCKER_V1;
/** /**
* This setting controls end point of nvidia-docker-v1 plugin * This setting controls end point of nvidia-docker-v1 plugin.
*/ */
@Private @Private
public static final String NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT = public static final String NVIDIA_DOCKER_PLUGIN_V1_ENDPOINT =

View File

@ -3770,6 +3770,25 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>
This setting controls if pluggable device framework is enabled.
Disabled by default
</description>
<name>yarn.nodemanager.pluggable-device-framework.enabled</name>
<value>false</value>
</property>
<property>
<description>
Configure vendor device plugin class name here. Comma separated.
The class must be found in CLASSPATH. The pluggable device framework will
load these classes.
</description>
<name>yarn.nodemanager.pluggable-device-framework.device-classes</name>
<value></value>
</property>
<property> <property>
<description> <description>
When yarn.nodemanager.resource.gpu.allowed-gpu-devices=auto specified, When yarn.nodemanager.resource.gpu.allowed-gpu-devices=auto specified,

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.FpgaResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuResourcePlugin;
@ -52,11 +53,10 @@ public class ResourcePluginManager {
public synchronized void initialize(Context context) public synchronized void initialize(Context context)
throws YarnException { throws YarnException {
Configuration conf = context.getConf(); Configuration conf = context.getConf();
Map<String, ResourcePlugin> pluginMap = new HashMap<>();
String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
if (plugins != null) { if (plugins != null) {
Map<String, ResourcePlugin> pluginMap = new HashMap<>();
// Initialize each plugins // Initialize each plugins
for (String resourceName : plugins) { for (String resourceName : plugins) {
resourceName = resourceName.trim(); resourceName = resourceName.trim();
@ -92,8 +92,34 @@ public synchronized void initialize(Context context)
plugin.initialize(context); plugin.initialize(context);
pluginMap.put(resourceName, plugin); pluginMap.put(resourceName, plugin);
} }
}
// Try to load pluggable device plugins
boolean puggableDeviceFrameworkEnabled = conf.getBoolean(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
configuredPlugins = Collections.unmodifiableMap(pluginMap); if (puggableDeviceFrameworkEnabled) {
initializePluggableDevicePlugins(context, conf, pluginMap);
} else {
LOG.info("The pluggable device framework is not enabled."
+ " If you want, please set true to {}",
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
}
configuredPlugins = Collections.unmodifiableMap(pluginMap);
}
public void initializePluggableDevicePlugins(Context context,
Configuration configuration,
Map<String, ResourcePlugin> pluginMap)
throws YarnRuntimeException {
LOG.info("The pluggable device framework enabled," +
"trying to load the vendor plugins");
String[] pluginClassNames = configuration.getStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
if (null == pluginClassNames) {
throw new YarnRuntimeException("Null value found in configuration: "
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES);
} }
} }

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@ -41,12 +42,10 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.HashMap; import java.util.HashMap;
@ -58,10 +57,18 @@
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
public class TestResourcePluginManager extends NodeManagerTestBase { public class TestResourcePluginManager extends NodeManagerTestBase {
private NodeManager nm; private NodeManager nm;
private YarnConfiguration conf;
@Before
public void setup() throws Exception {
this.conf = createNMConfig();
}
ResourcePluginManager stubResourcePluginmanager() { ResourcePluginManager stubResourcePluginmanager() {
// Stub ResourcePluginManager // Stub ResourcePluginManager
final ResourcePluginManager rpm = mock(ResourcePluginManager.class); final ResourcePluginManager rpm = mock(ResourcePluginManager.class);
@ -183,7 +190,6 @@ public void testResourcePluginManagerInitialization() throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager(); final ResourcePluginManager rpm = stubResourcePluginmanager();
nm = new MyMockNM(rpm); nm = new MyMockNM(rpm);
YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
verify(rpm, times(1)).initialize( verify(rpm, times(1)).initialize(
any(Context.class)); any(Context.class));
@ -198,7 +204,6 @@ public void testNodeStatusUpdaterWithResourcePluginsEnabled() throws Exception {
nm = new MyMockNM(rpm); nm = new MyMockNM(rpm);
YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
@ -238,15 +243,14 @@ protected ContainerManagerImpl createContainerManager(Context context,
} }
@Override @Override
protected ContainerExecutor createContainerExecutor(Configuration conf) { protected ContainerExecutor createContainerExecutor(
Configuration configuration) {
((NMContext)this.getNMContext()).setResourcePluginManager(rpm); ((NMContext)this.getNMContext()).setResourcePluginManager(rpm);
lce.setConf(conf); lce.setConf(configuration);
return lce; return lce;
} }
}; };
YarnConfiguration conf = createNMConfig();
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
@ -264,4 +268,89 @@ protected ContainerExecutor createContainerExecutor(Configuration conf) {
} }
Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded); Assert.assertTrue("New ResourceHandler should be added", newHandlerAdded);
} }
// Disabled pluggable framework in configuration.
// We use spy object of real rpm to verify "initializePluggableDevicePlugins"
// because use mock rpm will not working
@Test(timeout = 30000)
public void testInitializationWithPluggableDeviceFrameworkDisabled()
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
false);
nm.init(conf);
nm.start();
verify(rpmSpy, times(1)).initialize(
any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class));
}
// No related configuration set.
@Test(timeout = 30000)
public void testInitializationWithPluggableDeviceFrameworkDisabled2()
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
nm.init(conf);
nm.start();
verify(rpmSpy, times(1)).initialize(
any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class));
}
// Enable framework and configure pluggable device classes
@Test(timeout = 30000)
public void testInitializationWithPluggableDeviceFrameworkEnabled()
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
conf.setStrings(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_DEVICE_CLASSES,
"com.cmp1.hdw1plugin");
nm.init(conf);
nm.start();
verify(rpmSpy, times(1)).initialize(
any(Context.class));
verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class));
}
// Enable pluggable framework, but leave device classes un-configured
// initializePluggableDevicePlugins invoked but it should throw an exception
@Test(timeout = 30000)
public void testInitializationWithPluggableDeviceFrameworkEnabled2() {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
Boolean fail = false;
try {
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
nm.init(conf);
nm.start();
} catch (YarnRuntimeException e) {
fail = true;
} catch (Exception e) {
}
verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class));
Assert.assertTrue(fail);
}
} }