diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 3b9c30ff114..788ff90ff3e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -147,6 +148,23 @@ public abstract class AllocateResponse { .collectorInfo(collectorInfo).build(); } + @Private + @Unstable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens, Token amRMToken, + List updatedContainers, CollectorInfo collectorInfo, + EnhancedHeadroom enhancedHeadroom) { + AllocateResponse response = + newInstance(responseId, completedContainers, allocatedContainers, + updatedNodes, availResources, command, numClusterNodes, preempt, + nmTokens, amRMToken, updatedContainers, collectorInfo); + response.setEnhancedHeadroom(enhancedHeadroom); + return response; + } + /** * If the ResourceManager needs the * ApplicationMaster to take some action then it will send an @@ -439,6 +457,14 @@ public abstract class AllocateResponse { return new AllocateResponseBuilder(); } + @Public + @Unstable + public abstract EnhancedHeadroom getEnhancedHeadroom(); + + @Private + @Unstable + public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom); + /** * Class to construct instances of {@link AllocateResponse} with specific * options. @@ -666,6 +692,18 @@ public abstract class AllocateResponse { return this; } + @Public + @Unstable + public EnhancedHeadroom getEnhancedHeadroom() { + return allocateResponse.getEnhancedHeadroom(); + } + + @Private + @Unstable + public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){ + allocateResponse.setEnhancedHeadroom(enhancedHeadroom); + } + /** * Return generated {@link AllocateResponse} object. * @return {@link AllocateResponse} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java new file mode 100644 index 00000000000..7a5ff6adcc5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.util.Records; + +/** + * Enhanced head room in AllocateResponse. + * This provides a channel for RMs to return load information for AMRMProxy + * decision making when rerouting resource requests. + * + * Contains total pending container count and active cores for a cluster. + */ +public abstract class EnhancedHeadroom { + public static EnhancedHeadroom newInstance(int totalPendingCount, + int totalActiveCores) { + EnhancedHeadroom enhancedHeadroom = + Records.newRecord(EnhancedHeadroom.class); + enhancedHeadroom.setTotalPendingCount(totalPendingCount); + enhancedHeadroom.setTotalActiveCores(totalActiveCores); + return enhancedHeadroom; + } + + /** + * Set total pending container count. + * @param totalPendingCount the pending container count + */ + public abstract void setTotalPendingCount(int totalPendingCount); + + /** + * Get total pending container count. + * @return the pending container count + */ + public abstract int getTotalPendingCount(); + + /** + * Set total active cores for the cluster. + * @param totalActiveCores the total active cores for the cluster + */ + public abstract void setTotalActiveCores(int totalActiveCores); + + /** + * Get total active cores for the cluster. + * @return totalActiveCores the total active cores for the cluster + */ + public abstract int getTotalActiveCores(); + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(""); + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8a0273d7a79..7dabea08a4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -106,6 +106,11 @@ message UpdatedContainerProto { required ContainerProto container = 2; } +message EnhancedHeadroomProto { + optional int32 total_pending_count = 1; + optional int32 total_active_cores = 2; +} + message AllocateResponseProto { optional AMCommandProto a_m_command = 1; optional int32 response_id = 2; @@ -123,6 +128,7 @@ message AllocateResponseProto { repeated UpdatedContainerProto updated_containers = 16; repeated ContainerProto containers_from_previous_attempts = 17; repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18; + optional EnhancedHeadroomProto enhanced_headroom = 19; } enum SchedulerResourceTypes { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index a5705d275fe..4dc3d462ccb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; @@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; @@ -89,6 +91,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { private Token amrmToken = null; private Priority appPriority = null; private CollectorInfo collectorInfo = null; + private EnhancedHeadroom enhancedHeadroom = null; public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); @@ -190,6 +193,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { getContainerProtoIterable(this.containersFromPreviousAttempts); builder.addAllContainersFromPreviousAttempts(iterable); } + if (this.enhancedHeadroom != null) { + builder.setEnhancedHeadroom(convertToProtoFormat(this.enhancedHeadroom)); + } } private synchronized void mergeLocalToProto() { @@ -422,6 +428,28 @@ public class AllocateResponsePBImpl extends AllocateResponse { this.amrmToken = amRMToken; } + @Override + public synchronized EnhancedHeadroom getEnhancedHeadroom() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + if (enhancedHeadroom != null) { + return enhancedHeadroom; + } + if (!p.hasEnhancedHeadroom()) { + return null; + } + this.enhancedHeadroom = convertFromProtoFormat(p.getEnhancedHeadroom()); + return enhancedHeadroom; + } + + @Override + public synchronized void setEnhancedHeadroom( + EnhancedHeadroom enhancedHeadroom) { + maybeInitBuilder(); + if (enhancedHeadroom == null) { + builder.clearEnhancedHeadroom(); + } + this.enhancedHeadroom = enhancedHeadroom; + } @Override public synchronized CollectorInfo getCollectorInfo() { @@ -933,4 +961,14 @@ public class AllocateResponsePBImpl extends AllocateResponse { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + + private EnhancedHeadroomPBImpl convertFromProtoFormat( + YarnServiceProtos.EnhancedHeadroomProto p) { + return new EnhancedHeadroomPBImpl(p); + } + + private YarnServiceProtos.EnhancedHeadroomProto convertToProtoFormat( + EnhancedHeadroom t) { + return ((EnhancedHeadroomPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java new file mode 100644 index 00000000000..65b5b9e11a9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProtoOrBuilder; + +import org.apache.hadoop.thirdparty.protobuf.TextFormat; + +public class EnhancedHeadroomPBImpl extends EnhancedHeadroom { + + private EnhancedHeadroomProto proto = + EnhancedHeadroomProto.getDefaultInstance(); + private EnhancedHeadroomProto.Builder builder = null; + private boolean viaProto = false; + + public EnhancedHeadroomPBImpl() { + builder = EnhancedHeadroomProto.newBuilder(); + } + + public EnhancedHeadroomPBImpl(EnhancedHeadroomProto proto) { + this.proto = proto; + viaProto = true; + } + + public EnhancedHeadroomProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + private void mergeLocalToBuilder() { + // No local content yet + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = EnhancedHeadroomProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public void setTotalPendingCount(int totalPendingCount) { + maybeInitBuilder(); + if (totalPendingCount == 0) { + builder.clearTotalPendingCount(); + return; + } + builder.setTotalPendingCount(totalPendingCount); + } + + @Override + public int getTotalPendingCount() { + EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasTotalPendingCount()) ? p.getTotalPendingCount() : 0; + } + + @Override + public void setTotalActiveCores(int totalActiveCores) { + maybeInitBuilder(); + if (totalActiveCores == 0) { + builder.clearTotalActiveCores(); + return; + } + builder.setTotalActiveCores(totalActiveCores); + } + + @Override + public int getTotalActiveCores() { + EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder; + return (p.hasTotalActiveCores()) ? p.getTotalActiveCores() : 0; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index ad3725278a1..ceebbdbc433 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; @@ -184,6 +185,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ExecutionTypeRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; @@ -430,6 +432,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(UpdatedContainer.class); generateByNewInstance(ContainerUpdateRequest.class); generateByNewInstance(ContainerUpdateResponse.class); + generateByNewInstance(EnhancedHeadroom.class); // genByNewInstance does not apply to QueueInfo, cause // it is recursive(has sub queues) typeValueCache.put(QueueInfo.class, QueueInfo. @@ -1331,4 +1334,10 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { validatePBImplRecord(GetNodesToAttributesResponsePBImpl.class, YarnServiceProtos.GetNodesToAttributesResponseProto.class); } + + @Test + public void testGetEnhancedHeadroomPBImpl() throws Exception { + validatePBImplRecord(EnhancedHeadroomPBImpl.class, + YarnServiceProtos.EnhancedHeadroomProto.class); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java index ae47d900368..c8f945896e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -131,6 +132,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { private SubClusterResolver resolver; private Map headroom; + private Map enhancedHeadroom; private float hrAlpha; private FederationStateStoreFacade federationFacade; private SubClusterId homeSubcluster; @@ -182,6 +184,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { if (headroom == null) { headroom = new ConcurrentHashMap<>(); + enhancedHeadroom = new ConcurrentHashMap<>(); } hrAlpha = policy.getHeadroomAlpha(); @@ -195,9 +198,14 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy { AllocateResponse response) throws YarnException { if (response.getAvailableResources() != null) { headroom.put(subClusterId, response.getAvailableResources()); - LOG.info("Subcluster {} updated with {} memory headroom", subClusterId, - response.getAvailableResources().getMemorySize()); } + if (response.getEnhancedHeadroom() != null) { + this.enhancedHeadroom.put(subClusterId, response.getEnhancedHeadroom()); + } + LOG.info( + "Subcluster {} updated with AvailableResource {}, EnhancedHeadRoom {}", + subClusterId, response.getAvailableResources(), + response.getEnhancedHeadroom()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index c8777f579d5..12a3b21682a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerUpdateType; +import org.apache.hadoop.yarn.api.records.EnhancedHeadroom; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -334,6 +336,17 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { .pullJustFinishedContainers()); response.setAvailableResources(allocation.getResourceLimit()); + QueueMetrics queueMetrics = + this.rmContext.getScheduler().getRootQueueMetrics(); + if (queueMetrics != null) { + int totalVirtualCores = + queueMetrics.getAllocatedVirtualCores() + queueMetrics + .getAvailableVirtualCores(); + int pendingContainers = queueMetrics.getPendingContainers(); + response.setEnhancedHeadroom( + EnhancedHeadroom.newInstance(pendingContainers, totalVirtualCores)); + } + addToContainerUpdates(response, allocation, ((AbstractYarnScheduler)getScheduler()) .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());