diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java index d6edfdd5546..4ace3ae05a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -56,6 +56,17 @@ public class ResourcePluginManager { throws YarnException { Configuration conf = context.getConf(); + String[] plugins = getPluginsFromConfig(conf); + + Map pluginMap = Maps.newHashMap(); + if (plugins != null) { + pluginMap = initializePlugins(context, plugins); + } + + configuredPlugins = Collections.unmodifiableMap(pluginMap); + } + + private String[] getPluginsFromConfig(Configuration conf) { String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS); if (plugins == null || plugins.length == 0) { LOG.info("No Resource plugins found from configuration!"); @@ -63,27 +74,19 @@ public class ResourcePluginManager { LOG.info("Found Resource plugins from configuration: " + Arrays.toString(plugins)); - if (plugins != null) { - Map pluginMap = new HashMap<>(); + return plugins; + } - // Initialize each plugins - for (String resourceName : plugins) { - resourceName = resourceName.trim(); - if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) { - String msg = - "Trying to initialize resource plugin with name=" + resourceName - + ", it is not supported, list of supported plugins:" - + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS); - LOG.error(msg); - throw new YarnException(msg); - } - if (pluginMap.containsKey(resourceName)) { - LOG.warn("Ignoring duplicate Resource plugin definition: " + - resourceName); - continue; - } + private Map initializePlugins( + Context context, String[] plugins) throws YarnException { + Map pluginMap = Maps.newHashMap(); + for (String resourceName : plugins) { + resourceName = resourceName.trim(); + ensurePluginIsSupported(resourceName); + + if (!isPluginDuplicate(pluginMap, resourceName)) { ResourcePlugin plugin = null; if (resourceName.equals(GPU_URI)) { final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer(); @@ -103,11 +106,33 @@ public class ResourcePluginManager { LOG.info("Initialized plugin {}", plugin); pluginMap.put(resourceName, plugin); } + } + return pluginMap; + } - configuredPlugins = Collections.unmodifiableMap(pluginMap); + private void ensurePluginIsSupported(String resourceName) + throws YarnException { + if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) { + String msg = + "Trying to initialize resource plugin with name=" + resourceName + + ", it is not supported, list of supported plugins:" + + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS); + LOG.error(msg); + throw new YarnException(msg); } } + private boolean isPluginDuplicate(Map pluginMap, + String resourceName) { + if (pluginMap.containsKey(resourceName)) { + LOG.warn("Ignoring duplicate Resource plugin definition: " + + resourceName); + return true; + } + return false; + } + + public void cleanup() throws YarnException { for (ResourcePlugin plugin : configuredPlugins.values()) { plugin.cleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java index bcadf76e4bd..3f965b27b35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java @@ -40,10 +40,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.junit.After; import org.junit.Assert; @@ -55,10 +51,11 @@ import java.util.Map; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; + public class TestResourcePluginManager extends NodeManagerTestBase { private NodeManager nm; @@ -99,32 +96,28 @@ public class TestResourcePluginManager extends NodeManagerTestBase { private class CustomizedResourceHandler implements ResourceHandler { @Override - public List bootstrap(Configuration configuration) - throws ResourceHandlerException { + public List bootstrap(Configuration configuration) { return null; } @Override - public List preStart(Container container) - throws ResourceHandlerException { + public List preStart(Container container) { return null; } @Override - public List reacquireContainer(ContainerId containerId) - throws ResourceHandlerException { + public List reacquireContainer( + ContainerId containerId) { return null; } @Override - public List postComplete(ContainerId containerId) - throws ResourceHandlerException { + public List postComplete(ContainerId containerId) { return null; } @Override - public List teardown() - throws ResourceHandlerException { + public List teardown() { return null; } } @@ -149,9 +142,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, - LocalDirsHandlerService diskhandler) { + LocalDirsHandlerService dirsHandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, dirsHandler); } @Override @@ -179,7 +172,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase { YarnConfiguration conf = createNMConfig(); nm.init(conf); - verify(rpm, times(1)).initialize( + verify(rpm).initialize( any(Context.class)); } @@ -200,15 +193,17 @@ public class TestResourcePluginManager extends NodeManagerTestBase { rpm.getNameToPlugins().get("resource1") .getNodeResourceHandlerInstance(); - verify(nodeResourceUpdaterPlugin, times(1)).updateConfiguredResource( - any(Resource.class)); + + verify(nodeResourceUpdaterPlugin) + .updateConfiguredResource(any(Resource.class)); } /* * Make sure ResourcePluginManager is used to initialize ResourceHandlerChain */ @Test(timeout = 30000) - public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Exception { + public void testLinuxContainerExecutorWithResourcePluginsEnabled() + throws IOException { final ResourcePluginManager rpm = stubResourcePluginmanager(); final LinuxContainerExecutor lce = new MyLCE(); @@ -217,8 +212,8 @@ public class TestResourcePluginManager extends NodeManagerTestBase { protected NodeStatusUpdater createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { ((NMContext)context).setResourcePluginManager(rpm); - return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker, - metrics, new BaseResourceTrackerForTest()); + return new BaseNodeStatusUpdaterForTest(context, dispatcher, + healthChecker, metrics, new BaseResourceTrackerForTest()); } @Override @@ -226,9 +221,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase { ContainerExecutor exec, DeletionService del, NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager, - LocalDirsHandlerService diskhandler) { + LocalDirsHandlerService dirsHandler) { return new MyContainerManager(context, exec, del, nodeStatusUpdater, - metrics, diskhandler); + metrics, dirsHandler); } @Override