YARN-8002. Support NOT_SELF and ALL namespace types for allocation tag. (Weiwei Yang via wangda)
Change-Id: I63b4e4192a95bf7ded98c54e46a2871c72869700
This commit is contained in:
parent
b89dc00302
commit
78832eca8a
|
@ -18,12 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to describe all supported forms of namespaces for an allocation tag.
|
* Class to describe all supported forms of namespaces for an allocation tag.
|
||||||
*/
|
*/
|
||||||
|
@ -44,29 +38,6 @@ public enum AllocationTagNamespaceType {
|
||||||
return this.typeKeyword;
|
return this.typeKeyword;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Parses the namespace type from a given string.
|
|
||||||
* @param prefix namespace prefix.
|
|
||||||
* @return namespace type.
|
|
||||||
* @throws InvalidAllocationTagException
|
|
||||||
*/
|
|
||||||
public static AllocationTagNamespaceType fromString(String prefix) throws
|
|
||||||
InvalidAllocationTagException {
|
|
||||||
for (AllocationTagNamespaceType type :
|
|
||||||
AllocationTagNamespaceType.values()) {
|
|
||||||
if(type.getTypeKeyword().equals(prefix)) {
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
|
|
||||||
.map(AllocationTagNamespaceType::toString)
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
throw new InvalidAllocationTagException(
|
|
||||||
"Invalid namespace prefix: " + prefix
|
|
||||||
+ ", valid values are: " + String.join(",", values));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return this.getTypeKeyword();
|
return this.getTypeKeyword();
|
||||||
|
|
|
@ -1,50 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
|
||||||
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Allocation tags under same namespace.
|
|
||||||
*/
|
|
||||||
public class AllocationTags {
|
|
||||||
|
|
||||||
private AllocationTagNamespace ns;
|
|
||||||
private Set<String> tags;
|
|
||||||
|
|
||||||
public AllocationTags(AllocationTagNamespace namespace,
|
|
||||||
Set<String> allocationTags) {
|
|
||||||
this.ns = namespace;
|
|
||||||
this.tags = allocationTags;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the namespace of these tags.
|
|
||||||
*/
|
|
||||||
public AllocationTagNamespace getNamespace() {
|
|
||||||
return this.ns;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the allocation tags.
|
|
||||||
*/
|
|
||||||
public Set<String> getTags() {
|
|
||||||
return this.tags;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
|
||||||
|
@ -107,6 +107,25 @@ public final class PlacementConstraints {
|
||||||
PlacementTargets.allocationTag(allocationTags));
|
PlacementTargets.allocationTag(allocationTags));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to {@link #cardinality(String, int, int, String...)}, but let you
|
||||||
|
* attach a namespace to the given allocation tags.
|
||||||
|
*
|
||||||
|
* @param scope the scope of the constraint
|
||||||
|
* @param namespace the namespace of the allocation tags
|
||||||
|
* @param minCardinality determines the minimum number of allocations within
|
||||||
|
* the scope
|
||||||
|
* @param maxCardinality determines the maximum number of allocations within
|
||||||
|
* the scope
|
||||||
|
* @param allocationTags allocation tags
|
||||||
|
* @return the resulting placement constraint
|
||||||
|
*/
|
||||||
|
public static AbstractConstraint cardinality(String scope, String namespace,
|
||||||
|
int minCardinality, int maxCardinality, String... allocationTags) {
|
||||||
|
return new SingleConstraint(scope, minCardinality, maxCardinality,
|
||||||
|
PlacementTargets.allocationTagWithNamespace(namespace, allocationTags));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Similar to {@link #cardinality(String, int, int, String...)}, but
|
* Similar to {@link #cardinality(String, int, int, String...)}, but
|
||||||
* determines only the minimum cardinality (the maximum cardinality is
|
* determines only the minimum cardinality (the maximum cardinality is
|
||||||
|
@ -124,6 +143,23 @@ public final class PlacementConstraints {
|
||||||
allocationTags);
|
allocationTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to {@link #minCardinality(String, int, String...)}, but let you
|
||||||
|
* attach a namespace to the allocation tags.
|
||||||
|
*
|
||||||
|
* @param scope the scope of the constraint
|
||||||
|
* @param namespace the namespace of these tags
|
||||||
|
* @param minCardinality determines the minimum number of allocations within
|
||||||
|
* the scope
|
||||||
|
* @param allocationTags the constraint targets allocations with these tags
|
||||||
|
* @return the resulting placement constraint
|
||||||
|
*/
|
||||||
|
public static AbstractConstraint minCardinality(String scope,
|
||||||
|
String namespace, int minCardinality, String... allocationTags) {
|
||||||
|
return cardinality(scope, namespace, minCardinality, Integer.MAX_VALUE,
|
||||||
|
allocationTags);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Similar to {@link #cardinality(String, int, int, String...)}, but
|
* Similar to {@link #cardinality(String, int, int, String...)}, but
|
||||||
* determines only the maximum cardinality (the minimum cardinality is 0).
|
* determines only the maximum cardinality (the minimum cardinality is 0).
|
||||||
|
@ -139,6 +175,23 @@ public final class PlacementConstraints {
|
||||||
return cardinality(scope, 0, maxCardinality, allocationTags);
|
return cardinality(scope, 0, maxCardinality, allocationTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to {@link #maxCardinality(String, int, String...)}, but let you
|
||||||
|
* specify a namespace for the tags, see supported namespaces in
|
||||||
|
* {@link AllocationTagNamespaceType}.
|
||||||
|
*
|
||||||
|
* @param scope the scope of the constraint
|
||||||
|
* @param tagNamespace the namespace of these tags
|
||||||
|
* @param maxCardinality determines the maximum number of allocations within
|
||||||
|
* the scope
|
||||||
|
* @param allocationTags allocation tags
|
||||||
|
* @return the resulting placement constraint
|
||||||
|
*/
|
||||||
|
public static AbstractConstraint maxCardinality(String scope,
|
||||||
|
String tagNamespace, int maxCardinality, String... allocationTags) {
|
||||||
|
return cardinality(scope, tagNamespace, 0, maxCardinality, allocationTags);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This constraint generalizes the cardinality and target constraints.
|
* This constraint generalizes the cardinality and target constraints.
|
||||||
*
|
*
|
||||||
|
@ -242,9 +295,8 @@ public final class PlacementConstraints {
|
||||||
*/
|
*/
|
||||||
public static TargetExpression allocationTagToIntraApp(
|
public static TargetExpression allocationTagToIntraApp(
|
||||||
String... allocationTags) {
|
String... allocationTags) {
|
||||||
AllocationTagNamespace selfNs = new AllocationTagNamespace.Self();
|
|
||||||
return new TargetExpression(TargetType.ALLOCATION_TAG,
|
return new TargetExpression(TargetType.ALLOCATION_TAG,
|
||||||
selfNs.toString(), allocationTags);
|
AllocationTagNamespaceType.SELF.toString(), allocationTags);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.exceptions;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This exception is thrown by
|
|
||||||
* {@link
|
|
||||||
* org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)}
|
|
||||||
* when it fails to parse a namespace.
|
|
||||||
*/
|
|
||||||
public class InvalidAllocationTagException extends YarnException {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
public InvalidAllocationTagException(String message) {
|
|
||||||
super(message);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -16,22 +16,24 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
|
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
|
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
|
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
|
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
|
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
|
||||||
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to describe the namespace of an allocation tag.
|
* Class to describe the namespace of an allocation tag.
|
||||||
|
@ -69,8 +71,6 @@ public abstract class AllocationTagNamespace implements
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the scope of the namespace, in form of a set of applications.
|
* Get the scope of the namespace, in form of a set of applications.
|
||||||
* Before calling this method, {@link #evaluate(TargetApplications)}
|
|
||||||
* must be called in prior to ensure the scope is proper evaluated.
|
|
||||||
*
|
*
|
||||||
* @return a set of applications.
|
* @return a set of applications.
|
||||||
*/
|
*/
|
||||||
|
@ -83,51 +83,20 @@ public abstract class AllocationTagNamespace implements
|
||||||
return this.nsScope;
|
return this.nsScope;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evaluate the namespace against given target applications
|
||||||
|
* if it is necessary. Only self/not-self/app-label namespace types
|
||||||
|
* require this evaluation step, because they are not binding to a
|
||||||
|
* specific scope during initiating. So we do lazy binding for them
|
||||||
|
* in this method.
|
||||||
|
*
|
||||||
|
* @param target a generic type target that impacts this evaluation.
|
||||||
|
* @throws InvalidAllocationTagsQueryException
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public abstract void evaluate(TargetApplications target)
|
public void evaluate(TargetApplications target)
|
||||||
throws InvalidAllocationTagException;
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
// Sub-class needs to override this when it requires the eval step.
|
||||||
/**
|
|
||||||
* @return true if the namespace is effective in all applications
|
|
||||||
* in this cluster. Specifically the namespace prefix should be
|
|
||||||
* "all".
|
|
||||||
*/
|
|
||||||
public boolean isGlobal() {
|
|
||||||
return AllocationTagNamespaceType.ALL.equals(getNamespaceType());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if the namespace is effective within a single application
|
|
||||||
* by its application ID, the namespace prefix should be "app-id";
|
|
||||||
* false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean isSingleInterApp() {
|
|
||||||
return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if the namespace is effective to the application itself,
|
|
||||||
* the namespace prefix should be "self"; false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean isIntraApp() {
|
|
||||||
return AllocationTagNamespaceType.SELF.equals(getNamespaceType());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if the namespace is effective to all applications except
|
|
||||||
* itself, the namespace prefix should be "not-self"; false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean isNotSelf() {
|
|
||||||
return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if the namespace is effective to a group of applications
|
|
||||||
* identified by a application label, the namespace prefix should be
|
|
||||||
* "app-label"; false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean isAppLabel() {
|
|
||||||
return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -146,9 +115,9 @@ public abstract class AllocationTagNamespace implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void evaluate(TargetApplications target)
|
public void evaluate(TargetApplications target)
|
||||||
throws InvalidAllocationTagException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
if (target == null || target.getCurrentApplicationId() == null) {
|
if (target == null || target.getCurrentApplicationId() == null) {
|
||||||
throw new InvalidAllocationTagException("Namespace Self must"
|
throw new InvalidAllocationTagsQueryException("Namespace Self must"
|
||||||
+ " be evaluated against a single application ID.");
|
+ " be evaluated against a single application ID.");
|
||||||
}
|
}
|
||||||
ApplicationId applicationId = target.getCurrentApplicationId();
|
ApplicationId applicationId = target.getCurrentApplicationId();
|
||||||
|
@ -196,12 +165,6 @@ public abstract class AllocationTagNamespace implements
|
||||||
public All() {
|
public All() {
|
||||||
super(ALL);
|
super(ALL);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evaluate(TargetApplications target) {
|
|
||||||
Set<ApplicationId> allAppIds = target.getAllApplicationIds();
|
|
||||||
setScopeIfNotNull(allAppIds);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -229,10 +192,6 @@ public abstract class AllocationTagNamespace implements
|
||||||
public AppID(ApplicationId applicationId) {
|
public AppID(ApplicationId applicationId) {
|
||||||
super(APP_ID);
|
super(APP_ID);
|
||||||
this.targetAppId = applicationId;
|
this.targetAppId = applicationId;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void evaluate(TargetApplications target) {
|
|
||||||
setScopeIfNotNull(ImmutableSet.of(targetAppId));
|
setScopeIfNotNull(ImmutableSet.of(targetAppId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,11 +207,11 @@ public abstract class AllocationTagNamespace implements
|
||||||
*
|
*
|
||||||
* @param namespaceStr namespace string.
|
* @param namespaceStr namespace string.
|
||||||
* @return an instance of {@link AllocationTagNamespace}.
|
* @return an instance of {@link AllocationTagNamespace}.
|
||||||
* @throws InvalidAllocationTagException
|
* @throws InvalidAllocationTagsQueryException
|
||||||
* if given string is not in valid format
|
* if given string is not in valid format
|
||||||
*/
|
*/
|
||||||
public static AllocationTagNamespace parse(String namespaceStr)
|
public static AllocationTagNamespace parse(String namespaceStr)
|
||||||
throws InvalidAllocationTagException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
// Return the default namespace if no valid string is given.
|
// Return the default namespace if no valid string is given.
|
||||||
if (Strings.isNullOrEmpty(namespaceStr)) {
|
if (Strings.isNullOrEmpty(namespaceStr)) {
|
||||||
return new Self();
|
return new Self();
|
||||||
|
@ -273,7 +232,7 @@ public abstract class AllocationTagNamespace implements
|
||||||
return new All();
|
return new All();
|
||||||
case APP_ID:
|
case APP_ID:
|
||||||
if (nsValues.size() != 2) {
|
if (nsValues.size() != 2) {
|
||||||
throw new InvalidAllocationTagException(
|
throw new InvalidAllocationTagsQueryException(
|
||||||
"Missing the application ID in the namespace string: "
|
"Missing the application ID in the namespace string: "
|
||||||
+ namespaceStr);
|
+ namespaceStr);
|
||||||
}
|
}
|
||||||
|
@ -282,18 +241,35 @@ public abstract class AllocationTagNamespace implements
|
||||||
case APP_LABEL:
|
case APP_LABEL:
|
||||||
return new AppLabel();
|
return new AppLabel();
|
||||||
default:
|
default:
|
||||||
throw new InvalidAllocationTagException(
|
throw new InvalidAllocationTagsQueryException(
|
||||||
"Invalid namespace string " + namespaceStr);
|
"Invalid namespace string " + namespaceStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static AllocationTagNamespaceType fromString(String prefix) throws
|
||||||
|
InvalidAllocationTagsQueryException {
|
||||||
|
for (AllocationTagNamespaceType type :
|
||||||
|
AllocationTagNamespaceType.values()) {
|
||||||
|
if(type.getTypeKeyword().equals(prefix)) {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
|
||||||
|
.map(AllocationTagNamespaceType::toString)
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
throw new InvalidAllocationTagsQueryException(
|
||||||
|
"Invalid namespace prefix: " + prefix
|
||||||
|
+ ", valid values are: " + String.join(",", values));
|
||||||
|
}
|
||||||
|
|
||||||
private static AllocationTagNamespace parseAppID(String appIDStr)
|
private static AllocationTagNamespace parseAppID(String appIDStr)
|
||||||
throws InvalidAllocationTagException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
try {
|
try {
|
||||||
ApplicationId applicationId = ApplicationId.fromString(appIDStr);
|
ApplicationId applicationId = ApplicationId.fromString(appIDStr);
|
||||||
return new AppID(applicationId);
|
return new AppID(applicationId);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new InvalidAllocationTagException(
|
throw new InvalidAllocationTagsQueryException(
|
||||||
"Invalid application ID for "
|
"Invalid application ID for "
|
||||||
+ APP_ID.getTypeKeyword() + ": " + appIDStr);
|
+ APP_ID.getTypeKeyword() + ": " + appIDStr);
|
||||||
}
|
}
|
||||||
|
@ -307,11 +283,11 @@ public abstract class AllocationTagNamespace implements
|
||||||
*
|
*
|
||||||
* @param namespaceStr namespace string.
|
* @param namespaceStr namespace string.
|
||||||
* @return a list of parsed strings.
|
* @return a list of parsed strings.
|
||||||
* @throws InvalidAllocationTagException
|
* @throws InvalidAllocationTagsQueryException
|
||||||
* if namespace format is unexpected.
|
* if namespace format is unexpected.
|
||||||
*/
|
*/
|
||||||
private static List<String> normalize(String namespaceStr)
|
private static List<String> normalize(String namespaceStr)
|
||||||
throws InvalidAllocationTagException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
List<String> result = new ArrayList<>();
|
List<String> result = new ArrayList<>();
|
||||||
if (namespaceStr == null) {
|
if (namespaceStr == null) {
|
||||||
return result;
|
return result;
|
||||||
|
@ -326,7 +302,7 @@ public abstract class AllocationTagNamespace implements
|
||||||
|
|
||||||
// Currently we only allow 1 or 2 values for a namespace string
|
// Currently we only allow 1 or 2 values for a namespace string
|
||||||
if (result.size() == 0 || result.size() > 2) {
|
if (result.size() == 0 || result.size() > 2) {
|
||||||
throw new InvalidAllocationTagException("Invalid namespace string: "
|
throw new InvalidAllocationTagsQueryException("Invalid namespace string: "
|
||||||
+ namespaceStr + ", the syntax is <namespace_prefix> or"
|
+ namespaceStr + ", the syntax is <namespace_prefix> or"
|
||||||
+ " <namespace_prefix>/<namespace_value>");
|
+ " <namespace_prefix>/<namespace_value>");
|
||||||
}
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocation tags under same namespace.
|
||||||
|
*/
|
||||||
|
public final class AllocationTags {
|
||||||
|
|
||||||
|
private AllocationTagNamespace ns;
|
||||||
|
private Set<String> tags;
|
||||||
|
|
||||||
|
private AllocationTags(AllocationTagNamespace namespace,
|
||||||
|
Set<String> allocationTags) {
|
||||||
|
this.ns = namespace;
|
||||||
|
this.tags = allocationTags;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the namespace of these tags.
|
||||||
|
*/
|
||||||
|
public AllocationTagNamespace getNamespace() {
|
||||||
|
return this.ns;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the allocation tags.
|
||||||
|
*/
|
||||||
|
public Set<String> getTags() {
|
||||||
|
return this.tags;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static AllocationTags createSingleAppAllocationTags(
|
||||||
|
ApplicationId appId, Set<String> tags) {
|
||||||
|
AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId);
|
||||||
|
return new AllocationTags(namespace, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static AllocationTags createGlobalAllocationTags(Set<String> tags) {
|
||||||
|
AllocationTagNamespace namespace = new AllocationTagNamespace.All();
|
||||||
|
return new AllocationTags(namespace, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static AllocationTags createOtherAppAllocationTags(
|
||||||
|
ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags)
|
||||||
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf();
|
||||||
|
TargetApplications ta = new TargetApplications(currentApp, allIds);
|
||||||
|
namespace.evaluate(ta);
|
||||||
|
return new AllocationTags(namespace, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AllocationTags newAllocationTags(
|
||||||
|
AllocationTagNamespace namespace, Set<String> tags) {
|
||||||
|
return new AllocationTags(namespace, tags);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,9 +22,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -75,6 +78,12 @@ public class AllocationTagsManager {
|
||||||
// Map<Type, Map<Tag, Count>>
|
// Map<Type, Map<Tag, Count>>
|
||||||
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
|
private Map<T, Map<String, Long>> typeToTagsWithCount = new HashMap<>();
|
||||||
|
|
||||||
|
public TypeToCountedTags() {}
|
||||||
|
|
||||||
|
private TypeToCountedTags(Map<T, Map<String, Long>> tags) {
|
||||||
|
this.typeToTagsWithCount = tags;
|
||||||
|
}
|
||||||
|
|
||||||
// protected by external locks
|
// protected by external locks
|
||||||
private void addTags(T type, Set<String> tags) {
|
private void addTags(T type, Set<String> tags) {
|
||||||
Map<String, Long> innerMap =
|
Map<String, Long> innerMap =
|
||||||
|
@ -206,6 +215,52 @@ public class AllocationTagsManager {
|
||||||
public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
|
public Map<T, Map<String, Long>> getTypeToTagsWithCount() {
|
||||||
return typeToTagsWithCount;
|
return typeToTagsWithCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Absorbs the given {@link TypeToCountedTags} to current mapping,
|
||||||
|
* this will aggregate the count of the tags with same name.
|
||||||
|
*
|
||||||
|
* @param target a {@link TypeToCountedTags} to merge with.
|
||||||
|
*/
|
||||||
|
protected void absorb(final TypeToCountedTags<T> target) {
|
||||||
|
// No opt if the given target is null.
|
||||||
|
if (target == null || target.getTypeToTagsWithCount() == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge the target.
|
||||||
|
Map<T, Map<String, Long>> targetMap = target.getTypeToTagsWithCount();
|
||||||
|
for (Map.Entry<T, Map<String, Long>> targetEntry :
|
||||||
|
targetMap.entrySet()) {
|
||||||
|
// Get a mutable copy, do not modify the target reference.
|
||||||
|
Map<String, Long> copy = Maps.newHashMap(targetEntry.getValue());
|
||||||
|
|
||||||
|
// If the target type doesn't exist in the current mapping,
|
||||||
|
// add as a new entry.
|
||||||
|
Map<String, Long> existingMapping =
|
||||||
|
this.typeToTagsWithCount.putIfAbsent(targetEntry.getKey(), copy);
|
||||||
|
// There was a mapping for this target type,
|
||||||
|
// do proper merging on the operator.
|
||||||
|
if (existingMapping != null) {
|
||||||
|
Map<String, Long> localMap =
|
||||||
|
this.typeToTagsWithCount.get(targetEntry.getKey());
|
||||||
|
// Merge the target map to the inner map.
|
||||||
|
Map<String, Long> targetValue = targetEntry.getValue();
|
||||||
|
for (Map.Entry<String, Long> entry : targetValue.entrySet()) {
|
||||||
|
localMap.merge(entry.getKey(), entry.getValue(),
|
||||||
|
(a, b) -> Long.sum(a, b));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return an immutable copy of current instance.
|
||||||
|
*/
|
||||||
|
protected TypeToCountedTags immutableCopy() {
|
||||||
|
return new TypeToCountedTags(
|
||||||
|
Collections.unmodifiableMap(this.typeToTagsWithCount));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -235,6 +290,34 @@ public class AllocationTagsManager {
|
||||||
rmContext = context;
|
rmContext = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregates multiple {@link TypeToCountedTags} to a single one based on
|
||||||
|
* a given set of application IDs, the values are properly merged.
|
||||||
|
*
|
||||||
|
* @param appIds a set of application IDs.
|
||||||
|
* @return an aggregated {@link TypeToCountedTags}.
|
||||||
|
*/
|
||||||
|
private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds,
|
||||||
|
Map<ApplicationId, TypeToCountedTags> mapping) {
|
||||||
|
TypeToCountedTags result = new TypeToCountedTags();
|
||||||
|
if (appIds != null) {
|
||||||
|
if (appIds.size() == 1) {
|
||||||
|
// If there is only one app, we simply return the mapping
|
||||||
|
// without any extra computation.
|
||||||
|
return mapping.get(appIds.iterator().next());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ApplicationId applicationId : appIds) {
|
||||||
|
TypeToCountedTags appIdTags = mapping.get(applicationId);
|
||||||
|
if (appIdTags != null) {
|
||||||
|
// Make sure ATM state won't be changed.
|
||||||
|
result.absorb(appIdTags.immutableCopy());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify container allocated on a node.
|
* Notify container allocated on a node.
|
||||||
*
|
*
|
||||||
|
@ -458,9 +541,8 @@ public class AllocationTagsManager {
|
||||||
* to implement customized logic.
|
* to implement customized logic.
|
||||||
*
|
*
|
||||||
* @param nodeId nodeId, required.
|
* @param nodeId nodeId, required.
|
||||||
* @param applicationId applicationId. When null is specified, return
|
* @param tags {@link AllocationTags}, allocation tags under a
|
||||||
* aggregated cardinality among all applications.
|
* specific namespace. See
|
||||||
* @param tags allocation tags, see
|
|
||||||
* {@link SchedulingRequest#getAllocationTags()},
|
* {@link SchedulingRequest#getAllocationTags()},
|
||||||
* When multiple tags specified. Returns cardinality
|
* When multiple tags specified. Returns cardinality
|
||||||
* depends on op. If a specified tag doesn't exist, 0
|
* depends on op. If a specified tag doesn't exist, 0
|
||||||
|
@ -474,29 +556,28 @@ public class AllocationTagsManager {
|
||||||
* @throws InvalidAllocationTagsQueryException when illegal query
|
* @throws InvalidAllocationTagsQueryException when illegal query
|
||||||
* parameter specified
|
* parameter specified
|
||||||
*/
|
*/
|
||||||
public long getNodeCardinalityByOp(NodeId nodeId, ApplicationId applicationId,
|
public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
|
||||||
Set<String> tags, LongBinaryOperator op)
|
LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (nodeId == null || op == null) {
|
if (nodeId == null || op == null || tags == null) {
|
||||||
throw new InvalidAllocationTagsQueryException(
|
throw new InvalidAllocationTagsQueryException(
|
||||||
"Must specify nodeId/tags/op to query cardinality");
|
"Must specify nodeId/tags/op to query cardinality");
|
||||||
}
|
}
|
||||||
|
|
||||||
TypeToCountedTags mapping;
|
TypeToCountedTags mapping;
|
||||||
if (applicationId != null) {
|
if (AllocationTagNamespaceType.ALL.equals(
|
||||||
mapping = perAppNodeMappings.get(applicationId);
|
tags.getNamespace().getNamespaceType())) {
|
||||||
} else {
|
|
||||||
mapping = globalNodeMapping;
|
mapping = globalNodeMapping;
|
||||||
|
} else {
|
||||||
|
// Aggregate app tags cardinality by applications.
|
||||||
|
mapping = aggregateAllocationTags(
|
||||||
|
tags.getNamespace().getNamespaceScope(),
|
||||||
|
perAppNodeMappings);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mapping == null) {
|
return mapping == null ? 0 :
|
||||||
return 0;
|
mapping.getCardinality(nodeId, tags.getTags(), op);
|
||||||
}
|
|
||||||
|
|
||||||
return mapping.getCardinality(nodeId, tags, op);
|
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -507,9 +588,8 @@ public class AllocationTagsManager {
|
||||||
* to implement customized logic.
|
* to implement customized logic.
|
||||||
*
|
*
|
||||||
* @param rack rack, required.
|
* @param rack rack, required.
|
||||||
* @param applicationId applicationId. When null is specified, return
|
* @param tags {@link AllocationTags}, allocation tags under a
|
||||||
* aggregated cardinality among all applications.
|
* specific namespace. See
|
||||||
* @param tags allocation tags, see
|
|
||||||
* {@link SchedulingRequest#getAllocationTags()},
|
* {@link SchedulingRequest#getAllocationTags()},
|
||||||
* When multiple tags specified. Returns cardinality
|
* When multiple tags specified. Returns cardinality
|
||||||
* depends on op. If a specified tag doesn't exist, 0
|
* depends on op. If a specified tag doesn't exist, 0
|
||||||
|
@ -523,30 +603,28 @@ public class AllocationTagsManager {
|
||||||
* @throws InvalidAllocationTagsQueryException when illegal query
|
* @throws InvalidAllocationTagsQueryException when illegal query
|
||||||
* parameter specified
|
* parameter specified
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
public long getRackCardinalityByOp(String rack, AllocationTags tags,
|
||||||
public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
|
LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
|
||||||
Set<String> tags, LongBinaryOperator op)
|
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (rack == null || op == null) {
|
if (rack == null || op == null || tags == null) {
|
||||||
throw new InvalidAllocationTagsQueryException(
|
throw new InvalidAllocationTagsQueryException(
|
||||||
"Must specify rack/tags/op to query cardinality");
|
"Must specify nodeId/tags/op to query cardinality");
|
||||||
}
|
}
|
||||||
|
|
||||||
TypeToCountedTags mapping;
|
TypeToCountedTags mapping;
|
||||||
if (applicationId != null) {
|
if (AllocationTagNamespaceType.ALL.equals(
|
||||||
mapping = perAppRackMappings.get(applicationId);
|
tags.getNamespace().getNamespaceType())) {
|
||||||
} else {
|
|
||||||
mapping = globalRackMapping;
|
mapping = globalRackMapping;
|
||||||
|
} else {
|
||||||
|
// Aggregates cardinality by rack.
|
||||||
|
mapping = aggregateAllocationTags(
|
||||||
|
tags.getNamespace().getNamespaceScope(),
|
||||||
|
perAppRackMappings);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mapping == null) {
|
return mapping == null ? 0 :
|
||||||
return 0;
|
mapping.getCardinality(rack, tags.getTags(), op);
|
||||||
}
|
|
||||||
|
|
||||||
return mapping.getCardinality(rack, tags, op);
|
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
|
@ -24,11 +24,9 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
|
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.TargetApplications;
|
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
|
||||||
|
@ -38,7 +36,6 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
|
||||||
|
@ -70,43 +67,25 @@ public final class PlacementConstraintsUtil {
|
||||||
*/
|
*/
|
||||||
private static AllocationTagNamespace getAllocationTagNamespace(
|
private static AllocationTagNamespace getAllocationTagNamespace(
|
||||||
ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
|
ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
|
||||||
throws InvalidAllocationTagException{
|
throws InvalidAllocationTagsQueryException {
|
||||||
// Parse to a valid namespace.
|
// Parse to a valid namespace.
|
||||||
AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
|
AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
|
||||||
|
|
||||||
// TODO remove such check once we support all forms of namespaces
|
// TODO Complete remove this check once we support app-label.
|
||||||
if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) {
|
if (AllocationTagNamespaceType.APP_LABEL
|
||||||
throw new InvalidAllocationTagException(
|
.equals(namespace.getNamespaceType())) {
|
||||||
"Only support " + AllocationTagNamespaceType.SELF.toString()
|
throw new InvalidAllocationTagsQueryException(
|
||||||
+ " and "+ AllocationTagNamespaceType.APP_ID + " now,"
|
namespace.toString() + " is not supported yet!");
|
||||||
+ namespace.toString() + " is not supported yet!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Evaluate the namespace according to the given target
|
// Evaluate the namespace according to the given target
|
||||||
// before it can be consumed.
|
// before it can be consumed.
|
||||||
TargetApplications ta = new TargetApplications(currentAppId,
|
TargetApplications ta =
|
||||||
atm.getAllApplicationIds());
|
new TargetApplications(currentAppId, atm.getAllApplicationIds());
|
||||||
namespace.evaluate(ta);
|
namespace.evaluate(ta);
|
||||||
return namespace;
|
return namespace;
|
||||||
}
|
}
|
||||||
|
|
||||||
// We return a single app Id now, because at present,
|
|
||||||
// only self and app-id namespace is supported. But moving on,
|
|
||||||
// this will return a set of application IDs.
|
|
||||||
// TODO support other forms of namespaces
|
|
||||||
private static ApplicationId getNamespaceScope(
|
|
||||||
AllocationTagNamespace namespace)
|
|
||||||
throws InvalidAllocationTagException {
|
|
||||||
if (namespace.getNamespaceScope() == null
|
|
||||||
|| namespace.getNamespaceScope().size() != 1) {
|
|
||||||
throw new InvalidAllocationTagException(
|
|
||||||
"Invalid allocation tag namespace " + namespace.toString()
|
|
||||||
+ ", expecting it is not null and only 1 application"
|
|
||||||
+ " ID in the scope.");
|
|
||||||
}
|
|
||||||
return namespace.getNamespaceScope().iterator().next();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true if <b>single</b> placement constraint with associated
|
* Returns true if <b>single</b> placement constraint with associated
|
||||||
* allocationTags and scope is satisfied by a specific scheduler Node.
|
* allocationTags and scope is satisfied by a specific scheduler Node.
|
||||||
|
@ -128,14 +107,10 @@ public final class PlacementConstraintsUtil {
|
||||||
// Parse the allocation tag's namespace from the given target key,
|
// Parse the allocation tag's namespace from the given target key,
|
||||||
// then evaluate the namespace and get its scope,
|
// then evaluate the namespace and get its scope,
|
||||||
// which is represented by one or more application IDs.
|
// which is represented by one or more application IDs.
|
||||||
ApplicationId effectiveAppID;
|
|
||||||
try {
|
|
||||||
AllocationTagNamespace namespace = getAllocationTagNamespace(
|
AllocationTagNamespace namespace = getAllocationTagNamespace(
|
||||||
targetApplicationId, te.getTargetKey(), tm);
|
targetApplicationId, te.getTargetKey(), tm);
|
||||||
effectiveAppID = getNamespaceScope(namespace);
|
AllocationTags allocationTags = AllocationTags
|
||||||
} catch (InvalidAllocationTagException e) {
|
.newAllocationTags(namespace, te.getTargetValues());
|
||||||
throw new InvalidAllocationTagsQueryException(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
long minScopeCardinality = 0;
|
long minScopeCardinality = 0;
|
||||||
long maxScopeCardinality = 0;
|
long maxScopeCardinality = 0;
|
||||||
|
@ -149,20 +124,20 @@ public final class PlacementConstraintsUtil {
|
||||||
if (sc.getScope().equals(PlacementConstraints.NODE)) {
|
if (sc.getScope().equals(PlacementConstraints.NODE)) {
|
||||||
if (checkMinCardinality) {
|
if (checkMinCardinality) {
|
||||||
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
||||||
effectiveAppID, te.getTargetValues(), Long::max);
|
allocationTags, Long::max);
|
||||||
}
|
}
|
||||||
if (checkMaxCardinality) {
|
if (checkMaxCardinality) {
|
||||||
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
|
||||||
effectiveAppID, te.getTargetValues(), Long::min);
|
allocationTags, Long::min);
|
||||||
}
|
}
|
||||||
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
|
} else if (sc.getScope().equals(PlacementConstraints.RACK)) {
|
||||||
if (checkMinCardinality) {
|
if (checkMinCardinality) {
|
||||||
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
||||||
effectiveAppID, te.getTargetValues(), Long::max);
|
allocationTags, Long::max);
|
||||||
}
|
}
|
||||||
if (checkMaxCardinality) {
|
if (checkMaxCardinality) {
|
||||||
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
|
||||||
effectiveAppID, te.getTargetValues(), Long::min);
|
allocationTags, Long::min);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.api.records;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -37,10 +39,6 @@ public class TargetApplications {
|
||||||
this.allAppIds = allApplicationIds;
|
this.allAppIds = allApplicationIds;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ApplicationId> getAllApplicationIds() {
|
|
||||||
return this.allAppIds;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ApplicationId getCurrentApplicationId() {
|
public ApplicationId getCurrentApplicationId() {
|
||||||
return this.currentAppId;
|
return this.currentAppId;
|
||||||
}
|
}
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -138,30 +139,28 @@ class LocalAllocationTagsManager extends AllocationTagsManager {
|
||||||
return tagsManager.getNodeCardinality(nodeId, applicationId, tag);
|
return tagsManager.getNodeCardinality(nodeId, applicationId, tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
|
||||||
|
LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
|
||||||
|
return tagsManager.getNodeCardinalityByOp(nodeId, tags, op);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getRackCardinality(String rack, ApplicationId applicationId,
|
public long getRackCardinality(String rack, ApplicationId applicationId,
|
||||||
String tag) throws InvalidAllocationTagsQueryException {
|
String tag) throws InvalidAllocationTagsQueryException {
|
||||||
return tagsManager.getRackCardinality(rack, applicationId, tag);
|
return tagsManager.getRackCardinality(rack, applicationId, tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getRackCardinalityByOp(String rack, AllocationTags tags,
|
||||||
|
LongBinaryOperator op) throws InvalidAllocationTagsQueryException {
|
||||||
|
return tagsManager.getRackCardinalityByOp(rack, tags, op);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean allocationTagExistsOnNode(NodeId nodeId,
|
public boolean allocationTagExistsOnNode(NodeId nodeId,
|
||||||
ApplicationId applicationId, String tag)
|
ApplicationId applicationId, String tag)
|
||||||
throws InvalidAllocationTagsQueryException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
|
return tagsManager.allocationTagExistsOnNode(nodeId, applicationId, tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getNodeCardinalityByOp(NodeId nodeId,
|
|
||||||
ApplicationId applicationId, Set<String> tags, LongBinaryOperator op)
|
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
return tagsManager.getNodeCardinalityByOp(nodeId, applicationId, tags, op);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
|
|
||||||
Set<String> tags, LongBinaryOperator op)
|
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
return tagsManager.getRackCardinalityByOp(rack, applicationId, tags, op);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.commons.collections.IteratorUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
|
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
@ -339,18 +338,18 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
|
||||||
try {
|
try {
|
||||||
AllocationTagNamespace tagNS =
|
AllocationTagNamespace tagNS =
|
||||||
AllocationTagNamespace.parse(targetExpression.getTargetKey());
|
AllocationTagNamespace.parse(targetExpression.getTargetKey());
|
||||||
if (!AllocationTagNamespaceType.SELF
|
if (AllocationTagNamespaceType.APP_LABEL
|
||||||
.equals(tagNS.getNamespaceType())) {
|
.equals(tagNS.getNamespaceType())) {
|
||||||
throwExceptionWithMetaInfo(
|
throwExceptionWithMetaInfo(
|
||||||
"As of now, the only accepted target key for targetKey of "
|
"As of now, allocation tag namespace ["
|
||||||
+ "allocation_tag target expression is: ["
|
+ AllocationTagNamespaceType.APP_LABEL.toString()
|
||||||
+ AllocationTagNamespaceType.SELF.toString()
|
+ "] is not supported. Please make changes to placement "
|
||||||
+ "]. Please make changes to placement constraints "
|
+ "constraints accordingly. If this is null, it will be "
|
||||||
+ "accordingly. If this is null, it will be set to "
|
+ "set to "
|
||||||
+ AllocationTagNamespaceType.SELF.toString()
|
+ AllocationTagNamespaceType.SELF.toString()
|
||||||
+ " by default.");
|
+ " by default.");
|
||||||
}
|
}
|
||||||
} catch (InvalidAllocationTagException e) {
|
} catch (InvalidAllocationTagsQueryException e) {
|
||||||
throwExceptionWithMetaInfo(
|
throwExceptionWithMetaInfo(
|
||||||
"Invalid allocation tag namespace, message: " + e.getMessage());
|
"Invalid allocation tag namespace, message: " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,6 +305,14 @@ public class MockAM {
|
||||||
public AllocateResponse allocateIntraAppAntiAffinity(
|
public AllocateResponse allocateIntraAppAntiAffinity(
|
||||||
ResourceSizing resourceSizing, Priority priority, long allocationId,
|
ResourceSizing resourceSizing, Priority priority, long allocationId,
|
||||||
Set<String> allocationTags, String... targetTags) throws Exception {
|
Set<String> allocationTags, String... targetTags) throws Exception {
|
||||||
|
return allocateAppAntiAffinity(resourceSizing, priority, allocationId,
|
||||||
|
null, allocationTags, targetTags);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AllocateResponse allocateAppAntiAffinity(
|
||||||
|
ResourceSizing resourceSizing, Priority priority, long allocationId,
|
||||||
|
String namespace, Set<String> allocationTags, String... targetTags)
|
||||||
|
throws Exception {
|
||||||
return this.allocate(null,
|
return this.allocate(null,
|
||||||
Arrays.asList(SchedulingRequest.newBuilder().executionType(
|
Arrays.asList(SchedulingRequest.newBuilder().executionType(
|
||||||
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
|
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
|
||||||
|
@ -313,7 +321,8 @@ public class MockAM {
|
||||||
PlacementConstraints
|
PlacementConstraints
|
||||||
.targetNotIn(PlacementConstraints.NODE,
|
.targetNotIn(PlacementConstraints.NODE,
|
||||||
PlacementConstraints.PlacementTargets
|
PlacementConstraints.PlacementTargets
|
||||||
.allocationTagToIntraApp(targetTags)).build())
|
.allocationTagWithNamespace(namespace, targetTags))
|
||||||
|
.build())
|
||||||
.resourceSizing(resourceSizing).build()), null);
|
.resourceSizing(resourceSizing).build()), null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -428,20 +430,27 @@ public class TestRMContainerImpl {
|
||||||
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1), null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
rmContainer.handle(new RMContainerEvent(containerId,
|
||||||
RMContainerEventType.START));
|
RMContainerEventType.START));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
|
rmContainer.handle(new RMContainerFinishedEvent(containerId, ContainerStatus
|
||||||
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
|
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
|
||||||
RMContainerEventType.KILL));
|
RMContainerEventType.KILL));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
/* Second container: ACQUIRED -> FINISHED */
|
/* Second container: ACQUIRED -> FINISHED */
|
||||||
rmContainer = new RMContainerImpl(container,
|
rmContainer = new RMContainerImpl(container,
|
||||||
|
@ -449,14 +458,18 @@ public class TestRMContainerImpl {
|
||||||
nodeId, "user", rmContext);
|
nodeId, "user", rmContext);
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
rmContainer.handle(new RMContainerEvent(containerId,
|
||||||
RMContainerEventType.START));
|
RMContainerEventType.START));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
|
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
|
||||||
|
@ -466,7 +479,9 @@ public class TestRMContainerImpl {
|
||||||
RMContainerEventType.FINISHED));
|
RMContainerEventType.FINISHED));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
/* Third container: RUNNING -> FINISHED */
|
/* Third container: RUNNING -> FINISHED */
|
||||||
rmContainer = new RMContainerImpl(container,
|
rmContainer = new RMContainerImpl(container,
|
||||||
|
@ -475,13 +490,17 @@ public class TestRMContainerImpl {
|
||||||
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.handle(new RMContainerEvent(containerId,
|
rmContainer.handle(new RMContainerEvent(containerId,
|
||||||
RMContainerEventType.START));
|
RMContainerEventType.START));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
|
new RMContainerEvent(containerId, RMContainerEventType.ACQUIRED));
|
||||||
|
@ -494,7 +513,9 @@ public class TestRMContainerImpl {
|
||||||
RMContainerEventType.FINISHED));
|
RMContainerEventType.FINISHED));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
/* Fourth container: NEW -> RECOVERED */
|
/* Fourth container: NEW -> RECOVERED */
|
||||||
rmContainer = new RMContainerImpl(container,
|
rmContainer = new RMContainerImpl(container,
|
||||||
|
@ -503,7 +524,9 @@ public class TestRMContainerImpl {
|
||||||
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
rmContainer.setAllocationTags(ImmutableSet.of("mapper"));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
NMContainerStatus containerStatus = NMContainerStatus
|
NMContainerStatus containerStatus = NMContainerStatus
|
||||||
.newInstance(containerId, 0, ContainerState.NEW,
|
.newInstance(containerId, 0, ContainerState.NEW,
|
||||||
|
@ -514,6 +537,8 @@ public class TestRMContainerImpl {
|
||||||
.handle(new RMContainerRecoverEvent(containerId, containerStatus));
|
.handle(new RMContainerRecoverEvent(containerId, containerStatus));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
tagsManager.getNodeCardinalityByOp(nodeId, appId, null, Long::max));
|
tagsManager.getNodeCardinalityByOp(nodeId,
|
||||||
|
AllocationTags.createSingleAppAllocationTags(appId, null),
|
||||||
|
Long::max));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
||||||
|
@ -224,6 +225,131 @@ public class TestSchedulingRequestContainerAllocation {
|
||||||
rm1.close();
|
rm1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This UT covers some basic end-to-end inter-app anti-affinity
|
||||||
|
* constraint tests. For comprehensive tests over different namespace
|
||||||
|
* types, see more in TestPlacementConstraintsUtil.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInterAppAntiAffinity() throws Exception {
|
||||||
|
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
||||||
|
new Configuration());
|
||||||
|
csConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||||
|
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||||
|
|
||||||
|
// inject node label manager
|
||||||
|
MockRM rm1 = new MockRM(csConf) {
|
||||||
|
@Override
|
||||||
|
public RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
rm1.getRMContext().setNodeLabelManager(mgr);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
// 4 NMs.
|
||||||
|
MockNM[] nms = new MockNM[4];
|
||||||
|
RMNode[] rmNodes = new RMNode[4];
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
nms[i] = rm1.registerNode("192.168.0." + i + ":1234", 10 * GB);
|
||||||
|
rmNodes[i] = rm1.getRMContext().getRMNodes().get(nms[i].getNodeId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// app1 -> c
|
||||||
|
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms[0]);
|
||||||
|
|
||||||
|
// app1 asks for 3 anti-affinity containers for the same app. It should
|
||||||
|
// only get 3 containers allocated to 3 different nodes..
|
||||||
|
am1.allocateIntraAppAntiAffinity(
|
||||||
|
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
||||||
|
Priority.newInstance(1), 1L, ImmutableSet.of("mapper"), "mapper");
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
for (int j = 0; j < 4; j++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("Mappers on HOST0: "
|
||||||
|
+ rmNodes[0].getAllocationTagsWithCount().get("mapper"));
|
||||||
|
System.out.println("Mappers on HOST1: "
|
||||||
|
+ rmNodes[1].getAllocationTagsWithCount().get("mapper"));
|
||||||
|
System.out.println("Mappers on HOST2: "
|
||||||
|
+ rmNodes[2].getAllocationTagsWithCount().get("mapper"));
|
||||||
|
|
||||||
|
// App1 should get 4 containers allocated (1 AM + 3 mappers).
|
||||||
|
FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
|
||||||
|
am1.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(4, schedulerApp.getLiveContainers().size());
|
||||||
|
|
||||||
|
// app2 -> c
|
||||||
|
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
||||||
|
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms[0]);
|
||||||
|
|
||||||
|
// App2 asks for 3 containers that anti-affinity with any mapper,
|
||||||
|
// since 3 out of 4 nodes already have mapper containers, all 3
|
||||||
|
// containers will be allocated on the other node.
|
||||||
|
AllocationTagNamespace.All allNs = new AllocationTagNamespace.All();
|
||||||
|
am2.allocateAppAntiAffinity(
|
||||||
|
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
||||||
|
Priority.newInstance(1), 1L, allNs.toString(),
|
||||||
|
ImmutableSet.of("foo"), "mapper");
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
for (int j = 0; j < 4; j++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
|
||||||
|
am2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// App2 should get 4 containers allocated (1 AM + 3 container).
|
||||||
|
Assert.assertEquals(4, schedulerApp2.getLiveContainers().size());
|
||||||
|
|
||||||
|
// The allocated node should not have mapper tag.
|
||||||
|
Assert.assertTrue(schedulerApp2.getLiveContainers()
|
||||||
|
.stream().allMatch(rmContainer -> {
|
||||||
|
// except the nm host
|
||||||
|
if (!rmContainer.getContainer().getNodeId().equals(rmNodes[0])) {
|
||||||
|
return !rmContainer.getAllocationTags().contains("mapper");
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}));
|
||||||
|
|
||||||
|
// app3 -> c
|
||||||
|
RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "c");
|
||||||
|
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nms[0]);
|
||||||
|
|
||||||
|
// App3 asks for 3 containers that anti-affinity with any mapper.
|
||||||
|
// Unlike the former case, since app3 source tags are also mapper,
|
||||||
|
// it will anti-affinity with itself too. So there will be only 1
|
||||||
|
// container be allocated.
|
||||||
|
am3.allocateAppAntiAffinity(
|
||||||
|
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
|
||||||
|
Priority.newInstance(1), 1L, allNs.toString(),
|
||||||
|
ImmutableSet.of("mapper"), "mapper");
|
||||||
|
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
for (int j = 0; j < 4; j++) {
|
||||||
|
cs.handle(new NodeUpdateSchedulerEvent(rmNodes[j]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt(
|
||||||
|
am3.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// App3 should get 2 containers allocated (1 AM + 1 container).
|
||||||
|
Assert.assertEquals(2, schedulerApp3.getLiveContainers().size());
|
||||||
|
|
||||||
|
rm1.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSchedulingRequestDisabledByDefault() throws Exception {
|
public void testSchedulingRequestDisabledByDefault() throws Exception {
|
||||||
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
Configuration csConf = TestUtils.getConfigurationWithMultipleQueues(
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
|
@ -96,7 +97,9 @@ public class TestAllocationTagsManager {
|
||||||
// Get Node Cardinality of app1 on node1, with tag "mapper"
|
// Get Node Cardinality of app1 on node1, with tag "mapper"
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
Long::max));
|
Long::max));
|
||||||
|
|
||||||
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
|
// Get Rack Cardinality of app1 on rack0, with tag "mapper"
|
||||||
|
@ -106,20 +109,26 @@ public class TestAllocationTagsManager {
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::min));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::min));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::max));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
|
||||||
Assert.assertEquals(3,
|
Assert.assertEquals(3,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::sum));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality by passing single tag.
|
// Get Node Cardinality by passing single tag.
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
|
@ -134,38 +143,52 @@ public class TestAllocationTagsManager {
|
||||||
// op=min
|
// op=min
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("no_existed", "reducer"), Long::min));
|
ImmutableSet.of("no_existed", "reducer")),
|
||||||
|
Long::min));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
|
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
|
||||||
// (Expect this returns #containers from app1 on node2)
|
// (Expect this returns #containers from app1 on node2)
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), null, Long::max));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1), null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
|
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), null, Long::max));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1), null),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Cardinality of app1 on node2, with empty tag set, op=max
|
// Get Cardinality of app1 on node2, with empty tag set, op=max
|
||||||
Assert.assertEquals(2,
|
Assert.assertEquals(2,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1), ImmutableSet.of()),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
|
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(4, atm.getNodeCardinalityByOp(
|
Assert.assertEquals(4, atm.getNodeCardinalityByOp(
|
||||||
NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
|
NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(3,
|
Assert.assertEquals(3,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1), ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(2), ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Finish all containers:
|
// Finish all containers:
|
||||||
atm.removeContainer(NodeId.fromString("host1:123"),
|
atm.removeContainer(NodeId.fromString("host1:123"),
|
||||||
|
@ -189,33 +212,42 @@ public class TestAllocationTagsManager {
|
||||||
// Get Cardinality of app1 on node1, with tag "mapper"
|
// Get Cardinality of app1 on node1, with tag "mapper"
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
Long::max));
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=min
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::min));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::min));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=max
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::max));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
|
// Get Node Cardinality of app1 on node2, with tag "mapper/reducer", op=sum
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of("mapper", "reducer"), Long::sum));
|
ImmutableSet.of("mapper", "reducer")),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
|
// Get Node Cardinality of app1 on node2, with tag "<applicationId>", op=max
|
||||||
// (Expect this returns #containers from app1 on node2)
|
// (Expect this returns #containers from app1 on node2)
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
TestUtils.getMockApplicationId(1),
|
TestUtils.getMockApplicationId(1),
|
||||||
ImmutableSet.of(TestUtils.getMockApplicationId(1).toString()),
|
ImmutableSet.of(TestUtils.getMockApplicationId(1).toString())),
|
||||||
Long::max));
|
Long::max));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
|
@ -226,21 +258,32 @@ public class TestAllocationTagsManager {
|
||||||
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
|
// Get Node Cardinality of app1 on node2, with empty tag set, op=max
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of()),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
|
// Get Node Cardinality of all apps on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(0, atm.getNodeCardinalityByOp(
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(
|
||||||
NodeId.fromString("host2:123"), null, ImmutableSet.of(), Long::sum));
|
NodeId.fromString("host2:123"),
|
||||||
|
AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
// Get Node Cardinality of app_1 on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::sum));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
|
|
||||||
// Get Node Cardinality of app_2 on node2, with empty tag set, op=sum
|
// Get Node Cardinality of app_2 on node2, with empty tag set, op=sum
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of(), Long::sum));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of()),
|
||||||
|
Long::sum));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -296,20 +339,26 @@ public class TestAllocationTagsManager {
|
||||||
|
|
||||||
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
|
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=max
|
||||||
Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
|
Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of()),
|
||||||
|
Long::max));
|
||||||
|
|
||||||
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
|
// Get Rack Cardinality of app_1 on rack0, with empty tag set, op=min
|
||||||
Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
|
Assert.assertEquals(1, atm.getRackCardinalityByOp("rack0",
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::min));
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of()),
|
||||||
|
Long::min));
|
||||||
|
|
||||||
// Get Rack Cardinality of all apps on rack0, with empty tag set, op=min
|
// Get Rack Cardinality of all apps on rack0, with empty tag set, op=min
|
||||||
Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0", null,
|
Assert.assertEquals(3, atm.getRackCardinalityByOp("rack0",
|
||||||
ImmutableSet.of(), Long::max));
|
AllocationTags.createGlobalAllocationTags(ImmutableSet.of()),
|
||||||
|
Long::max));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAllocationTagsManagerMemoryAfterCleanup()
|
public void testAllocationTagsManagerMemoryAfterCleanup() {
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
/**
|
/**
|
||||||
* Make sure YARN cleans up all memory once container/app finishes.
|
* Make sure YARN cleans up all memory once container/app finishes.
|
||||||
*/
|
*/
|
||||||
|
@ -362,8 +411,7 @@ public class TestAllocationTagsManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueryCardinalityWithIllegalParameters()
|
public void testQueryCardinalityWithIllegalParameters() {
|
||||||
throws InvalidAllocationTagsQueryException {
|
|
||||||
/**
|
/**
|
||||||
* Make sure YARN cleans up all memory once container/app finishes.
|
* Make sure YARN cleans up all memory once container/app finishes.
|
||||||
*/
|
*/
|
||||||
|
@ -391,9 +439,12 @@ public class TestAllocationTagsManager {
|
||||||
// No node-id
|
// No node-id
|
||||||
boolean caughtException = false;
|
boolean caughtException = false;
|
||||||
try {
|
try {
|
||||||
atm.getNodeCardinalityByOp(null, TestUtils.getMockApplicationId(2),
|
atm.getNodeCardinalityByOp(null,
|
||||||
ImmutableSet.of("mapper"), Long::min);
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
} catch (InvalidAllocationTagsQueryException e) {
|
TestUtils.getMockApplicationId(2),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
|
Long::min);
|
||||||
|
} catch (InvalidAllocationTagsQueryException e1) {
|
||||||
caughtException = true;
|
caughtException = true;
|
||||||
}
|
}
|
||||||
Assert.assertTrue("should fail because of nodeId specified",
|
Assert.assertTrue("should fail because of nodeId specified",
|
||||||
|
@ -403,11 +454,150 @@ public class TestAllocationTagsManager {
|
||||||
caughtException = false;
|
caughtException = false;
|
||||||
try {
|
try {
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of("mapper"), null);
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
} catch (InvalidAllocationTagsQueryException e) {
|
TestUtils.getMockApplicationId(2),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
|
null);
|
||||||
|
} catch (InvalidAllocationTagsQueryException e1) {
|
||||||
caughtException = true;
|
caughtException = true;
|
||||||
}
|
}
|
||||||
Assert.assertTrue("should fail because of nodeId specified",
|
Assert.assertTrue("should fail because of nodeId specified",
|
||||||
caughtException);
|
caughtException);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeAllocationTagsAggregation()
|
||||||
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
|
||||||
|
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
|
||||||
|
ApplicationId app1 = TestUtils.getMockApplicationId(1);
|
||||||
|
ApplicationId app2 = TestUtils.getMockApplicationId(2);
|
||||||
|
ApplicationId app3 = TestUtils.getMockApplicationId(3);
|
||||||
|
NodeId host1 = NodeId.fromString("host1:123");
|
||||||
|
NodeId host2 = NodeId.fromString("host2:123");
|
||||||
|
NodeId host3 = NodeId.fromString("host3:123");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Node1 (rack0)
|
||||||
|
* app1/A(2)
|
||||||
|
* app1/B(1)
|
||||||
|
* app2/A(3)
|
||||||
|
* app3/A(1)
|
||||||
|
*
|
||||||
|
* Node2 (rack0)
|
||||||
|
* app2/A(1)
|
||||||
|
* app2/B(2)
|
||||||
|
* app1/C(1)
|
||||||
|
* app3/B(1)
|
||||||
|
*
|
||||||
|
* Node3 (rack1):
|
||||||
|
* app2/D(1)
|
||||||
|
* app3/D(1)
|
||||||
|
*/
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(1, 1),
|
||||||
|
ImmutableSet.of("A", "B"));
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(1, 2),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(2, 1),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(2, 2),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(2, 3),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
atm.addContainer(host1, TestUtils.getMockContainerId(3, 1),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
|
||||||
|
atm.addContainer(host2, TestUtils.getMockContainerId(1, 3),
|
||||||
|
ImmutableSet.of("C"));
|
||||||
|
atm.addContainer(host2, TestUtils.getMockContainerId(2, 4),
|
||||||
|
ImmutableSet.of("A"));
|
||||||
|
atm.addContainer(host2, TestUtils.getMockContainerId(2, 5),
|
||||||
|
ImmutableSet.of("B"));
|
||||||
|
atm.addContainer(host2, TestUtils.getMockContainerId(2, 6),
|
||||||
|
ImmutableSet.of("B"));
|
||||||
|
atm.addContainer(host2, TestUtils.getMockContainerId(3, 2),
|
||||||
|
ImmutableSet.of("B"));
|
||||||
|
|
||||||
|
atm.addContainer(host3, TestUtils.getMockContainerId(2, 7),
|
||||||
|
ImmutableSet.of("D"));
|
||||||
|
atm.addContainer(host3, TestUtils.getMockContainerId(3, 3),
|
||||||
|
ImmutableSet.of("D"));
|
||||||
|
|
||||||
|
// Target applications, current app: app1
|
||||||
|
// all apps: app1, app2, app3
|
||||||
|
TargetApplications ta = new TargetApplications(app1,
|
||||||
|
ImmutableSet.of(app1, app2, app3));
|
||||||
|
|
||||||
|
//********************************
|
||||||
|
// 1) self (app1)
|
||||||
|
//********************************
|
||||||
|
AllocationTags tags = AllocationTags
|
||||||
|
.createSingleAppAllocationTags(app1, ImmutableSet.of("A", "C"));
|
||||||
|
Assert.assertEquals(2, atm.getNodeCardinalityByOp(host1, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
|
||||||
|
Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host2, tags, Long::min));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min));
|
||||||
|
|
||||||
|
//********************************
|
||||||
|
// 2) not-self (app2, app3)
|
||||||
|
//********************************
|
||||||
|
/**
|
||||||
|
* Verify max/min cardinality of tag "A" on host1 from all applications
|
||||||
|
* other than app1. This returns the max/min cardinality of tag "A" of
|
||||||
|
* app2 or app3 on this node.
|
||||||
|
*
|
||||||
|
* Node1 (rack0)
|
||||||
|
* app1/A(1)
|
||||||
|
* app1/B(1)
|
||||||
|
* app2/A(3)
|
||||||
|
* app3/A(1)
|
||||||
|
*
|
||||||
|
* app2_app3/A(4)
|
||||||
|
* app2_app3/B(0)
|
||||||
|
*
|
||||||
|
* expecting to return max=3, min=1
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
tags = AllocationTags.createOtherAppAllocationTags(app1,
|
||||||
|
ImmutableSet.of(app1, app2, app3), ImmutableSet.of("A", "B"));
|
||||||
|
|
||||||
|
Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
|
||||||
|
Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
|
||||||
|
|
||||||
|
//********************************
|
||||||
|
// 3) app-id/app2 (app2)
|
||||||
|
//********************************
|
||||||
|
tags = AllocationTags
|
||||||
|
.createSingleAppAllocationTags(app2, ImmutableSet.of("A", "B"));
|
||||||
|
Assert.assertEquals(3, atm.getNodeCardinalityByOp(host1, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));
|
||||||
|
Assert.assertEquals(2, atm.getNodeCardinalityByOp(host2, tags, Long::max));
|
||||||
|
Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min));
|
||||||
|
Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
|
||||||
|
|
||||||
|
|
||||||
|
//********************************
|
||||||
|
// 4) all (app1, app2, app3)
|
||||||
|
//********************************
|
||||||
|
tags = AllocationTags
|
||||||
|
.createGlobalAllocationTags(ImmutableSet.of("A"));
|
||||||
|
Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
|
||||||
|
Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum));
|
||||||
|
|
||||||
|
tags = AllocationTags
|
||||||
|
.createGlobalAllocationTags(ImmutableSet.of("A", "B"));
|
||||||
|
Assert.assertEquals(7, atm.getNodeCardinalityByOp(host1, tags, Long::sum));
|
||||||
|
Assert.assertEquals(4, atm.getNodeCardinalityByOp(host2, tags, Long::sum));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::sum));
|
||||||
|
Assert.assertEquals(6, atm.getNodeCardinalityByOp(host1, tags, Long::max));
|
||||||
|
Assert.assertEquals(3, atm.getNodeCardinalityByOp(host2, tags, Long::max));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::max));
|
||||||
|
Assert.assertEquals(1, atm.getNodeCardinalityByOp(host1, tags, Long::min));
|
||||||
|
Assert.assertEquals(1, atm.getNodeCardinalityByOp(host2, tags, Long::min));
|
||||||
|
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host3, tags, Long::min));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,10 +16,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /**
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
|
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.TargetApplications;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -29,29 +27,34 @@ import org.junit.Test;
|
||||||
public class TestAllocationTagsNamespace {
|
public class TestAllocationTagsNamespace {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNamespaceParse() throws InvalidAllocationTagException {
|
public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
|
||||||
AllocationTagNamespace namespace;
|
AllocationTagNamespace namespace;
|
||||||
|
|
||||||
String namespaceStr = "self";
|
String namespaceStr = "self";
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.assertTrue(namespace.isIntraApp());
|
Assert.assertEquals(AllocationTagNamespaceType.SELF,
|
||||||
|
namespace.getNamespaceType());
|
||||||
|
|
||||||
namespaceStr = "not-self";
|
namespaceStr = "not-self";
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.assertTrue(namespace.isNotSelf());
|
Assert.assertEquals(AllocationTagNamespaceType.NOT_SELF,
|
||||||
|
namespace.getNamespaceType());
|
||||||
|
|
||||||
namespaceStr = "all";
|
namespaceStr = "all";
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.assertTrue(namespace.isGlobal());
|
Assert.assertEquals(AllocationTagNamespaceType.ALL,
|
||||||
|
namespace.getNamespaceType());
|
||||||
|
|
||||||
namespaceStr = "app-label";
|
namespaceStr = "app-label";
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.assertTrue(namespace.isAppLabel());
|
Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL,
|
||||||
|
namespace.getNamespaceType());
|
||||||
|
|
||||||
ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
|
ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
|
||||||
namespaceStr = "app-id/" + applicationId.toString();
|
namespaceStr = "app-id/" + applicationId.toString();
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.assertTrue(namespace.isSingleInterApp());
|
Assert.assertEquals(AllocationTagNamespaceType.APP_ID,
|
||||||
|
namespace.getNamespaceType());
|
||||||
|
|
||||||
// Invalid app-id namespace syntax, invalid app ID.
|
// Invalid app-id namespace syntax, invalid app ID.
|
||||||
try {
|
try {
|
||||||
|
@ -59,7 +62,7 @@ public class TestAllocationTagsNamespace {
|
||||||
AllocationTagNamespace.parse(namespaceStr);
|
AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.fail("Parsing should fail as the given app ID is invalid");
|
Assert.fail("Parsing should fail as the given app ID is invalid");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e instanceof InvalidAllocationTagException);
|
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
|
||||||
Assert.assertTrue(e.getMessage().startsWith(
|
Assert.assertTrue(e.getMessage().startsWith(
|
||||||
"Invalid application ID for app-id"));
|
"Invalid application ID for app-id"));
|
||||||
}
|
}
|
||||||
|
@ -71,7 +74,7 @@ public class TestAllocationTagsNamespace {
|
||||||
Assert.fail("Parsing should fail as the given namespace"
|
Assert.fail("Parsing should fail as the given namespace"
|
||||||
+ " is missing application ID");
|
+ " is missing application ID");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e instanceof InvalidAllocationTagException);
|
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
|
||||||
Assert.assertTrue(e.getMessage().startsWith(
|
Assert.assertTrue(e.getMessage().startsWith(
|
||||||
"Missing the application ID in the namespace string"));
|
"Missing the application ID in the namespace string"));
|
||||||
}
|
}
|
||||||
|
@ -82,14 +85,15 @@ public class TestAllocationTagsNamespace {
|
||||||
AllocationTagNamespace.parse(namespaceStr);
|
AllocationTagNamespace.parse(namespaceStr);
|
||||||
Assert.fail("Parsing should fail as the giving type is not supported.");
|
Assert.fail("Parsing should fail as the giving type is not supported.");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e instanceof InvalidAllocationTagException);
|
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
|
||||||
Assert.assertTrue(e.getMessage().startsWith(
|
Assert.assertTrue(e.getMessage().startsWith(
|
||||||
"Invalid namespace prefix"));
|
"Invalid namespace prefix"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNamespaceEvaluation() throws InvalidAllocationTagException {
|
public void testNamespaceEvaluation() throws
|
||||||
|
InvalidAllocationTagsQueryException {
|
||||||
AllocationTagNamespace namespace;
|
AllocationTagNamespace namespace;
|
||||||
TargetApplications targetApplications;
|
TargetApplications targetApplications;
|
||||||
ApplicationId app1 = ApplicationId.newInstance(10000, 1);
|
ApplicationId app1 = ApplicationId.newInstance(10000, 1);
|
||||||
|
@ -131,10 +135,8 @@ public class TestAllocationTagsNamespace {
|
||||||
|
|
||||||
namespaceStr = "all";
|
namespaceStr = "all";
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
targetApplications = new TargetApplications(null,
|
Assert.assertEquals(AllocationTagNamespaceType.ALL,
|
||||||
ImmutableSet.of(app1, app2));
|
namespace.getNamespaceType());
|
||||||
namespace.evaluate(targetApplications);
|
|
||||||
Assert.assertEquals(2, namespace.getNamespaceScope().size());
|
|
||||||
|
|
||||||
namespaceStr = "app-id/" + app2.toString();
|
namespaceStr = "app-id/" + app2.toString();
|
||||||
namespace = AllocationTagNamespace.parse(namespaceStr);
|
namespace = AllocationTagNamespace.parse(namespaceStr);
|
||||||
|
|
|
@ -41,7 +41,6 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -512,6 +511,252 @@ public class TestPlacementConstraintsUtil {
|
||||||
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGlobalAppConstraints()
|
||||||
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
|
||||||
|
PlacementConstraintManagerService pcm =
|
||||||
|
new MemoryPlacementConstraintManager();
|
||||||
|
rmContext.setAllocationTagsManager(tm);
|
||||||
|
rmContext.setPlacementConstraintManager(pcm);
|
||||||
|
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
|
||||||
|
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
|
||||||
|
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
|
||||||
|
|
||||||
|
// Register App1 with anti-affinity constraint map.
|
||||||
|
RMNode n0r1 = rmNodes.get(0);
|
||||||
|
RMNode n1r1 = rmNodes.get(1);
|
||||||
|
RMNode n2r2 = rmNodes.get(2);
|
||||||
|
RMNode n3r2 = rmNodes.get(3);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Place container:
|
||||||
|
* n0: app1/A(1), app2/A(1)
|
||||||
|
* n1: app3/A(3)
|
||||||
|
* n2: app1/A(2)
|
||||||
|
* n3: ""
|
||||||
|
*/
|
||||||
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
|
newContainerId(application2), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
|
||||||
|
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
|
||||||
|
n0r1.getRackName(), n0r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
|
||||||
|
n1r1.getRackName(), n1r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
|
||||||
|
n2r2.getRackName(), n2r2.getNodeID());
|
||||||
|
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
|
||||||
|
n3r2.getRackName(), n3r2.getNodeID());
|
||||||
|
|
||||||
|
AllocationTagNamespace namespaceAll =
|
||||||
|
new AllocationTagNamespace.All();
|
||||||
|
|
||||||
|
//***************************
|
||||||
|
// 1) all, anti-affinity
|
||||||
|
//***************************
|
||||||
|
// Anti-affinity with "A" from any application including itself.
|
||||||
|
PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
|
||||||
|
NODE, allocationTagWithNamespace(namespaceAll.toString(), "A"))
|
||||||
|
.build();
|
||||||
|
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
|
||||||
|
Set<String> srcTags1 = ImmutableSet.of("A");
|
||||||
|
constraintMap.put(srcTags1, constraint1);
|
||||||
|
pcm.registerApplication(application1, constraintMap);
|
||||||
|
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
pcm.unregisterApplication(application1);
|
||||||
|
|
||||||
|
//***************************
|
||||||
|
// 2) all, max cardinality
|
||||||
|
//***************************
|
||||||
|
PlacementConstraint constraint2 = PlacementConstraints
|
||||||
|
.maxCardinality(NODE, namespaceAll.toString(), 2, "A")
|
||||||
|
.build();
|
||||||
|
constraintMap.clear();
|
||||||
|
Set<String> srcTags2 = ImmutableSet.of("foo");
|
||||||
|
constraintMap.put(srcTags2, constraint2);
|
||||||
|
pcm.registerApplication(application2, constraintMap);
|
||||||
|
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application2, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application2, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application2, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application2, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
pcm.unregisterApplication(application2);
|
||||||
|
|
||||||
|
//***************************
|
||||||
|
// 3) all, min cardinality
|
||||||
|
//***************************
|
||||||
|
PlacementConstraint constraint3 = PlacementConstraints
|
||||||
|
.minCardinality(NODE, namespaceAll.toString(), 3, "A")
|
||||||
|
.build();
|
||||||
|
constraintMap.clear();
|
||||||
|
Set<String> srcTags3 = ImmutableSet.of("foo");
|
||||||
|
constraintMap.put(srcTags3, constraint3);
|
||||||
|
pcm.registerApplication(application3, constraintMap);
|
||||||
|
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application3, createSchedulingRequest(srcTags3),
|
||||||
|
schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application3, createSchedulingRequest(srcTags3),
|
||||||
|
schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application3, createSchedulingRequest(srcTags3),
|
||||||
|
schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application3, createSchedulingRequest(srcTags3),
|
||||||
|
schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
pcm.unregisterApplication(application3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotSelfAppConstraints()
|
||||||
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
|
||||||
|
PlacementConstraintManagerService pcm =
|
||||||
|
new MemoryPlacementConstraintManager();
|
||||||
|
rmContext.setAllocationTagsManager(tm);
|
||||||
|
rmContext.setPlacementConstraintManager(pcm);
|
||||||
|
|
||||||
|
long ts = System.currentTimeMillis();
|
||||||
|
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
|
||||||
|
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
|
||||||
|
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
|
||||||
|
|
||||||
|
// Register App1 with anti-affinity constraint map.
|
||||||
|
RMNode n0r1 = rmNodes.get(0);
|
||||||
|
RMNode n1r1 = rmNodes.get(1);
|
||||||
|
RMNode n2r2 = rmNodes.get(2);
|
||||||
|
RMNode n3r2 = rmNodes.get(3);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Place container:
|
||||||
|
* n0: app1/A(1), app2/A(1)
|
||||||
|
* n1: app3/A(3)
|
||||||
|
* n2: app1/A(2)
|
||||||
|
* n3: ""
|
||||||
|
*/
|
||||||
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n0r1.getNodeID(),
|
||||||
|
newContainerId(application2), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n1r1.getNodeID(),
|
||||||
|
newContainerId(application3), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
tm.addContainer(n2r2.getNodeID(),
|
||||||
|
newContainerId(application1), ImmutableSet.of("A"));
|
||||||
|
|
||||||
|
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
|
||||||
|
n0r1.getRackName(), n0r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
|
||||||
|
n1r1.getRackName(), n1r1.getNodeID());
|
||||||
|
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
|
||||||
|
n2r2.getRackName(), n2r2.getNodeID());
|
||||||
|
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
|
||||||
|
n3r2.getRackName(), n3r2.getNodeID());
|
||||||
|
|
||||||
|
AllocationTagNamespace notSelf =
|
||||||
|
new AllocationTagNamespace.NotSelf();
|
||||||
|
|
||||||
|
//***************************
|
||||||
|
// 1) not-self, app1
|
||||||
|
//***************************
|
||||||
|
// Anti-affinity with "A" from app2 and app3,
|
||||||
|
// n0 and n1 both have tag "A" from either app2 or app3, so they are
|
||||||
|
// not qualified for the placement.
|
||||||
|
PlacementConstraint constraint1 = PlacementConstraints.targetNotIn(
|
||||||
|
NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
|
||||||
|
.build();
|
||||||
|
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
|
||||||
|
Set<String> srcTags1 = ImmutableSet.of("A");
|
||||||
|
constraintMap.put(srcTags1, constraint1);
|
||||||
|
pcm.registerApplication(application1, constraintMap);
|
||||||
|
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags1),
|
||||||
|
schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
pcm.unregisterApplication(application1);
|
||||||
|
|
||||||
|
//***************************
|
||||||
|
// 2) not-self, app1
|
||||||
|
//***************************
|
||||||
|
// Affinity with "A" from app2 and app3,
|
||||||
|
// N0 and n1 are qualified for the placement.
|
||||||
|
PlacementConstraint constraint2 = PlacementConstraints.targetIn(
|
||||||
|
NODE, allocationTagWithNamespace(notSelf.toString(), "A"))
|
||||||
|
.build();
|
||||||
|
Map<Set<String>, PlacementConstraint> cm2 = new HashMap<>();
|
||||||
|
Set<String> srcTags2 = ImmutableSet.of("A");
|
||||||
|
cm2.put(srcTags2, constraint2);
|
||||||
|
pcm.registerApplication(application1, cm2);
|
||||||
|
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode0, pcm, tm));
|
||||||
|
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode1, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode2, pcm, tm));
|
||||||
|
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
|
||||||
|
application1, createSchedulingRequest(srcTags2),
|
||||||
|
schedulerNode3, pcm, tm));
|
||||||
|
|
||||||
|
pcm.unregisterApplication(application1);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInterAppConstraintsByAppID()
|
public void testInterAppConstraintsByAppID()
|
||||||
throws InvalidAllocationTagsQueryException {
|
throws InvalidAllocationTagsQueryException {
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -85,46 +86,62 @@ public class TestLocalAllocationTagsManager {
|
||||||
// Expect tag mappings to be present including temp Tags
|
// Expect tag mappings to be present including temp Tags
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("service")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(2),
|
||||||
|
ImmutableSet.of("service")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
// Do a temp Tag cleanup on app2
|
// Do a temp Tag cleanup on app2
|
||||||
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
|
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(2));
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(2),
|
||||||
|
ImmutableSet.of("service")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
// Expect app1 to be unaffected
|
// Expect app1 to be unaffected
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
// Do a cleanup on app1 as well
|
// Do a cleanup on app1 as well
|
||||||
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
|
ephAtm.cleanTempContainers(TestUtils.getMockApplicationId(1));
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("mapper"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("mapper")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
// Non temp-tags should be unaffected
|
// Non temp-tags should be unaffected
|
||||||
Assert.assertEquals(1,
|
Assert.assertEquals(1,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host1:123"),
|
||||||
TestUtils.getMockApplicationId(1), ImmutableSet.of("service"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(1),
|
||||||
|
ImmutableSet.of("service")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
Assert.assertEquals(0,
|
Assert.assertEquals(0,
|
||||||
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"),
|
||||||
TestUtils.getMockApplicationId(2), ImmutableSet.of("service"),
|
AllocationTags.createSingleAppAllocationTags(
|
||||||
|
TestUtils.getMockApplicationId(2),
|
||||||
|
ImmutableSet.of("service")),
|
||||||
Long::sum));
|
Long::sum));
|
||||||
|
|
||||||
// Expect app2 with no containers, and app1 with 2 containers across 2 nodes
|
// Expect app2 with no containers, and app1 with 2 containers across 2 nodes
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -366,8 +367,7 @@ public class TestSingleConstraintAppPlacementAllocator {
|
||||||
allocator.canAllocate(NodeType.NODE_LOCAL,
|
allocator.canAllocate(NodeType.NODE_LOCAL,
|
||||||
TestUtils.getMockNode("host1", "/rack1", 123, 1024));
|
TestUtils.getMockNode("host1", "/rack1", 123, 1024));
|
||||||
verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
|
verify(spyAllocationTagsManager, Mockito.times(1)).getNodeCardinalityByOp(
|
||||||
eq(NodeId.fromString("host1:123")), eq(TestUtils.getMockApplicationId(1)),
|
eq(NodeId.fromString("host1:123")), any(AllocationTags.class),
|
||||||
eq(ImmutableSet.of("mapper", "reducer")),
|
|
||||||
any(LongBinaryOperator.class));
|
any(LongBinaryOperator.class));
|
||||||
|
|
||||||
allocator = new SingleConstraintAppPlacementAllocator();
|
allocator = new SingleConstraintAppPlacementAllocator();
|
||||||
|
@ -388,9 +388,8 @@ public class TestSingleConstraintAppPlacementAllocator {
|
||||||
allocator.canAllocate(NodeType.NODE_LOCAL,
|
allocator.canAllocate(NodeType.NODE_LOCAL,
|
||||||
TestUtils.getMockNode("host1", "/rack1", 123, 1024));
|
TestUtils.getMockNode("host1", "/rack1", 123, 1024));
|
||||||
verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
|
verify(spyAllocationTagsManager, Mockito.atLeast(1)).getNodeCardinalityByOp(
|
||||||
eq(NodeId.fromString("host1:123")),
|
eq(NodeId.fromString("host1:123")), any(AllocationTags.class),
|
||||||
eq(TestUtils.getMockApplicationId(1)), eq(ImmutableSet
|
any(LongBinaryOperator.class));
|
||||||
.of("mapper", "reducer")), any(LongBinaryOperator.class));
|
|
||||||
|
|
||||||
SchedulerNode node1 = mock(SchedulerNode.class);
|
SchedulerNode node1 = mock(SchedulerNode.class);
|
||||||
when(node1.getPartition()).thenReturn("x");
|
when(node1.getPartition()).thenReturn("x");
|
||||||
|
|
Loading…
Reference in New Issue