MAPREDUCE-2788. Normalize resource requests in FifoScheduler appropriately. Contributed by Ahmed Radwan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1186467 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9473b2e43f
commit
e549ac9369
|
@ -1689,6 +1689,9 @@ Release 0.23.0 - Unreleased
|
||||||
MAPREDUCE-3181. Fixed MapReduce runtime to load yarn-default.xml and
|
MAPREDUCE-3181. Fixed MapReduce runtime to load yarn-default.xml and
|
||||||
yarn-site.xml. (acmurthy)
|
yarn-site.xml. (acmurthy)
|
||||||
|
|
||||||
|
MAPREDUCE-2788. Normalize resource requests in FifoScheduler
|
||||||
|
appropriately. (Ahmed Radwan via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -17,11 +17,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
@ -72,5 +75,35 @@ public class SchedulerUtils {
|
||||||
return containerStatus;
|
return containerStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method to normalize a list of resource requests, by insuring that
|
||||||
|
* the memory for each request is a multiple of minMemory and is not zero.
|
||||||
|
*
|
||||||
|
* @param asks
|
||||||
|
* a list of resource requests.
|
||||||
|
* @param minMemory
|
||||||
|
* the configured minimum memory allocation.
|
||||||
|
*/
|
||||||
|
public static void normalizeRequests(List<ResourceRequest> asks,
|
||||||
|
int minMemory) {
|
||||||
|
for (ResourceRequest ask : asks) {
|
||||||
|
normalizeRequest(ask, minMemory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method to normalize a resource request, by insuring that the
|
||||||
|
* requested memory is a multiple of minMemory and is not zero.
|
||||||
|
*
|
||||||
|
* @param ask
|
||||||
|
* the resource request.
|
||||||
|
* @param minMemory
|
||||||
|
* the configured minimum memory allocation.
|
||||||
|
*/
|
||||||
|
public static void normalizeRequest(ResourceRequest ask, int minMemory) {
|
||||||
|
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
|
||||||
|
ask.getCapability().setMemory(
|
||||||
|
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -441,7 +441,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sanity check
|
// Sanity check
|
||||||
normalizeRequests(ask);
|
SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
|
||||||
|
|
||||||
// Release containers
|
// Release containers
|
||||||
for (ContainerId releasedContainerId : release) {
|
for (ContainerId releasedContainerId : release) {
|
||||||
|
@ -521,21 +521,6 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
||||||
return root.getQueueUserAclInfo(user);
|
return root.getQueueUserAclInfo(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(Lock.NoLock.class)
|
|
||||||
private void normalizeRequests(List<ResourceRequest> asks) {
|
|
||||||
for (ResourceRequest ask : asks) {
|
|
||||||
normalizeRequest(ask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Lock(Lock.NoLock.class)
|
|
||||||
private void normalizeRequest(ResourceRequest ask) {
|
|
||||||
int minMemory = minimumAllocation.getMemory();
|
|
||||||
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
|
|
||||||
ask.getCapability().setMemory (
|
|
||||||
minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void nodeUpdate(RMNode nm,
|
private synchronized void nodeUpdate(RMNode nm,
|
||||||
List<ContainerStatus> newlyLaunchedContainers,
|
List<ContainerStatus> newlyLaunchedContainers,
|
||||||
List<ContainerStatus> completedContainers) {
|
List<ContainerStatus> completedContainers) {
|
||||||
|
|
|
@ -217,7 +217,7 @@ public class FifoScheduler implements ResourceScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sanity check
|
// Sanity check
|
||||||
normalizeRequests(ask);
|
SchedulerUtils.normalizeRequests(ask, MINIMUM_MEMORY);
|
||||||
|
|
||||||
// Release containers
|
// Release containers
|
||||||
for (ContainerId releasedContainer : release) {
|
for (ContainerId releasedContainer : release) {
|
||||||
|
@ -260,21 +260,6 @@ public class FifoScheduler implements ResourceScheduler {
|
||||||
application.getHeadroom());
|
application.getHeadroom());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void normalizeRequests(List<ResourceRequest> asks) {
|
|
||||||
for (ResourceRequest ask : asks) {
|
|
||||||
normalizeRequest(ask);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void normalizeRequest(ResourceRequest ask) {
|
|
||||||
int memory = ask.getCapability().getMemory();
|
|
||||||
// FIXME: TestApplicationCleanup is relying on unnormalized behavior.
|
|
||||||
memory =
|
|
||||||
MINIMUM_MEMORY *
|
|
||||||
((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
|
|
||||||
ask.setCapability(Resources.createResource(memory));
|
|
||||||
}
|
|
||||||
|
|
||||||
private SchedulerApp getApplication(
|
private SchedulerApp getApplication(
|
||||||
ApplicationAttemptId applicationAttemptId) {
|
ApplicationAttemptId applicationAttemptId) {
|
||||||
return applications.get(applicationAttemptId);
|
return applications.get(applicationAttemptId);
|
||||||
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestSchedulerUtils {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNormalizeRequest() {
|
||||||
|
int minMemory = 1024;
|
||||||
|
ResourceRequest ask = new ResourceRequestPBImpl();
|
||||||
|
|
||||||
|
// case negative memory
|
||||||
|
ask.setCapability(Resource.createResource(-1024));
|
||||||
|
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||||
|
assertEquals(minMemory, ask.getCapability().getMemory());
|
||||||
|
|
||||||
|
// case zero memory
|
||||||
|
ask.setCapability(Resource.createResource(0));
|
||||||
|
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||||
|
assertEquals(minMemory, ask.getCapability().getMemory());
|
||||||
|
|
||||||
|
// case memory is a multiple of minMemory
|
||||||
|
ask.setCapability(Resource.createResource(2 * minMemory));
|
||||||
|
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||||
|
assertEquals(2 * minMemory, ask.getCapability().getMemory());
|
||||||
|
|
||||||
|
// case memory is not a multiple of minMemory
|
||||||
|
ask.setCapability(Resource.createResource(minMemory + 10));
|
||||||
|
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||||
|
assertEquals(2 * minMemory, ask.getCapability().getMemory());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue