YARN-10911. AbstractCSQueue: Create a separate class for usernames and weights that are travelling in a Map. Contributed by Szilard Nemeth

This commit is contained in:
Szilard Nemeth 2021-09-20 16:47:46 +02:00 committed by GitHub
parent f93e8fbf2d
commit 4df4389325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 113 additions and 56 deletions

View File

@ -1126,7 +1126,7 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
* @throws IllegalArgumentException when more than * @throws IllegalArgumentException when more than
* {@link Configuration#MAX_SUBST} replacements are required * {@link Configuration#MAX_SUBST} replacements are required
*/ */
protected String substituteVars(String expr) { public String substituteVars(String expr) {
if (expr == null) { if (expr == null) {
return null; return null;
} }

View File

@ -146,7 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue {
protected ReentrantReadWriteLock.WriteLock writeLock; protected ReentrantReadWriteLock.WriteLock writeLock;
volatile Priority priority = Priority.newInstance(0); volatile Priority priority = Priority.newInstance(0);
private Map<String, Float> userWeights = new HashMap<String, Float>(); private UserWeights userWeights = UserWeights.createEmpty();
private int maxParallelApps; private int maxParallelApps;
// is it a dynamic queue? // is it a dynamic queue?
@ -561,18 +561,17 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
} }
private Map<String, Float> getUserWeightsFromHierarchy private UserWeights getUserWeightsFromHierarchy(
(CapacitySchedulerConfiguration configuration) throws CapacitySchedulerConfiguration configuration) {
IOException { UserWeights unionInheritedWeights = UserWeights.createEmpty();
Map<String, Float> unionInheritedWeights = new HashMap<String, Float>();
CSQueue parentQ = getParent(); CSQueue parentQ = getParent();
if (parentQ != null) { if (parentQ != null) {
// Inherit all of parent's user's weights // Inherit all of parent's userWeights
unionInheritedWeights.putAll(parentQ.getUserWeights()); unionInheritedWeights.addFrom(parentQ.getUserWeights());
} }
// Insert this queue's user's weights, overriding parent's user's weights if // Insert this queue's userWeights, overriding parent's userWeights if
// there is overlap. // there is an overlap.
unionInheritedWeights.putAll( unionInheritedWeights.addFrom(
configuration.getAllUserWeightsForQueue(getQueuePath())); configuration.getAllUserWeightsForQueue(getQueuePath()));
return unionInheritedWeights; return unionInheritedWeights;
} }
@ -1375,7 +1374,7 @@ public abstract class AbstractCSQueue implements CSQueue {
} }
@Override @Override
public Map<String, Float> getUserWeights() { public UserWeights getUserWeights() {
return userWeights; return userWeights;
} }

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -393,10 +392,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
Priority getPriority(); Priority getPriority();
/** /**
* Get a map of usernames and weights * Get the UserWeights object that wraps a map of usernames and weights
* @return map of usernames and corresponding weight * @return The UserWeights object.
*/ */
Map<String, Float> getUserWeights(); UserWeights getUserWeights();
/** /**
* Get QueueResourceQuotas associated with each queue. * Get QueueResourceQuotas associated with each queue.

View File

@ -2048,30 +2048,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* Get the weights of all users at this queue level from the configuration. * Get the weights of all users at this queue level from the configuration.
* Used in computing user-specific user limit, relative to other users. * Used in computing user-specific user limit, relative to other users.
* @param queuePath full queue path * @param queuePath full queue path
* @return map of user weights, if they exists. Otherwise, return empty map. * @return map of user weights, if they exist. Otherwise, return empty map.
*/ */
public Map<String, Float> getAllUserWeightsForQueue(String queuePath) { public UserWeights getAllUserWeightsForQueue(String queuePath) {
Map <String, Float> userWeights = new HashMap <>(); return UserWeights.createByConfig(this, getConfigurationProperties(), queuePath);
String qPathPlusPrefix = getQueuePrefix(queuePath) + USER_SETTINGS;
Map<String, String> props = getConfigurationProperties()
.getPropertiesWithPrefix(qPathPlusPrefix);
Map<String, String> result = new HashMap<>();
for(Map.Entry<String, String> item: props.entrySet()) {
Matcher m = USER_WEIGHT_PATTERN.matcher(item.getKey());
if(m.find()) {
result.put(item.getKey(), substituteVars(item.getValue()));
}
}
for (Entry<String, String> e : result.entrySet()) {
String userName =
e.getKey().replaceFirst("\\." + USER_WEIGHT, "");
if (!userName.isEmpty()) {
userWeights.put(userName, new Float(e.getValue()));
}
}
return userWeights;
} }
public boolean getAssignMultipleEnabled() { public boolean getAssignMultipleEnabled() {

View File

@ -259,17 +259,8 @@ public class LeafQueue extends AbstractCSQueue {
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights. // Validate leaf queue's user's weights.
float queueUL = Math.min(100.0f, conf.getUserLimit(getQueuePath())); float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) { getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
float val = e.getValue().floatValue();
if (val < 0.0f || val > (100.0f / queueUL)) {
throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
+ "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
100.0f/queueUL + ", the number of concurrent active users in "
+ getQueuePath() + ")");
}
}
usersManager.updateUserWeights(); usersManager.updateUserWeights();
LOG.info( LOG.info(

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_SETTINGS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_WEIGHT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_WEIGHT_PATTERN;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getQueuePrefix;
public final class UserWeights {
public static final float DEFAULT_WEIGHT = 1.0F;
/**
* Key: Username,
* Value: Weight as float.
*/
private final Map<String, Float> data = new HashMap<>();
private UserWeights() {}
public static UserWeights createEmpty() {
return new UserWeights();
}
public static UserWeights createByConfig(
CapacitySchedulerConfiguration conf,
ConfigurationProperties configurationProperties,
String queuePath) {
String queuePathPlusPrefix = getQueuePrefix(queuePath) + USER_SETTINGS;
Map<String, String> props = configurationProperties
.getPropertiesWithPrefix(queuePathPlusPrefix);
UserWeights userWeights = new UserWeights();
for (Map.Entry<String, String> item: props.entrySet()) {
Matcher m = USER_WEIGHT_PATTERN.matcher(item.getKey());
if (m.find()) {
String userName = item.getKey().replaceFirst("\\." + USER_WEIGHT, "");
if (!userName.isEmpty()) {
String value = conf.substituteVars(item.getValue());
userWeights.data.put(userName, new Float(value));
}
}
}
return userWeights;
}
public float getByUser(String userName) {
Float weight = data.get(userName);
if (weight == null) {
return DEFAULT_WEIGHT;
}
return weight;
}
public void validateForLeafQueue(float queueUserLimit, String queuePath) throws IOException {
for (Map.Entry<String, Float> e : data.entrySet()) {
String userName = e.getKey();
float weight = e.getValue();
if (weight < 0.0F || weight > (100.0F / queueUserLimit)) {
throw new IOException("Weight (" + weight + ") for user \"" + userName
+ "\" must be between 0 and" + " 100 / " + queueUserLimit + " (= " +
100.0f / queueUserLimit + ", the number of concurrent active users in "
+ queuePath + ")");
}
}
}
public void addFrom(UserWeights addFrom) {
data.putAll(addFrom.data);
}
}

View File

@ -471,8 +471,7 @@ public class UsersManager implements AbstractUsersManager {
} }
private float getUserWeightFromQueue(String userName) { private float getUserWeightFromQueue(String userName) {
Float weight = lQueue.getUserWeights().get(userName); return lQueue.getUserWeights().getByUser(userName);
return (weight == null) ? 1.0f : weight.floatValue();
} }
/** /**

View File

@ -1902,7 +1902,7 @@ public class TestLeafQueue {
csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath() csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
+ ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT, + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
1.5f); 1.5f);
// Set weight for "firstname.lastname" to be 0.7f for the a queue // Set weight for "firstname.lastname" to be 0.7f for the "a" queue
// in the configs. Notice the user contains a dot. This is to test // in the configs. Notice the user contains a dot. This is to test
// that weights are accepted for a username that contains dots. // that weights are accepted for a username that contains dots.
csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath() csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
@ -1914,10 +1914,10 @@ public class TestLeafQueue {
when(csContext.getClusterResource()) when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32)); .thenReturn(Resources.createResource(16 * GB, 32));
// Verify that configs were updated and parsed correctly. // Verify that configs were updated and parsed correctly.
Assert.assertNull(a.getUserWeights().get("user_0")); Assert.assertEquals(UserWeights.DEFAULT_WEIGHT, a.getUserWeights().getByUser("user_0"), 0.0f);
a.reinitialize(a, csContext.getClusterResource()); a.reinitialize(a, csContext.getClusterResource());
assertEquals(1.5f, a.getUserWeights().get("user_0"), 0.0f); assertEquals(1.5f, a.getUserWeights().getByUser("user_0"), 0.0f);
assertEquals(0.7f, a.getUserWeights().get("firstname.lastname"), 0.0f); assertEquals(0.7f, a.getUserWeights().getByUser("firstname.lastname"), 0.0f);
// set maxCapacity // set maxCapacity
a.setMaxCapacity(1.0f); a.setMaxCapacity(1.0f);