YARN-3318. Create Initial OrderingPolicy Framework and FifoOrderingPolicy. (Craig Welch via wangda)

(cherry picked from commit 5004e75332)
This commit is contained in:
Wangda Tan 2015-04-15 09:56:32 -07:00
parent 871bf6a765
commit 89a7c98436
10 changed files with 592 additions and 1 deletions

View File

@ -27,6 +27,9 @@ Release 2.8.0 - UNRELEASED
YARN-3361. CapacityScheduler side changes to support non-exclusive node
labels. (Wangda Tan via jianhe)
YARN-3318. Create Initial OrderingPolicy Framework and FifoOrderingPolicy.
(Craig Welch via wangda)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -140,6 +140,10 @@
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<!-- Ignore some irrelevant class name warning -->
<Match>

View File

@ -57,7 +57,10 @@ public class ResourceUsage {
// Usage enum here to make implement cleaner
private enum ResourceType {
USED(0), PENDING(1), AMUSED(2), RESERVED(3);
//CACHED_USED and CACHED_PENDING may be read by anyone, but must only
//be written by ordering policies
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
CACHED_PENDING(5);
private int idx;
@ -102,6 +105,14 @@ public class ResourceUsage {
public Resource getUsed(String label) {
return _get(label, ResourceType.USED);
}
public Resource getCachedUsed(String label) {
return _get(label, ResourceType.CACHED_USED);
}
public Resource getCachedPending(String label) {
return _get(label, ResourceType.CACHED_PENDING);
}
public void incUsed(String label, Resource res) {
_inc(label, ResourceType.USED, res);
@ -137,6 +148,14 @@ public class ResourceUsage {
public void setUsed(String label, Resource res) {
_set(label, ResourceType.USED, res);
}
public void setCachedUsed(String label, Resource res) {
_set(label, ResourceType.CACHED_USED, res);
}
public void setCachedPending(String label, Resource res) {
_set(label, ResourceType.CACHED_PENDING, res);
}
/*
* Pending
@ -266,6 +285,28 @@ public class ResourceUsage {
readLock.unlock();
}
}
private Resource _getAll(ResourceType type) {
try {
readLock.lock();
Resource allOfType = Resources.createResource(0);
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
//all usages types are initialized
Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
}
return allOfType;
} finally {
readLock.unlock();
}
}
public Resource getAllPending() {
return _getAll(ResourceType.PENDING);
}
public Resource getAllUsed() {
return _getAll(ResourceType.USED);
}
private UsageByLabel getAndAddIfMissing(String label) {
if (label == null) {
@ -309,6 +350,18 @@ public class ResourceUsage {
writeLock.unlock();
}
}
public Resource getCachedDemand(String label) {
try {
readLock.lock();
Resource demand = Resources.createResource(0);
Resources.addTo(demand, getCachedUsed(label));
Resources.addTo(demand, getCachedPending(label));
return demand;
} finally {
readLock.unlock();
}
}
@Override
public String toString() {

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import com.google.common.annotations.VisibleForTesting;
/**
* An OrderingPolicy which can serve as a baseclass for policies which can be
* expressed in terms of comparators
*/
public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEntity> implements OrderingPolicy<S> {
private static final Log LOG = LogFactory.getLog(OrderingPolicy.class);
protected TreeSet<S> schedulableEntities;
protected Comparator<SchedulableEntity> comparator;
public AbstractComparatorOrderingPolicy() { }
@Override
public Collection<S> getSchedulableEntities() {
return schedulableEntities;
}
@Override
public Iterator<S> getAssignmentIterator() {
return schedulableEntities.iterator();
}
@Override
public Iterator<S> getPreemptionIterator() {
return schedulableEntities.descendingIterator();
}
public static void updateSchedulingResourceUsage(ResourceUsage ru) {
ru.setCachedUsed(CommonNodeLabelsManager.ANY, ru.getAllUsed());
ru.setCachedPending(CommonNodeLabelsManager.ANY, ru.getAllPending());
}
protected void reorderSchedulableEntity(S schedulableEntity) {
//remove, update comparable data, and reinsert to update position in order
schedulableEntities.remove(schedulableEntity);
updateSchedulingResourceUsage(
schedulableEntity.getSchedulingResourceUsage());
schedulableEntities.add(schedulableEntity);
}
public void setComparator(Comparator<SchedulableEntity> comparator) {
this.comparator = comparator;
TreeSet<S> schedulableEntities = new TreeSet<S>(comparator);
if (this.schedulableEntities != null) {
schedulableEntities.addAll(this.schedulableEntities);
}
this.schedulableEntities = schedulableEntities;
}
@VisibleForTesting
public Comparator<SchedulableEntity> getComparator() {
return comparator;
}
@Override
public void addSchedulableEntity(S s) {
schedulableEntities.add(s);
}
@Override
public boolean removeSchedulableEntity(S s) {
return schedulableEntities.remove(s);
}
@Override
public void addAllSchedulableEntities(Collection<S> sc) {
schedulableEntities.addAll(sc);
}
@Override
public int getNumSchedulableEntities() {
return schedulableEntities.size();
}
@Override
public abstract void configure(String conf);
@Override
public abstract void containerAllocated(S schedulableEntity,
RMContainer r);
@Override
public abstract void containerReleased(S schedulableEntity,
RMContainer r);
@Override
public abstract String getStatusMessage();
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
/**
* A Comparator which orders SchedulableEntities by input order
*/
public class FifoComparator
implements Comparator<SchedulableEntity> {
@Override
public int compare(SchedulableEntity r1, SchedulableEntity r2) {
int res = r1.compareInputOrderTo(r2);
return res;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
/**
* An OrderingPolicy which orders SchedulableEntities by input order
*/
public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
public FifoOrderingPolicy() {
setComparator(new FifoComparator());
}
@Override
public void configure(String conf) {
}
@Override
public void containerAllocated(S schedulableEntity,
RMContainer r) {
}
@Override
public void containerReleased(S schedulableEntity,
RMContainer r) {
}
@Override
public String getStatusMessage() {
return "FifoOrderingPolicy";
}
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
/**
* OrderingPolicy is used by the scheduler to order SchedulableEntities for
* container assignment and preemption
*/
public interface OrderingPolicy<S extends SchedulableEntity> {
/*
* Note: OrderingPolicy depends upon external
* synchronization of all use of the SchedulableEntity Collection and
* Iterators for correctness and to avoid concurrent modification issues
*/
/**
* Get the collection of SchedulableEntities which are managed by this
* OrderingPolicy - should include processes returned by the Assignment and
* Preemption iterator with no guarantees regarding order
*/
public Collection<S> getSchedulableEntities();
/**
* Return an iterator over the collection of SchedulableEntities which orders
* them for container assignment
*/
public Iterator<S> getAssignmentIterator();
/**
* Return an iterator over the collection of SchedulableEntities which orders
* them for preemption
*/
public Iterator<S> getPreemptionIterator();
/**
* Add a SchedulableEntity to be managed for allocation and preemption
* ordering
*/
public void addSchedulableEntity(S s);
/**
* Remove a SchedulableEntity from management for allocation and preemption
* ordering
*/
public boolean removeSchedulableEntity(S s);
/**
* Add a collection of SchedulableEntities to be managed for allocation
* and preemption ordering
*/
public void addAllSchedulableEntities(Collection<S> sc);
/**
* Get the number of SchedulableEntities managed for allocation and
* preemption ordering
*/
public int getNumSchedulableEntities();
/**
* Provides configuration information for the policy from the scheduler
* configuration
*/
public void configure(String conf);
/**
* The passed SchedulableEntity has been allocated the passed Container,
* take appropriate action (depending on comparator, a reordering of the
* SchedulableEntity may be required)
*/
public void containerAllocated(S schedulableEntity,
RMContainer r);
/**
* The passed SchedulableEntity has released the passed Container,
* take appropriate action (depending on comparator, a reordering of the
* SchedulableEntity may be required)
*/
public void containerReleased(S schedulableEntity,
RMContainer r);
/**
* Display information regarding configuration & status
*/
public String getStatusMessage();
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
/**
* A SchedulableEntity is a process to be scheduled,
* for example, an application / application attempt
*/
public interface SchedulableEntity {
/**
* Id - each entity must have a unique id
*/
public String getId();
/**
* Compare the passed SchedulableEntity to this one for input order.
* Input order is implementation defined and should reflect the
* correct ordering for first-in first-out processing
*/
public int compareInputOrderTo(SchedulableEntity other);
/**
* View of Resources wanted and consumed by the entity
*/
public ResourceUsage getSchedulingResourceUsage();
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
public class MockSchedulableEntity implements SchedulableEntity {
private String id;
private long serial = 0;
public MockSchedulableEntity() { }
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setSerial(long serial) {
this.serial = serial;
}
public long getSerial() {
return serial;
}
public void setUsed(Resource value) {
schedulingResourceUsage.setUsed(CommonNodeLabelsManager.ANY, value);
}
public void setPending(Resource value) {
schedulingResourceUsage.setPending(CommonNodeLabelsManager.ANY, value);
}
private ResourceUsage schedulingResourceUsage = new ResourceUsage();
@Override
public ResourceUsage getSchedulingResourceUsage() {
return schedulingResourceUsage;
}
@Override
public int compareInputOrderTo(SchedulableEntity other) {
if (other instanceof MockSchedulableEntity) {
MockSchedulableEntity r2 = (MockSchedulableEntity) other;
int res = (int) Math.signum(getSerial() - r2.getSerial());
return res;
}
return 1;//let other types go before this, if any
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
import java.util.*;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
public class TestFifoOrderingPolicy {
@Test
public void testFifoOrderingPolicy() {
FifoOrderingPolicy<MockSchedulableEntity> policy =
new FifoOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity r1 = new MockSchedulableEntity();
MockSchedulableEntity r2 = new MockSchedulableEntity();
Assert.assertEquals(policy.getComparator().compare(r1, r2), 0);
r1.setSerial(1);
Assert.assertEquals(policy.getComparator().compare(r1, r2), 1);
r2.setSerial(2);
Assert.assertEquals(policy.getComparator().compare(r1, r2), -1);
}
@Test
public void testIterators() {
OrderingPolicy<MockSchedulableEntity> schedOrder =
new FifoOrderingPolicy<MockSchedulableEntity>();
MockSchedulableEntity msp1 = new MockSchedulableEntity();
MockSchedulableEntity msp2 = new MockSchedulableEntity();
MockSchedulableEntity msp3 = new MockSchedulableEntity();
msp1.setSerial(3);
msp2.setSerial(2);
msp3.setSerial(1);
schedOrder.addSchedulableEntity(msp1);
schedOrder.addSchedulableEntity(msp2);
schedOrder.addSchedulableEntity(msp3);
//Assignment, oldest to youngest
checkSerials(schedOrder.getAssignmentIterator(), new long[]{1, 2, 3});
//Preemption, youngest to oldest
checkSerials(schedOrder.getPreemptionIterator(), new long[]{3, 2, 1});
}
public void checkSerials(Iterator<MockSchedulableEntity> si,
long[] serials) {
for (int i = 0;i < serials.length;i++) {
Assert.assertEquals(si.next().getSerial(),
serials[i]);
}
}
}