From e5e609384f68cc45b0c2bfbde0a49426c90017d3 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Wed, 14 Aug 2019 16:58:22 +0200 Subject: [PATCH] YARN-9140. Code cleanup in ResourcePluginManager.initialize and in TestResourcePluginManager. Contributed by Peter Bacsko --- .../resourceplugin/ResourcePluginManager.java | 84 +++++++++++------- .../TestResourcePluginManager.java | 86 ++++++++----------- 2 files changed, 91 insertions(+), 79 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index 1274b64f4f6..84cdd7a4755 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugi import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; @@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.Method; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -68,8 +68,29 @@ public class ResourcePluginManager { public void initialize(Context context) throws YarnException, ClassNotFoundException { Configuration conf = context.getConf(); - Map pluginMap = new HashMap<>(); + String[] plugins = getPluginsFromConfig(conf); + Map pluginMap = Maps.newHashMap(); + if (plugins != null) { + pluginMap = initializePlugins(context, plugins); + } + + // Try to load pluggable device plugins + boolean pluggableDeviceFrameworkEnabled = conf.getBoolean( + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, + YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + + if (pluggableDeviceFrameworkEnabled) { + initializePluggableDevicePlugins(context, conf, pluginMap); + } else { + LOG.info("The pluggable device framework is not enabled." + + " If you want, please set true to {}", + YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + } + configuredPlugins = Collections.unmodifiableMap(pluginMap); + } + + private String[] getPluginsFromConfig(Configuration conf) { String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); if (plugins == null || plugins.length == 0) { LOG.info("No Resource plugins found from configuration!"); @@ -77,25 +98,18 @@ public class ResourcePluginManager { LOG.info("Found Resource plugins from configuration: " + Arrays.toString(plugins)); - if (plugins != null) { - // Initialize each plugins - for (String resourceName : plugins) { - resourceName = resourceName.trim(); - if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) { - String msg = - "Trying to initialize resource plugin with name=" + resourceName - + ", it is not supported, list of supported plugins:" - + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS); - LOG.error(msg); - throw new YarnException(msg); - } + return plugins; + } - if (pluginMap.containsKey(resourceName)) { - LOG.warn("Ignoring duplicate Resource plugin definition: " + - resourceName); - continue; - } + private Map initializePlugins( + Context context, String[] plugins) throws YarnException { + Map pluginMap = Maps.newHashMap(); + for (String resourceName : plugins) { + resourceName = resourceName.trim(); + ensurePluginIsSupported(resourceName); + + if (!isPluginDuplicate(pluginMap, resourceName)) { ResourcePlugin plugin = null; if (resourceName.equals(GPU_URI)) { final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer(); @@ -116,19 +130,29 @@ public class ResourcePluginManager { pluginMap.put(resourceName, plugin); } } - // Try to load pluggable device plugins - boolean puggableDeviceFrameworkEnabled = conf.getBoolean( - YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, - YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + return pluginMap; + } - if (puggableDeviceFrameworkEnabled) { - initializePluggableDevicePlugins(context, conf, pluginMap); - } else { - LOG.info("The pluggable device framework is not enabled." - + " If you want, please set true to {}", - YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED); + private void ensurePluginIsSupported(String resourceName) + throws YarnException { + if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) { + String msg = + "Trying to initialize resource plugin with name=" + resourceName + + ", it is not supported, list of supported plugins:" + + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS); + LOG.error(msg); + throw new YarnException(msg); } - configuredPlugins = Collections.unmodifiableMap(pluginMap); + } + + private boolean isPluginDuplicate(Map pluginMap, + String resourceName) { + if (pluginMap.containsKey(resourceName)) { + LOG.warn("Ignoring duplicate Resource plugin definition: " + + resourceName); + return true; + } + return false; } public void initializePluggableDevicePlugins(Context context, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index 87ce575f0e1..a41edbad598 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.util.resource.ResourceUtils; @@ -59,6 +58,7 @@ import java.util.Map; import java.io.File; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -124,38 +124,33 @@ public class TestResourcePluginManager extends NodeManagerTestBase { private class CustomizedResourceHandler implements ResourceHandler { @Override - public List bootstrap(Configuration configuration) - throws ResourceHandlerException { + public List bootstrap(Configuration configuration) { return null; } @Override - public List preStart(Container container) - throws ResourceHandlerException { + public List preStart(Container container) { return null; } @Override - public List reacquireContainer(ContainerId containerId) - throws ResourceHandlerException { + public List reacquireContainer( + ContainerId containerId) { return null; } @Override - public List updateContainer(Container container) - throws ResourceHandlerException { + public List updateContainer(Container container) { return null; } @Override - public List postComplete(ContainerId containerId) - throws ResourceHandlerException { + public List postComplete(ContainerId containerId) { return null; } @Override - public List teardown() - throws ResourceHandlerException { + public List teardown() { return null; } } @@ -180,9 +175,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, - LocalDirsHandlerService diskhandler) { + LocalDirsHandlerService dirsHandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, dirsHandler); } @Override @@ -210,7 +205,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { nm = new MyMockNM(rpm); nm.init(conf); - verify(rpm, times(1)).initialize( + verify(rpm).initialize( any(Context.class)); } @@ -231,7 +226,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { rpm.getNameToPlugins().get("resource1") .getNodeResourceHandlerInstance(); - verify(nodeResourceUpdaterPlugin, times(1)) + verify(nodeResourceUpdaterPlugin) .updateConfiguredResource(any(Resource.class)); } @@ -239,8 +234,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { * Make sure ResourcePluginManager is used to initialize ResourceHandlerChain */ @Test(timeout = 30000) - public void testLinuxContainerExecutorWithResourcePluginsEnabled() - throws Exception { + public void testLinuxContainerExecutorWithResourcePluginsEnabled() { final ResourcePluginManager rpm = stubResourcePluginmanager(); final LinuxContainerExecutor lce = new MyLCE(); @@ -249,8 +243,8 @@ public class TestResourcePluginManager extends NodeManagerTestBase { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { ((NMContext)context).setResourcePluginManager(rpm); - return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, - metrics, new BaseResourceTrackerForTest()); + return new BaseNodeStatusUpdaterForTest(context, dispatcher, + healthChecker, metrics, new BaseResourceTrackerForTest()); } @Override @@ -258,9 +252,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, - LocalDirsHandlerService diskhandler) { + LocalDirsHandlerService dirsHandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, dirsHandler); } @Override @@ -308,10 +302,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase { false); nm.init(conf); nm.start(); - verify(rpmSpy, times(1)).initialize( + verify(rpmSpy).initialize( any(Context.class)); verify(rpmSpy, times(0)).initializePluggableDevicePlugins( - any(Context.class), any(Configuration.class), any(Map.class)); + any(Context.class), any(Configuration.class), anyMap()); } // No related configuration set. @@ -325,10 +319,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase { nm.init(conf); nm.start(); - verify(rpmSpy, times(1)).initialize( + verify(rpmSpy).initialize( any(Context.class)); verify(rpmSpy, times(0)).initializePluggableDevicePlugins( - any(Context.class), any(Configuration.class), any(Map.class)); + any(Context.class), any(Configuration.class), anyMap()); } // Enable framework and configure pluggable device classes @@ -347,10 +341,10 @@ public class TestResourcePluginManager extends NodeManagerTestBase { FakeTestDevicePlugin1.class.getCanonicalName()); nm.init(conf); nm.start(); - verify(rpmSpy, times(1)).initialize( + verify(rpmSpy).initialize( any(Context.class)); - verify(rpmSpy, times(1)).initializePluggableDevicePlugins( - any(Context.class), any(Configuration.class), any(Map.class)); + verify(rpmSpy).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), anyMap()); } // Enable pluggable framework, but leave device classes un-configured @@ -362,7 +356,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { ResourcePluginManager rpmSpy = spy(rpm); nm = new MyMockNM(rpmSpy); - Boolean fail = false; + boolean fail = false; try { conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED, true); @@ -371,18 +365,16 @@ public class TestResourcePluginManager extends NodeManagerTestBase { nm.start(); } catch (YarnRuntimeException e) { fail = true; - } catch (Exception e) { + } catch (Exception ignored) { } - verify(rpmSpy, times(1)).initializePluggableDevicePlugins( - any(Context.class), any(Configuration.class), any(Map.class)); + verify(rpmSpy).initializePluggableDevicePlugins( + any(Context.class), any(Configuration.class), anyMap()); Assert.assertTrue(fail); } @Test(timeout = 30000) - public void testNormalInitializationOfPluggableDeviceClasses() - throws Exception { - + public void testNormalInitializationOfPluggableDeviceClasses() { ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpmSpy = spy(rpm); @@ -399,16 +391,15 @@ public class TestResourcePluginManager extends NodeManagerTestBase { Assert.assertEquals(1, pluginMap.size()); ResourcePlugin rp = pluginMap.get("cmpA.com/hdwA"); if (!(rp instanceof DevicePluginAdapter)) { - Assert.assertTrue(false); + Assert.fail(); } - verify(rpmSpy, times(1)).checkInterfaceCompatibility( + verify(rpmSpy).checkInterfaceCompatibility( DevicePlugin.class, FakeTestDevicePlugin1.class); } // Fail to load a class which doesn't implement interface DevicePlugin @Test(timeout = 30000) - public void testLoadInvalidPluggableDeviceClasses() - throws Exception{ + public void testLoadInvalidPluggableDeviceClasses() { ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpmSpy = spy(rpm); @@ -435,8 +426,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { // Fail to register duplicated resource name. @Test(timeout = 30000) - public void testLoadDuplicateResourceNameDevicePlugin() - throws Exception{ + public void testLoadDuplicateResourceNameDevicePlugin() { ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpmSpy = spy(rpm); @@ -469,8 +459,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { * It doesn't implement the "getRegisterRequestInfo" */ @Test(timeout = 30000) - public void testIncompatibleDevicePlugin() - throws Exception { + public void testIncompatibleDevicePlugin() { ResourcePluginManager rpm = new ResourcePluginManager(); ResourcePluginManager rpmSpy = spy(rpm); @@ -515,16 +504,15 @@ public class TestResourcePluginManager extends NodeManagerTestBase { nm.init(conf); nm.start(); // only 1 plugin has the customized scheduler - verify(rpmSpy, times(1)).checkInterfaceCompatibility( + verify(rpmSpy).checkInterfaceCompatibility( DevicePlugin.class, FakeTestDevicePlugin1.class); - verify(dmmSpy, times(1)).addDevicePluginScheduler( + verify(dmmSpy).addDevicePluginScheduler( any(String.class), any(DevicePluginScheduler.class)); Assert.assertEquals(1, dmm.getDevicePluginSchedulers().size()); } @Test(timeout = 30000) - public void testRequestedResourceNameIsConfigured() - throws Exception{ + public void testRequestedResourceNameIsConfigured() { ResourcePluginManager rpm = new ResourcePluginManager(); String resourceName = "a.com/a"; Assert.assertFalse(rpm.isConfiguredResourceName(resourceName));