YARN-2448. Changed ApplicationMasterProtocol to expose RM-recognized resource types to the AMs. Contributed by Varun Vasudev.

This commit is contained in:
Vinod Kumar Vavilapalli 2014-09-08 14:48:21 -07:00
parent 3072c83b38
commit b67d5ba784
10 changed files with 202 additions and 34 deletions

View File

@ -196,6 +196,9 @@ Release 2.6.0 - UNRELEASED
YARN-2515. Updated ConverterUtils#toContainerId to parse epoch.
(Tsuyoshi OZAWA via jianhe)
YARN-2448. Changed ApplicationMasterProtocol to expose RM-recognized resource
types to the AMs. (Varun Vasudev via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.util.Records;
/**
@ -180,4 +182,25 @@ public abstract class RegisterApplicationMasterResponse {
@Private
@Unstable
public abstract void setNMTokensFromPreviousAttempts(List<NMToken> nmTokens);
/**
* Get a set of the resource types considered by the scheduler.
*
* @return a Map of RM settings
*/
@Public
@Unstable
public abstract EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes();
/**
* Set the resource types used by the scheduler.
*
* @param types
* a set of the resource types that the scheduler considers during
* scheduling
*/
@Private
@Unstable
public abstract void setSchedulerResourceTypes(
EnumSet<SchedulerResourceTypes> types);
}

View File

@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto {
repeated ContainerProto containers_from_previous_attempts = 4;
optional string queue = 5;
repeated NMTokenProto nm_tokens_from_previous_attempts = 6;
repeated SchedulerResourceTypes scheduler_resource_types = 7;
}
message FinishApplicationMasterRequestProto {
@ -88,6 +89,11 @@ message AllocateResponseProto {
optional hadoop.common.TokenProto am_rm_token = 12;
}
enum SchedulerResourceTypes {
MEMORY = 0;
CPU = 1;
}
//////////////////////////////////////////////////////
/////// client_RM_Protocol ///////////////////////////
//////////////////////////////////////////////////////

View File

@ -20,11 +20,7 @@ package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -43,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
@ -61,6 +58,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
private Map<ApplicationAccessType, String> applicationACLS = null;
private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
public RegisterApplicationMasterResponsePBImpl() {
builder = RegisterApplicationMasterResponseProto.newBuilder();
@ -122,6 +120,9 @@ public class RegisterApplicationMasterResponsePBImpl extends
Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
builder.addAllNmTokensFromPreviousAttempts(iterable);
}
if(schedulerResourceTypes != null) {
addSchedulerResourceTypes();
}
}
@ -364,6 +365,73 @@ public class RegisterApplicationMasterResponsePBImpl extends
};
}
@Override
public EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes() {
initSchedulerResourceTypes();
return this.schedulerResourceTypes;
}
private void initSchedulerResourceTypes() {
if (this.schedulerResourceTypes != null) {
return;
}
RegisterApplicationMasterResponseProtoOrBuilder p =
viaProto ? proto : builder;
List<SchedulerResourceTypes> list = p.getSchedulerResourceTypesList();
if (list.isEmpty()) {
this.schedulerResourceTypes =
EnumSet.noneOf(SchedulerResourceTypes.class);
} else {
this.schedulerResourceTypes = EnumSet.copyOf(list);
}
}
private void addSchedulerResourceTypes() {
maybeInitBuilder();
builder.clearSchedulerResourceTypes();
if (schedulerResourceTypes == null) {
return;
}
Iterable<? extends SchedulerResourceTypes> values =
new Iterable<SchedulerResourceTypes>() {
@Override
public Iterator<SchedulerResourceTypes> iterator() {
return new Iterator<SchedulerResourceTypes>() {
Iterator<SchedulerResourceTypes> settingsIterator =
schedulerResourceTypes.iterator();
@Override
public boolean hasNext() {
return settingsIterator.hasNext();
}
@Override
public SchedulerResourceTypes next() {
return settingsIterator.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
this.builder.addAllSchedulerResourceTypes(values);
}
@Override
public void setSchedulerResourceTypes(EnumSet<SchedulerResourceTypes> types) {
if (types == null) {
return;
}
initSchedulerResourceTypes();
this.schedulerResourceTypes.clear();
this.schedulerResourceTypes.addAll(types);
}
private Resource convertFromProtoFormat(ResourceProto resource) {
return new ResourcePBImpl(resource);
}

View File

@ -22,11 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -329,6 +325,10 @@ public class ApplicationMasterService extends AbstractService implements
+ transferredContainers.size() + " containers from previous"
+ " attempts and " + nmTokens.size() + " NM tokens.");
}
response.setSchedulerResourceTypes(rScheduler
.getSchedulingResourceTypes());
return response;
}
}

View File

@ -19,13 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@ -45,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@ -502,4 +497,10 @@ public abstract class AbstractYarnScheduler
+ " with the same resource: " + newResource);
}
}
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
return EnumSet.of(SchedulerResourceTypes.MEMORY);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
/**
* This interface is used by the components to talk to the
@ -220,4 +222,12 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @throws YarnException
*/
void killAllAppsInQueue(String queueName) throws YarnException;
/**
* Return a collection of the resource types that are considered when
* scheduling
*
* @return an EnumSet containing the resource types
*/
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes();
}

View File

@ -20,13 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -50,12 +44,12 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@ -89,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -1285,4 +1280,15 @@ public class CapacityScheduler extends
}
return (LeafQueue) ret;
}
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
if (calculator.getClass().getName()
.equals(DefaultResourceCalculator.class.getName())) {
return EnumSet.of(SchedulerResourceTypes.MEMORY);
}
return EnumSet
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
}
}

View File

@ -19,13 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
@ -50,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@ -1529,4 +1524,11 @@ public class FairScheduler extends
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
}
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
return EnumSet
.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU);
}
}

View File

@ -18,6 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -40,8 +47,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import static java.lang.Thread.sleep;
import static org.mockito.Matchers.any;
@ -259,4 +265,47 @@ public class TestApplicationMasterService {
}
}
}
@Test(timeout = 3000000)
public void testResourceTypes() throws Exception {
HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>> driver =
new HashMap<YarnConfiguration, EnumSet<SchedulerResourceTypes>>();
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf);
testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
YarnConfiguration testCapacityDefConf = new YarnConfiguration();
testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class, ResourceScheduler.class);
YarnConfiguration testFairDefConf = new YarnConfiguration();
testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER,
FairScheduler.class, ResourceScheduler.class);
driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY));
driver.put(testCapacityDRConf,
EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY));
driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY));
driver.put(testFairDefConf,
EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU));
for (Map.Entry<YarnConfiguration, EnumSet<SchedulerResourceTypes>> entry : driver
.entrySet()) {
EnumSet<SchedulerResourceTypes> expectedValue = entry.getValue();
MockRM rm = new MockRM(entry.getKey());
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
RMApp app1 = rm.submitApp(2048);
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
EnumSet<SchedulerResourceTypes> types = resp.getSchedulerResourceTypes();
LOG.info("types = " + types.toString());
Assert.assertEquals(expectedValue, types);
rm.stop();
}
}
}