YARN-7468. Provide means for container network policy control. (Xuan Gong via wangda)

Change-Id: I73678c343f663412917758feef35d8308c216e76
This commit is contained in:
Wangda Tan 2018-01-12 16:14:10 -08:00
parent 53f2768926
commit edcc3a95d5
9 changed files with 1122 additions and 3 deletions

View File

@ -1538,6 +1538,24 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN =
"org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin";
public static final String NM_NETWORK_TAG_PREFIX = NM_PREFIX
+ "network-tagging";
public static final String NM_NETWORK_TAG_HANDLER_ENABLED =
NM_NETWORK_TAG_PREFIX + "-handler.enabled";
public static final boolean DEFAULT_NM_NETWORK_TAG_HANDLER_ENABLED =
false;
public static final String NM_NETWORK_TAG_MAPPING_MANAGER =
NM_NETWORK_TAG_PREFIX + ".mapping-mamager.class";
public static final String NM_NETWORK_TAG_MAPPING_FILE_PATH =
NM_NETWORK_TAG_PREFIX + ".mapping-file.path";
public static final String DEFAULT_NM_NETWORK_RESOURCE_TAG_MAPPING_FILE_PATH =
"";
/** NM Webapp address.**/
public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address";
public static final int DEFAULT_NM_WEBAPP_PORT = 8042;

View File

@ -162,6 +162,12 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX);
configurationPrefixToSkipCompare
.add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
configurationPrefixToSkipCompare.add(
YarnConfiguration.NM_NETWORK_TAG_MAPPING_MANAGER);
configurationPrefixToSkipCompare.add(
YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH);
configurationPrefixToSkipCompare.add(
YarnConfiguration.NM_NETWORK_TAG_PREFIX);
// Ignore all Router Federation variables

View File

@ -0,0 +1,163 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
/**
* The network packet tagging handler implementation.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NetworkPacketTaggingHandlerImpl
implements ResourceHandler {
private static final Logger LOG =
LoggerFactory.getLogger(NetworkPacketTaggingHandlerImpl.class);
private final CGroupsHandler cGroupsHandler;
private Configuration conf;
private NetworkTagMappingManager tagMappingManager;
public NetworkPacketTaggingHandlerImpl(
PrivilegedOperationExecutor privilegedOperationExecutor,
CGroupsHandler cGroupsHandler) {
this.cGroupsHandler = cGroupsHandler;
}
/**
* Bootstrapping network-tagging-handler - mounts net_cls
* controller.
* @param configuration yarn configuration in use
* @return (potentially empty) list of privileged operations to execute.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> bootstrap(Configuration configuration)
throws ResourceHandlerException {
conf = configuration;
cGroupsHandler
.initializeCGroupController(CGroupsHandler.CGroupController.NET_CLS);
this.tagMappingManager = createNetworkTagMappingManager(conf);
this.tagMappingManager.initialize(conf);
return null;
}
/**
* Pre-start hook for network-tagging-handler. A cgroup is created
* and a net_cls classid is generated and written to a cgroup file.
*
* @param container Container being launched
* @return privileged operations for some cgroups operations.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> preStart(Container container)
throws ResourceHandlerException {
String containerIdStr = container.getContainerId().toString();
String classIdStr = this.tagMappingManager.getNetworkTagHexID(
container);
cGroupsHandler.createCGroup(CGroupsHandler.CGroupController
.NET_CLS, containerIdStr);
cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS,
containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr);
//Now create a privileged operation in order to update the tasks file with
//the pid of the running container process (root of process tree). This can
//only be done at the time of launching the container, in a privileged
//executable.
String tasksFile = cGroupsHandler.getPathForCGroupTasks(
CGroupsHandler.CGroupController.NET_CLS, containerIdStr);
String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX)
.append(tasksFile).toString();
List<PrivilegedOperation> ops = new ArrayList<>();
ops.add(new PrivilegedOperation(
PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg));
return ops;
}
/**
* Reacquires state for a container - reads the classid from the cgroup
* being used for the container being reacquired.
* @param containerId if of the container being reacquired.
* @return (potentially empty) list of privileged operations
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> reacquireContainer(ContainerId containerId)
throws ResourceHandlerException {
return null;
}
/**
* Cleanup operation once container is completed - deletes cgroup.
*
* @param containerId of the container that was completed.
* @return a list of PrivilegedOperations.
* @throws ResourceHandlerException
*/
@Override
public List<PrivilegedOperation> postComplete(ContainerId containerId)
throws ResourceHandlerException {
LOG.info("postComplete for container: " + containerId.toString());
cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS,
containerId.toString());
return null;
}
@Override
public List<PrivilegedOperation> teardown()
throws ResourceHandlerException {
if (LOG.isDebugEnabled()) {
LOG.debug("teardown(): Nothing to do");
}
return null;
}
@Private
@VisibleForTesting
public NetworkTagMappingManager createNetworkTagMappingManager(
Configuration conf) {
return NetworkTagMappingManagerFactory.getManager(conf);
}
}

View File

@ -0,0 +1,317 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
/**
* The NetworkTagMapping JsonManager implementation.
*/
public class NetworkTagMappingJsonManager implements NetworkTagMappingManager {
/** Format of the classid that is to be used with the net_cls cgroup. Needs
* to be of the form 0xAAAABBBB */
private static final String FORMAT_NET_CLS_CLASS_ID = "0x[0-9]{8}";
private NetworkTagMapping networkTagMapping = null;
@Override
public void initialize(Configuration conf) {
String mappingJsonFile = conf.get(
YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH,
YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_TAG_MAPPING_FILE_PATH);
if (mappingJsonFile == null || mappingJsonFile.isEmpty()) {
throw new YarnRuntimeException("To use NetworkTagMappingJsonManager,"
+ " we have to set the configuration:" +
YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH);
}
ObjectMapper mapper = new ObjectMapper();
try {
networkTagMapping = mapper.readValue(new File(mappingJsonFile),
NetworkTagMapping.class);
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
if (networkTagMapping == null) {
throw new YarnRuntimeException("Fail to load the specific JSON file: "
+ mappingJsonFile);
}
networkTagMapping.validateUsers();
networkTagMapping.validateGroups();
networkTagMapping.validateDefaultClass();
}
@Override
public String getNetworkTagHexID(Container container) {
String userNetworkTagID = this.networkTagMapping.getUserNetworkTagID(
container.getUser());
if (userNetworkTagID != null) {
return userNetworkTagID;
}
UserGroupInformation userUGI = UserGroupInformation.createRemoteUser(
container.getUser());
List<Group> groups = this.networkTagMapping.getGroups();
for(Group group : groups) {
if (userUGI.getGroups().contains(group.getGroupName())) {
return group.getNetworkTagID();
}
}
return this.networkTagMapping.getDefaultNetworkTagID();
}
/**
* The NetworkTagMapping object.
*
*/
@VisibleForTesting
@Private
public static class NetworkTagMapping {
@JsonProperty("users")
private List<User> users = new LinkedList<>();
@JsonProperty("groups")
private List<Group> groups = new LinkedList<>();
@JsonProperty("default-network-tag-id")
private String defaultNetworkTagID;
@JsonIgnore
private final Pattern pattern = Pattern.compile(FORMAT_NET_CLS_CLASS_ID);
public NetworkTagMapping() {}
public List<User> getUsers() {
return this.users;
}
public void setUsers(List<User> users) {
this.users = users;
}
public void addUser(User user) {
this.users.add(user);
}
public String getUserNetworkTagID(String userName) {
for (User user : users) {
if (userName.equals(user.getUserName())) {
return user.getNetworkTagID();
}
}
return null;
}
public List<Group> getGroups() {
return this.groups;
}
public void setGroups(List<Group> groups) {
this.groups = groups;
}
public void addGroup(Group group) {
this.groups.add(group);
}
public String getDefaultNetworkTagID() {
return this.defaultNetworkTagID;
}
public void setDefaultNetworkTagID(String defaultNetworkTagID) {
this.defaultNetworkTagID = defaultNetworkTagID;
}
private boolean containsUser(String user, List<User> userList) {
for (User existing : userList) {
if (user.equals(existing.getUserName())) {
return true;
}
}
return false;
}
private boolean containsGroup(String group, List<Group> groupList) {
for (Group existing : groupList) {
if (group.equals(existing.getGroupName())) {
return true;
}
}
return false;
}
// Make sure that we do not have the duplicate user names.
// If it exists, we would only keep the user name which is
// set first.
// Also, make sure the class_id set for the user match the
// 0xAAAABBBB format
public void validateUsers() {
List<User> validateUsers = new LinkedList<>();
for(User user : this.users) {
Matcher m = pattern.matcher(user.getNetworkTagID());
if (!m.matches()) {
throw new YarnRuntimeException(
"User-network-tag-id mapping configuraton error. "
+ "The user:" + user.getUserName()
+ " 's configured network-tag-id:" + user.getNetworkTagID()
+ " does not match the '0xAAAABBBB' format.");
}
String[] userSplits = user.getUserName().split(",");
if (userSplits.length > 1) {
String networkTagID = user.getNetworkTagID();
for(String split : userSplits) {
if (!containsUser(split.trim(), validateUsers)) {
User addUsers = new User(split.trim(), networkTagID);
validateUsers.add(addUsers);
}
}
} else {
if (!containsUser(user.getUserName(), validateUsers)) {
validateUsers.add(user);
}
}
}
this.users = validateUsers;
}
// Make sure that we do not have the duplicate group names.
// If it exists, we would only keep the group name which is
// set first.
// Also, make sure the class_id set for the group match the
// 0xAAAABBBB format
public void validateGroups() {
List<Group> validateGroups = new LinkedList<>();
for(Group group : this.groups) {
if (!containsGroup(group.getGroupName(), validateGroups)) {
Matcher m = pattern.matcher(group.getNetworkTagID());
if (!m.matches()) {
throw new YarnRuntimeException(
"Group-network-tag-id mapping configuraton error. "
+ "The group:" + group.getGroupName()
+ " 's configured network-tag-id:" + group.getNetworkTagID()
+ " does not match the '0xAAAABBBB' format.");
}
validateGroups.add(group);
}
}
this.groups = validateGroups;
}
// make sure that we set the value for default-network-tag-id.
// Also, make sure the default class id match the
// 0xAAAABBBB format
public void validateDefaultClass() {
if (getDefaultNetworkTagID() == null ||
getDefaultNetworkTagID().isEmpty()) {
throw new YarnRuntimeException("Missing value for defaultNetworkTagID."
+ " We have to set non-empty value for defaultNetworkTagID");
}
Matcher m = pattern.matcher(getDefaultNetworkTagID());
if (!m.matches()) {
throw new YarnRuntimeException("Configuration error on "
+ "default-network-tag-id. The configured default-network-tag-id:"
+ getDefaultNetworkTagID()
+ " does not match the '0xAAAABBBB' format.");
}
}
}
/**
* The user object.
*
*/
@VisibleForTesting
@Private
public static class User {
@JsonProperty("name")
private String userName;
@JsonProperty("network-tag-id")
private String networkTagID;
public User() {}
public User(String userName, String networkTagID) {
this.setUserName(userName);
this.setNetworkTagID(networkTagID);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getNetworkTagID() {
return networkTagID;
}
public void setNetworkTagID(String networkTagID) {
this.networkTagID = networkTagID;
}
}
/**
* The group object.
*
*/
@VisibleForTesting
@Private
public static class Group {
@JsonProperty("name")
private String groupName;
@JsonProperty("network-tag-id")
private String networkTagID;
public Group() {}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getNetworkTagID() {
return networkTagID;
}
public void setNetworkTagID(String networkTagID) {
this.networkTagID = networkTagID;
}
}
@Private
@VisibleForTesting
public NetworkTagMapping getNetworkTagMapping() {
return this.networkTagMapping;
}
}

View File

@ -0,0 +1,41 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
/**
* Base interface for network tag mapping manager.
*/
public interface NetworkTagMappingManager {
/**
* Initialize the networkTagMapping manager.
*/
void initialize(Configuration conf);
/**
* Get networkTagHexID for the given container.
* @param container
* @return the networkTagID.
*/
String getNetworkTagHexID(Container container);
}

View File

@ -0,0 +1,49 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Use {@code NetworkTagMappingManagerFactory} to get the correct
* {@link NetworkTagMappingManager}.
*
*/
public final class NetworkTagMappingManagerFactory {
private static final Log LOG = LogFactory.getLog(
NetworkTagMappingManagerFactory.class);
private NetworkTagMappingManagerFactory() {}
public static NetworkTagMappingManager getManager(Configuration conf) {
Class<? extends NetworkTagMappingManager> managerClass =
conf.getClass(YarnConfiguration.NM_NETWORK_TAG_MAPPING_MANAGER,
NetworkTagMappingJsonManager.class,
NetworkTagMappingManager.class);
LOG.info("Using NetworkTagMappingManager implementation - "
+ managerClass);
return ReflectionUtils.newInstance(managerClass, conf);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
@ -63,6 +64,8 @@ public class ResourceHandlerModule {
*/
private static volatile TrafficControlBandwidthHandlerImpl
trafficControlBandwidthHandler;
private static volatile NetworkPacketTaggingHandlerImpl
networkPacketTaggingHandlerImpl;
private static volatile CGroupsHandler cGroupsHandler;
private static volatile CGroupsBlkioResourceHandlerImpl
cGroupsBlkioResourceHandler;
@ -131,7 +134,7 @@ public class ResourceHandlerModule {
if (trafficControlBandwidthHandler == null) {
synchronized (OutboundBandwidthResourceHandler.class) {
if (trafficControlBandwidthHandler == null) {
LOG.debug("Creating new traffic control bandwidth handler");
LOG.info("Creating new traffic control bandwidth handler.");
trafficControlBandwidthHandler = new
TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor
.getInstance(conf), getInitializedCGroupsHandler(conf),
@ -147,9 +150,39 @@ public class ResourceHandlerModule {
}
}
public static ResourceHandler getNetworkResourceHandler(Configuration conf)
throws ResourceHandlerException {
boolean useNetworkTagHandler = conf.getBoolean(
YarnConfiguration.NM_NETWORK_TAG_HANDLER_ENABLED,
YarnConfiguration.DEFAULT_NM_NETWORK_TAG_HANDLER_ENABLED);
if (useNetworkTagHandler) {
LOG.info("Using network-tagging-handler.");
return getNetworkTaggingHandler(conf);
} else {
LOG.info("Using traffic control bandwidth handler");
return getTrafficControlBandwidthHandler(conf);
}
}
public static ResourceHandler getNetworkTaggingHandler(Configuration conf)
throws ResourceHandlerException {
if (networkPacketTaggingHandlerImpl == null) {
synchronized (OutboundBandwidthResourceHandler.class) {
if (networkPacketTaggingHandlerImpl == null) {
LOG.info("Creating new network-tagging-handler.");
networkPacketTaggingHandlerImpl =
new NetworkPacketTaggingHandlerImpl(
PrivilegedOperationExecutor.getInstance(conf),
getInitializedCGroupsHandler(conf));
}
}
}
return networkPacketTaggingHandlerImpl;
}
public static OutboundBandwidthResourceHandler
getOutboundBandwidthResourceHandler(Configuration conf)
throws ResourceHandlerException {
throws ResourceHandlerException {
return getTrafficControlBandwidthHandler(conf);
}
@ -213,7 +246,7 @@ public class ResourceHandlerModule {
throws ResourceHandlerException {
ArrayList<ResourceHandler> handlerList = new ArrayList<>();
addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf));
addHandlerIfNotNull(handlerList, getNetworkResourceHandler(conf));
addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf));
addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf));
addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf));

View File

@ -0,0 +1,310 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.Group;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.NetworkTagMapping;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.User;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test NetworkTagMapping Json Manager.
*
*/
public class TestNetworkTagMappingJsonManager {
private Path jsonDirDirPath = new Path("target/json");
private Configuration conf = new YarnConfiguration();
private FileSystem fs;
@Before
public void setUp() throws IOException {
fs = FileSystem.get(conf);
if (fs.exists(jsonDirDirPath)) {
fs.delete(jsonDirDirPath, true);
}
assertTrue(fs.mkdirs(jsonDirDirPath));
}
@After
public void tearDown() throws IOException {
if (fs.exists(jsonDirDirPath)) {
fs.delete(jsonDirDirPath, true);
}
}
@Test (timeout=10000)
public void testNetworkMappingJsonManager() throws Exception {
Path jsonFilePath = new Path(jsonDirDirPath, "test.json");
File jsonFile = new File(jsonFilePath.toString());
NetworkTagMappingJsonManager manager = new NetworkTagMappingJsonManager();
JSONObject json = new JSONObject();
JSONArray userArray = new JSONArray();
// add users
Map<String, String> createdUsers = createUserNetworkTagIDMapping();
for(Entry<String, String> user : createdUsers.entrySet()) {
JSONObject userJson = new JSONObject();
userJson.put("name", user.getKey());
userJson.put("network-tag-id", user.getValue());
userArray.put(userJson);
}
// add duplicate user1
JSONObject duplicateUser1 = new JSONObject();
duplicateUser1.put("name", "user1");
duplicateUser1.put("network-tag-id", "0x88888888");
userArray.put(duplicateUser1);
json.put("users", userArray);
JSONArray groupArray = new JSONArray();
// add groups
Map<String, String> createdGroups = createGroupNetworkTagIDMapping();
for(Entry<String, String> group : createdGroups.entrySet()) {
JSONObject groupJson = new JSONObject();
groupJson.put("name", group.getKey());
groupJson.put("network-tag-id", group.getValue());
groupArray.put(groupJson);
}
// add duplicate group1
JSONObject duplicateGroup1 = new JSONObject();
duplicateGroup1.put("name", "team1");
duplicateGroup1.put("network-tag-id", "0x20002003");
groupArray.put(duplicateGroup1);
json.put("groups", groupArray);
writeJson(jsonFile, json.toString());
conf.set(YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH,
jsonFile.getAbsolutePath());
try {
manager.initialize(conf);
fail("Should get an exception. Becase we did not "
+ "set default-network-tag-id");
} catch (Exception ex) {
// Do Nothing
}
// add default-network-tag-id
json.put("default-network-tag-id", "0x99999999");
// remove previous json file
if (fs.exists(jsonFilePath)) {
fs.delete(jsonFilePath, false);
}
assertFalse(fs.exists(jsonFilePath));
writeJson(jsonFile, json.toString());
manager.initialize(conf);
NetworkTagMapping networkTagMapping = manager.getNetworkTagMapping();
// Verify the default-network-tag-id
assertTrue(networkTagMapping != null);
assertTrue("0x99999999".equals(networkTagMapping
.getDefaultNetworkTagID()));
// Verify the users
List<User> users = networkTagMapping.getUsers();
// The number of users should be 4 which is user1, user2, user3 and user4.
assertTrue(users.size() == 4);
for (int index = 0; index < users.size(); index++) {
String userName = users.get(index).getUserName();
String classId = users.get(index).getNetworkTagID();
assertTrue(createdUsers.containsValue(classId));
String createdUserName = getUserName(createdUsers, classId);
assertTrue(createdUserName.contains(userName));
}
// Verify the groups
List<Group> groups = networkTagMapping.getGroups();
// The number of groups should be 2 which is team1 and team2.
assertTrue(groups.size() == 2);
for (int index = 0; index < groups.size(); index++) {
String groupName = groups.get(index).getGroupName();
String classId = groups.get(index).getNetworkTagID();
assertTrue(createdGroups.containsKey(groupName));
assertTrue(classId.equals(createdGroups.get(groupName)));
}
}
@Test (timeout=10000)
public void testNetworkTagIDMatchPattern() throws Exception {
Path jsonFilePath = new Path(jsonDirDirPath, "test.json");
File jsonFile = new File(jsonFilePath.toString());
NetworkTagMappingJsonManager manager = new NetworkTagMappingJsonManager();
JSONObject json = new JSONObject();
JSONArray userArray = new JSONArray();
JSONObject user1 = new JSONObject();
user1.put("name", "user1");
user1.put("network-tag-id", "1x88888888");
userArray.put(user1);
json.put("users", userArray);
writeJson(jsonFile, json.toString());
conf.set(YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH,
jsonFile.getAbsolutePath());
try {
manager.initialize(conf);
fail("Should get an exception. "
+ "Becase we did not set network-tag-id for user1 correctly");
} catch(Exception ex) {
// should catch exception here
assertTrue(ex.getMessage().contains(
"User-network-tag-id mapping configuraton error."));
}
json.remove("users");
userArray = new JSONArray();
user1 = new JSONObject();
user1.put("name", "user1");
user1.put("network-tag-id", "0x88888888");
userArray.put(user1);
json.put("users", userArray);
JSONArray groupArray = new JSONArray();
JSONObject group1 = new JSONObject();
group1.put("name", "team1");
group1.put("network-tag-id", "0x2000003");
groupArray.put(group1);
json.put("groups", groupArray);
// remove previous json file
if (fs.exists(jsonFilePath)) {
fs.delete(jsonFilePath, false);
}
assertFalse(fs.exists(jsonFilePath));
writeJson(jsonFile, json.toString());
try {
manager.initialize(conf);
fail("Should get an exception. "
+ "Becase we did not set network-tag-id for group1 correctly");
} catch(Exception ex) {
// should catch exception here
assertTrue(ex.getMessage().contains(
"Group-network-tag-id mapping configuraton error."));
}
json.remove("groups");
groupArray = new JSONArray();
group1 = new JSONObject();
group1.put("name", "team1");
group1.put("network-tag-id", "0x20002003");
groupArray.put(group1);
json.put("groups", groupArray);
json.put("default-network-tag-id", "0x99");
// remove previous json file
if (fs.exists(jsonFilePath)) {
fs.delete(jsonFilePath, false);
}
assertFalse(fs.exists(jsonFilePath));
writeJson(jsonFile, json.toString());
try {
manager.initialize(conf);
fail("Should get an exception. "
+ "Becase we did not set default-network-tag-id correctly");
} catch(Exception ex) {
// should catch exception here
assertTrue(ex.getMessage().contains(
"Configuration error on default-network-tag-id."));
}
json.remove("default-network-tag-id");
json.put("default-network-tag-id", "0x99999999");
// remove previous json file
if (fs.exists(jsonFilePath)) {
fs.delete(jsonFilePath, false);
}
assertFalse(fs.exists(jsonFilePath));
writeJson(jsonFile, json.toString());
manager.initialize(conf);
}
private void writeJson(File jsonFile, String jsonStr) throws IOException {
Writer writer = null;
try {
writer = new FileWriter(jsonFile);
writer.write(jsonStr);
} finally {
if (writer != null) {
writer.close();
}
}
}
private Map<String, String> createUserNetworkTagIDMapping() {
Map<String, String> classIdMap = new LinkedHashMap<>();
classIdMap.put("user1", "0x10001001");
classIdMap.put("user2", "0x10001002");
classIdMap.put("user3,user4", "0x10001003");
return classIdMap;
}
private Map<String, String> createGroupNetworkTagIDMapping() {
Map<String, String> classIdMap = new LinkedHashMap<>();
classIdMap.put("team1", "0x20002001");
classIdMap.put("team2", "0x20002002");
return classIdMap;
}
private String getUserName(Map<String, String> userMapping, String classId) {
for (Entry<String, String> o : userMapping.entrySet()) {
if (o.getValue().equals(classId)) {
return o.getKey();
}
}
return null;
}
}

View File

@ -0,0 +1,182 @@
/*
* *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* /
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.List;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Matchers.any;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test NetworkPacketTagging Handler.
*
*/
public class TestNetworkPacketTaggingHandlerImpl {
private static final Logger LOG =
LoggerFactory.getLogger(TestNetworkPacketTaggingHandlerImpl.class);
private static final String TEST_CLASSID = "0x100001";
private static final String TEST_CONTAINER_ID_STR = "container_01";
private static final String TEST_TASKS_FILE = "testTasksFile";
private NetworkTagMappingManager mockManager;
private PrivilegedOperationExecutor privilegedOperationExecutorMock;
private CGroupsHandler cGroupsHandlerMock;
private Configuration conf;
private String tmpPath;
private ContainerId containerIdMock;
private Container containerMock;
@Before
public void setup() {
privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class);
cGroupsHandlerMock = mock(CGroupsHandler.class);
conf = new YarnConfiguration();
tmpPath = new StringBuffer(System.getProperty("test.build.data"))
.append('/').append("hadoop.tmp.dir").toString();
containerIdMock = mock(ContainerId.class);
containerMock = mock(Container.class);
when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR);
//mock returning a mock - an angel died somewhere.
when(containerMock.getContainerId()).thenReturn(containerIdMock);
conf.set("hadoop.tmp.dir", tmpPath);
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
mockManager = mock(NetworkTagMappingManager.class);
doNothing().when(mockManager).initialize(any(Configuration.class));
when(mockManager.getNetworkTagHexID(any(Container.class)))
.thenReturn(TEST_CLASSID);
}
@Test
public void testBootstrap() {
NetworkPacketTaggingHandlerImpl handlerImpl =
createNetworkPacketTaggingHandlerImpl();
try {
handlerImpl.bootstrap(conf);
verify(cGroupsHandlerMock).initializeCGroupController(
eq(CGroupsHandler.CGroupController.NET_CLS));
verifyNoMoreInteractions(cGroupsHandlerMock);
} catch (ResourceHandlerException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected ResourceHandlerException!");
}
}
@Test
public void testLifeCycle() {
NetworkPacketTaggingHandlerImpl handlerImpl =
createNetworkPacketTaggingHandlerImpl();
try {
handlerImpl.bootstrap(conf);
testPreStart(handlerImpl);
testPostComplete(handlerImpl);
} catch (ResourceHandlerException e) {
LOG.error("Unexpected exception: " + e);
Assert.fail("Caught unexpected ResourceHandlerException!");
}
}
private void testPreStart(NetworkPacketTaggingHandlerImpl handlerImpl) throws
ResourceHandlerException {
reset(privilegedOperationExecutorMock);
when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler
.CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn(
TEST_TASKS_FILE);
List<PrivilegedOperation> ops = handlerImpl.preStart(containerMock);
//Ensure that cgroups is created and updated as expected
verify(cGroupsHandlerMock).createCGroup(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
verify(cGroupsHandlerMock).updateCGroupParam(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR),
eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID));
//Now check the privileged operations being returned
//We expect one operations - for adding pid to tasks file
Assert.assertEquals(1, ops.size());
//Verify that the add pid op is correct
PrivilegedOperation addPidOp = ops.get(0);
String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX +
TEST_TASKS_FILE;
List<String> addPidOpArgs = addPidOp.getArguments();
Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP,
addPidOp.getOperationType());
Assert.assertEquals(1, addPidOpArgs.size());
Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0));
}
private void testPostComplete(NetworkPacketTaggingHandlerImpl handlerImpl)
throws ResourceHandlerException {
reset(privilegedOperationExecutorMock);
List<PrivilegedOperation> ops = handlerImpl.postComplete(containerIdMock);
verify(cGroupsHandlerMock).deleteCGroup(
eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR));
//We don't expect any operations to be returned here
Assert.assertNull(ops);
}
private NetworkPacketTaggingHandlerImpl
createNetworkPacketTaggingHandlerImpl() {
return new NetworkPacketTaggingHandlerImpl(
privilegedOperationExecutorMock, cGroupsHandlerMock) {
@Override
public NetworkTagMappingManager createNetworkTagMappingManager(
Configuration conf) {
return mockManager;
}
};
}
@After
public void teardown() {
FileUtil.fullyDelete(new File(tmpPath));
}
}