YARN-3318. Create Initial OrderingPolicy Framework and FifoOrderingPolicy. (Craig Welch via wangda)
This commit is contained in:
parent
fddd55279d
commit
5004e75332
@ -75,6 +75,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-3361. CapacityScheduler side changes to support non-exclusive node
|
YARN-3361. CapacityScheduler side changes to support non-exclusive node
|
||||||
labels. (Wangda Tan via jianhe)
|
labels. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
|
YARN-3318. Create Initial OrderingPolicy Framework and FifoOrderingPolicy.
|
||||||
|
(Craig Welch via wangda)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||||
|
@ -140,6 +140,10 @@
|
|||||||
<Match>
|
<Match>
|
||||||
<Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
|
<Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
|
||||||
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
||||||
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
|
||||||
|
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
|
||||||
</Match>
|
</Match>
|
||||||
<!-- Ignore some irrelevant class name warning -->
|
<!-- Ignore some irrelevant class name warning -->
|
||||||
<Match>
|
<Match>
|
||||||
|
@ -57,7 +57,10 @@ public ResourceUsage() {
|
|||||||
|
|
||||||
// Usage enum here to make implement cleaner
|
// Usage enum here to make implement cleaner
|
||||||
private enum ResourceType {
|
private enum ResourceType {
|
||||||
USED(0), PENDING(1), AMUSED(2), RESERVED(3);
|
//CACHED_USED and CACHED_PENDING may be read by anyone, but must only
|
||||||
|
//be written by ordering policies
|
||||||
|
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
|
||||||
|
CACHED_PENDING(5);
|
||||||
|
|
||||||
private int idx;
|
private int idx;
|
||||||
|
|
||||||
@ -103,6 +106,14 @@ public Resource getUsed(String label) {
|
|||||||
return _get(label, ResourceType.USED);
|
return _get(label, ResourceType.USED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getCachedUsed(String label) {
|
||||||
|
return _get(label, ResourceType.CACHED_USED);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getCachedPending(String label) {
|
||||||
|
return _get(label, ResourceType.CACHED_PENDING);
|
||||||
|
}
|
||||||
|
|
||||||
public void incUsed(String label, Resource res) {
|
public void incUsed(String label, Resource res) {
|
||||||
_inc(label, ResourceType.USED, res);
|
_inc(label, ResourceType.USED, res);
|
||||||
}
|
}
|
||||||
@ -138,6 +149,14 @@ public void setUsed(String label, Resource res) {
|
|||||||
_set(label, ResourceType.USED, res);
|
_set(label, ResourceType.USED, res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setCachedUsed(String label, Resource res) {
|
||||||
|
_set(label, ResourceType.CACHED_USED, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCachedPending(String label, Resource res) {
|
||||||
|
_set(label, ResourceType.CACHED_PENDING, res);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Pending
|
* Pending
|
||||||
*/
|
*/
|
||||||
@ -267,6 +286,28 @@ private Resource _get(String label, ResourceType type) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Resource _getAll(ResourceType type) {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
Resource allOfType = Resources.createResource(0);
|
||||||
|
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
|
||||||
|
//all usages types are initialized
|
||||||
|
Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
|
||||||
|
}
|
||||||
|
return allOfType;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getAllPending() {
|
||||||
|
return _getAll(ResourceType.PENDING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getAllUsed() {
|
||||||
|
return _getAll(ResourceType.USED);
|
||||||
|
}
|
||||||
|
|
||||||
private UsageByLabel getAndAddIfMissing(String label) {
|
private UsageByLabel getAndAddIfMissing(String label) {
|
||||||
if (label == null) {
|
if (label == null) {
|
||||||
label = RMNodeLabelsManager.NO_LABEL;
|
label = RMNodeLabelsManager.NO_LABEL;
|
||||||
@ -310,6 +351,18 @@ private void _dec(String label, ResourceType type, Resource res) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Resource getCachedDemand(String label) {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
Resource demand = Resources.createResource(0);
|
||||||
|
Resources.addTo(demand, getCachedUsed(label));
|
||||||
|
Resources.addTo(demand, getCachedPending(label));
|
||||||
|
return demand;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
try {
|
try {
|
||||||
|
@ -0,0 +1,119 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An OrderingPolicy which can serve as a baseclass for policies which can be
|
||||||
|
* expressed in terms of comparators
|
||||||
|
*/
|
||||||
|
public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEntity> implements OrderingPolicy<S> {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(OrderingPolicy.class);
|
||||||
|
|
||||||
|
protected TreeSet<S> schedulableEntities;
|
||||||
|
protected Comparator<SchedulableEntity> comparator;
|
||||||
|
|
||||||
|
public AbstractComparatorOrderingPolicy() { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<S> getSchedulableEntities() {
|
||||||
|
return schedulableEntities;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<S> getAssignmentIterator() {
|
||||||
|
return schedulableEntities.iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<S> getPreemptionIterator() {
|
||||||
|
return schedulableEntities.descendingIterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void updateSchedulingResourceUsage(ResourceUsage ru) {
|
||||||
|
ru.setCachedUsed(CommonNodeLabelsManager.ANY, ru.getAllUsed());
|
||||||
|
ru.setCachedPending(CommonNodeLabelsManager.ANY, ru.getAllPending());
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void reorderSchedulableEntity(S schedulableEntity) {
|
||||||
|
//remove, update comparable data, and reinsert to update position in order
|
||||||
|
schedulableEntities.remove(schedulableEntity);
|
||||||
|
updateSchedulingResourceUsage(
|
||||||
|
schedulableEntity.getSchedulingResourceUsage());
|
||||||
|
schedulableEntities.add(schedulableEntity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setComparator(Comparator<SchedulableEntity> comparator) {
|
||||||
|
this.comparator = comparator;
|
||||||
|
TreeSet<S> schedulableEntities = new TreeSet<S>(comparator);
|
||||||
|
if (this.schedulableEntities != null) {
|
||||||
|
schedulableEntities.addAll(this.schedulableEntities);
|
||||||
|
}
|
||||||
|
this.schedulableEntities = schedulableEntities;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Comparator<SchedulableEntity> getComparator() {
|
||||||
|
return comparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addSchedulableEntity(S s) {
|
||||||
|
schedulableEntities.add(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeSchedulableEntity(S s) {
|
||||||
|
return schedulableEntities.remove(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addAllSchedulableEntities(Collection<S> sc) {
|
||||||
|
schedulableEntities.addAll(sc);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumSchedulableEntities() {
|
||||||
|
return schedulableEntities.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract void configure(String conf);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract void containerAllocated(S schedulableEntity,
|
||||||
|
RMContainer r);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract void containerReleased(S schedulableEntity,
|
||||||
|
RMContainer r);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract String getStatusMessage();
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Comparator which orders SchedulableEntities by input order
|
||||||
|
*/
|
||||||
|
public class FifoComparator
|
||||||
|
implements Comparator<SchedulableEntity> {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(SchedulableEntity r1, SchedulableEntity r2) {
|
||||||
|
int res = r1.compareInputOrderTo(r2);
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,54 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An OrderingPolicy which orders SchedulableEntities by input order
|
||||||
|
*/
|
||||||
|
public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
|
||||||
|
|
||||||
|
public FifoOrderingPolicy() {
|
||||||
|
setComparator(new FifoComparator());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(String conf) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void containerAllocated(S schedulableEntity,
|
||||||
|
RMContainer r) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void containerReleased(S schedulableEntity,
|
||||||
|
RMContainer r) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getStatusMessage() {
|
||||||
|
return "FifoOrderingPolicy";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,109 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* OrderingPolicy is used by the scheduler to order SchedulableEntities for
|
||||||
|
* container assignment and preemption
|
||||||
|
*/
|
||||||
|
public interface OrderingPolicy<S extends SchedulableEntity> {
|
||||||
|
/*
|
||||||
|
* Note: OrderingPolicy depends upon external
|
||||||
|
* synchronization of all use of the SchedulableEntity Collection and
|
||||||
|
* Iterators for correctness and to avoid concurrent modification issues
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the collection of SchedulableEntities which are managed by this
|
||||||
|
* OrderingPolicy - should include processes returned by the Assignment and
|
||||||
|
* Preemption iterator with no guarantees regarding order
|
||||||
|
*/
|
||||||
|
public Collection<S> getSchedulableEntities();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an iterator over the collection of SchedulableEntities which orders
|
||||||
|
* them for container assignment
|
||||||
|
*/
|
||||||
|
public Iterator<S> getAssignmentIterator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an iterator over the collection of SchedulableEntities which orders
|
||||||
|
* them for preemption
|
||||||
|
*/
|
||||||
|
public Iterator<S> getPreemptionIterator();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a SchedulableEntity to be managed for allocation and preemption
|
||||||
|
* ordering
|
||||||
|
*/
|
||||||
|
public void addSchedulableEntity(S s);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove a SchedulableEntity from management for allocation and preemption
|
||||||
|
* ordering
|
||||||
|
*/
|
||||||
|
public boolean removeSchedulableEntity(S s);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a collection of SchedulableEntities to be managed for allocation
|
||||||
|
* and preemption ordering
|
||||||
|
*/
|
||||||
|
public void addAllSchedulableEntities(Collection<S> sc);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of SchedulableEntities managed for allocation and
|
||||||
|
* preemption ordering
|
||||||
|
*/
|
||||||
|
public int getNumSchedulableEntities();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides configuration information for the policy from the scheduler
|
||||||
|
* configuration
|
||||||
|
*/
|
||||||
|
public void configure(String conf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The passed SchedulableEntity has been allocated the passed Container,
|
||||||
|
* take appropriate action (depending on comparator, a reordering of the
|
||||||
|
* SchedulableEntity may be required)
|
||||||
|
*/
|
||||||
|
public void containerAllocated(S schedulableEntity,
|
||||||
|
RMContainer r);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The passed SchedulableEntity has released the passed Container,
|
||||||
|
* take appropriate action (depending on comparator, a reordering of the
|
||||||
|
* SchedulableEntity may be required)
|
||||||
|
*/
|
||||||
|
public void containerReleased(S schedulableEntity,
|
||||||
|
RMContainer r);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Display information regarding configuration & status
|
||||||
|
*/
|
||||||
|
public String getStatusMessage();
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,51 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A SchedulableEntity is a process to be scheduled,
|
||||||
|
* for example, an application / application attempt
|
||||||
|
*/
|
||||||
|
public interface SchedulableEntity {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Id - each entity must have a unique id
|
||||||
|
*/
|
||||||
|
public String getId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare the passed SchedulableEntity to this one for input order.
|
||||||
|
* Input order is implementation defined and should reflect the
|
||||||
|
* correct ordering for first-in first-out processing
|
||||||
|
*/
|
||||||
|
public int compareInputOrderTo(SchedulableEntity other);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* View of Resources wanted and consumed by the entity
|
||||||
|
*/
|
||||||
|
public ResourceUsage getSchedulingResourceUsage();
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,78 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
|
|
||||||
|
|
||||||
|
public class MockSchedulableEntity implements SchedulableEntity {
|
||||||
|
|
||||||
|
private String id;
|
||||||
|
private long serial = 0;
|
||||||
|
|
||||||
|
public MockSchedulableEntity() { }
|
||||||
|
|
||||||
|
public void setId(String id) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSerial(long serial) {
|
||||||
|
this.serial = serial;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getSerial() {
|
||||||
|
return serial;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setUsed(Resource value) {
|
||||||
|
schedulingResourceUsage.setUsed(CommonNodeLabelsManager.ANY, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setPending(Resource value) {
|
||||||
|
schedulingResourceUsage.setPending(CommonNodeLabelsManager.ANY, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceUsage schedulingResourceUsage = new ResourceUsage();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceUsage getSchedulingResourceUsage() {
|
||||||
|
return schedulingResourceUsage;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareInputOrderTo(SchedulableEntity other) {
|
||||||
|
if (other instanceof MockSchedulableEntity) {
|
||||||
|
MockSchedulableEntity r2 = (MockSchedulableEntity) other;
|
||||||
|
int res = (int) Math.signum(getSerial() - r2.getSerial());
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
return 1;//let other types go before this, if any
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,83 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
|
||||||
|
public class TestFifoOrderingPolicy {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFifoOrderingPolicy() {
|
||||||
|
FifoOrderingPolicy<MockSchedulableEntity> policy =
|
||||||
|
new FifoOrderingPolicy<MockSchedulableEntity>();
|
||||||
|
MockSchedulableEntity r1 = new MockSchedulableEntity();
|
||||||
|
MockSchedulableEntity r2 = new MockSchedulableEntity();
|
||||||
|
|
||||||
|
Assert.assertEquals(policy.getComparator().compare(r1, r2), 0);
|
||||||
|
|
||||||
|
r1.setSerial(1);
|
||||||
|
Assert.assertEquals(policy.getComparator().compare(r1, r2), 1);
|
||||||
|
|
||||||
|
r2.setSerial(2);
|
||||||
|
Assert.assertEquals(policy.getComparator().compare(r1, r2), -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIterators() {
|
||||||
|
OrderingPolicy<MockSchedulableEntity> schedOrder =
|
||||||
|
new FifoOrderingPolicy<MockSchedulableEntity>();
|
||||||
|
|
||||||
|
MockSchedulableEntity msp1 = new MockSchedulableEntity();
|
||||||
|
MockSchedulableEntity msp2 = new MockSchedulableEntity();
|
||||||
|
MockSchedulableEntity msp3 = new MockSchedulableEntity();
|
||||||
|
|
||||||
|
msp1.setSerial(3);
|
||||||
|
msp2.setSerial(2);
|
||||||
|
msp3.setSerial(1);
|
||||||
|
|
||||||
|
schedOrder.addSchedulableEntity(msp1);
|
||||||
|
schedOrder.addSchedulableEntity(msp2);
|
||||||
|
schedOrder.addSchedulableEntity(msp3);
|
||||||
|
|
||||||
|
//Assignment, oldest to youngest
|
||||||
|
checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
|
||||||
|
|
||||||
|
//Preemption, youngest to oldest
|
||||||
|
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void checkSerials(Iterator<MockSchedulableEntity> si,
|
||||||
|
long[] serials) {
|
||||||
|
for (int i = 0;i < serials.length;i++) {
|
||||||
|
Assert.assertEquals(si.next().getSerial(),
|
||||||
|
serials[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user