YARN-6164. Expose Queue Configurations per Node Label through YARN client api. Contributed by Benson Qiu.

This commit is contained in:
Sunil G 2017-04-24 12:16:18 +05:30
parent d686560b35
commit a9495b206b
9 changed files with 495 additions and 7 deletions

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* QueueConfigurations contain information about the configuration percentages
* of a queue.
* <p>
* It includes information such as:
* <ul>
* <li>Capacity of the queue.</li>
* <li>Absolute capacity of the queue.</li>
* <li>Maximum capacity of the queue.</li>
* <li>Absolute maximum capacity of the queue.</li>
* <li>Maximum ApplicationMaster resource percentage of the queue.</li>
* </ul>
*/
public abstract class QueueConfigurations {
@Public
@Unstable
public static QueueConfigurations newInstance(float capacity,
float absoluteCapacity, float maxCapacity, float absoluteMaxCapacity,
float maxAMPercentage) {
QueueConfigurations queueConfigurations =
Records.newRecord(QueueConfigurations.class);
queueConfigurations.setCapacity(capacity);
queueConfigurations.setAbsoluteCapacity(absoluteCapacity);
queueConfigurations.setMaxCapacity(maxCapacity);
queueConfigurations.setAbsoluteMaxCapacity(absoluteMaxCapacity);
queueConfigurations.setMaxAMPercentage(maxAMPercentage);
return queueConfigurations;
}
/**
* Get the queue capacity.
*
* @return the queue capacity
*/
@Public
@Unstable
public abstract float getCapacity();
/**
* Set the queue capacity.
*
* @param capacity
* the queue capacity.
*/
@Private
@Unstable
public abstract void setCapacity(float capacity);
/**
* Get the absolute capacity.
*
* @return the absolute capacity
*/
@Public
@Unstable
public abstract float getAbsoluteCapacity();
/**
* Set the absolute capacity.
*
* @param absoluteCapacity
* the absolute capacity
*/
@Private
@Unstable
public abstract void setAbsoluteCapacity(float absoluteCapacity);
/**
* Get the maximum capacity.
*
* @return the maximum capacity
*/
@Public
@Unstable
public abstract float getMaxCapacity();
/**
* Set the maximum capacity.
*
* @param maxCapacity
* the maximum capacity
*/
@Private
@Unstable
public abstract void setMaxCapacity(float maxCapacity);
/**
* Get the absolute maximum capacity.
*
* @return the absolute maximum capacity
*/
@Public
@Unstable
public abstract float getAbsoluteMaxCapacity();
/**
* Set the absolute maximum capacity.
*
* @param absoluteMaxCapacity
* the absolute maximum capacity
*/
@Private
@Unstable
public abstract void setAbsoluteMaxCapacity(float absoluteMaxCapacity);
/**
* Get the maximum AM resource percentage.
*
* @return the maximum AM resource percentage
*/
@Public
@Unstable
public abstract float getMaxAMPercentage();
/**
* Set the maximum AM resource percentage.
*
* @param maxAMPercentage
* the maximum AM resource percentage
*/
@Private
@Unstable
public abstract void setMaxAMPercentage(float maxAMPercentage);
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.records;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -40,9 +41,11 @@ import org.apache.hadoop.yarn.util.Records;
* <li>Child queues.</li>
* <li>Running applications.</li>
* <li>{@link QueueState} of the queue.</li>
* <li>{@link QueueConfigurations} of the queue.</li>
* </ul>
*
* @see QueueState
* @see QueueConfigurations
* @see ApplicationClientProtocol#getQueueInfo(org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest)
*/
@Public
@ -72,6 +75,25 @@ public abstract class QueueInfo {
return queueInfo;
}
@Private
@Unstable
public static QueueInfo newInstance(String queueName, float capacity,
float maximumCapacity, float currentCapacity,
List<QueueInfo> childQueues, List<ApplicationReport> applications,
QueueState queueState, Set<String> accessibleNodeLabels,
String defaultNodeLabelExpression, QueueStatistics queueStatistics,
boolean preemptionDisabled,
Map<String, QueueConfigurations> queueConfigurations) {
QueueInfo queueInfo = QueueInfo.newInstance(queueName, capacity,
maximumCapacity, currentCapacity,
childQueues, applications,
queueState, accessibleNodeLabels,
defaultNodeLabelExpression, queueStatistics,
preemptionDisabled);
queueInfo.setQueueConfigurations(queueConfigurations);
return queueInfo;
}
/**
* Get the <em>name</em> of the queue.
* @return <em>name</em> of the queue
@ -219,4 +241,24 @@ public abstract class QueueInfo {
@Private
@Unstable
public abstract void setPreemptionDisabled(boolean preemptionDisabled);
/**
* Get the per-node-label queue configurations of the queue.
*
* @return the per-node-label queue configurations of the queue.
*/
@Public
@Stable
public abstract Map<String, QueueConfigurations> getQueueConfigurations();
/**
* Set the per-node-label queue configurations for the queue.
*
* @param queueConfigurations
* the queue configurations
*/
@Private
@Unstable
public abstract void setQueueConfigurations(
Map<String, QueueConfigurations> queueConfigurations);
}

View File

@ -468,6 +468,20 @@ message QueueInfoProto {
optional string defaultNodeLabelExpression = 9;
optional QueueStatisticsProto queueStatistics = 10;
optional bool preemptionDisabled = 11;
repeated QueueConfigurationsMapProto queueConfigurationsMap = 12;
}
message QueueConfigurationsProto {
optional float capacity = 1;
optional float absoluteCapacity = 2;
optional float maxCapacity = 3;
optional float absoluteMaxCapacity = 4;
optional float maxAMPercentage = 5;
}
message QueueConfigurationsMapProto {
required string partitionName = 1;
optional QueueConfigurationsProto queueConfigurations = 2;
}
enum QueueACLProto {

View File

@ -1699,7 +1699,7 @@ public class TestYarnCLI {
nodeLabels.add("GPU");
nodeLabels.add("JDK_7");
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false);
null, null, QueueState.RUNNING, nodeLabels, "GPU", null, false, null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);
@ -1800,7 +1800,7 @@ public class TestYarnCLI {
public void testGetQueueInfoWithEmptyNodeLabel() throws Exception {
QueueCLI cli = createAndGetQueueCLI();
QueueInfo queueInfo = QueueInfo.newInstance("queueA", 0.4f, 0.8f, 0.5f,
null, null, QueueState.RUNNING, null, null, null, true);
null, null, QueueState.RUNNING, null, null, null, true, null);
when(client.getQueueInfo(any(String.class))).thenReturn(queueInfo);
int result = cli.run(new String[] { "-status", "queueA" });
assertEquals(0, result);

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationsProtoOrBuilder;
import com.google.protobuf.TextFormat;
public class QueueConfigurationsPBImpl extends QueueConfigurations {
QueueConfigurationsProto proto =
QueueConfigurationsProto.getDefaultInstance();
QueueConfigurationsProto.Builder builder = null;
boolean viaProto = false;
public QueueConfigurationsPBImpl() {
builder = QueueConfigurationsProto.newBuilder();
}
public QueueConfigurationsPBImpl(QueueConfigurationsProto proto) {
this.proto = proto;
viaProto = true;
}
public QueueConfigurationsProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public float getCapacity() {
QueueConfigurationsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasCapacity()) ? p.getCapacity() : 0f;
}
@Override
public void setCapacity(float capacity) {
maybeInitBuilder();
builder.setCapacity(capacity);
}
@Override
public float getAbsoluteCapacity() {
QueueConfigurationsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAbsoluteCapacity()) ? p.getAbsoluteCapacity() : 0f;
}
@Override
public void setAbsoluteCapacity(float absoluteCapacity) {
maybeInitBuilder();
builder.setAbsoluteCapacity(absoluteCapacity);
}
@Override
public float getMaxCapacity() {
QueueConfigurationsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxCapacity()) ? p.getMaxCapacity() : 0f;
}
@Override
public void setMaxCapacity(float maxCapacity) {
maybeInitBuilder();
builder.setMaxCapacity(maxCapacity);
}
@Override
public float getAbsoluteMaxCapacity() {
QueueConfigurationsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasAbsoluteMaxCapacity()) ? p.getAbsoluteMaxCapacity() : 0f;
}
@Override
public void setAbsoluteMaxCapacity(float absoluteMaxCapacity) {
maybeInitBuilder();
builder.setAbsoluteMaxCapacity(absoluteMaxCapacity);
}
@Override
public float getMaxAMPercentage() {
QueueConfigurationsProtoOrBuilder p = viaProto ? proto : builder;
return (p.hasMaxAMPercentage()) ? p.getMaxAMPercentage() : 0f;
}
@Override
public void setMaxAMPercentage(float maxAMPercentage) {
maybeInitBuilder();
builder.setMaxAMPercentage(maxAMPercentage);
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = QueueConfigurationsProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
}

View File

@ -19,18 +19,23 @@
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationsMapProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueConfigurationsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
@ -49,7 +54,8 @@ public class QueueInfoPBImpl extends QueueInfo {
List<ApplicationReport> applicationsList;
List<QueueInfo> childQueuesList;
Set<String> accessibleNodeLabels;
Map<String, QueueConfigurations> queueConfigurations;
public QueueInfoPBImpl() {
builder = QueueInfoProto.newBuilder();
}
@ -279,6 +285,46 @@ public class QueueInfoPBImpl extends QueueInfo {
builder.addAllChildQueues(iterable);
}
private void addQueueConfigurations() {
maybeInitBuilder();
builder.clearQueueConfigurationsMap();
if (queueConfigurations == null) {
return;
}
Iterable<? extends QueueConfigurationsMapProto> values =
new Iterable<QueueConfigurationsMapProto>() {
@Override
public Iterator<QueueConfigurationsMapProto> iterator() {
return new Iterator<QueueConfigurationsMapProto>() {
private Iterator<String> iterator =
queueConfigurations.keySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public QueueConfigurationsMapProto next() {
String key = iterator.next();
return QueueConfigurationsMapProto.newBuilder()
.setPartitionName(key)
.setQueueConfigurations(
convertToProtoFormat(queueConfigurations.get(key)))
.build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllQueueConfigurationsMap(values);
}
private void mergeLocalToBuilder() {
if (this.childQueuesList != null) {
addChildQueuesInfoToProto();
@ -290,6 +336,9 @@ public class QueueInfoPBImpl extends QueueInfo {
builder.clearAccessibleNodeLabels();
builder.addAllAccessibleNodeLabels(this.accessibleNodeLabels);
}
if (this.queueConfigurations != null) {
addQueueConfigurations();
}
}
private void mergeLocalToProto() {
@ -327,11 +376,21 @@ public class QueueInfoPBImpl extends QueueInfo {
private QueueState convertFromProtoFormat(QueueStateProto q) {
return ProtoUtils.convertFromProtoFormat(q);
}
private QueueStateProto convertToProtoFormat(QueueState queueState) {
return ProtoUtils.convertToProtoFormat(queueState);
}
private QueueConfigurationsPBImpl convertFromProtoFormat(
QueueConfigurationsProto q) {
return new QueueConfigurationsPBImpl(q);
}
private QueueConfigurationsProto convertToProtoFormat(
QueueConfigurations q) {
return ((QueueConfigurationsPBImpl)q).getProto();
}
@Override
public void setAccessibleNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
@ -408,4 +467,37 @@ public class QueueInfoPBImpl extends QueueInfo {
maybeInitBuilder();
builder.setPreemptionDisabled(preemptionDisabled);
}
private void initQueueConfigurations() {
if (queueConfigurations != null) {
return;
}
QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
List<QueueConfigurationsMapProto> lists = p.getQueueConfigurationsMapList();
queueConfigurations =
new HashMap<String, QueueConfigurations>(lists.size());
for (QueueConfigurationsMapProto queueConfigurationsProto : lists) {
queueConfigurations.put(queueConfigurationsProto.getPartitionName(),
convertFromProtoFormat(
queueConfigurationsProto.getQueueConfigurations()));
}
}
@Override
public Map<String, QueueConfigurations> getQueueConfigurations() {
initQueueConfigurations();
return queueConfigurations;
}
@Override
public void setQueueConfigurations(
Map<String, QueueConfigurations> queueConfigurations) {
if (queueConfigurations == null) {
return;
}
initQueueConfigurations();
this.queueConfigurations.clear();
this.queueConfigurations.putAll(queueConfigurations);
}
}

View File

@ -17,7 +17,8 @@
*/
package org.apache.hadoop.yarn.api;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
@ -125,6 +126,7 @@ import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
@ -328,7 +330,7 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import com.google.common.collect.ImmutableSet;
/**
* Test class for YARN API protocol records.
@ -402,6 +404,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
generateByNewInstance(CommitResponse.class);
generateByNewInstance(ApplicationTimeout.class);
generateByNewInstance(ContainerResourceIncreaseRequest.class);
generateByNewInstance(QueueConfigurations.class);
}
@Test

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
@ -401,6 +402,7 @@ public abstract class AbstractCSQueue implements CSQueue {
queueInfo.setCurrentCapacity(getUsedCapacity());
queueInfo.setQueueStatistics(getQueueStatistics());
queueInfo.setPreemptionDisabled(preemptionDisabled);
queueInfo.setQueueConfigurations(getQueueConfigurations());
return queueInfo;
}
@ -432,6 +434,29 @@ public abstract class AbstractCSQueue implements CSQueue {
return stats;
}
public Map<String, QueueConfigurations> getQueueConfigurations() {
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
Set<String> nodeLabels = getNodeLabelsForQueue();
for (String nodeLabel : nodeLabels) {
QueueConfigurations queueConfiguration =
recordFactory.newRecordInstance(QueueConfigurations.class);
float capacity = queueCapacities.getCapacity(nodeLabel);
float absoluteCapacity = queueCapacities.getAbsoluteCapacity(nodeLabel);
float maxCapacity = queueCapacities.getMaximumCapacity(nodeLabel);
float absMaxCapacity =
queueCapacities.getAbsoluteMaximumCapacity(nodeLabel);
float maxAMPercentage =
queueCapacities.getMaxAMResourcePercentage(nodeLabel);
queueConfiguration.setCapacity(capacity);
queueConfiguration.setAbsoluteCapacity(absoluteCapacity);
queueConfiguration.setMaxCapacity(maxCapacity);
queueConfiguration.setAbsoluteMaxCapacity(absMaxCapacity);
queueConfiguration.setMaxAMPercentage(maxAMPercentage);
queueConfigurations.put(nodeLabel, queueConfiguration);
}
return queueConfigurations;
}
@Private
public Resource getMaximumAllocation() {
return maximumAllocation;

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueConfigurations;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
@ -603,6 +604,17 @@ public class TestClientRMService {
List<ApplicationReport> applications = queueInfo.getQueueInfo()
.getApplications();
Assert.assertEquals(2, applications.size());
Map<String, QueueConfigurations> queueConfigsByPartition =
queueInfo.getQueueInfo().getQueueConfigurations();
Assert.assertEquals(1, queueConfigsByPartition.size());
Assert.assertTrue(queueConfigsByPartition.containsKey("*"));
QueueConfigurations queueConfigs = queueConfigsByPartition.get("*");
Assert.assertEquals(0.5f, queueConfigs.getCapacity(), 0.0001f);
Assert.assertEquals(0.1f, queueConfigs.getAbsoluteCapacity(), 0.0001f);
Assert.assertEquals(1.0f, queueConfigs.getMaxCapacity(), 0.0001f);
Assert.assertEquals(1.0f, queueConfigs.getAbsoluteMaxCapacity(), 0.0001f);
Assert.assertEquals(0.2f, queueConfigs.getMaxAMPercentage(), 0.0001f);
request.setQueueName("nonexistentqueue");
request.setIncludeApplications(true);
// should not throw exception on nonexistent queue
@ -974,8 +986,21 @@ public class TestClientRMService {
when(rmContext.getDispatcher()).thenReturn(dispatcher);
EventHandler eventHandler = mock(EventHandler.class);
when(dispatcher.getEventHandler()).thenReturn(eventHandler);
QueueInfo queInfo = recordFactory.newRecordInstance(QueueInfo.class);
queInfo.setQueueName("testqueue");
QueueConfigurations queueConfigs =
recordFactory.newRecordInstance(QueueConfigurations.class);
queueConfigs.setCapacity(0.5f);
queueConfigs.setAbsoluteCapacity(0.1f);
queueConfigs.setMaxCapacity(1.0f);
queueConfigs.setAbsoluteMaxCapacity(1.0f);
queueConfigs.setMaxAMPercentage(0.2f);
Map<String, QueueConfigurations> queueConfigsByPartition =
new HashMap<>();
queueConfigsByPartition.put("*", queueConfigs);
queInfo.setQueueConfigurations(queueConfigsByPartition);
when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
.thenReturn(queInfo);
when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), anyBoolean()))