Merge -c 1186467 from trunk to branch-0.23 to complete fix for MAPREDUCE-2788.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1186468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cad9bc8843
commit
9895a6353b
|
@ -1638,6 +1638,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-3181. Fixed MapReduce runtime to load yarn-default.xml and
|
||||
yarn-site.xml. (acmurthy)
|
||||
|
||||
MAPREDUCE-2788. Normalize resource requests in FifoScheduler
|
||||
appropriately. (Ahmed Radwan via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -17,11 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
|
@ -72,5 +75,35 @@ public class SchedulerUtils {
|
|||
return containerStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a list of resource requests, by insuring that
|
||||
* the memory for each request is a multiple of minMemory and is not zero.
|
||||
*
|
||||
* @param asks
|
||||
* a list of resource requests.
|
||||
* @param minMemory
|
||||
* the configured minimum memory allocation.
|
||||
*/
|
||||
public static void normalizeRequests(List<ResourceRequest> asks,
|
||||
int minMemory) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(ask, minMemory);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to normalize a resource request, by insuring that the
|
||||
* requested memory is a multiple of minMemory and is not zero.
|
||||
*
|
||||
* @param ask
|
||||
* the resource request.
|
||||
* @param minMemory
|
||||
* the configured minimum memory allocation.
|
||||
*/
|
||||
public static void normalizeRequest(ResourceRequest ask, int minMemory) {
|
||||
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
|
||||
ask.getCapability().setMemory(
|
||||
minMemory * ((memory / minMemory) + (memory % minMemory > 0 ? 1 : 0)));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -441,7 +441,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
}
|
||||
|
||||
// Sanity check
|
||||
normalizeRequests(ask);
|
||||
SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainerId : release) {
|
||||
|
@ -521,21 +521,6 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
return root.getQueueUserAclInfo(user);
|
||||
}
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
private void normalizeRequests(List<ResourceRequest> asks) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(ask);
|
||||
}
|
||||
}
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
private void normalizeRequest(ResourceRequest ask) {
|
||||
int minMemory = minimumAllocation.getMemory();
|
||||
int memory = Math.max(ask.getCapability().getMemory(), minMemory);
|
||||
ask.getCapability().setMemory (
|
||||
minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
|
||||
}
|
||||
|
||||
private synchronized void nodeUpdate(RMNode nm,
|
||||
List<ContainerStatus> newlyLaunchedContainers,
|
||||
List<ContainerStatus> completedContainers) {
|
||||
|
|
|
@ -217,7 +217,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
}
|
||||
|
||||
// Sanity check
|
||||
normalizeRequests(ask);
|
||||
SchedulerUtils.normalizeRequests(ask, MINIMUM_MEMORY);
|
||||
|
||||
// Release containers
|
||||
for (ContainerId releasedContainer : release) {
|
||||
|
@ -260,21 +260,6 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
application.getHeadroom());
|
||||
}
|
||||
|
||||
private void normalizeRequests(List<ResourceRequest> asks) {
|
||||
for (ResourceRequest ask : asks) {
|
||||
normalizeRequest(ask);
|
||||
}
|
||||
}
|
||||
|
||||
private void normalizeRequest(ResourceRequest ask) {
|
||||
int memory = ask.getCapability().getMemory();
|
||||
// FIXME: TestApplicationCleanup is relying on unnormalized behavior.
|
||||
memory =
|
||||
MINIMUM_MEMORY *
|
||||
((memory/MINIMUM_MEMORY) + (memory%MINIMUM_MEMORY > 0 ? 1 : 0));
|
||||
ask.setCapability(Resources.createResource(memory));
|
||||
}
|
||||
|
||||
private SchedulerApp getApplication(
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
return applications.get(applicationAttemptId);
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resource;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestSchedulerUtils {
|
||||
|
||||
@Test
|
||||
public void testNormalizeRequest() {
|
||||
int minMemory = 1024;
|
||||
ResourceRequest ask = new ResourceRequestPBImpl();
|
||||
|
||||
// case negative memory
|
||||
ask.setCapability(Resource.createResource(-1024));
|
||||
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||
assertEquals(minMemory, ask.getCapability().getMemory());
|
||||
|
||||
// case zero memory
|
||||
ask.setCapability(Resource.createResource(0));
|
||||
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||
assertEquals(minMemory, ask.getCapability().getMemory());
|
||||
|
||||
// case memory is a multiple of minMemory
|
||||
ask.setCapability(Resource.createResource(2 * minMemory));
|
||||
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||
assertEquals(2 * minMemory, ask.getCapability().getMemory());
|
||||
|
||||
// case memory is not a multiple of minMemory
|
||||
ask.setCapability(Resource.createResource(minMemory + 10));
|
||||
SchedulerUtils.normalizeRequest(ask, minMemory);
|
||||
assertEquals(2 * minMemory, ask.getCapability().getMemory());
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue