YARN-7555. Support multiple resource types in YARN native services. (wangda)

Change-Id: I330e6ee17a73962dcaadd766a3e72d2888681731
This commit is contained in:
Wangda Tan 2017-12-29 11:46:30 -08:00
parent 81127616c5
commit 7467e8fe5a
12 changed files with 343 additions and 30 deletions

View File

@ -650,5 +650,4 @@
<Method name="equals" />
<Bug pattern="EQ_OVERRIDING_EQUALS_NOT_SYMMETRIC" />
</Match>
</FindBugsFilter>

View File

@ -247,7 +247,18 @@ definitions:
kerberos_principal:
description: The Kerberos Principal of the service
$ref: '#/definitions/KerberosPrincipal'
ResourceInformation:
description:
ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object
properties:
value:
type: integer
format: int64
description: Integer value of the resource.
unit:
type: string
description:
Unit of the resource, acceptable values are: p/n/u/m/k/M/G/T/P/Ki/Mi/Gi/Ti/Pi. By default it is empty means no unit
Resource:
description:
Resource determines the amount of resources (vcores, memory, network, etc.) usable by a container. This field determines the resource to be applied for all the containers of a component or service. The resource specified at the service (or global) level can be overriden at the component level. Only one of profile OR cpu & memory are expected. It raises a validation exception otherwise.
@ -262,6 +273,11 @@ definitions:
memory:
type: string
description: Amount of memory allocated to each container (optional but overrides memory in profile if specified). Currently accepts only an integer value and default unit is in MB.
additional:
type: object
additionalProperties:
$ref: '#/definitions/ResourceInformation'
description: Map of resource name to ResourceInformation
PlacementPolicy:
description: Placement policy of an instance of a service. This feature is in the works in YARN-6592.
properties:

View File

@ -48,4 +48,12 @@
<Class name="org.apache.hadoop.yarn.service.ClientAMPolicyProvider"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<!-- SE_BAD_FIELD -->
<Match>
<Class name="org.apache.hadoop.yarn.service.api.records.Resource" />
<Or>
<Field name="additional"/>
</Or>
<Bug pattern="SE_BAD_FIELD" />
</Match>
</FindBugsFilter>

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
import org.apache.hadoop.yarn.util.BoundedAppender;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -278,6 +279,12 @@ public class ServiceScheduler extends CompositeService {
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(bindAddress.getHostName(),
bindAddress.getPort(), "N/A");
// Update internal resource types according to response.
if (response.getResourceTypes() != null) {
ResourceUtils.reinitializeResources(response.getResourceTypes());
}
if (response.getClientToAMTokenMasterKey() != null
&& response.getClientToAMTokenMasterKey().remaining() != 0) {
context.secretManager

View File

@ -17,16 +17,17 @@
package org.apache.hadoop.yarn.service.api.records;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.xml.bind.annotation.XmlElement;
import java.util.Map;
import java.util.Objects;
/**
* Resource determines the amount of resources (vcores, memory, network, etc.)
* usable by a container. This field determines the resource to be applied for
@ -46,6 +47,10 @@ public class Resource extends BaseResource implements Cloneable {
private Integer cpus = 1;
private String memory = null;
@JsonProperty("additional")
@XmlElement(name = "additional")
private Map<String, ResourceInformation> additional = null;
/**
* Each resource profile has a unique id which is associated with a
* cluster-level predefined memory, cpus, etc.
@ -112,6 +117,28 @@ public class Resource extends BaseResource implements Cloneable {
return Long.parseLong(memory);
}
public Resource setResourceInformations(
Map<String, ResourceInformation> resourceInformations) {
this.additional = resourceInformations;
return this;
}
public Resource resourceInformations(
Map<String, ResourceInformation> resourceInformations) {
this.additional = resourceInformations;
return this;
}
/**
* Map of resource name to ResourceInformation
* @return additional
**/
@ApiModelProperty(value = "Map of resource name to ResourceInformation")
@JsonProperty("additional")
public Map<String, ResourceInformation> getAdditional() {
return additional;
}
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
@ -121,14 +148,15 @@ public class Resource extends BaseResource implements Cloneable {
return false;
}
Resource resource = (Resource) o;
return Objects.equals(this.profile, resource.profile)
&& Objects.equals(this.cpus, resource.cpus)
&& Objects.equals(this.memory, resource.memory);
return Objects.equals(this.profile, resource.profile) && Objects.equals(
this.cpus, resource.cpus) && Objects.equals(this.memory,
resource.memory) && Objects.equals(this.additional,
resource.additional);
}
@Override
public int hashCode() {
return Objects.hash(profile, cpus, memory);
return Objects.hash(profile, cpus, memory, additional);
}
@Override
@ -139,6 +167,8 @@ public class Resource extends BaseResource implements Cloneable {
sb.append(" profile: ").append(toIndentedString(profile)).append("\n");
sb.append(" cpus: ").append(toIndentedString(cpus)).append("\n");
sb.append(" memory: ").append(toIndentedString(memory)).append("\n");
sb.append(" additional: ").append(
toIndentedString(additional)).append("\n");
sb.append("}");
return sb.toString();
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.api.records;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Objects;
/**
* ResourceInformation determines unit/name/value of resource types in addition to memory and vcores. It will be part of Resource object
*/
@ApiModel(description = "ResourceInformation determines unit/value of resource types in addition to memory and vcores. It will be part of Resource object")
@javax.annotation.Generated(value = "io.swagger.codegen.languages.JavaClientCodegen",
date = "2017-11-22T15:15:49.495-08:00")
public class ResourceInformation {
@SerializedName("value")
private Long value = null;
@SerializedName("unit")
private String unit = null;
public ResourceInformation value(Long value) {
this.value = value;
return this;
}
/**
* Integer value of the resource.
*
* @return value
**/
@ApiModelProperty(value = "Integer value of the resource.")
@JsonProperty("value")
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
public ResourceInformation unit(String unit) {
this.unit = unit;
return this;
}
/**
* @return unit
**/
@ApiModelProperty(value = "")
@JsonProperty("unit")
public String getUnit() {
return unit == null ? "" : unit;
}
public void setUnit(String unit) {
this.unit = unit;
}
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResourceInformation resourceInformation = (ResourceInformation) o;
return Objects
.equals(this.value, resourceInformation.value) && Objects.equals(
this.unit, resourceInformation.unit);
}
@Override
public int hashCode() {
return Objects.hash(value, unit);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class ResourceInformation {\n");
sb.append(" value: ").append(toIndentedString(value)).append("\n");
sb.append(" unit: ").append(toIndentedString(unit)).append("\n");
sb.append("}");
return sb.toString();
}
/**
* Convert the given object to string with each line indented by 4 spaces
* (except the first line).
*/
private String toIndentedString(java.lang.Object o) {
if (o == null) {
return "null";
}
return o.toString().replace("\n", "\n ");
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
@ -392,9 +393,37 @@ public class Component implements EventHandler<ComponentEvent> {
@SuppressWarnings({ "unchecked" })
public void requestContainers(long count) {
Resource resource = Resource
.newInstance(componentSpec.getResource().calcMemoryMB(),
componentSpec.getResource().getCpus());
org.apache.hadoop.yarn.service.api.records.Resource componentResource =
componentSpec.getResource();
Resource resource = Resource.newInstance(componentResource.calcMemoryMB(),
componentResource.getCpus());
if (componentResource.getAdditional() != null) {
for (Map.Entry<String, ResourceInformation> entry : componentResource
.getAdditional().entrySet()) {
String resourceName = entry.getKey();
// Avoid setting memory/cpu under "additional"
if (resourceName.equals(
org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI)
|| resourceName.equals(
org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI)) {
LOG.warn("Please set memory/vcore in the main section of resource, "
+ "ignoring this entry=" + resourceName);
continue;
}
ResourceInformation specInfo = entry.getValue();
org.apache.hadoop.yarn.api.records.ResourceInformation ri =
org.apache.hadoop.yarn.api.records.ResourceInformation.newInstance(
entry.getKey(),
specInfo.getUnit(),
specInfo.getValue());
resource.setResourceInformation(resourceName, ri);
}
}
for (int i = 0; i < count; i++) {
//TODO Once YARN-5468 is done, use that for anti-affinity

View File

@ -29,8 +29,16 @@ import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@ -47,11 +55,16 @@ import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
@ -88,7 +101,6 @@ public class MockServiceAM extends ServiceMaster {
this.service = service;
}
@Override
protected ContainerId getAMContainerId()
throws BadClusterStateException {
@ -185,7 +197,11 @@ public class MockServiceAM extends ServiceMaster {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) {
return mock(RegisterApplicationMasterResponse.class);
RegisterApplicationMasterResponse response = mock(
RegisterApplicationMasterResponse.class);
when(response.getResourceTypes()).thenReturn(
ResourceUtils.getResourcesTypeInfo());
return response;
}
@Override public void unregisterApplicationMaster(
@ -195,8 +211,11 @@ public class MockServiceAM extends ServiceMaster {
}
};
return AMRMClientAsync.createAMRMClientAsync(client1, 1000,
AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync =
AMRMClientAsync.createAMRMClientAsync(client1, 1000,
this.new AMRMClientCallback());
return amrmClientAsync;
}
@SuppressWarnings("SuspiciousMethodCalls")

View File

@ -18,17 +18,25 @@
package org.apache.hadoop.yarn.service;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.ComponentState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -38,10 +46,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.registry.client.api.RegistryConstants
.KEY_REGISTRY_ZK_QUORUM;
import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
public class TestServiceAM extends ServiceTestUtils{
@ -183,12 +193,12 @@ public class TestServiceAM extends ServiceTestUtils{
am.init(conf);
am.start();
Thread.sleep(100);
GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState().equals(
ComponentState.FLEXING), 100, 2000);
GenericTestUtils.waitFor(() -> am.getComponent(comp1Name).getState()
.equals(ComponentState.FLEXING), 100, 2000);
// 1 pending instance
Assert.assertEquals(1,
am.getComponent(comp1Name).getPendingInstances().size());
Assert.assertEquals(1, am.getComponent(comp1Name).getPendingInstances()
.size());
am.feedContainerToComp(exampleApp, 2, comp1Name);
@ -198,6 +208,47 @@ public class TestServiceAM extends ServiceTestUtils{
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
am.getCompInstance(comp1Name, comp1InstName).getContainerStatus()
.getState());
}
@Test
public void testScheduleWithMultipleResourceTypes()
throws TimeoutException, InterruptedException, IOException {
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testScheduleWithMultipleResourceTypes");
List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
ResourceUtils.getResourcesTypeInfo());
// Add 3rd resource type.
resourceTypeInfos.add(ResourceTypeInfo
.newInstance("resource-1", "", ResourceTypes.COUNTABLE));
// Reinitialize resource types
ResourceUtils.reinitializeResources(resourceTypeInfos);
Component serviceCompoent = createComponent("compa", 1, "pwd");
serviceCompoent.getResource().setResourceInformations(ImmutableMap
.of("resource-1", new ResourceInformation().value(3333L).unit("Gi")));
exampleApp.addComponent(serviceCompoent);
MockServiceAM am = new MockServiceAM(exampleApp);
am.init(conf);
am.start();
ServiceScheduler serviceScheduler = am.context.scheduler;
AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync =
serviceScheduler.getAmRMClient();
Collection<AMRMClient.ContainerRequest> rr =
amrmClientAsync.getMatchingRequests(0);
Assert.assertEquals(1, rr.size());
org.apache.hadoop.yarn.api.records.Resource capability =
rr.iterator().next().getCapability();
Assert.assertEquals(3333L, capability.getResourceValue("resource-1"));
Assert.assertEquals("Gi",
capability.getResourceInformation("resource-1").getUnits());
am.stop();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.conf;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.Configuration;
@ -194,6 +195,22 @@ public class TestAppJsonResolve extends Assert {
assertEquals("60",
worker.getProperty("yarn.service.failure-count-reset.window"));
// Validate worker's resources
Resource workerResource = orig.getComponent("worker").getResource();
Assert.assertEquals(1, workerResource.getCpus().intValue());
Assert.assertEquals(1024, workerResource.calcMemoryMB());
Assert.assertNotNull(workerResource.getAdditional());
Assert.assertEquals(2, workerResource.getAdditional().size());
Assert.assertEquals(3333, workerResource.getAdditional().get(
"resource-1").getValue().longValue());
Assert.assertEquals("Gi", workerResource.getAdditional().get(
"resource-1").getUnit());
Assert.assertEquals(5, workerResource.getAdditional().get(
"yarn.io/gpu").getValue().longValue());
Assert.assertEquals("", workerResource.getAdditional().get(
"yarn.io/gpu").getUnit());
other = orig.getComponent("other").getConfiguration();
assertEquals(0, other.getProperties().size());
}

View File

@ -37,7 +37,16 @@
"launch_command": "sleep 3600",
"resource": {
"cpus": 1,
"memory": "1024"
"memory": "1024",
"additional": {
"resource-1": {
"value": 3333,
"unit": "Gi"
},
"yarn.io/gpu": {
"value": 5
}
}
},
"configuration": {
"properties": {

View File

@ -1,4 +1,4 @@
<!---
# <!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
@ -330,6 +330,7 @@ Resource determines the amount of resources (vcores, memory, network, etc.) usab
|profile|Each resource profile has a unique id which is associated with a cluster-level predefined memory, cpus, etc.|false|string||
|cpus|Amount of vcores allocated to each container (optional but overrides cpus in profile if specified).|false|integer (int32)||
|memory|Amount of memory allocated to each container (optional but overrides memory in profile if specified). Currently accepts only an integer value and default unit is in MB.|false|string||
|additional|A map of resource type name to resource type information. Including value (integer), and unit (string). This will be used to specify resource other than cpu and memory. Please refer to example below. | false | object ||
### Service
@ -395,8 +396,14 @@ POST URL - http://localhost:8088/ws/v1/services
"launch_command": "./start_nginx.sh",
"resource": {
"cpus": 1,
"memory": "256"
}
"memory": "256",
"additional" : {
"yarn.io/gpu" : {
"value" : 4,
"unit" : ""
}
}
}
}
]
}
@ -605,3 +612,5 @@ POST URL - http://localhost:8088:/ws/v1/services/hbase-app-1
}
}
```