diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index fdb58598abd..c892cfb1510 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1538,6 +1538,24 @@ public class YarnConfiguration extends Configuration { public static final String DEFAULT_NM_FPGA_VENDOR_PLUGIN = "org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.fpga.IntelFpgaOpenclPlugin"; + public static final String NM_NETWORK_TAG_PREFIX = NM_PREFIX + + "network-tagging"; + + public static final String NM_NETWORK_TAG_HANDLER_ENABLED = + NM_NETWORK_TAG_PREFIX + "-handler.enabled"; + + public static final boolean DEFAULT_NM_NETWORK_TAG_HANDLER_ENABLED = + false; + + public static final String NM_NETWORK_TAG_MAPPING_MANAGER = + NM_NETWORK_TAG_PREFIX + ".mapping-mamager.class"; + + public static final String NM_NETWORK_TAG_MAPPING_FILE_PATH = + NM_NETWORK_TAG_PREFIX + ".mapping-file.path"; + + public static final String DEFAULT_NM_NETWORK_RESOURCE_TAG_MAPPING_FILE_PATH = + ""; + /** NM Webapp address.**/ public static final String NM_WEBAPP_ADDRESS = NM_PREFIX + "webapp.address"; public static final int DEFAULT_NM_WEBAPP_PORT = 8042; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index b8fbca66936..3976d2df49e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -162,6 +162,12 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration.NM_MEMORY_RESOURCE_PREFIX); configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); + configurationPrefixToSkipCompare.add( + YarnConfiguration.NM_NETWORK_TAG_MAPPING_MANAGER); + configurationPrefixToSkipCompare.add( + YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH); + configurationPrefixToSkipCompare.add( + YarnConfiguration.NM_NETWORK_TAG_PREFIX); // Ignore all Router Federation variables diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java new file mode 100644 index 00000000000..1580e2c4ed7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkPacketTaggingHandlerImpl.java @@ -0,0 +1,163 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; + +/** + * The network packet tagging handler implementation. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class NetworkPacketTaggingHandlerImpl + implements ResourceHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(NetworkPacketTaggingHandlerImpl.class); + + private final CGroupsHandler cGroupsHandler; + + private Configuration conf; + private NetworkTagMappingManager tagMappingManager; + + public NetworkPacketTaggingHandlerImpl( + PrivilegedOperationExecutor privilegedOperationExecutor, + CGroupsHandler cGroupsHandler) { + this.cGroupsHandler = cGroupsHandler; + } + + /** + * Bootstrapping network-tagging-handler - mounts net_cls + * controller. + * @param configuration yarn configuration in use + * @return (potentially empty) list of privileged operations to execute. + * @throws ResourceHandlerException + */ + + @Override + public List bootstrap(Configuration configuration) + throws ResourceHandlerException { + conf = configuration; + + cGroupsHandler + .initializeCGroupController(CGroupsHandler.CGroupController.NET_CLS); + + this.tagMappingManager = createNetworkTagMappingManager(conf); + this.tagMappingManager.initialize(conf); + return null; + } + + /** + * Pre-start hook for network-tagging-handler. A cgroup is created + * and a net_cls classid is generated and written to a cgroup file. + * + * @param container Container being launched + * @return privileged operations for some cgroups operations. + * @throws ResourceHandlerException + */ + @Override + public List preStart(Container container) + throws ResourceHandlerException { + String containerIdStr = container.getContainerId().toString(); + String classIdStr = this.tagMappingManager.getNetworkTagHexID( + container); + + cGroupsHandler.createCGroup(CGroupsHandler.CGroupController + .NET_CLS, containerIdStr); + cGroupsHandler.updateCGroupParam(CGroupsHandler.CGroupController.NET_CLS, + containerIdStr, CGroupsHandler.CGROUP_PARAM_CLASSID, classIdStr); + + //Now create a privileged operation in order to update the tasks file with + //the pid of the running container process (root of process tree). This can + //only be done at the time of launching the container, in a privileged + //executable. + String tasksFile = cGroupsHandler.getPathForCGroupTasks( + CGroupsHandler.CGroupController.NET_CLS, containerIdStr); + String opArg = new StringBuffer(PrivilegedOperation.CGROUP_ARG_PREFIX) + .append(tasksFile).toString(); + List ops = new ArrayList<>(); + + ops.add(new PrivilegedOperation( + PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, opArg)); + + return ops; + } + + /** + * Reacquires state for a container - reads the classid from the cgroup + * being used for the container being reacquired. + * @param containerId if of the container being reacquired. + * @return (potentially empty) list of privileged operations + * @throws ResourceHandlerException + */ + + @Override + public List reacquireContainer(ContainerId containerId) + throws ResourceHandlerException { + return null; + } + + /** + * Cleanup operation once container is completed - deletes cgroup. + * + * @param containerId of the container that was completed. + * @return a list of PrivilegedOperations. + * @throws ResourceHandlerException + */ + @Override + public List postComplete(ContainerId containerId) + throws ResourceHandlerException { + LOG.info("postComplete for container: " + containerId.toString()); + cGroupsHandler.deleteCGroup(CGroupsHandler.CGroupController.NET_CLS, + containerId.toString()); + return null; + } + + @Override + public List teardown() + throws ResourceHandlerException { + if (LOG.isDebugEnabled()) { + LOG.debug("teardown(): Nothing to do"); + } + + return null; + } + + @Private + @VisibleForTesting + public NetworkTagMappingManager createNetworkTagMappingManager( + Configuration conf) { + return NetworkTagMappingManagerFactory.getManager(conf); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingJsonManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingJsonManager.java new file mode 100644 index 00000000000..eba0ce1deeb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingJsonManager.java @@ -0,0 +1,317 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.util.LinkedList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * The NetworkTagMapping JsonManager implementation. + */ +public class NetworkTagMappingJsonManager implements NetworkTagMappingManager { + + /** Format of the classid that is to be used with the net_cls cgroup. Needs + * to be of the form 0xAAAABBBB */ + private static final String FORMAT_NET_CLS_CLASS_ID = "0x[0-9]{8}"; + + private NetworkTagMapping networkTagMapping = null; + + @Override + public void initialize(Configuration conf) { + String mappingJsonFile = conf.get( + YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH, + YarnConfiguration.DEFAULT_NM_NETWORK_RESOURCE_TAG_MAPPING_FILE_PATH); + if (mappingJsonFile == null || mappingJsonFile.isEmpty()) { + throw new YarnRuntimeException("To use NetworkTagMappingJsonManager," + + " we have to set the configuration:" + + YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH); + } + ObjectMapper mapper = new ObjectMapper(); + try { + networkTagMapping = mapper.readValue(new File(mappingJsonFile), + NetworkTagMapping.class); + } catch (Exception e) { + throw new YarnRuntimeException(e); + } + + if (networkTagMapping == null) { + throw new YarnRuntimeException("Fail to load the specific JSON file: " + + mappingJsonFile); + } + + networkTagMapping.validateUsers(); + networkTagMapping.validateGroups(); + networkTagMapping.validateDefaultClass(); + } + + @Override + public String getNetworkTagHexID(Container container) { + String userNetworkTagID = this.networkTagMapping.getUserNetworkTagID( + container.getUser()); + if (userNetworkTagID != null) { + return userNetworkTagID; + } + + UserGroupInformation userUGI = UserGroupInformation.createRemoteUser( + container.getUser()); + List groups = this.networkTagMapping.getGroups(); + for(Group group : groups) { + if (userUGI.getGroups().contains(group.getGroupName())) { + return group.getNetworkTagID(); + } + } + + return this.networkTagMapping.getDefaultNetworkTagID(); + } + + /** + * The NetworkTagMapping object. + * + */ + @VisibleForTesting + @Private + public static class NetworkTagMapping { + @JsonProperty("users") + private List users = new LinkedList<>(); + @JsonProperty("groups") + private List groups = new LinkedList<>(); + @JsonProperty("default-network-tag-id") + private String defaultNetworkTagID; + @JsonIgnore + private final Pattern pattern = Pattern.compile(FORMAT_NET_CLS_CLASS_ID); + + public NetworkTagMapping() {} + + public List getUsers() { + return this.users; + } + + public void setUsers(List users) { + this.users = users; + } + + public void addUser(User user) { + this.users.add(user); + } + + public String getUserNetworkTagID(String userName) { + for (User user : users) { + if (userName.equals(user.getUserName())) { + return user.getNetworkTagID(); + } + } + return null; + } + + public List getGroups() { + return this.groups; + } + + public void setGroups(List groups) { + this.groups = groups; + } + + public void addGroup(Group group) { + this.groups.add(group); + } + + public String getDefaultNetworkTagID() { + return this.defaultNetworkTagID; + } + + public void setDefaultNetworkTagID(String defaultNetworkTagID) { + this.defaultNetworkTagID = defaultNetworkTagID; + } + + private boolean containsUser(String user, List userList) { + for (User existing : userList) { + if (user.equals(existing.getUserName())) { + return true; + } + } + return false; + } + + private boolean containsGroup(String group, List groupList) { + for (Group existing : groupList) { + if (group.equals(existing.getGroupName())) { + return true; + } + } + return false; + } + + // Make sure that we do not have the duplicate user names. + // If it exists, we would only keep the user name which is + // set first. + // Also, make sure the class_id set for the user match the + // 0xAAAABBBB format + public void validateUsers() { + List validateUsers = new LinkedList<>(); + for(User user : this.users) { + Matcher m = pattern.matcher(user.getNetworkTagID()); + if (!m.matches()) { + throw new YarnRuntimeException( + "User-network-tag-id mapping configuraton error. " + + "The user:" + user.getUserName() + + " 's configured network-tag-id:" + user.getNetworkTagID() + + " does not match the '0xAAAABBBB' format."); + } + String[] userSplits = user.getUserName().split(","); + if (userSplits.length > 1) { + String networkTagID = user.getNetworkTagID(); + for(String split : userSplits) { + if (!containsUser(split.trim(), validateUsers)) { + User addUsers = new User(split.trim(), networkTagID); + validateUsers.add(addUsers); + } + } + } else { + if (!containsUser(user.getUserName(), validateUsers)) { + validateUsers.add(user); + } + } + } + this.users = validateUsers; + } + + // Make sure that we do not have the duplicate group names. + // If it exists, we would only keep the group name which is + // set first. + // Also, make sure the class_id set for the group match the + // 0xAAAABBBB format + public void validateGroups() { + List validateGroups = new LinkedList<>(); + for(Group group : this.groups) { + if (!containsGroup(group.getGroupName(), validateGroups)) { + Matcher m = pattern.matcher(group.getNetworkTagID()); + if (!m.matches()) { + throw new YarnRuntimeException( + "Group-network-tag-id mapping configuraton error. " + + "The group:" + group.getGroupName() + + " 's configured network-tag-id:" + group.getNetworkTagID() + + " does not match the '0xAAAABBBB' format."); + } + validateGroups.add(group); + } + } + this.groups = validateGroups; + } + + // make sure that we set the value for default-network-tag-id. + // Also, make sure the default class id match the + // 0xAAAABBBB format + public void validateDefaultClass() { + if (getDefaultNetworkTagID() == null || + getDefaultNetworkTagID().isEmpty()) { + throw new YarnRuntimeException("Missing value for defaultNetworkTagID." + + " We have to set non-empty value for defaultNetworkTagID"); + } + Matcher m = pattern.matcher(getDefaultNetworkTagID()); + if (!m.matches()) { + throw new YarnRuntimeException("Configuration error on " + + "default-network-tag-id. The configured default-network-tag-id:" + + getDefaultNetworkTagID() + + " does not match the '0xAAAABBBB' format."); + } + } + } + + /** + * The user object. + * + */ + @VisibleForTesting + @Private + public static class User { + @JsonProperty("name") + private String userName; + @JsonProperty("network-tag-id") + private String networkTagID; + + public User() {} + + public User(String userName, String networkTagID) { + this.setUserName(userName); + this.setNetworkTagID(networkTagID); + } + + public String getUserName() { + return userName; + } + public void setUserName(String userName) { + this.userName = userName; + } + public String getNetworkTagID() { + return networkTagID; + } + public void setNetworkTagID(String networkTagID) { + this.networkTagID = networkTagID; + } + } + + /** + * The group object. + * + */ + @VisibleForTesting + @Private + public static class Group { + @JsonProperty("name") + private String groupName; + @JsonProperty("network-tag-id") + private String networkTagID; + + public Group() {} + + public String getGroupName() { + return groupName; + } + public void setGroupName(String groupName) { + this.groupName = groupName; + } + + public String getNetworkTagID() { + return networkTagID; + } + public void setNetworkTagID(String networkTagID) { + this.networkTagID = networkTagID; + } + } + + @Private + @VisibleForTesting + public NetworkTagMapping getNetworkTagMapping() { + return this.networkTagMapping; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManager.java new file mode 100644 index 00000000000..124abde0e50 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManager.java @@ -0,0 +1,41 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; + +/** + * Base interface for network tag mapping manager. + */ +public interface NetworkTagMappingManager { + + /** + * Initialize the networkTagMapping manager. + */ + void initialize(Configuration conf); + + /** + * Get networkTagHexID for the given container. + * @param container + * @return the networkTagID. + */ + String getNetworkTagHexID(Container container); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManagerFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManagerFactory.java new file mode 100644 index 00000000000..17e2e21744d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/NetworkTagMappingManagerFactory.java @@ -0,0 +1,49 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Use {@code NetworkTagMappingManagerFactory} to get the correct + * {@link NetworkTagMappingManager}. + * + */ +public final class NetworkTagMappingManagerFactory { + private static final Log LOG = LogFactory.getLog( + NetworkTagMappingManagerFactory.class); + + private NetworkTagMappingManagerFactory() {} + + public static NetworkTagMappingManager getManager(Configuration conf) { + Class managerClass = + conf.getClass(YarnConfiguration.NM_NETWORK_TAG_MAPPING_MANAGER, + NetworkTagMappingJsonManager.class, + NetworkTagMappingManager.class); + LOG.info("Using NetworkTagMappingManager implementation - " + + managerClass); + return ReflectionUtils.newInstance(managerClass, conf); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java index ce850ab3b7c..921f92090b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/ResourceHandlerModule.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin; @@ -63,6 +64,8 @@ public class ResourceHandlerModule { */ private static volatile TrafficControlBandwidthHandlerImpl trafficControlBandwidthHandler; + private static volatile NetworkPacketTaggingHandlerImpl + networkPacketTaggingHandlerImpl; private static volatile CGroupsHandler cGroupsHandler; private static volatile CGroupsBlkioResourceHandlerImpl cGroupsBlkioResourceHandler; @@ -131,7 +134,7 @@ public class ResourceHandlerModule { if (trafficControlBandwidthHandler == null) { synchronized (OutboundBandwidthResourceHandler.class) { if (trafficControlBandwidthHandler == null) { - LOG.debug("Creating new traffic control bandwidth handler"); + LOG.info("Creating new traffic control bandwidth handler."); trafficControlBandwidthHandler = new TrafficControlBandwidthHandlerImpl(PrivilegedOperationExecutor .getInstance(conf), getInitializedCGroupsHandler(conf), @@ -147,9 +150,39 @@ public class ResourceHandlerModule { } } + public static ResourceHandler getNetworkResourceHandler(Configuration conf) + throws ResourceHandlerException { + boolean useNetworkTagHandler = conf.getBoolean( + YarnConfiguration.NM_NETWORK_TAG_HANDLER_ENABLED, + YarnConfiguration.DEFAULT_NM_NETWORK_TAG_HANDLER_ENABLED); + if (useNetworkTagHandler) { + LOG.info("Using network-tagging-handler."); + return getNetworkTaggingHandler(conf); + } else { + LOG.info("Using traffic control bandwidth handler"); + return getTrafficControlBandwidthHandler(conf); + } + } + + public static ResourceHandler getNetworkTaggingHandler(Configuration conf) + throws ResourceHandlerException { + if (networkPacketTaggingHandlerImpl == null) { + synchronized (OutboundBandwidthResourceHandler.class) { + if (networkPacketTaggingHandlerImpl == null) { + LOG.info("Creating new network-tagging-handler."); + networkPacketTaggingHandlerImpl = + new NetworkPacketTaggingHandlerImpl( + PrivilegedOperationExecutor.getInstance(conf), + getInitializedCGroupsHandler(conf)); + } + } + } + return networkPacketTaggingHandlerImpl; + } + public static OutboundBandwidthResourceHandler getOutboundBandwidthResourceHandler(Configuration conf) - throws ResourceHandlerException { + throws ResourceHandlerException { return getTrafficControlBandwidthHandler(conf); } @@ -213,7 +246,7 @@ public class ResourceHandlerModule { throws ResourceHandlerException { ArrayList handlerList = new ArrayList<>(); - addHandlerIfNotNull(handlerList, getOutboundBandwidthResourceHandler(conf)); + addHandlerIfNotNull(handlerList, getNetworkResourceHandler(conf)); addHandlerIfNotNull(handlerList, getDiskResourceHandler(conf)); addHandlerIfNotNull(handlerList, getMemoryResourceHandler(conf)); addHandlerIfNotNull(handlerList, getCGroupsCpuResourceHandler(conf)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNetworkTagMappingJsonManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNetworkTagMappingJsonManager.java new file mode 100644 index 00000000000..bcd565644d1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNetworkTagMappingJsonManager.java @@ -0,0 +1,310 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.Group; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.NetworkTagMapping; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.NetworkTagMappingJsonManager.User; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test NetworkTagMapping Json Manager. + * + */ +public class TestNetworkTagMappingJsonManager { + private Path jsonDirDirPath = new Path("target/json"); + private Configuration conf = new YarnConfiguration(); + private FileSystem fs; + + @Before + public void setUp() throws IOException { + fs = FileSystem.get(conf); + if (fs.exists(jsonDirDirPath)) { + fs.delete(jsonDirDirPath, true); + } + assertTrue(fs.mkdirs(jsonDirDirPath)); + } + + @After + public void tearDown() throws IOException { + if (fs.exists(jsonDirDirPath)) { + fs.delete(jsonDirDirPath, true); + } + } + + @Test (timeout=10000) + public void testNetworkMappingJsonManager() throws Exception { + Path jsonFilePath = new Path(jsonDirDirPath, "test.json"); + File jsonFile = new File(jsonFilePath.toString()); + + NetworkTagMappingJsonManager manager = new NetworkTagMappingJsonManager(); + + JSONObject json = new JSONObject(); + + JSONArray userArray = new JSONArray(); + // add users + Map createdUsers = createUserNetworkTagIDMapping(); + for(Entry user : createdUsers.entrySet()) { + JSONObject userJson = new JSONObject(); + userJson.put("name", user.getKey()); + userJson.put("network-tag-id", user.getValue()); + userArray.put(userJson); + } + // add duplicate user1 + JSONObject duplicateUser1 = new JSONObject(); + duplicateUser1.put("name", "user1"); + duplicateUser1.put("network-tag-id", "0x88888888"); + userArray.put(duplicateUser1); + json.put("users", userArray); + + JSONArray groupArray = new JSONArray(); + // add groups + Map createdGroups = createGroupNetworkTagIDMapping(); + for(Entry group : createdGroups.entrySet()) { + JSONObject groupJson = new JSONObject(); + groupJson.put("name", group.getKey()); + groupJson.put("network-tag-id", group.getValue()); + groupArray.put(groupJson); + } + // add duplicate group1 + JSONObject duplicateGroup1 = new JSONObject(); + duplicateGroup1.put("name", "team1"); + duplicateGroup1.put("network-tag-id", "0x20002003"); + groupArray.put(duplicateGroup1); + json.put("groups", groupArray); + + writeJson(jsonFile, json.toString()); + + conf.set(YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH, + jsonFile.getAbsolutePath()); + + try { + manager.initialize(conf); + fail("Should get an exception. Becase we did not " + + "set default-network-tag-id"); + } catch (Exception ex) { + // Do Nothing + } + + // add default-network-tag-id + json.put("default-network-tag-id", "0x99999999"); + + // remove previous json file + if (fs.exists(jsonFilePath)) { + fs.delete(jsonFilePath, false); + } + assertFalse(fs.exists(jsonFilePath)); + + writeJson(jsonFile, json.toString()); + + manager.initialize(conf); + NetworkTagMapping networkTagMapping = manager.getNetworkTagMapping(); + // Verify the default-network-tag-id + assertTrue(networkTagMapping != null); + assertTrue("0x99999999".equals(networkTagMapping + .getDefaultNetworkTagID())); + // Verify the users + List users = networkTagMapping.getUsers(); + // The number of users should be 4 which is user1, user2, user3 and user4. + assertTrue(users.size() == 4); + for (int index = 0; index < users.size(); index++) { + String userName = users.get(index).getUserName(); + String classId = users.get(index).getNetworkTagID(); + assertTrue(createdUsers.containsValue(classId)); + String createdUserName = getUserName(createdUsers, classId); + assertTrue(createdUserName.contains(userName)); + } + + // Verify the groups + List groups = networkTagMapping.getGroups(); + // The number of groups should be 2 which is team1 and team2. + assertTrue(groups.size() == 2); + for (int index = 0; index < groups.size(); index++) { + String groupName = groups.get(index).getGroupName(); + String classId = groups.get(index).getNetworkTagID(); + assertTrue(createdGroups.containsKey(groupName)); + assertTrue(classId.equals(createdGroups.get(groupName))); + } + } + + @Test (timeout=10000) + public void testNetworkTagIDMatchPattern() throws Exception { + Path jsonFilePath = new Path(jsonDirDirPath, "test.json"); + File jsonFile = new File(jsonFilePath.toString()); + + NetworkTagMappingJsonManager manager = new NetworkTagMappingJsonManager(); + + JSONObject json = new JSONObject(); + + JSONArray userArray = new JSONArray(); + + JSONObject user1 = new JSONObject(); + user1.put("name", "user1"); + user1.put("network-tag-id", "1x88888888"); + userArray.put(user1); + json.put("users", userArray); + + writeJson(jsonFile, json.toString()); + + conf.set(YarnConfiguration.NM_NETWORK_TAG_MAPPING_FILE_PATH, + jsonFile.getAbsolutePath()); + + try { + manager.initialize(conf); + fail("Should get an exception. " + + "Becase we did not set network-tag-id for user1 correctly"); + } catch(Exception ex) { + // should catch exception here + assertTrue(ex.getMessage().contains( + "User-network-tag-id mapping configuraton error.")); + } + + json.remove("users"); + userArray = new JSONArray(); + + user1 = new JSONObject(); + user1.put("name", "user1"); + user1.put("network-tag-id", "0x88888888"); + userArray.put(user1); + json.put("users", userArray); + + JSONArray groupArray = new JSONArray(); + JSONObject group1 = new JSONObject(); + group1.put("name", "team1"); + group1.put("network-tag-id", "0x2000003"); + groupArray.put(group1); + json.put("groups", groupArray); + + // remove previous json file + if (fs.exists(jsonFilePath)) { + fs.delete(jsonFilePath, false); + } + assertFalse(fs.exists(jsonFilePath)); + + writeJson(jsonFile, json.toString()); + + try { + manager.initialize(conf); + fail("Should get an exception. " + + "Becase we did not set network-tag-id for group1 correctly"); + } catch(Exception ex) { + // should catch exception here + assertTrue(ex.getMessage().contains( + "Group-network-tag-id mapping configuraton error.")); + } + + json.remove("groups"); + groupArray = new JSONArray(); + group1 = new JSONObject(); + group1.put("name", "team1"); + group1.put("network-tag-id", "0x20002003"); + groupArray.put(group1); + json.put("groups", groupArray); + + json.put("default-network-tag-id", "0x99"); + // remove previous json file + if (fs.exists(jsonFilePath)) { + fs.delete(jsonFilePath, false); + } + assertFalse(fs.exists(jsonFilePath)); + + writeJson(jsonFile, json.toString()); + + try { + manager.initialize(conf); + fail("Should get an exception. " + + "Becase we did not set default-network-tag-id correctly"); + } catch(Exception ex) { + // should catch exception here + assertTrue(ex.getMessage().contains( + "Configuration error on default-network-tag-id.")); + } + + json.remove("default-network-tag-id"); + json.put("default-network-tag-id", "0x99999999"); + // remove previous json file + if (fs.exists(jsonFilePath)) { + fs.delete(jsonFilePath, false); + } + assertFalse(fs.exists(jsonFilePath)); + + writeJson(jsonFile, json.toString()); + + manager.initialize(conf); + } + + private void writeJson(File jsonFile, String jsonStr) throws IOException { + Writer writer = null; + try { + writer = new FileWriter(jsonFile); + writer.write(jsonStr); + } finally { + if (writer != null) { + writer.close(); + } + } + } + + private Map createUserNetworkTagIDMapping() { + Map classIdMap = new LinkedHashMap<>(); + classIdMap.put("user1", "0x10001001"); + classIdMap.put("user2", "0x10001002"); + classIdMap.put("user3,user4", "0x10001003"); + return classIdMap; + } + + private Map createGroupNetworkTagIDMapping() { + Map classIdMap = new LinkedHashMap<>(); + classIdMap.put("team1", "0x20002001"); + classIdMap.put("team2", "0x20002002"); + return classIdMap; + } + + private String getUserName(Map userMapping, String classId) { + for (Entry o : userMapping.entrySet()) { + if (o.getValue().equals(classId)) { + return o.getKey(); + } + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNetworkPacketTaggingHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNetworkPacketTaggingHandlerImpl.java new file mode 100644 index 00000000000..541be21f597 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/TestNetworkPacketTaggingHandlerImpl.java @@ -0,0 +1,182 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.util.List; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Matchers.any; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperation; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileged.PrivilegedOperationExecutor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test NetworkPacketTagging Handler. + * + */ +public class TestNetworkPacketTaggingHandlerImpl { + private static final Logger LOG = + LoggerFactory.getLogger(TestNetworkPacketTaggingHandlerImpl.class); + private static final String TEST_CLASSID = "0x100001"; + private static final String TEST_CONTAINER_ID_STR = "container_01"; + private static final String TEST_TASKS_FILE = "testTasksFile"; + + private NetworkTagMappingManager mockManager; + private PrivilegedOperationExecutor privilegedOperationExecutorMock; + private CGroupsHandler cGroupsHandlerMock; + private Configuration conf; + private String tmpPath; + private ContainerId containerIdMock; + private Container containerMock; + + @Before + public void setup() { + privilegedOperationExecutorMock = mock(PrivilegedOperationExecutor.class); + cGroupsHandlerMock = mock(CGroupsHandler.class); + conf = new YarnConfiguration(); + tmpPath = new StringBuffer(System.getProperty("test.build.data")) + .append('/').append("hadoop.tmp.dir").toString(); + containerIdMock = mock(ContainerId.class); + containerMock = mock(Container.class); + when(containerIdMock.toString()).thenReturn(TEST_CONTAINER_ID_STR); + //mock returning a mock - an angel died somewhere. + when(containerMock.getContainerId()).thenReturn(containerIdMock); + + conf.set("hadoop.tmp.dir", tmpPath); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); + + mockManager = mock(NetworkTagMappingManager.class); + doNothing().when(mockManager).initialize(any(Configuration.class)); + when(mockManager.getNetworkTagHexID(any(Container.class))) + .thenReturn(TEST_CLASSID); + } + + @Test + public void testBootstrap() { + NetworkPacketTaggingHandlerImpl handlerImpl = + createNetworkPacketTaggingHandlerImpl(); + + try { + handlerImpl.bootstrap(conf); + verify(cGroupsHandlerMock).initializeCGroupController( + eq(CGroupsHandler.CGroupController.NET_CLS)); + verifyNoMoreInteractions(cGroupsHandlerMock); + } catch (ResourceHandlerException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected ResourceHandlerException!"); + } + } + + @Test + public void testLifeCycle() { + NetworkPacketTaggingHandlerImpl handlerImpl = + createNetworkPacketTaggingHandlerImpl(); + try { + handlerImpl.bootstrap(conf); + testPreStart(handlerImpl); + testPostComplete(handlerImpl); + } catch (ResourceHandlerException e) { + LOG.error("Unexpected exception: " + e); + Assert.fail("Caught unexpected ResourceHandlerException!"); + } + } + + private void testPreStart(NetworkPacketTaggingHandlerImpl handlerImpl) throws + ResourceHandlerException { + reset(privilegedOperationExecutorMock); + + when(cGroupsHandlerMock.getPathForCGroupTasks(CGroupsHandler + .CGroupController.NET_CLS, TEST_CONTAINER_ID_STR)).thenReturn( + TEST_TASKS_FILE); + + List ops = handlerImpl.preStart(containerMock); + + //Ensure that cgroups is created and updated as expected + verify(cGroupsHandlerMock).createCGroup( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR)); + verify(cGroupsHandlerMock).updateCGroupParam( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR), + eq(CGroupsHandler.CGROUP_PARAM_CLASSID), eq(TEST_CLASSID)); + + //Now check the privileged operations being returned + //We expect one operations - for adding pid to tasks file + Assert.assertEquals(1, ops.size()); + + //Verify that the add pid op is correct + PrivilegedOperation addPidOp = ops.get(0); + String expectedAddPidOpArg = PrivilegedOperation.CGROUP_ARG_PREFIX + + TEST_TASKS_FILE; + List addPidOpArgs = addPidOp.getArguments(); + + Assert.assertEquals(PrivilegedOperation.OperationType.ADD_PID_TO_CGROUP, + addPidOp.getOperationType()); + Assert.assertEquals(1, addPidOpArgs.size()); + Assert.assertEquals(expectedAddPidOpArg, addPidOpArgs.get(0)); + } + + private void testPostComplete(NetworkPacketTaggingHandlerImpl handlerImpl) + throws ResourceHandlerException { + reset(privilegedOperationExecutorMock); + + List ops = handlerImpl.postComplete(containerIdMock); + + verify(cGroupsHandlerMock).deleteCGroup( + eq(CGroupsHandler.CGroupController.NET_CLS), eq(TEST_CONTAINER_ID_STR)); + + //We don't expect any operations to be returned here + Assert.assertNull(ops); + } + + private NetworkPacketTaggingHandlerImpl + createNetworkPacketTaggingHandlerImpl() { + return new NetworkPacketTaggingHandlerImpl( + privilegedOperationExecutorMock, cGroupsHandlerMock) { + @Override + public NetworkTagMappingManager createNetworkTagMappingManager( + Configuration conf) { + return mockManager; + } + }; + } + + @After + public void teardown() { + FileUtil.fullyDelete(new File(tmpPath)); + } +}