From 4df4389325254465b52557d6fa99bcd470d64409 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth <954799+szilard-nemeth@users.noreply.github.com> Date: Mon, 20 Sep 2021 16:47:46 +0200 Subject: [PATCH] YARN-10911. AbstractCSQueue: Create a separate class for usernames and weights that are travelling in a Map. Contributed by Szilard Nemeth --- .../org/apache/hadoop/conf/Configuration.java | 2 +- .../scheduler/capacity/AbstractCSQueue.java | 21 +++-- .../scheduler/capacity/CSQueue.java | 7 +- .../CapacitySchedulerConfiguration.java | 26 +----- .../scheduler/capacity/LeafQueue.java | 13 +-- .../scheduler/capacity/UserWeights.java | 89 +++++++++++++++++++ .../scheduler/capacity/UsersManager.java | 3 +- .../scheduler/capacity/TestLeafQueue.java | 8 +- 8 files changed, 113 insertions(+), 56 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserWeights.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java index 1d69a34eee1..c8d968f71fc 100755 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -1126,7 +1126,7 @@ public class Configuration implements Iterable>, * @throws IllegalArgumentException when more than * {@link Configuration#MAX_SUBST} replacements are required */ - protected String substituteVars(String expr) { + public String substituteVars(String expr) { if (expr == null) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index f0d06392404..7473aa30c11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -146,7 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue { protected ReentrantReadWriteLock.WriteLock writeLock; volatile Priority priority = Priority.newInstance(0); - private Map userWeights = new HashMap(); + private UserWeights userWeights = UserWeights.createEmpty(); private int maxParallelApps; // is it a dynamic queue? @@ -561,18 +561,17 @@ public abstract class AbstractCSQueue implements CSQueue { } } - private Map getUserWeightsFromHierarchy - (CapacitySchedulerConfiguration configuration) throws - IOException { - Map unionInheritedWeights = new HashMap(); + private UserWeights getUserWeightsFromHierarchy( + CapacitySchedulerConfiguration configuration) { + UserWeights unionInheritedWeights = UserWeights.createEmpty(); CSQueue parentQ = getParent(); if (parentQ != null) { - // Inherit all of parent's user's weights - unionInheritedWeights.putAll(parentQ.getUserWeights()); + // Inherit all of parent's userWeights + unionInheritedWeights.addFrom(parentQ.getUserWeights()); } - // Insert this queue's user's weights, overriding parent's user's weights if - // there is overlap. - unionInheritedWeights.putAll( + // Insert this queue's userWeights, overriding parent's userWeights if + // there is an overlap. + unionInheritedWeights.addFrom( configuration.getAllUserWeightsForQueue(getQueuePath())); return unionInheritedWeights; } @@ -1375,7 +1374,7 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public Map getUserWeights() { + public UserWeights getUserWeights() { return userWeights; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 6bf5feaeb4f..52e8f93502e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -393,10 +392,10 @@ public interface CSQueue extends SchedulerQueue { Priority getPriority(); /** - * Get a map of usernames and weights - * @return map of usernames and corresponding weight + * Get the UserWeights object that wraps a map of usernames and weights + * @return The UserWeights object. */ - Map getUserWeights(); + UserWeights getUserWeights(); /** * Get QueueResourceQuotas associated with each queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 6e1b72feac0..60e55275e2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -2048,30 +2048,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * Get the weights of all users at this queue level from the configuration. * Used in computing user-specific user limit, relative to other users. * @param queuePath full queue path - * @return map of user weights, if they exists. Otherwise, return empty map. + * @return map of user weights, if they exist. Otherwise, return empty map. */ - public Map getAllUserWeightsForQueue(String queuePath) { - Map userWeights = new HashMap <>(); - String qPathPlusPrefix = getQueuePrefix(queuePath) + USER_SETTINGS; - Map props = getConfigurationProperties() - .getPropertiesWithPrefix(qPathPlusPrefix); - - Map result = new HashMap<>(); - for(Map.Entry item: props.entrySet()) { - Matcher m = USER_WEIGHT_PATTERN.matcher(item.getKey()); - if(m.find()) { - result.put(item.getKey(), substituteVars(item.getValue())); - } - } - - for (Entry e : result.entrySet()) { - String userName = - e.getKey().replaceFirst("\\." + USER_WEIGHT, ""); - if (!userName.isEmpty()) { - userWeights.put(userName, new Float(e.getValue())); - } - } - return userWeights; + public UserWeights getAllUserWeightsForQueue(String queuePath) { + return UserWeights.createByConfig(this, getConfigurationProperties(), queuePath); } public boolean getAssignMultipleEnabled() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 168d2a37a6f..2c0593d0fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -259,17 +259,8 @@ public class LeafQueue extends AbstractCSQueue { conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); // Validate leaf queue's user's weights. - float queueUL = Math.min(100.0f, conf.getUserLimit(getQueuePath())); - for (Entry e : getUserWeights().entrySet()) { - float val = e.getValue().floatValue(); - if (val < 0.0f || val > (100.0f / queueUL)) { - throw new IOException("Weight (" + val + ") for user \"" + e.getKey() - + "\" must be between 0 and" + " 100 / " + queueUL + " (= " + - 100.0f/queueUL + ", the number of concurrent active users in " - + getQueuePath() + ")"); - } - } - + float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath())); + getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath()); usersManager.updateUserWeights(); LOG.info( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserWeights.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserWeights.java new file mode 100644 index 00000000000..7bdc1bc8045 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserWeights.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_SETTINGS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_WEIGHT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_WEIGHT_PATTERN; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getQueuePrefix; + +public final class UserWeights { + public static final float DEFAULT_WEIGHT = 1.0F; + /** + * Key: Username, + * Value: Weight as float. + */ + private final Map data = new HashMap<>(); + + private UserWeights() {} + + public static UserWeights createEmpty() { + return new UserWeights(); + } + + public static UserWeights createByConfig( + CapacitySchedulerConfiguration conf, + ConfigurationProperties configurationProperties, + String queuePath) { + String queuePathPlusPrefix = getQueuePrefix(queuePath) + USER_SETTINGS; + Map props = configurationProperties + .getPropertiesWithPrefix(queuePathPlusPrefix); + + UserWeights userWeights = new UserWeights(); + for (Map.Entry item: props.entrySet()) { + Matcher m = USER_WEIGHT_PATTERN.matcher(item.getKey()); + if (m.find()) { + String userName = item.getKey().replaceFirst("\\." + USER_WEIGHT, ""); + if (!userName.isEmpty()) { + String value = conf.substituteVars(item.getValue()); + userWeights.data.put(userName, new Float(value)); + } + } + } + return userWeights; + } + + public float getByUser(String userName) { + Float weight = data.get(userName); + if (weight == null) { + return DEFAULT_WEIGHT; + } + return weight; + } + + public void validateForLeafQueue(float queueUserLimit, String queuePath) throws IOException { + for (Map.Entry e : data.entrySet()) { + String userName = e.getKey(); + float weight = e.getValue(); + if (weight < 0.0F || weight > (100.0F / queueUserLimit)) { + throw new IOException("Weight (" + weight + ") for user \"" + userName + + "\" must be between 0 and" + " 100 / " + queueUserLimit + " (= " + + 100.0f / queueUserLimit + ", the number of concurrent active users in " + + queuePath + ")"); + } + } + } + + public void addFrom(UserWeights addFrom) { + data.putAll(addFrom.data); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 853535d5af5..5897f2645aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -471,8 +471,7 @@ public class UsersManager implements AbstractUsersManager { } private float getUserWeightFromQueue(String userName) { - Float weight = lQueue.getUserWeights().get(userName); - return (weight == null) ? 1.0f : weight.floatValue(); + return lQueue.getUserWeights().getByUser(userName); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index aa342af0dfd..c918a29b1a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1902,7 +1902,7 @@ public class TestLeafQueue { csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath() + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT, 1.5f); - // Set weight for "firstname.lastname" to be 0.7f for the a queue + // Set weight for "firstname.lastname" to be 0.7f for the "a" queue // in the configs. Notice the user contains a dot. This is to test // that weights are accepted for a username that contains dots. csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath() @@ -1914,10 +1914,10 @@ public class TestLeafQueue { when(csContext.getClusterResource()) .thenReturn(Resources.createResource(16 * GB, 32)); // Verify that configs were updated and parsed correctly. - Assert.assertNull(a.getUserWeights().get("user_0")); + Assert.assertEquals(UserWeights.DEFAULT_WEIGHT, a.getUserWeights().getByUser("user_0"), 0.0f); a.reinitialize(a, csContext.getClusterResource()); - assertEquals(1.5f, a.getUserWeights().get("user_0"), 0.0f); - assertEquals(0.7f, a.getUserWeights().get("firstname.lastname"), 0.0f); + assertEquals(1.5f, a.getUserWeights().getByUser("user_0"), 0.0f); + assertEquals(0.7f, a.getUserWeights().getByUser("firstname.lastname"), 0.0f); // set maxCapacity a.setMaxCapacity(1.0f);