YARN-9140. Code cleanup in ResourcePluginManager.initialize and in TestResourcePluginManager. Contributed by Peter Bacsko
This commit is contained in:
parent
9a87e74e54
commit
4dc477b606
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -33,7 +34,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -56,6 +56,17 @@ public class ResourcePluginManager {
|
|||
throws YarnException {
|
||||
Configuration conf = context.getConf();
|
||||
|
||||
String[] plugins = getPluginsFromConfig(conf);
|
||||
|
||||
Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
|
||||
if (plugins != null) {
|
||||
pluginMap = initializePlugins(context, plugins);
|
||||
}
|
||||
|
||||
configuredPlugins = Collections.unmodifiableMap(pluginMap);
|
||||
}
|
||||
|
||||
private String[] getPluginsFromConfig(Configuration conf) {
|
||||
String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
|
||||
if (plugins == null || plugins.length == 0) {
|
||||
LOG.info("No Resource plugins found from configuration!");
|
||||
|
@ -63,27 +74,19 @@ public class ResourcePluginManager {
|
|||
LOG.info("Found Resource plugins from configuration: "
|
||||
+ Arrays.toString(plugins));
|
||||
|
||||
if (plugins != null) {
|
||||
Map<String, ResourcePlugin> pluginMap = new HashMap<>();
|
||||
return plugins;
|
||||
}
|
||||
|
||||
// Initialize each plugins
|
||||
for (String resourceName : plugins) {
|
||||
resourceName = resourceName.trim();
|
||||
if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
|
||||
String msg =
|
||||
"Trying to initialize resource plugin with name=" + resourceName
|
||||
+ ", it is not supported, list of supported plugins:"
|
||||
+ StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
|
||||
if (pluginMap.containsKey(resourceName)) {
|
||||
LOG.warn("Ignoring duplicate Resource plugin definition: " +
|
||||
resourceName);
|
||||
continue;
|
||||
}
|
||||
private Map<String, ResourcePlugin> initializePlugins(
|
||||
Context context, String[] plugins) throws YarnException {
|
||||
Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
|
||||
|
||||
for (String resourceName : plugins) {
|
||||
resourceName = resourceName.trim();
|
||||
ensurePluginIsSupported(resourceName);
|
||||
|
||||
if (!isPluginDuplicate(pluginMap, resourceName)) {
|
||||
ResourcePlugin plugin = null;
|
||||
if (resourceName.equals(GPU_URI)) {
|
||||
final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer();
|
||||
|
@ -103,11 +106,33 @@ public class ResourcePluginManager {
|
|||
LOG.info("Initialized plugin {}", plugin);
|
||||
pluginMap.put(resourceName, plugin);
|
||||
}
|
||||
}
|
||||
return pluginMap;
|
||||
}
|
||||
|
||||
configuredPlugins = Collections.unmodifiableMap(pluginMap);
|
||||
private void ensurePluginIsSupported(String resourceName)
|
||||
throws YarnException {
|
||||
if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
|
||||
String msg =
|
||||
"Trying to initialize resource plugin with name=" + resourceName
|
||||
+ ", it is not supported, list of supported plugins:"
|
||||
+ StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isPluginDuplicate(Map<String, ResourcePlugin> pluginMap,
|
||||
String resourceName) {
|
||||
if (pluginMap.containsKey(resourceName)) {
|
||||
LOG.warn("Ignoring duplicate Resource plugin definition: " +
|
||||
resourceName);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
public void cleanup() throws YarnException {
|
||||
for (ResourcePlugin plugin : configuredPlugins.values()) {
|
||||
plugin.cleanup();
|
||||
|
|
|
@ -40,10 +40,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.NodeResourceUpdaterPlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -55,10 +51,11 @@ import java.util.Map;
|
|||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class TestResourcePluginManager extends NodeManagerTestBase {
|
||||
private NodeManager nm;
|
||||
|
||||
|
@ -99,38 +96,33 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
private class CustomizedResourceHandler implements ResourceHandler {
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> bootstrap(Configuration configuration)
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> bootstrap(Configuration configuration) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> preStart(Container container)
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> preStart(Container container) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> reacquireContainer(
|
||||
ContainerId containerId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> updateContainer(Container container)
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> updateContainer(Container container) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> postComplete(ContainerId containerId)
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> postComplete(ContainerId containerId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<PrivilegedOperation> teardown()
|
||||
throws ResourceHandlerException {
|
||||
public List<PrivilegedOperation> teardown() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -155,9 +147,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater,
|
||||
ApplicationACLsManager aclsManager,
|
||||
LocalDirsHandlerService diskhandler) {
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
metrics, diskhandler);
|
||||
metrics, dirsHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,7 +177,7 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
|
||||
YarnConfiguration conf = createNMConfig();
|
||||
nm.init(conf);
|
||||
verify(rpm, times(1)).initialize(
|
||||
verify(rpm).initialize(
|
||||
any(Context.class));
|
||||
}
|
||||
|
||||
|
@ -206,15 +198,17 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
rpm.getNameToPlugins().get("resource1")
|
||||
.getNodeResourceHandlerInstance();
|
||||
|
||||
verify(nodeResourceUpdaterPlugin, times(1)).updateConfiguredResource(
|
||||
any(Resource.class));
|
||||
|
||||
verify(nodeResourceUpdaterPlugin)
|
||||
.updateConfiguredResource(any(Resource.class));
|
||||
}
|
||||
|
||||
/*
|
||||
* Make sure ResourcePluginManager is used to initialize ResourceHandlerChain
|
||||
*/
|
||||
@Test(timeout = 30000)
|
||||
public void testLinuxContainerExecutorWithResourcePluginsEnabled() throws Exception {
|
||||
public void testLinuxContainerExecutorWithResourcePluginsEnabled()
|
||||
throws IOException {
|
||||
final ResourcePluginManager rpm = stubResourcePluginmanager();
|
||||
final LinuxContainerExecutor lce = new MyLCE();
|
||||
|
||||
|
@ -223,8 +217,8 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||
((NMContext)context).setResourcePluginManager(rpm);
|
||||
return new BaseNodeStatusUpdaterForTest(context, dispatcher, healthChecker,
|
||||
metrics, new BaseResourceTrackerForTest());
|
||||
return new BaseNodeStatusUpdaterForTest(context, dispatcher,
|
||||
healthChecker, metrics, new BaseResourceTrackerForTest());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -232,9 +226,9 @@ public class TestResourcePluginManager extends NodeManagerTestBase {
|
|||
ContainerExecutor exec, DeletionService del,
|
||||
NodeStatusUpdater nodeStatusUpdater,
|
||||
ApplicationACLsManager aclsManager,
|
||||
LocalDirsHandlerService diskhandler) {
|
||||
LocalDirsHandlerService dirsHandler) {
|
||||
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
|
||||
metrics, diskhandler);
|
||||
metrics, dirsHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue