diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 872f16e75c2..924bfa693d6 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -203,6 +203,9 @@ Release 2.7.0 - UNRELEASED YARN-3024. LocalizerRunner should give DIE action when all resources are localized. (Chengbing Liu via xgong) + YARN-3092. Created a common ResourceUsage class to track labeled resource + usages in Capacity Scheduler. (Wangda Tan via jianhe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java new file mode 100644 index 00000000000..5a4ccedc3bd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * Resource Usage by Labels for following fields by label - AM resource (to + * enforce max-am-resource-by-label after YARN-2637) - Used resource (includes + * AM resource usage) - Reserved resource - Pending resource - Headroom + * + * This class can be used to track resource usage in queue/user/app. + * + * And it is thread-safe + */ +public class ResourceUsage { + private ReadLock readLock; + private WriteLock writeLock; + private Map usages; + // short for no-label :) + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + + public ResourceUsage() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + usages = new HashMap(); + } + + // Usage enum here to make implement cleaner + private enum ResourceType { + USED(0), PENDING(1), AMUSED(2), RESERVED(3), HEADROOM(4); + + private int value; + + private ResourceType(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + } + + private static class UsageByLabel { + // usage by label, contains all UsageType + private Resource[] resArr; + + public UsageByLabel(String label) { + resArr = new Resource[ResourceType.values().length]; + for (int i = 0; i < resArr.length; i++) { + resArr[i] = Resource.newInstance(0, 0); + } + } + + public Resource get(ResourceType type) { + return resArr[type.getValue()]; + } + + public void set(ResourceType type, Resource res) { + resArr[type.getValue()] = res; + } + + public void inc(ResourceType type, Resource res) { + Resources.addTo(resArr[type.getValue()], res); + } + + public void dec(ResourceType type, Resource res) { + Resources.subtractFrom(resArr[type.getValue()], res); + } + } + + /* + * Used + */ + public Resource getUsed() { + return getUsed(NL); + } + + public Resource getUsed(String label) { + return internalGet(label, ResourceType.USED); + } + + public void incUsed(String label, Resource res) { + internalInc(label, ResourceType.USED, res); + } + + public void incUsed(Resource res) { + incUsed(NL, res); + } + + public void decUsed(Resource res) { + decUsed(NL, res); + } + + public void decUsed(String label, Resource res) { + internalDec(label, ResourceType.USED, res); + } + + public void setUsed(Resource res) { + setUsed(NL, res); + } + + public void setUsed(String label, Resource res) { + internalSet(label, ResourceType.USED, res); + } + + /* + * Pending + */ + public Resource getPending() { + return getPending(NL); + } + + public Resource getPending(String label) { + return internalGet(label, ResourceType.PENDING); + } + + public void incPending(String label, Resource res) { + internalInc(label, ResourceType.PENDING, res); + } + + public void incPending(Resource res) { + incPending(NL, res); + } + + public void decPending(Resource res) { + decPending(NL, res); + } + + public void decPending(String label, Resource res) { + internalDec(label, ResourceType.PENDING, res); + } + + public void setPending(Resource res) { + setPending(NL, res); + } + + public void setPending(String label, Resource res) { + internalSet(label, ResourceType.PENDING, res); + } + + /* + * Reserved + */ + public Resource getReserved() { + return getReserved(NL); + } + + public Resource getReserved(String label) { + return internalGet(label, ResourceType.RESERVED); + } + + public void incReserved(String label, Resource res) { + internalInc(label, ResourceType.RESERVED, res); + } + + public void incReserved(Resource res) { + incReserved(NL, res); + } + + public void decReserved(Resource res) { + decReserved(NL, res); + } + + public void decReserved(String label, Resource res) { + internalDec(label, ResourceType.RESERVED, res); + } + + public void setReserved(Resource res) { + setReserved(NL, res); + } + + public void setReserved(String label, Resource res) { + internalSet(label, ResourceType.RESERVED, res); + } + + /* + * Headroom + */ + public Resource getHeadroom() { + return getHeadroom(NL); + } + + public Resource getHeadroom(String label) { + return internalGet(label, ResourceType.HEADROOM); + } + + public void incHeadroom(String label, Resource res) { + internalInc(label, ResourceType.HEADROOM, res); + } + + public void incHeadroom(Resource res) { + incHeadroom(NL, res); + } + + public void decHeadroom(Resource res) { + decHeadroom(NL, res); + } + + public void decHeadroom(String label, Resource res) { + internalDec(label, ResourceType.HEADROOM, res); + } + + public void setHeadroom(Resource res) { + setHeadroom(NL, res); + } + + public void setHeadroom(String label, Resource res) { + internalSet(label, ResourceType.HEADROOM, res); + } + + /* + * AM-Used + */ + public Resource getAMUsed() { + return getAMUsed(NL); + } + + public Resource getAMUsed(String label) { + return internalGet(label, ResourceType.AMUSED); + } + + public void incAMUsed(String label, Resource res) { + internalInc(label, ResourceType.AMUSED, res); + } + + public void incAMUsed(Resource res) { + incAMUsed(NL, res); + } + + public void decAMUsed(Resource res) { + decAMUsed(NL, res); + } + + public void decAMUsed(String label, Resource res) { + internalDec(label, ResourceType.AMUSED, res); + } + + public void setAMUsed(Resource res) { + setAMUsed(NL, res); + } + + public void setAMUsed(String label, Resource res) { + internalSet(label, ResourceType.AMUSED, res); + } + + private static Resource normalize(Resource res) { + if (res == null) { + return Resources.none(); + } + return res; + } + + private Resource internalGet(String label, ResourceType type) { + try { + readLock.lock(); + UsageByLabel usage = usages.get(label); + if (null == usage) { + return Resources.none(); + } + return normalize(usage.get(type)); + } finally { + readLock.unlock(); + } + } + + private UsageByLabel getAndAddIfMissing(String label) { + if (!usages.containsKey(label)) { + UsageByLabel u = new UsageByLabel(label); + usages.put(label, u); + return u; + } + + return usages.get(label); + } + + private void internalSet(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.set(type, res); + } finally { + writeLock.unlock(); + } + } + + private void internalInc(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.inc(type, res); + } finally { + writeLock.unlock(); + } + } + + private void internalDec(String label, ResourceType type, Resource res) { + try { + writeLock.lock(); + UsageByLabel usage = getAndAddIfMissing(label); + usage.dec(type, res); + } finally { + writeLock.unlock(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java new file mode 100644 index 00000000000..b6dfacb7028 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestResourceUsage.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestResourceUsage { + private static final Log LOG = LogFactory.getLog(TestResourceUsage.class); + private String suffix; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] { { "Pending" }, { "Used" }, + { "Headroom" }, { "Reserved" }, { "AMUsed" } }); + } + + public TestResourceUsage(String suffix) { + this.suffix = suffix; + } + + private static void dec(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "dec" + suffix, res, label); + } + + private static void inc(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "inc" + suffix, res, label); + } + + private static void set(ResourceUsage obj, String suffix, Resource res, + String label) throws Exception { + executeByName(obj, "set" + suffix, res, label); + } + + private static Resource get(ResourceUsage obj, String suffix, String label) + throws Exception { + return executeByName(obj, "get" + suffix, null, label); + } + + // Use reflection to avoid too much avoid code + private static Resource executeByName(ResourceUsage obj, String methodName, + Resource arg, String label) throws Exception { + // We have 4 kinds of method + // 1. getXXX() : Resource + // 2. getXXX(label) : Resource + // 3. set/inc/decXXX(res) : void + // 4. set/inc/decXXX(label, res) : void + if (methodName.startsWith("get")) { + Resource result; + if (label == null) { + // 1. + Method method = ResourceUsage.class.getDeclaredMethod(methodName); + result = (Resource) method.invoke(obj); + } else { + // 2. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, String.class); + result = (Resource) method.invoke(obj, label); + } + return result; + } else { + if (label == null) { + // 3. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, Resource.class); + method.invoke(obj, arg); + } else { + // 4. + Method method = + ResourceUsage.class.getDeclaredMethod(methodName, String.class, + Resource.class); + method.invoke(obj, label, arg); + } + return null; + } + } + + private void internalTestModifyAndRead(String label) throws Exception { + ResourceUsage usage = new ResourceUsage(); + Resource res; + + // First get returns 0 always + res = get(usage, suffix, label); + check(0, 0, res); + + // Add 1,1 should returns 1,1 + inc(usage, suffix, Resource.newInstance(1, 1), label); + check(1, 1, get(usage, suffix, label)); + + // Set 2,2 + set(usage, suffix, Resource.newInstance(2, 2), label); + check(2, 2, get(usage, suffix, label)); + + // dec 2,2 + dec(usage, suffix, Resource.newInstance(2, 2), label); + check(0, 0, get(usage, suffix, label)); + } + + void check(int mem, int cpu, Resource res) { + Assert.assertEquals(mem, res.getMemory()); + Assert.assertEquals(cpu, res.getVirtualCores()); + } + + @Test + public void testModifyAndRead() throws Exception { + LOG.info("Test - " + suffix); + internalTestModifyAndRead(null); + internalTestModifyAndRead("label"); + } +}