From c0b1311a93614becc4a255af48fb7b697d491b80 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 3 Feb 2015 11:43:12 -0800 Subject: [PATCH] YARN-3098. Created common QueueCapacities class in Capacity Scheduler to track capacities-by-labels of queues. Contributed by Wangda Tan (cherry picked from commit 21d80b3dd90a8e33e51701887c8d9369ed4ab17d) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/ResourceUsage.java | 80 +++----- .../scheduler/capacity/QueueCapacities.java | 191 ++++++++++++++++++ .../capacity/TestQueueCapacities.java | 127 ++++++++++++ 4 files changed, 351 insertions(+), 50 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 96eb6651002..859061943f3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -197,6 +197,9 @@ Release 2.7.0 - UNRELEASED YARN-3022. Expose Container resource information from NodeManager for monitoring (adhoot via ranter) + YARN-3098. Created common QueueCapacities class in Capacity Scheduler to + track capacities-by-labels of queues. (Wangda Tan via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 5a4ccedc3bd..c651878a996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -56,14 +56,10 @@ public class ResourceUsage { private enum ResourceType { USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4); - private int value; + private int idx; private ResourceType(int value) { - this.value = value; - } - - public int getValue() { - return this.value; + this.idx = value; } } @@ -77,22 +73,6 @@ public class ResourceUsage { resArr[i] = Resource.newInstance(0, 0); } } - - public Resource get(ResourceType type) { - return resArr[type.getValue()]; - } - - public void set(ResourceType type, Resource res) { - resArr[type.getValue()] = res; - } - - public void inc(ResourceType type, Resource res) { - Resources.addTo(resArr[type.getValue()], res); - } - - public void dec(ResourceType type, Resource res) { - Resources.subtractFrom(resArr[type.getValue()], res); - } } /* @@ -103,11 +83,11 @@ public class ResourceUsage { } public Resource getUsed(String label) { - return internalGet(label, ResourceType.USED); + return _get(label, ResourceType.USED); } public void incUsed(String label, Resource res) { - internalInc(label, ResourceType.USED, res); + _inc(label, ResourceType.USED, res); } public void incUsed(Resource res) { @@ -119,7 +99,7 @@ public class ResourceUsage { } public void decUsed(String label, Resource res) { - internalDec(label, ResourceType.USED, res); + _dec(label, ResourceType.USED, res); } public void setUsed(Resource res) { @@ -127,7 +107,7 @@ public class ResourceUsage { } public void setUsed(String label, Resource res) { - internalSet(label, ResourceType.USED, res); + _set(label, ResourceType.USED, res); } /* @@ -138,11 +118,11 @@ public class ResourceUsage { } public Resource getPending(String label) { - return internalGet(label, ResourceType.PENDING); + return _get(label, ResourceType.PENDING); } public void incPending(String label, Resource res) { - internalInc(label, ResourceType.PENDING, res); + _inc(label, ResourceType.PENDING, res); } public void incPending(Resource res) { @@ -154,7 +134,7 @@ public class ResourceUsage { } public void decPending(String label, Resource res) { - internalDec(label, ResourceType.PENDING, res); + _dec(label, ResourceType.PENDING, res); } public void setPending(Resource res) { @@ -162,7 +142,7 @@ public class ResourceUsage { } public void setPending(String label, Resource res) { - internalSet(label, ResourceType.PENDING, res); + _set(label, ResourceType.PENDING, res); } /* @@ -173,11 +153,11 @@ public class ResourceUsage { } public Resource getReserved(String label) { - return internalGet(label, ResourceType.RESERVED); + return _get(label, ResourceType.RESERVED); } public void incReserved(String label, Resource res) { - internalInc(label, ResourceType.RESERVED, res); + _inc(label, ResourceType.RESERVED, res); } public void incReserved(Resource res) { @@ -189,7 +169,7 @@ public class ResourceUsage { } public void decReserved(String label, Resource res) { - internalDec(label, ResourceType.RESERVED, res); + _dec(label, ResourceType.RESERVED, res); } public void setReserved(Resource res) { @@ -197,7 +177,7 @@ public class ResourceUsage { } public void setReserved(String label, Resource res) { - internalSet(label, ResourceType.RESERVED, res); + _set(label, ResourceType.RESERVED, res); } /* @@ -208,11 +188,11 @@ public class ResourceUsage { } public Resource getHeadroom(String label) { - return internalGet(label, ResourceType.HEADROOM); + return _get(label, ResourceType.HEADROOM); } public void incHeadroom(String label, Resource res) { - internalInc(label, ResourceType.HEADROOM, res); + _inc(label, ResourceType.HEADROOM, res); } public void incHeadroom(Resource res) { @@ -224,7 +204,7 @@ public class ResourceUsage { } public void decHeadroom(String label, Resource res) { - internalDec(label, ResourceType.HEADROOM, res); + _dec(label, ResourceType.HEADROOM, res); } public void setHeadroom(Resource res) { @@ -232,7 +212,7 @@ public class ResourceUsage { } public void setHeadroom(String label, Resource res) { - internalSet(label, ResourceType.HEADROOM, res); + _set(label, ResourceType.HEADROOM, res); } /* @@ -243,11 +223,11 @@ public class ResourceUsage { } public Resource getAMUsed(String label) { - return internalGet(label, ResourceType.AMUSED); + return _get(label, ResourceType.AMUSED); } public void incAMUsed(String label, Resource res) { - internalInc(label, ResourceType.AMUSED, res); + _inc(label, ResourceType.AMUSED, res); } public void incAMUsed(Resource res) { @@ -259,7 +239,7 @@ public class ResourceUsage { } public void decAMUsed(String label, Resource res) { - internalDec(label, ResourceType.AMUSED, res); + _dec(label, ResourceType.AMUSED, res); } public void setAMUsed(Resource res) { @@ -267,7 +247,7 @@ public class ResourceUsage { } public void setAMUsed(String label, Resource res) { - internalSet(label, ResourceType.AMUSED, res); + _set(label, ResourceType.AMUSED, res); } private static Resource normalize(Resource res) { @@ -277,14 +257,14 @@ public class ResourceUsage { return res; } - private Resource internalGet(String label, ResourceType type) { + private Resource _get(String label, ResourceType type) { try { readLock.lock(); UsageByLabel usage = usages.get(label); if (null == usage) { return Resources.none(); } - return normalize(usage.get(type)); + return normalize(usage.resArr[type.idx]); } finally { readLock.unlock(); } @@ -300,31 +280,31 @@ public class ResourceUsage { return usages.get(label); } - private void internalSet(String label, ResourceType type, Resource res) { + private void _set(String label, ResourceType type, Resource res) { try { writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); - usage.set(type, res); + usage.resArr[type.idx] = res; } finally { writeLock.unlock(); } } - private void internalInc(String label, ResourceType type, Resource res) { + private void _inc(String label, ResourceType type, Resource res) { try { writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); - usage.inc(type, res); + Resources.addTo(usage.resArr[type.idx], res); } finally { writeLock.unlock(); } } - private void internalDec(String label, ResourceType type, Resource res) { + private void _dec(String label, ResourceType type, Resource res) { try { writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); - usage.dec(type, res); + Resources.subtractFrom(usage.resArr[type.idx], res); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java new file mode 100644 index 00000000000..a0e6d8c9b17 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +public class QueueCapacities { + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + private static final float LABEL_DOESNT_EXIST_CAP = 0f; + private Map capacitiesMap; + private ReadLock readLock; + private WriteLock writeLock; + + public QueueCapacities() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + capacitiesMap = new HashMap(); + } + + // Usage enum here to make implement cleaner + private enum CapacityType { + USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5); + + private int idx; + + private CapacityType(int idx) { + this.idx = idx; + } + } + + private static class Capacities { + private float[] capacitiesArr; + + public Capacities() { + capacitiesArr = new float[CapacityType.values().length]; + } + } + + private float _get(String label, CapacityType type) { + try { + readLock.lock(); + Capacities cap = capacitiesMap.get(label); + if (null == cap) { + return LABEL_DOESNT_EXIST_CAP; + } + return cap.capacitiesArr[type.idx]; + } finally { + readLock.unlock(); + } + } + + private void _set(String label, CapacityType type, float value) { + try { + writeLock.lock(); + Capacities cap = capacitiesMap.get(label); + if (null == cap) { + cap = new Capacities(); + capacitiesMap.put(label, cap); + } + cap.capacitiesArr[type.idx] = value; + } finally { + writeLock.unlock(); + } + } + + /* Used Capacity Getter and Setter */ + public float getUsedCapacity() { + return _get(NL, CapacityType.USED_CAP); + } + + public float getUsedCapacity(String label) { + return _get(label, CapacityType.USED_CAP); + } + + public void setUsedCapacity(float value) { + _set(NL, CapacityType.USED_CAP, value); + } + + public void setUsedCapacity(String label, float value) { + _set(label, CapacityType.USED_CAP, value); + } + + /* Absolute Used Capacity Getter and Setter */ + public float getAbsoluteUsedCapacity() { + return _get(NL, CapacityType.ABS_USED_CAP); + } + + public float getAbsoluteUsedCapacity(String label) { + return _get(label, CapacityType.ABS_USED_CAP); + } + + public void setAbsoluteUsedCapacity(float value) { + _set(NL, CapacityType.ABS_USED_CAP, value); + } + + public void setAbsoluteUsedCapacity(String label, float value) { + _set(label, CapacityType.ABS_USED_CAP, value); + } + + /* Capacity Getter and Setter */ + public float getCapacity() { + return _get(NL, CapacityType.CAP); + } + + public float getCapacity(String label) { + return _get(label, CapacityType.CAP); + } + + public void setCapacity(float value) { + _set(NL, CapacityType.CAP, value); + } + + public void setCapacity(String label, float value) { + _set(label, CapacityType.CAP, value); + } + + /* Absolute Capacity Getter and Setter */ + public float getAbsoluteCapacity() { + return _get(NL, CapacityType.ABS_CAP); + } + + public float getAbsoluteCapacity(String label) { + return _get(label, CapacityType.ABS_CAP); + } + + public void setAbsoluteCapacity(float value) { + _set(NL, CapacityType.ABS_CAP, value); + } + + public void setAbsoluteCapacity(String label, float value) { + _set(label, CapacityType.ABS_CAP, value); + } + + /* Maximum Capacity Getter and Setter */ + public float getMaximumCapacity() { + return _get(NL, CapacityType.MAX_CAP); + } + + public float getMaximumCapacity(String label) { + return _get(label, CapacityType.MAX_CAP); + } + + public void setMaximumCapacity(float value) { + _set(NL, CapacityType.MAX_CAP, value); + } + + public void setMaximumCapacity(String label, float value) { + _set(label, CapacityType.MAX_CAP, value); + } + + /* Absolute Maximum Capacity Getter and Setter */ + public float getAbsoluteMaximumCapacity() { + return _get(NL, CapacityType.ABS_MAX_CAP); + } + + public float getAbsoluteMaximumCapacity(String label) { + return _get(label, CapacityType.ABS_MAX_CAP); + } + + public void setAbsoluteMaximumCapacity(float value) { + _set(NL, CapacityType.ABS_MAX_CAP, value); + } + + public void setAbsoluteMaximumCapacity(String label, float value) { + _set(label, CapacityType.ABS_MAX_CAP, value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java new file mode 100644 index 00000000000..6d2a42180f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestQueueCapacities { + private static final Log LOG = LogFactory.getLog(TestQueueCapacities.class); + private String suffix; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] { + { "Capacity" }, + { "AbsoluteCapacity" }, + { "UsedCapacity" }, + { "AbsoluteUsedCapacity" }, + { "MaximumCapacity" }, + { "AbsoluteMaximumCapacity" } }); + } + + public TestQueueCapacities(String suffix) { + this.suffix = suffix; + } + + private static float get(QueueCapacities obj, String suffix, + String label) throws Exception { + return executeByName(obj, "get" + suffix, label, -1f); + } + + private static void set(QueueCapacities obj, String suffix, + String label, float value) throws Exception { + executeByName(obj, "set" + suffix, label, value); + } + + // Use reflection to avoid too much avoid code + private static float executeByName(QueueCapacities obj, String methodName, + String label, float value) throws Exception { + // We have 4 kinds of method + // 1. getXXX() : float + // 2. getXXX(label) : float + // 3. setXXX(float) : void + // 4. setXXX(label, float) : void + if (methodName.startsWith("get")) { + float result; + if (label == null) { + // 1. + Method method = QueueCapacities.class.getDeclaredMethod(methodName); + result = (float) method.invoke(obj); + } else { + // 2. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, String.class); + result = (float) method.invoke(obj, label); + } + return result; + } else { + if (label == null) { + // 3. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, Float.TYPE); + method.invoke(obj, value); + } else { + // 4. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, String.class, + Float.TYPE); + method.invoke(obj, label, value); + } + return -1f; + } + } + + private void internalTestModifyAndRead(String label) throws Exception { + QueueCapacities qc = new QueueCapacities(); + + // First get returns 0 always + Assert.assertEquals(0f, get(qc, suffix, label), 1e-8); + + // Set to 1, and check + set(qc, suffix, label, 1f); + Assert.assertEquals(1f, get(qc, suffix, label), 1e-8); + + // Set to 2, and check + set(qc, suffix, label, 2f); + Assert.assertEquals(2f, get(qc, suffix, label), 1e-8); + } + + void check(int mem, int cpu, Resource res) { + Assert.assertEquals(mem, res.getMemory()); + Assert.assertEquals(cpu, res.getVirtualCores()); + } + + @Test + public void testModifyAndRead() throws Exception { + LOG.info("Test - " + suffix); + internalTestModifyAndRead(null); + internalTestModifyAndRead("label"); + } +} \ No newline at end of file