YARN-9140. Code cleanup in ResourcePluginManager.initialize and in TestResourcePluginManager. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2019-08-14 16:58:22 +02:00
parent 3c0382f1b9
commit e5e609384f
2 changed files with 91 additions and 79 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugi
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -68,8 +68,29 @@ public class ResourcePluginManager {
public void initialize(Context context) public void initialize(Context context)
throws YarnException, ClassNotFoundException { throws YarnException, ClassNotFoundException {
Configuration conf = context.getConf(); Configuration conf = context.getConf();
Map<String, ResourcePlugin> pluginMap = new HashMap<>(); String[] plugins = getPluginsFromConfig(conf);
Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
if (plugins != null) {
pluginMap = initializePlugins(context, plugins);
}
// Try to load pluggable device plugins
boolean pluggableDeviceFrameworkEnabled = conf.getBoolean(
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
if (pluggableDeviceFrameworkEnabled) {
initializePluggableDevicePlugins(context, conf, pluginMap);
} else {
LOG.info("The pluggable device framework is not enabled."
+ " If you want, please set true to {}",
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
}
configuredPlugins = Collections.unmodifiableMap(pluginMap);
}
private String[] getPluginsFromConfig(Configuration conf) {
String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
if (plugins == null || plugins.length == 0) { if (plugins == null || plugins.length == 0) {
LOG.info("No Resource plugins found from configuration!"); LOG.info("No Resource plugins found from configuration!");
@ -77,25 +98,18 @@ public class ResourcePluginManager {
LOG.info("Found Resource plugins from configuration: " LOG.info("Found Resource plugins from configuration: "
+ Arrays.toString(plugins)); + Arrays.toString(plugins));
if (plugins != null) { return plugins;
// Initialize each plugins }
for (String resourceName : plugins) {
resourceName = resourceName.trim();
if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
String msg =
"Trying to initialize resource plugin with name=" + resourceName
+ ", it is not supported, list of supported plugins:"
+ StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
LOG.error(msg);
throw new YarnException(msg);
}
if (pluginMap.containsKey(resourceName)) { private Map<String, ResourcePlugin> initializePlugins(
LOG.warn("Ignoring duplicate Resource plugin definition: " + Context context, String[] plugins) throws YarnException {
resourceName); Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
continue;
}
for (String resourceName : plugins) {
resourceName = resourceName.trim();
ensurePluginIsSupported(resourceName);
if (!isPluginDuplicate(pluginMap, resourceName)) {
ResourcePlugin plugin = null; ResourcePlugin plugin = null;
if (resourceName.equals(GPU_URI)) { if (resourceName.equals(GPU_URI)) {
final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer(); final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer();
@ -116,19 +130,29 @@ public class ResourcePluginManager {
pluginMap.put(resourceName, plugin); pluginMap.put(resourceName, plugin);
} }
} }
// Try to load pluggable device plugins return pluginMap;
boolean puggableDeviceFrameworkEnabled = conf.getBoolean( }
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
if (puggableDeviceFrameworkEnabled) { private void ensurePluginIsSupported(String resourceName)
initializePluggableDevicePlugins(context, conf, pluginMap); throws YarnException {
} else { if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
LOG.info("The pluggable device framework is not enabled." String msg =
+ " If you want, please set true to {}", "Trying to initialize resource plugin with name=" + resourceName
YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + ", it is not supported, list of supported plugins:"
+ StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
LOG.error(msg);
throw new YarnException(msg);
} }
configuredPlugins = Collections.unmodifiableMap(pluginMap); }
private boolean isPluginDuplicate(Map<String, ResourcePlugin> pluginMap,
String resourceName) {
if (pluginMap.containsKey(resourceName)) {
LOG.warn("Ignoring duplicate Resource plugin definition: " +
resourceName);
return true;
}
return false;
} }
public void initializePluggableDevicePlugins(Context context, public void initializePluggableDevicePlugins(Context context,

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@ -59,6 +58,7 @@ import java.util.Map;
import java.io.File; import java.io.File;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -124,38 +124,33 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
private class CustomizedResourceHandler implements ResourceHandler { private class CustomizedResourceHandler implements ResourceHandler {
@Override @Override
public List<PrivilegedOperation> bootstrap(Configuration configuration) public List<PrivilegedOperation> bootstrap(Configuration configuration) {
throws ResourceHandlerException {
return null; return null;
} }
@Override @Override
public List<PrivilegedOperation> preStart(Container container) public List<PrivilegedOperation> preStart(Container container) {
throws ResourceHandlerException {
return null; return null;
} }
@Override @Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId) public List<PrivilegedOperation> reacquireContainer(
throws ResourceHandlerException { ContainerId containerId) {
return null; return null;
} }
@Override @Override
public List<PrivilegedOperation> updateContainer(Container container) public List<PrivilegedOperation> updateContainer(Container container) {
throws ResourceHandlerException {
return null; return null;
} }
@Override @Override
public List<PrivilegedOperation> postComplete(ContainerId containerId) public List<PrivilegedOperation> postComplete(ContainerId containerId) {
throws ResourceHandlerException {
return null; return null;
} }
@Override @Override
public List<PrivilegedOperation> teardown() public List<PrivilegedOperation> teardown() {
throws ResourceHandlerException {
return null; return null;
} }
} }
@ -180,9 +175,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
ContainerExecutor exec, DeletionService del, ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager, ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) { LocalDirsHandlerService dirsHandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater, return new MyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, diskhandler); metrics, dirsHandler);
} }
@Override @Override
@ -210,7 +205,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
nm = new MyMockNM(rpm); nm = new MyMockNM(rpm);
nm.init(conf); nm.init(conf);
verify(rpm, times(1)).initialize( verify(rpm).initialize(
any(Context.class)); any(Context.class));
} }
@ -231,7 +226,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
rpm.getNameToPlugins().get("resource1") rpm.getNameToPlugins().get("resource1")
.getNodeResourceHandlerInstance(); .getNodeResourceHandlerInstance();
verify(nodeResourceUpdaterPlugin, times(1)) verify(nodeResourceUpdaterPlugin)
.updateConfiguredResource(any(Resource.class)); .updateConfiguredResource(any(Resource.class));
} }
@ -239,8 +234,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
* Make sure ResourcePluginManager is used to initialize ResourceHandlerChain * Make sure ResourcePluginManager is used to initialize ResourceHandlerChain
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testLinuxContainerExecutorWithResourcePluginsEnabled() public void testLinuxContainerExecutorWithResourcePluginsEnabled() {
throws Exception {
final ResourcePluginManager rpm = stubResourcePluginmanager(); final ResourcePluginManager rpm = stubResourcePluginmanager();
final LinuxContainerExecutor lce = new MyLCE(); final LinuxContainerExecutor lce = new MyLCE();
@ -249,8 +243,8 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
((NMContext)context).setResourcePluginManager(rpm); ((NMContext)context).setResourcePluginManager(rpm);
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, return new BaseNodeStatusUpdaterForTest(context, dispatcher,
metrics, new BaseResourceTrackerForTest()); healthChecker, metrics, new BaseResourceTrackerForTest());
} }
@Override @Override
@ -258,9 +252,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
ContainerExecutor exec, DeletionService del, ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager, ApplicationACLsManager aclsManager,
LocalDirsHandlerService diskhandler) { LocalDirsHandlerService dirsHandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater, return new MyContainerManager(context, exec, del, nodeStatusUpdater,
metrics, diskhandler); metrics, dirsHandler);
} }
@Override @Override
@ -308,10 +302,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
false); false);
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
verify(rpmSpy, times(1)).initialize( verify(rpmSpy).initialize(
any(Context.class)); any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins( verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class)); any(Context.class), any(Configuration.class), anyMap());
} }
// No related configuration set. // No related configuration set.
@ -325,10 +319,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
verify(rpmSpy, times(1)).initialize( verify(rpmSpy).initialize(
any(Context.class)); any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins( verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class)); any(Context.class), any(Configuration.class), anyMap());
} }
// Enable framework and configure pluggable device classes // Enable framework and configure pluggable device classes
@ -347,10 +341,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
FakeTestDevicePlugin1.class.getCanonicalName()); FakeTestDevicePlugin1.class.getCanonicalName());
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
verify(rpmSpy, times(1)).initialize( verify(rpmSpy).initialize(
any(Context.class)); any(Context.class));
verify(rpmSpy, times(1)).initializePluggableDevicePlugins( verify(rpmSpy).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class)); any(Context.class), any(Configuration.class), anyMap());
} }
// Enable pluggable framework, but leave device classes un-configured // Enable pluggable framework, but leave device classes un-configured
@ -362,7 +356,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy); nm = new MyMockNM(rpmSpy);
Boolean fail = false; boolean fail = false;
try { try {
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true); true);
@ -371,18 +365,16 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
nm.start(); nm.start();
} catch (YarnRuntimeException e) { } catch (YarnRuntimeException e) {
fail = true; fail = true;
} catch (Exception e) { } catch (Exception ignored) {
} }
verify(rpmSpy, times(1)).initializePluggableDevicePlugins( verify(rpmSpy).initializePluggableDevicePlugins(
any(Context.class), any(Configuration.class), any(Map.class)); any(Context.class), any(Configuration.class), anyMap());
Assert.assertTrue(fail); Assert.assertTrue(fail);
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testNormalInitializationOfPluggableDeviceClasses() public void testNormalInitializationOfPluggableDeviceClasses() {
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
@ -399,16 +391,15 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
Assert.assertEquals(1, pluginMap.size()); Assert.assertEquals(1, pluginMap.size());
ResourcePlugin rp = pluginMap.get("cmpA.com/hdwA"); ResourcePlugin rp = pluginMap.get("cmpA.com/hdwA");
if (!(rp instanceof DevicePluginAdapter)) { if (!(rp instanceof DevicePluginAdapter)) {
Assert.assertTrue(false); Assert.fail();
} }
verify(rpmSpy, times(1)).checkInterfaceCompatibility( verify(rpmSpy).checkInterfaceCompatibility(
DevicePlugin.class, FakeTestDevicePlugin1.class); DevicePlugin.class, FakeTestDevicePlugin1.class);
} }
// Fail to load a class which doesn't implement interface DevicePlugin // Fail to load a class which doesn't implement interface DevicePlugin
@Test(timeout = 30000) @Test(timeout = 30000)
public void testLoadInvalidPluggableDeviceClasses() public void testLoadInvalidPluggableDeviceClasses() {
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
@ -435,8 +426,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
// Fail to register duplicated resource name. // Fail to register duplicated resource name.
@Test(timeout = 30000) @Test(timeout = 30000)
public void testLoadDuplicateResourceNameDevicePlugin() public void testLoadDuplicateResourceNameDevicePlugin() {
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
@ -469,8 +459,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
* It doesn't implement the "getRegisterRequestInfo" * It doesn't implement the "getRegisterRequestInfo"
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
public void testIncompatibleDevicePlugin() public void testIncompatibleDevicePlugin() {
throws Exception {
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm); ResourcePluginManager rpmSpy = spy(rpm);
@ -515,16 +504,15 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
nm.init(conf); nm.init(conf);
nm.start(); nm.start();
// only 1 plugin has the customized scheduler // only 1 plugin has the customized scheduler
verify(rpmSpy, times(1)).checkInterfaceCompatibility( verify(rpmSpy).checkInterfaceCompatibility(
DevicePlugin.class, FakeTestDevicePlugin1.class); DevicePlugin.class, FakeTestDevicePlugin1.class);
verify(dmmSpy, times(1)).addDevicePluginScheduler( verify(dmmSpy).addDevicePluginScheduler(
any(String.class), any(DevicePluginScheduler.class)); any(String.class), any(DevicePluginScheduler.class));
Assert.assertEquals(1, dmm.getDevicePluginSchedulers().size()); Assert.assertEquals(1, dmm.getDevicePluginSchedulers().size());
} }
@Test(timeout = 30000) @Test(timeout = 30000)
public void testRequestedResourceNameIsConfigured() public void testRequestedResourceNameIsConfigured() {
throws Exception{
ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpm = new ResourcePluginManager();
String resourceName = "a.com/a"; String resourceName = "a.com/a";
Assert.assertFalse(rpm.isConfiguredResourceName(resourceName)); Assert.assertFalse(rpm.isConfiguredResourceName(resourceName));