Merge remote-tracking branch 'apache-commit/trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-01 11:52:59 -07:00
commit 217dea7794
55 changed files with 1334 additions and 325 deletions

View File

@ -476,6 +476,15 @@ Release 2.6.0 - UNRELEASED
HADOOP-10814. Update Tomcat version used by HttpFS and KMS to latest
6.x version. (rkanter via tucu)
HADOOP-10994. KeyProviderCryptoExtension should use CryptoCodec for
generation/decryption of keys. (tucu)
HADOOP-11021. Configurable replication factor in the hadoop archive
command. (Zhe Zhang via wang)
HADOOP-11030. Define a variable jackson.version instead of using constant
at multiple places. (Juan Yu via kasha)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -528,6 +537,8 @@ Release 2.6.0 - UNRELEASED
schedules incoming calls and multiplexes outgoing calls. (Chris Li via
Arpit Agarwal)
HADOOP-10833. Remove unused cache in UserProvider. (Benoy Antony)
BUG FIXES
HADOOP-10781. Unportable getgrouplist() usage breaks FreeBSD (Dmitry

View File

@ -108,6 +108,7 @@ public class JavaKeyStoreProvider extends KeyProvider {
private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
private JavaKeyStoreProvider(URI uri, Configuration conf) throws IOException {
super(conf);
this.uri = uri;
path = ProviderUtils.unnestUri(uri);
fs = path.getFileSystem(conf);

View File

@ -56,6 +56,8 @@ public abstract class KeyProvider {
"hadoop.security.key.default.bitlength";
public static final int DEFAULT_BITLENGTH = 128;
private final Configuration conf;
/**
* The combination of both the key version name and the key material.
*/
@ -353,6 +355,24 @@ public String toString() {
}
}
/**
* Constructor.
*
* @param conf configuration for the provider
*/
public KeyProvider(Configuration conf) {
this.conf = new Configuration(conf);
}
/**
* Return the provider configuration.
*
* @return the provider configuration
*/
public Configuration getConf() {
return conf;
}
/**
* A helper function to create an options object.
* @param conf the configuration to use

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.crypto.key;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.security.SecureRandom;
@ -29,6 +30,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.Decryptor;
import org.apache.hadoop.crypto.Encryptor;
/**
* A KeyProvider with Cryptographic Extensions specifically for generating
@ -239,18 +243,25 @@ public EncryptedKeyVersion generateEncryptedKey(String encryptionKeyName)
Preconditions.checkNotNull(encryptionKey,
"No KeyVersion exists for key '%s' ", encryptionKeyName);
// Generate random bytes for new key and IV
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
CryptoCodec cc = CryptoCodec.getInstance(keyProvider.getConf());
final byte[] newKey = new byte[encryptionKey.getMaterial().length];
RANDOM.get().nextBytes(newKey);
final byte[] iv = new byte[cipher.getBlockSize()];
RANDOM.get().nextBytes(iv);
cc.generateSecureRandom(newKey);
final byte[] iv = new byte[cc.getCipherSuite().getAlgorithmBlockSize()];
cc.generateSecureRandom(iv);
// Encryption key IV is derived from new key's IV
final byte[] encryptionIV = EncryptedKeyVersion.deriveIV(iv);
// Encrypt the new key
cipher.init(Cipher.ENCRYPT_MODE,
new SecretKeySpec(encryptionKey.getMaterial(), "AES"),
new IvParameterSpec(encryptionIV));
final byte[] encryptedKey = cipher.doFinal(newKey);
Encryptor encryptor = cc.createEncryptor();
encryptor.init(encryptionKey.getMaterial(), encryptionIV);
int keyLen = newKey.length;
ByteBuffer bbIn = ByteBuffer.allocateDirect(keyLen);
ByteBuffer bbOut = ByteBuffer.allocateDirect(keyLen);
bbIn.put(newKey);
bbIn.flip();
encryptor.encrypt(bbIn, bbOut);
bbOut.flip();
byte[] encryptedKey = new byte[keyLen];
bbOut.get(encryptedKey);
return new EncryptedKeyVersion(encryptionKeyName,
encryptionKey.getVersionName(), iv,
new KeyVersion(encryptionKey.getName(), EEK, encryptedKey));
@ -274,19 +285,25 @@ public KeyVersion decryptEncryptedKey(
KeyProviderCryptoExtension.EEK,
encryptedKeyVersion.getEncryptedKeyVersion().getVersionName()
);
final byte[] encryptionKeyMaterial = encryptionKey.getMaterial();
// Encryption key IV is determined from encrypted key's IV
final byte[] encryptionIV =
EncryptedKeyVersion.deriveIV(encryptedKeyVersion.getEncryptedKeyIv());
// Init the cipher with encryption key parameters
Cipher cipher = Cipher.getInstance("AES/CTR/NoPadding");
cipher.init(Cipher.DECRYPT_MODE,
new SecretKeySpec(encryptionKeyMaterial, "AES"),
new IvParameterSpec(encryptionIV));
// Decrypt the encrypted key
CryptoCodec cc = CryptoCodec.getInstance(keyProvider.getConf());
Decryptor decryptor = cc.createDecryptor();
decryptor.init(encryptionKey.getMaterial(), encryptionIV);
final KeyVersion encryptedKV =
encryptedKeyVersion.getEncryptedKeyVersion();
final byte[] decryptedKey = cipher.doFinal(encryptedKV.getMaterial());
int keyLen = encryptedKV.getMaterial().length;
ByteBuffer bbIn = ByteBuffer.allocateDirect(keyLen);
ByteBuffer bbOut = ByteBuffer.allocateDirect(keyLen);
bbIn.put(encryptedKV.getMaterial());
bbIn.flip();
decryptor.decrypt(bbIn, bbOut);
bbOut.flip();
byte[] decryptedKey = new byte[keyLen];
bbOut.get(decryptedKey);
return new KeyVersion(encryptionKey.getName(), EK, decryptedKey);
}

View File

@ -40,6 +40,7 @@ public static interface Extension {
private E extension;
public KeyProviderExtension(KeyProvider keyProvider, E extensions) {
super(keyProvider.getConf());
this.keyProvider = keyProvider;
this.extension = extensions;
}

View File

@ -44,7 +44,8 @@ public class UserProvider extends KeyProvider {
private final Credentials credentials;
private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
private UserProvider() throws IOException {
private UserProvider(Configuration conf) throws IOException {
super(conf);
user = UserGroupInformation.getCurrentUser();
credentials = user.getCredentials();
}
@ -145,7 +146,7 @@ public static class Factory extends KeyProviderFactory {
public KeyProvider createProvider(URI providerName,
Configuration conf) throws IOException {
if (SCHEME_NAME.equals(providerName.getScheme())) {
return new UserProvider();
return new UserProvider(conf);
}
return null;
}

View File

@ -283,6 +283,7 @@ public HttpURLConnection configure(HttpURLConnection conn)
}
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
super(conf);
Path path = ProviderUtils.unnestUri(uri);
URL url = path.toUri().toURL();
kmsUrl = createServiceURL(url);

View File

@ -21,9 +21,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -41,8 +39,6 @@ public class UserProvider extends CredentialProvider {
public static final String SCHEME_NAME = "user";
private final UserGroupInformation user;
private final Credentials credentials;
private final Map<String, CredentialEntry> cache = new HashMap<String,
CredentialEntry>();
private UserProvider() throws IOException {
user = UserGroupInformation.getCurrentUser();
@ -86,7 +82,6 @@ public synchronized void deleteCredentialEntry(String name) throws IOException {
throw new IOException("Credential " + name +
" does not exist in " + this);
}
cache.remove(name);
}
@Override

View File

@ -19,6 +19,7 @@
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.junit.Assert;
import org.junit.Test;
@ -32,6 +33,7 @@ public void testCurrentKey() throws Exception {
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k2"))).thenReturn(null);
Mockito.when(mockProv.getConf()).thenReturn(new Configuration());
KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
// asserting caching
@ -58,6 +60,7 @@ public void testKeyVersion() throws Exception {
Mockito.when(mockProv.getKeyVersion(Mockito.eq("k1@0")))
.thenReturn(mockKey);
Mockito.when(mockProv.getKeyVersion(Mockito.eq("k2@0"))).thenReturn(null);
Mockito.when(mockProv.getConf()).thenReturn(new Configuration());
KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
// asserting caching
@ -88,6 +91,7 @@ public void testMetadata() throws Exception {
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(mockMeta);
Mockito.when(mockProv.getMetadata(Mockito.eq("k2"))).thenReturn(null);
Mockito.when(mockProv.getConf()).thenReturn(new Configuration());
KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
// asserting caching
@ -112,6 +116,7 @@ public void testRollNewVersion() throws Exception {
KeyProvider.KeyVersion mockKey = Mockito.mock(KeyProvider.KeyVersion.class);
KeyProvider mockProv = Mockito.mock(KeyProvider.class);
Mockito.when(mockProv.getCurrentKey(Mockito.eq("k1"))).thenReturn(mockKey);
Mockito.when(mockProv.getConf()).thenReturn(new Configuration());
KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));
@ -134,6 +139,7 @@ public void testDeleteKey() throws Exception {
.thenReturn(mockKey);
Mockito.when(mockProv.getMetadata(Mockito.eq("k1"))).thenReturn(
new KMSClientProvider.KMSMetadata("c", 0, "l", null, new Date(), 1));
Mockito.when(mockProv.getConf()).thenReturn(new Configuration());
KeyProvider cache = new CachingKeyProvider(mockProv, 100, 100);
Assert.assertEquals(mockKey, cache.getCurrentKey("k1"));
Mockito.verify(mockProv, Mockito.times(1)).getCurrentKey(Mockito.eq("k1"));

View File

@ -159,6 +159,10 @@ private static class MyKeyProvider extends KeyProvider {
private int size;
private byte[] material;
public MyKeyProvider(Configuration conf) {
super(conf);
}
@Override
public KeyVersion getKeyVersion(String versionName)
throws IOException {
@ -216,7 +220,7 @@ protected byte[] generateKey(int size, String algorithm)
@Test
public void testMaterialGeneration() throws Exception {
MyKeyProvider kp = new MyKeyProvider();
MyKeyProvider kp = new MyKeyProvider(new Configuration());
KeyProvider.Options options = new KeyProvider.Options(new Configuration());
options.setCipher(CIPHER);
options.setBitLength(128);
@ -225,10 +229,19 @@ public void testMaterialGeneration() throws Exception {
Assert.assertEquals(CIPHER, kp.algorithm);
Assert.assertNotNull(kp.material);
kp = new MyKeyProvider();
kp = new MyKeyProvider(new Configuration());
kp.rollNewVersion("hello");
Assert.assertEquals(128, kp.size);
Assert.assertEquals(CIPHER, kp.algorithm);
Assert.assertNotNull(kp.material);
}
@Test
public void testConfiguration() throws Exception {
Configuration conf = new Configuration(false);
conf.set("a", "A");
MyKeyProvider kp = new MyKeyProvider(conf);
Assert.assertEquals("A", kp.getConf().get("a"));
}
}

View File

@ -29,13 +29,18 @@
import org.apache.hadoop.security.token.Token;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestKeyProviderDelegationTokenExtension {
public static abstract class MockKeyProvider extends
KeyProvider implements DelegationTokenExtension {
public MockKeyProvider() {
super(new Configuration(false));
}
}
@Test
public void testCreateExtension() throws Exception {
Configuration conf = new Configuration();
@ -50,9 +55,11 @@ public void testCreateExtension() throws Exception {
Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials));
MockKeyProvider mock = mock(MockKeyProvider.class);
Mockito.when(mock.getConf()).thenReturn(new Configuration());
when(mock.addDelegationTokens("renewer", credentials)).thenReturn(
new Token<?>[] { new Token(null, null, new Text("kind"), new Text(
"service")) });
new Token<?>[]{new Token(null, null, new Text("kind"), new Text(
"service"))}
);
KeyProviderDelegationTokenExtension kpDTE2 =
KeyProviderDelegationTokenExtension
.createKeyProviderDelegationTokenExtension(mock);

View File

@ -577,6 +577,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6800. Support Datanode layout changes with rolling upgrade.
(James Thomas via Arpit Agarwal)
HDFS-6972. TestRefreshUserMappings.testRefreshSuperUserGroupsConfiguration
doesn't decode url correctly. (Yongjun Zhang via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -30,7 +30,9 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -225,15 +227,17 @@ public void testRefreshSuperUserGroupsConfiguration() throws Exception {
}
private void addNewConfigResource(String rsrcName, String keyGroup,
String groups, String keyHosts, String hosts) throws FileNotFoundException {
String groups, String keyHosts, String hosts)
throws FileNotFoundException, UnsupportedEncodingException {
// location for temp resource should be in CLASSPATH
Configuration conf = new Configuration();
URL url = conf.getResource("hdfs-site.xml");
Path p = new Path(url.getPath());
String urlPath = URLDecoder.decode(url.getPath().toString(), "UTF-8");
Path p = new Path(urlPath);
Path dir = p.getParent();
tempResource = dir.toString() + "/" + rsrcName;
String newResource =
"<configuration>"+
"<property><name>" + keyGroup + "</name><value>"+groups+"</value></property>" +

View File

@ -38,7 +38,7 @@ Overview
How to Create an Archive
------------------------
`Usage: hadoop archive -archiveName name -p <parent> <src>* <dest>`
`Usage: hadoop archive -archiveName name -p <parent> [-r <replication factor>] <src>* <dest>`
-archiveName is the name of the archive you would like to create. An example
would be foo.har. The name should have a \*.har extension. The parent argument
@ -52,9 +52,12 @@ How to Create an Archive
would need a map reduce cluster to run this. For a detailed example the later
sections.
-r indicates the desired replication factor; if this optional argument is
not specified, a replication factor of 10 will be used.
If you just want to archive a single directory /foo/bar then you can just use
`hadoop archive -archiveName zoo.har -p /foo/bar /outputdir`
`hadoop archive -archiveName zoo.har -p /foo/bar -r 3 /outputdir`
How to Look Up Files in Archives
--------------------------------
@ -90,14 +93,15 @@ Archives Examples
$H3 Creating an Archive
`hadoop archive -archiveName foo.har -p /user/hadoop dir1 dir2 /user/zoo`
`hadoop archive -archiveName foo.har -p /user/hadoop -r 3 dir1 dir2 /user/zoo`
The above example is creating an archive using /user/hadoop as the relative
archive directory. The directories /user/hadoop/dir1 and /user/hadoop/dir2
will be archived in the following file system directory -- /user/zoo/foo.har.
Archiving does not delete the input files. If you want to delete the input
files after creating the archives (to reduce namespace), you will have to do
it on your own.
it on your own. In this example, because `-r 3` is specified, a replication
factor of 3 will be used.
$H3 Looking Up Files

View File

@ -61,6 +61,9 @@
<!-- jersey version -->
<jersey.version>1.9</jersey.version>
<!-- jackson version -->
<jackson.version>1.9.13</jackson.version>
<!-- ProtocolBuffer version, used to verify the protoc version and -->
<!-- define the protobuf JAR version -->
<protobuf.version>2.5.0</protobuf.version>
@ -637,22 +640,22 @@
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
<version>1.9.13</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>

View File

@ -97,9 +97,12 @@ public class HadoopArchives implements Tool {
long partSize = 2 * 1024 * 1024 * 1024l;
/** size of blocks in hadoop archives **/
long blockSize = 512 * 1024 * 1024l;
/** the desired replication degree; default is 10 **/
short repl = 10;
private static final String usage = "archive"
+ " -archiveName NAME -p <parent path> <src>* <dest>" +
+ " -archiveName NAME -p <parent path> [-r <replication factor>]" +
"<src>* <dest>" +
"\n";
@ -542,7 +545,7 @@ void archive(Path parentPath, List<Path> srcPaths,
srcWriter.close();
}
//increase the replication of src files
jobfs.setReplication(srcFiles, (short) 10);
jobfs.setReplication(srcFiles, repl);
conf.setInt(SRC_COUNT_LABEL, numFiles);
conf.setLong(TOTAL_SIZE_LABEL, totalSize);
int numMaps = (int)(totalSize/partSize);
@ -835,6 +838,11 @@ public int run(String[] args) throws Exception {
}
i+=2;
if ("-r".equals(args[i])) {
repl = Short.parseShort(args[i+1]);
i+=2;
}
//read the rest of the paths
for (; i < args.length; i++) {
if (i == (args.length - 1)) {

View File

@ -157,6 +157,24 @@ public void testRelativePath() throws Exception {
final List<String> harPaths = lsr(shell, fullHarPathStr);
Assert.assertEquals(originalPaths, harPaths);
}
@Test
public void testRelativePathWitRepl() throws Exception {
final Path sub1 = new Path(inputPath, "dir1");
fs.mkdirs(sub1);
createFile(inputPath, fs, sub1.getName(), "a");
final FsShell shell = new FsShell(conf);
final List<String> originalPaths = lsr(shell, "input");
System.out.println("originalPaths: " + originalPaths);
// make the archive:
final String fullHarPathStr = makeArchiveWithRepl();
// compare results:
final List<String> harPaths = lsr(shell, fullHarPathStr);
Assert.assertEquals(originalPaths, harPaths);
}
@Test
public void testPathWithSpaces() throws Exception {
@ -625,6 +643,29 @@ private String makeArchive() throws Exception {
assertEquals(0, ToolRunner.run(har, args));
return fullHarPathStr;
}
/*
* Run the HadoopArchives tool to create an archive on the
* given file system with a specified replication degree.
*/
private String makeArchiveWithRepl() throws Exception {
final String inputPathStr = inputPath.toUri().getPath();
System.out.println("inputPathStr = " + inputPathStr);
final URI uri = fs.getUri();
final String prefix = "har://hdfs-" + uri.getHost() + ":" + uri.getPort()
+ archivePath.toUri().getPath() + Path.SEPARATOR;
final String harName = "foo.har";
final String fullHarPathStr = prefix + harName;
final String[] args = { "-archiveName", harName, "-p", inputPathStr,
"-r 3", "*", archivePath.toString() };
System.setProperty(HadoopArchives.TEST_HADOOP_ARCHIVES_JAR_PATH,
HADOOP_ARCHIVES_JAR);
final HadoopArchives har = new HadoopArchives(conf);
assertEquals(0, ToolRunner.run(har, args));
return fullHarPathStr;
}
@Test
/*

View File

@ -32,7 +32,6 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -55,7 +54,7 @@ private static class FakeRMNodeImpl implements RMNode {
private String nodeAddr;
private String httpAddress;
private int cmdPort;
private volatile ResourceOption perNode;
private volatile Resource perNode;
private String rackName;
private String healthReport;
private NodeState state;
@ -63,7 +62,7 @@ private static class FakeRMNodeImpl implements RMNode {
private List<ApplicationId> toCleanUpApplications;
public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
ResourceOption perNode, String rackName, String healthReport,
Resource perNode, String rackName, String healthReport,
int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
@ -111,10 +110,6 @@ public long getLastHealthReportTime() {
}
public Resource getTotalCapability() {
return perNode.getResource();
}
public ResourceOption getResourceOption() {
return perNode;
}
@ -159,32 +154,26 @@ public List<UpdatedContainerInfo> pullContainerUpdates() {
return list;
}
@Override
public String getNodeManagerVersion() {
// TODO Auto-generated method stub
return null;
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
perNode = resourceOption;
public String getNodeManagerVersion() {
return null;
}
}
public static RMNode newNodeInfo(String rackName, String hostName,
final ResourceOption resourceOption, int port) {
final Resource resource, int port) {
final NodeId nodeId = newNodeID(hostName, port);
final String nodeAddr = hostName + ":" + port;
final String httpAddress = hostName;
return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
resourceOption, rackName, "Me good",
resource, rackName, "Me good",
port, hostName, null);
}
public static RMNode newNodeInfo(String rackName, String hostName,
final Resource resource) {
return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++);
return newNodeInfo(rackName, hostName, resource, NODE_ID++);
}
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@ -148,14 +147,4 @@ public String getNodeManagerVersion() {
return node.getNodeManagerVersion();
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
node.setResourceOption(resourceOption);
}
@Override
public ResourceOption getResourceOption() {
return node.getResourceOption();
}
}

View File

@ -58,6 +58,9 @@ Release 2.6.0 - UNRELEASED
YARN-2393. FairScheduler: Add the notion of steady fair share.
(Wei Yan via kasha)
YARN-2395. FairScheduler: Preemption timeout should be configurable per
queue. (Wei Yan via kasha)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
@ -163,6 +166,12 @@ Release 2.6.0 - UNRELEASED
YARN-2406. Move RM recovery related proto to
yarn_server_resourcemanager_recovery.proto. (Tsuyoshi Ozawa via jianhe)
YARN-2360. Fair Scheduler: Display dynamic fair share for queues on the
scheduler page. (Ashwin Shankar and Wei Yan via kasha)
YARN-1506. Changed RMNode/SchedulerNode to update resource with event
notification. (Junping Du via jianhe)
OPTIMIZATIONS
BUG FIXES

View File

@ -62,6 +62,8 @@ public static ResourceOption newInstance(Resource resource,
@Evolving
protected abstract void setOverCommitTimeout(int overCommitTimeout);
@Private
@Evolving
protected abstract void build();
@Override

View File

@ -17,9 +17,10 @@
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>The response sent by the <code>ResourceManager</code> to Admin client on
@ -30,8 +31,13 @@
* @see ResourceManagerAdministrationProtocol#updateNodeResource(
* UpdateNodeResourceRequest)
*/
@Public
@Private
@Evolving
public interface UpdateNodeResourceResponse {
public abstract class UpdateNodeResourceResponse {
public static UpdateNodeResourceResponse newInstance(){
UpdateNodeResourceResponse response =
Records.newRecord(UpdateNodeResourceResponse.class);
return response;
}
}

View File

@ -22,14 +22,15 @@
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProtoOrBuilder;
import com.google.common.base.Preconditions;
public class ResourceOptionPBImpl extends ResourceOption {
ResourceOptionProto proto = null;
ResourceOptionProto proto = ResourceOptionProto.getDefaultInstance();
ResourceOptionProto.Builder builder = null;
private Resource resource = null;
boolean viaProto = false;
public ResourceOptionPBImpl() {
builder = ResourceOptionProto.newBuilder();
@ -37,39 +38,46 @@ public ResourceOptionPBImpl() {
public ResourceOptionPBImpl(ResourceOptionProto proto) {
this.proto = proto;
this.resource = convertFromProtoFormat(proto.getResource());
viaProto = true;
}
public ResourceOptionProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public Resource getResource() {
return this.resource;
ResourceOptionProtoOrBuilder p = viaProto ? proto : builder;
return convertFromProtoFormat(p.getResource());
}
@Override
protected void setResource(Resource resource) {
if (resource != null) {
Preconditions.checkNotNull(builder);
builder.setResource(convertToProtoFormat(resource));
}
this.resource = resource;
maybeInitBuilder();
builder.setResource(convertToProtoFormat(resource));
}
@Override
public int getOverCommitTimeout() {
Preconditions.checkNotNull(proto);
return proto.getOverCommitTimeout();
ResourceOptionProtoOrBuilder p = viaProto ? proto : builder;
return p.getOverCommitTimeout();
}
@Override
protected void setOverCommitTimeout(int overCommitTimeout) {
Preconditions.checkNotNull(builder);
maybeInitBuilder();
builder.setOverCommitTimeout(overCommitTimeout);
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ResourceOptionProto.newBuilder(proto);
}
viaProto = false;
}
private ResourceProto convertToProtoFormat(
Resource resource) {
return ((ResourcePBImpl)resource).getProto();
@ -83,6 +91,7 @@ private ResourcePBImpl convertFromProtoFormat(
@Override
protected void build() {
proto = builder.build();
viaProto = true;
builder = null;
}

View File

@ -20,7 +20,7 @@
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
public class UpdateNodeResourceResponsePBImpl implements UpdateNodeResourceResponse {
public class UpdateNodeResourceResponsePBImpl extends UpdateNodeResourceResponse {
UpdateNodeResourceResponseProto proto = UpdateNodeResourceResponseProto.getDefaultInstance();
UpdateNodeResourceResponseProto.Builder builder = null;

View File

@ -72,6 +72,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import com.google.common.annotations.VisibleForTesting;
@ -513,9 +514,20 @@ public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
@SuppressWarnings("unchecked")
@Override
public UpdateNodeResourceResponse updateNodeResource(
UpdateNodeResourceRequest request) throws YarnException, IOException {
String argName = "updateNodeResource";
UserGroupInformation user = checkAcls(argName);
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), argName,
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not update node resource.");
throwStandbyException();
}
Map<NodeId, ResourceOption> nodeResourceMap = request.getNodeResourceMap();
Set<NodeId> nodeIds = nodeResourceMap.keySet();
// verify nodes are all valid first.
@ -536,21 +548,31 @@ public UpdateNodeResourceResponse updateNodeResource(
// Notice: it is still possible to have invalid NodeIDs as nodes decommission
// may happen just at the same time. This time, only log and skip absent
// nodes without throwing any exceptions.
boolean allSuccess = true;
for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
ResourceOption newResourceOption = entry.getValue();
NodeId nodeId = entry.getKey();
RMNode node = this.rmContext.getRMNodes().get(nodeId);
if (node == null) {
LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
allSuccess = false;
} else {
node.setResourceOption(newResourceOption);
LOG.info("Update resource successfully on node(" + node.getNodeID()
+") with resource(" + newResourceOption.toString() + ")");
// update resource to RMNode
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
LOG.info("Update resource on node(" + node.getNodeID()
+ ") with resource(" + newResourceOption.toString() + ")");
}
}
UpdateNodeResourceResponse response = recordFactory.newRecordInstance(
UpdateNodeResourceResponse.class);
return response;
if (allSuccess) {
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
}
UpdateNodeResourceResponse response =
UpdateNodeResourceResponse.newInstance();
return response;
}
private synchronized Configuration getConfiguration(Configuration conf,

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -299,8 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager(
.getCurrentKey());
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
nodeManagerVersion);
resolve(host), capability, nodeManagerVersion);
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {

View File

@ -101,18 +101,6 @@ public interface RMNode {
*/
public Resource getTotalCapability();
/**
* Set resource option with total available resource and overCommitTimoutMillis
* @param resourceOption
*/
public void setResourceOption(ResourceOption resourceOption);
/**
* resource option with total available resource and overCommitTimoutMillis
* @return ResourceOption
*/
public ResourceOption getResourceOption();
/**
* The rack name for this node manager.
* @return the rack name.

View File

@ -24,6 +24,9 @@ public enum RMNodeEventType {
// Source: AdminService
DECOMMISSION,
// Source: AdminService, ResourceTrackerService
RESOURCE_UPDATE,
// ResourceTrackerService
STATUS_UPDATE,

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils.ContainerIdComparator;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@ -96,7 +97,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private int httpPort;
private final String nodeAddress; // The containerManager address
private String httpAddress;
private volatile ResourceOption resourceOption;
private volatile Resource totalCapability;
private final Node node;
private String healthReport;
@ -129,6 +130,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
.addTransition(NodeState.NEW, NodeState.NEW,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from RUNNING state
.addTransition(NodeState.RUNNING,
@ -149,6 +153,23 @@ RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
//Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from DECOMMISSIONED state
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY,
@ -169,6 +190,8 @@ RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
// create the topology tables
.installTopology();
@ -177,13 +200,13 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
RMNodeEvent> stateMachine;
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
this.commandPort = cmPort;
this.httpPort = httpPort;
this.resourceOption = resourceOption;
this.totalCapability = capability;
this.nodeAddress = hostName + ":" + cmPort;
this.httpAddress = hostName + ":" + httpPort;
this.node = node;
@ -239,17 +262,7 @@ public String getHttpAddress() {
@Override
public Resource getTotalCapability() {
return this.resourceOption.getResource();
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
this.resourceOption = resourceOption;
}
@Override
public ResourceOption getResourceOption(){
return this.resourceOption;
return this.totalCapability;
}
@Override
@ -473,6 +486,13 @@ private static void handleRunningAppOnNode(RMNodeImpl rmNode,
context.getDispatcher().getEventHandler()
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
RMNodeResourceUpdateEvent event){
ResourceOption resourceOption = event.getResourceOption();
// Set resource on RMNode
rmNode.totalCapability = resourceOption.getResource();
}
public static class AddNodeTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@ -526,8 +546,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
rmNode.httpPort = newNode.getHttpPort();
rmNode.httpAddress = newNode.getHttpAddress();
rmNode.resourceOption = newNode.getResourceOption();
rmNode.totalCapability = newNode.getTotalCapability();
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
@ -540,9 +560,43 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
if (rmNode.getState().equals(NodeState.RUNNING)) {
// Update scheduler node's capacity for reconnect node.
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeResourceUpdateSchedulerEvent(rmNode,
ResourceOption.newInstance(rmNode.totalCapability, -1)));
}
}
}
public static class UpdateNodeResourceWhenRunningTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeResourceUpdateEvent updateEvent = (RMNodeResourceUpdateEvent)event;
updateNodeResourceFromEvent(rmNode, updateEvent);
// Notify new resourceOption to scheduler
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption()));
}
}
public static class UpdateNodeResourceWhenUnusableTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// The node is not usable, only log a warn message
LOG.warn("Try to update resource on a "+ rmNode.getState().toString() +
" node: "+rmNode.toString());
updateNodeResourceFromEvent(rmNode, (RMNodeResourceUpdateEvent)event);
// No need to notify scheduler as schedulerNode is not function now
// and can sync later from RMnode.
}
}
public static class CleanUpAppTransition
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceOption;
public class RMNodeResourceUpdateEvent extends RMNodeEvent {
private final ResourceOption resourceOption;
public RMNodeResourceUpdateEvent(NodeId nodeId, ResourceOption resourceOption) {
super(nodeId, RMNodeEventType.RESOURCE_UPDATE);
this.resourceOption = resourceOption;
}
public ResourceOption getResourceOption() {
return resourceOption;
}
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -473,4 +474,32 @@ public synchronized void killAllAppsInQueue(String queueName)
.handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
}
}
/**
* Process resource update on a node.
*/
public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) {
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "
+ newResource);
// update resource to node
node.setTotalResource(newResource);
// update resource to clusterResource
Resources.subtractFrom(clusterResource, oldResource);
Resources.addTo(clusterResource, newResource);
} else {
// Log resource change
LOG.warn("Update resource on node: " + node.getNodeName()
+ " with the same resource: " + newResource);
}
}
}

View File

@ -77,6 +77,16 @@ public RMNode getRMNode() {
return this.rmNode;
}
/**
* Set total resources on the node.
* @param resource total resources on the node.
*/
public synchronized void setTotalResource(Resource resource){
this.totalResourceCapability = resource;
this.availableResource = Resources.subtract(totalResourceCapability,
this.usedResource);
}
/**
* Get the ID of the node which contains both its hostname and port.
*
@ -158,7 +168,7 @@ public synchronized Resource getUsedResource() {
*
* @return total resources on the node.
*/
public Resource getTotalResource() {
public synchronized Resource getTotalResource() {
return this.totalResourceCapability;
}
@ -259,19 +269,6 @@ public synchronized RMContainer getReservedContainer() {
this.reservedContainer = reservedContainer;
}
/**
* Apply delta resource on node's available resource.
*
* @param deltaResource
* the delta of resource need to apply to node
*/
public synchronized void
applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
public synchronized void recoverContainer(RMContainer rmContainer) {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;

View File

@ -19,7 +19,6 @@
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@ -147,42 +146,6 @@ public static void normalizeRequest(
ask.setCapability(normalized);
}
/**
* Update resource in SchedulerNode if any resource change in RMNode.
* @param node SchedulerNode with old resource view
* @param rmNode RMNode with new resource view
* @param clusterResource the cluster's resource that need to update
* @param log Scheduler's log for resource change
* @return true if the resources have changed
*/
public static boolean updateResourceIfChanged(SchedulerNode node,
RMNode rmNode, Resource clusterResource, Log log) {
boolean result = false;
Resource oldAvailableResource = node.getAvailableResource();
Resource newAvailableResource = Resources.subtract(
rmNode.getTotalCapability(), node.getUsedResource());
if (!newAvailableResource.equals(oldAvailableResource)) {
result = true;
Resource deltaResource = Resources.subtract(newAvailableResource,
oldAvailableResource);
// Reflect resource change to scheduler node.
node.applyDeltaOnAvailableResource(deltaResource);
// Reflect resource change to clusterResource.
Resources.addTo(clusterResource, deltaResource);
// TODO process resource over-commitment case (allocated containers
// > total capacity) in different option by getting value of
// overCommitTimeoutMillis.
// Log resource change
log.info("Resource change on node: " + rmNode.getNodeAddress()
+ " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
+ deltaResource.getMemory() +"MB");
}
return result;
}
/**
* Utility method to normalize a list of resource requests, by insuring that
* the memory for each request is a multiple of minMemory and is not zero.

View File

@ -50,6 +50,8 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -82,6 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -866,12 +869,6 @@ private synchronized void nodeUpdate(RMNode nm) {
FiCaSchedulerNode node = getNode(nm.getNodeID());
// Update resource if any change
if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
LOG)) {
root.updateClusterResource(clusterResource);
}
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@ -899,6 +896,15 @@ private synchronized void nodeUpdate(RMNode nm) {
+ " availableResource: " + node.getAvailableResource());
}
}
/**
* Process resource update on a node.
*/
private synchronized void updateNodeAndQueueResource(RMNode nm,
ResourceOption resourceOption) {
updateNodeResource(nm, resourceOption);
root.updateClusterResource(clusterResource);
}
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
@ -969,6 +975,14 @@ public void handle(SchedulerEvent event) {
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_RESOURCE_UPDATE:
{
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeResourceUpdateSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
private final ResourceOption resourceOption;
public NodeResourceUpdateSchedulerEvent(RMNode rmNode,
ResourceOption resourceOption) {
super(SchedulerEventType.NODE_RESOURCE_UPDATE);
this.rmNode = rmNode;
this.resourceOption = resourceOption;
}
public RMNode getRMNode() {
return rmNode;
}
public ResourceOption getResourceOption() {
return resourceOption;
}
}

View File

@ -24,6 +24,7 @@ public enum SchedulerEventType {
NODE_ADDED,
NODE_REMOVED,
NODE_UPDATE,
NODE_RESOURCE_UPDATE,
// Source: RMApp
APP_ADDED,

View File

@ -65,13 +65,10 @@ public class AllocationConfiguration {
// preempt other jobs' tasks.
private final Map<String, Long> minSharePreemptionTimeouts;
// Default min share preemption timeout for queues where it is not set
// explicitly.
private final long defaultMinSharePreemptionTimeout;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
private final long fairSharePreemptionTimeout;
// Fair share preemption timeout for each queue in seconds. If a job in the
// queue waits this long without receiving its fair share threshold, it is
// allowed to preempt other jobs' tasks.
private final Map<String, Long> fairSharePreemptionTimeouts;
private final Map<String, SchedulingPolicy> schedulingPolicies;
@ -94,8 +91,8 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources;
@ -110,9 +107,8 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.queueAcls = queueAcls;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
}
@ -129,8 +125,7 @@ public AllocationConfiguration(Configuration conf) {
queueMaxAMShareDefault = -1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeout = Long.MAX_VALUE;
fairSharePreemptionTimeouts = new HashMap<String, Long>();
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
configuredQueues = new HashMap<FSQueueType, Set<String>>();
@ -159,23 +154,22 @@ public AccessControlList getQueueAcl(String queue, QueueACL operation) {
}
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
* Get a queue's min share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getMinSharePreemptionTimeout(String queueName) {
Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName);
return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout
: minSharePreemptionTimeout;
return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout;
}
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
* Get a queue's fair share preemption timeout configured in the allocation
* file, in milliseconds. Return -1 if not set.
*/
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
public long getFairSharePreemptionTimeout(String queueName) {
Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName);
return (fairSharePreemptionTimeout == null) ?
-1 : fairSharePreemptionTimeout;
}
public ResourceWeights getQueueWeight(String queue) {

View File

@ -217,27 +217,28 @@ public synchronized void reloadAllocations() throws IOException,
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = -1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues =
Map<FSQueueType, Set<String>> configuredQueues =
new HashMap<FSQueueType, Set<String>>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
@ -276,10 +277,16 @@ public synchronized void reloadAllocations() throws IOException,
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
} else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val;
defaultFairSharePreemptionTimeout = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
@ -304,7 +311,7 @@ public synchronized void reloadAllocations() throws IOException,
}
}
}
// Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it.
for (Element element : queueElements) {
@ -318,10 +325,10 @@ public synchronized void reloadAllocations() throws IOException,
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
queueAcls, configuredQueues);
}
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
if (placementPolicyElement != null) {
@ -331,11 +338,22 @@ public synchronized void reloadAllocations() throws IOException,
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault,
queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
// Set the min/fair share preemption timeout for the root queue
if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){
minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionTimeout);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime();
@ -353,6 +371,7 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException {
@ -395,6 +414,10 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
@ -410,8 +433,8 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls,
configuredQueues);
queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, queueAcls, configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}

View File

@ -77,6 +77,15 @@ public void recomputeSteadyShares() {
}
}
@Override
public void updatePreemptionTimeouts() {
super.updatePreemptionTimeouts();
// For child queues
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionTimeouts();
}
}
@Override
public Resource getDemand() {
return demand;

View File

@ -52,6 +52,9 @@ public abstract class FSQueue implements Queue, Schedulable {
protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
private long minSharePreemptionTimeout = Long.MAX_VALUE;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
this.scheduler = scheduler;
@ -166,13 +169,47 @@ public void setSteadyFairShare(Resource steadyFairShare) {
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
}
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
}
public long getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
*/
public abstract void recomputeShares();
/**
* Update the min/fair share preemption timeouts for this queue.
*/
public void updatePreemptionTimeouts() {
// For min share
minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName());
if (minSharePreemptionTimeout == -1 && parent != null) {
minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
}
// For fair share
fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getFairSharePreemptionTimeout(getName());
if (fairSharePreemptionTimeout == -1 && parent != null) {
fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
}
}
/**
* Gets the children of this queue, if any.
*/

View File

@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -79,6 +80,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -504,9 +506,8 @@ protected void warnOrKillContainer(RMContainer container) {
* identical for some reason).
*/
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
@ -956,7 +957,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
allocation.getNMTokenList());
}
}
/**
* Process a heartbeat update from a node.
*/
@ -967,9 +968,6 @@ private synchronized void nodeUpdate(RMNode nm) {
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@ -1173,6 +1171,15 @@ public void handle(SchedulerEvent event) {
removeApplication(appRemovedEvent.getApplicationID(),
appRemovedEvent.getFinalState());
break;
case NODE_RESOURCE_UPDATE:
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
break;
case APP_ATTEMPT_ADDED:
if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
@ -1534,4 +1541,16 @@ FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
}
return queue1; // names are identical
}
/**
* Process resource update on a node and update Queue.
*/
@Override
public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) {
super.updateNodeResource(nm, resourceOption);
updateRootQueueMetrics();
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
queueMgr.getRootQueue().recomputeSteadyShares();
}
}

View File

@ -181,6 +181,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
setPreemptionTimeout(leafQueue, parent, queueConf);
return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
@ -192,6 +193,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
}
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
setPreemptionTimeout(newParent, parent, queueConf);
parent = newParent;
}
}
@ -199,6 +201,29 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
return parent;
}
/**
* Set the min/fair share preemption timeouts for the given queue.
* If the timeout is configured in the allocation file, the queue will use
* that value; otherwise, the queue inherits the value from its parent queue.
*/
private void setPreemptionTimeout(FSQueue queue,
FSParentQueue parentQueue, AllocationConfiguration queueConf) {
// For min share
long minSharePreemptionTimeout =
queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
if (minSharePreemptionTimeout == -1) {
minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
}
queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
// For fair share
long fairSharePreemptionTimeout =
queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
if (fairSharePreemptionTimeout == -1) {
fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
}
queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
@ -384,5 +409,7 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts for all queues recursively
rootQueue.updatePreemptionTimeouts();
}
}

View File

@ -85,6 +85,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -681,9 +682,6 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
// Update resource if any change
SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@ -750,6 +748,14 @@ public void handle(SchedulerEvent event) {
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_RESOURCE_UPDATE:
{
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent)event;
updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
}
break;
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent =

View File

@ -44,10 +44,12 @@ public class FairSchedulerPage extends RmView {
static final float Q_MAX_WIDTH = 0.8f;
static final float Q_STATS_POS = Q_MAX_WIDTH + 0.05f;
static final String Q_END = "left:101%";
static final String Q_GIVEN = "left:0%;background:none;border:1px dashed rgba(0,0,0,0.25)";
static final String Q_GIVEN = "left:0%;background:none;border:1px solid rgba(0,0,0,1)";
static final String Q_INSTANTANEOUS_FS = "left:0%;background:none;border:1px dashed rgba(0,0,0,1)";
static final String Q_OVER = "background:rgba(255, 140, 0, 0.8)";
static final String Q_UNDER = "background:rgba(50, 205, 50, 0.8)";
static final String STEADY_FAIR_SHARE = "Steady Fair Share";
static final String INSTANTANEOUS_FAIR_SHARE = "Instantaneous Fair Share";
@RequestScoped
static class FSQInfo {
FairSchedulerQueueInfo qinfo;
@ -73,8 +75,8 @@ protected void render(Block html) {
if (maxApps < Integer.MAX_VALUE) {
ri._("Max Running Applications:", qinfo.getMaxApplications());
}
ri._("Fair Share:", qinfo.getFairShare().toString());
ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString());
ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString());
html._(InfoBlock.class);
// clear the info contents so this queue's info doesn't accumulate into another queue's info
@ -95,16 +97,21 @@ public void render(Block html) {
UL<Hamlet> ul = html.ul("#pq");
for (FairSchedulerQueueInfo info : subQueues) {
float capacity = info.getMaxResourcesFraction();
float fairShare = info.getFairShareMemoryFraction();
float steadyFairShare = info.getSteadyFairShareMemoryFraction();
float instantaneousFairShare = info.getFairShareMemoryFraction();
float used = info.getUsedMemoryFraction();
LI<UL<Hamlet>> li = ul.
li().
a(_Q).$style(width(capacity * Q_MAX_WIDTH)).
$title(join("Fair Share:", percent(fairShare))).
span().$style(join(Q_GIVEN, ";font-size:1px;", width(fairShare/capacity))).
$title(join(join(STEADY_FAIR_SHARE + ":", percent(steadyFairShare)),
join(" " + INSTANTANEOUS_FAIR_SHARE + ":", percent(instantaneousFairShare)))).
span().$style(join(Q_GIVEN, ";font-size:1px;", width(steadyFairShare / capacity))).
_('.')._().
span().$style(join(Q_INSTANTANEOUS_FS, ";font-size:1px;",
width(instantaneousFairShare/capacity))).
_('.')._().
span().$style(join(width(used/capacity),
";font-size:1px;left:0%;", used > fairShare ? Q_OVER : Q_UNDER)).
";font-size:1px;left:0%;", used > instantaneousFairShare ? Q_OVER : Q_UNDER)).
_('.')._().
span(".q", info.getQueueName())._().
span().$class("qstats").$style(left(Q_STATS_POS)).
@ -156,7 +163,13 @@ public void render(Block html) {
li().$style("margin-bottom: 1em").
span().$style("font-weight: bold")._("Legend:")._().
span().$class("qlegend ui-corner-all").$style(Q_GIVEN).
_("Fair Share")._().
$title("The steady fair shares consider all queues, " +
"both active (with running applications) and inactive.").
_(STEADY_FAIR_SHARE)._().
span().$class("qlegend ui-corner-all").$style(Q_INSTANTANEOUS_FS).
$title("The instantaneous fair shares consider only active " +
"queues (with running applications).").
_(INSTANTANEOUS_FAIR_SHARE)._().
span().$class("qlegend ui-corner-all").$style(Q_UNDER).
_("Used")._().
span().$class("qlegend ui-corner-all").$style(Q_OVER).

View File

@ -28,7 +28,6 @@
import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
@ -44,6 +43,8 @@ public class FairSchedulerQueueInfo {
@XmlTransient
private float fractionMemUsed;
@XmlTransient
private float fractionMemSteadyFairShare;
@XmlTransient
private float fractionMemFairShare;
@XmlTransient
private float fractionMemMinShare;
@ -53,6 +54,7 @@ public class FairSchedulerQueueInfo {
private ResourceInfo minResources;
private ResourceInfo maxResources;
private ResourceInfo usedResources;
private ResourceInfo steadyFairResources;
private ResourceInfo fairResources;
private ResourceInfo clusterResources;
@ -75,15 +77,19 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
usedResources = new ResourceInfo(queue.getResourceUsage());
fractionMemUsed = (float)usedResources.getMemory() /
clusterResources.getMemory();
steadyFairResources = new ResourceInfo(queue.getSteadyFairShare());
fairResources = new ResourceInfo(queue.getFairShare());
minResources = new ResourceInfo(queue.getMinShare());
maxResources = new ResourceInfo(queue.getMaxShare());
maxResources = new ResourceInfo(
Resources.componentwiseMin(queue.getMaxShare(),
scheduler.getClusterResource()));
fractionMemFairShare = (float)fairResources.getMemory() / clusterResources.getMemory();
fractionMemSteadyFairShare =
(float)steadyFairResources.getMemory() / clusterResources.getMemory();
fractionMemFairShare = (float) fairResources.getMemory()
/ clusterResources.getMemory();
fractionMemMinShare = (float)minResources.getMemory() / clusterResources.getMemory();
fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory();
@ -100,20 +106,34 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
}
}
/**
* Returns the steady fair share as a fraction of the entire cluster capacity.
*/
public float getSteadyFairShareMemoryFraction() {
return fractionMemSteadyFairShare;
}
/**
* Returns the fair share as a fraction of the entire cluster capacity.
*/
public float getFairShareMemoryFraction() {
return fractionMemFairShare;
}
/**
* Returns the fair share of this queue in megabytes.
* Returns the steady fair share of this queue in megabytes.
*/
public ResourceInfo getSteadyFairShare() {
return steadyFairResources;
}
/**
* Returns the fair share of this queue in megabytes
*/
public ResourceInfo getFairShare() {
return fairResources;
}
public ResourceInfo getMinResources() {
return minResources;
}

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -94,14 +93,14 @@ private static class MockRMNodeImpl implements RMNode {
private String nodeAddr;
private String httpAddress;
private int cmdPort;
private ResourceOption perNode;
private Resource perNode;
private String rackName;
private String healthReport;
private long lastHealthReportTime;
private NodeState state;
public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
ResourceOption perNode, String rackName, String healthReport,
Resource perNode, String rackName, String healthReport,
long lastHealthReportTime, int cmdPort, String hostName, NodeState state) {
this.nodeId = nodeId;
this.nodeAddr = nodeAddr;
@ -147,7 +146,7 @@ public String getHttpAddress() {
@Override
public Resource getTotalCapability() {
return this.perNode.getResource();
return this.perNode;
}
@Override
@ -203,16 +202,6 @@ public String getHealthReport() {
public long getLastHealthReportTime() {
return lastHealthReportTime;
}
@Override
public void setResourceOption(ResourceOption resourceOption) {
this.perNode = resourceOption;
}
@Override
public ResourceOption getResourceOption(){
return this.perNode;
}
};
@ -232,9 +221,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode,
final String httpAddress = httpAddr;
String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress,
ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
rackName, healthReport, 0, nid, hostName, state);
return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
rackName, healthReport, 0, nid, hostName, state);
}
public static RMNode nodeInfo(int rack, final Resource perNode,

View File

@ -457,7 +457,6 @@ protected void serviceStop() {
@Override
protected ResourceTrackerService createResourceTrackerService() {
Configuration conf = new Configuration();
RMContainerTokenSecretManager containerTokenSecretManager =
getRMContext().getContainerTokenSecretManager();
@ -547,6 +546,10 @@ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
public RMAppManager getRMAppManager() {
return this.rmAppManager;
}
public AdminService getAdminService() {
return this.adminService;
}
@Override
protected void startWepApp() {

View File

@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,9 +37,13 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -509,6 +515,85 @@ public void testHeadroom() throws Exception {
rm.stop();
}
@Test
public void testResourceOverCommit() throws Exception {
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
RMApp app1 = rm.submitApp(2048);
// kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
// check node report, 2 GB used and 2 GB available
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
// add request for containers
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler, 2 GB given to AM1, resource remaining 0
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
// check node report, 4 GB used and 0 GB available
Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
// check container is assigned with 2 GB.
Container c1 = allocated1.get(0);
Assert.assertEquals(2 * GB, c1.getResource().getMemory());
// update node resource to 2 GB, so resource is over-consumed.
Map<NodeId, ResourceOption> nodeResourceMap =
new HashMap<NodeId, ResourceOption>();
nodeResourceMap.put(nm1.getNodeId(),
ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
UpdateNodeResourceRequest request =
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
AdminService as = rm.adminService;
as.updateNodeResource(request);
// Now, the used resource is still 4 GB, and available resource is minus value.
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory());
// Check container can complete successfully in case of resource over-commitment.
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
c1.getId(), ContainerState.COMPLETE, "", 0);
nm1.containerStatus(containerStatus);
int waitCount = 0;
while (attempt1.getJustFinishedContainers().size() < 1
&& waitCount++ != 20) {
LOG.info("Waiting for containers to be finished for app 1... Tried "
+ waitCount + " times already..");
Thread.sleep(100);
}
Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
// As container return 2 GB back, the available resource becomes 0 again.
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
rm.stop();
}
public static void main(String[] args) throws Exception {
TestFifoScheduler t = new TestFifoScheduler();

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@ -48,6 +49,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -463,8 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
null, capability, nmVersion);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
@ -486,6 +487,25 @@ private RMNodeImpl getNewNode() {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
return node;
}
private RMNodeImpl getNewNode(Resource capability) {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
capability, null);
return node;
}
private RMNodeImpl getRebootedNode() {
NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
Resource capability = Resource.newInstance(4096, 4);
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, capability, null);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
Assert.assertEquals(NodeState.REBOOTED, node.getState());
return node;
}
@Test
public void testAdd() {
@ -534,6 +554,57 @@ public void testReconnect() {
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
@Test
public void testResourceUpdateOnRunningNode() {
RMNodeImpl node = getRunningNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.RUNNING, node.getState());
Assert.assertNotNull(nodesListManagerEvent);
Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
nodesListManagerEvent.getType());
}
@Test
public void testResourceUpdateOnNewNode() {
RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.NEW, node.getState());
}
@Test
public void testResourceUpdateOnRebootedNode() {
RMNodeImpl node = getRebootedNode();
Resource oldCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
ResourceOption.newInstance(Resource.newInstance(2048, 2),
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
Resource newCapacity = node.getTotalCapability();
assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
Assert.assertEquals(NodeState.REBOOTED, node.getState());
}
@Test
public void testReconnnectUpdate() {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
import java.util.ArrayList;
import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -47,14 +50,14 @@ public class TestNMReconnect {
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private RMNodeEvent rmNodeEvent = null;
private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
private class TestRMNodeEventDispatcher implements
EventHandler<RMNodeEvent> {
@Override
public void handle(RMNodeEvent event) {
rmNodeEvent = event;
rmNodeEvents.add(event);
}
}
@ -109,16 +112,18 @@ public void testReconnect() throws Exception {
request1.setResource(capability);
resourceTrackerService.registerNodeManager(request1);
Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvent.getType());
Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvents.get(0).getType());
rmNodeEvent = null;
rmNodeEvents.clear();
resourceTrackerService.registerNodeManager(request1);
Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
Assert.assertEquals(RMNodeEventType.RECONNECTED,
rmNodeEvents.get(0).getType());
rmNodeEvent = null;
rmNodeEvents.clear();
resourceTrackerService.registerNodeManager(request1);
capability = BuilderUtils.newResource(1024, 2);
request1.setResource(capability);
Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
Assert.assertEquals(RMNodeEventType.RECONNECTED,
rmNodeEvents.get(0).getType());
}
}

View File

@ -47,23 +47,30 @@
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -90,6 +97,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -591,7 +599,6 @@ private int getQueueCount(List<QueueUserACLInfo> queueInformation, String queueN
return result;
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();
@ -627,6 +634,104 @@ public void testBlackListNodes() throws Exception {
Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
rm.stop();
}
@Test
public void testResourceOverCommit() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
RMApp app1 = rm.submitApp(2048);
// kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
// check node report, 2 GB used and 2 GB available
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
// add request for containers
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
AllocateResponse alloc1Response = am1.schedule(); // send the request
// kick the scheduler, 2 GB given to AM1, resource remaining 0
nm1.nodeHeartbeat(true);
while (alloc1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(100);
alloc1Response = am1.schedule();
}
List<Container> allocated1 = alloc1Response.getAllocatedContainers();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
// check node report, 4 GB used and 0 GB available
Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
// check container is assigned with 2 GB.
Container c1 = allocated1.get(0);
Assert.assertEquals(2 * GB, c1.getResource().getMemory());
// update node resource to 2 GB, so resource is over-consumed.
Map<NodeId, ResourceOption> nodeResourceMap =
new HashMap<NodeId, ResourceOption>();
nodeResourceMap.put(nm1.getNodeId(),
ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
UpdateNodeResourceRequest request =
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
AdminService as = ((MockRM)rm).getAdminService();
as.updateNodeResource(request);
// Now, the used resource is still 4 GB, and available resource is minus value.
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory());
// Check container can complete successfully in case of resource over-commitment.
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
c1.getId(), ContainerState.COMPLETE, "", 0);
nm1.containerStatus(containerStatus);
int waitCount = 0;
while (attempt1.getJustFinishedContainers().size() < 1
&& waitCount++ != 20) {
LOG.info("Waiting for containers to be finished for app 1... Tried "
+ waitCount + " times already..");
Thread.sleep(100);
}
Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
// As container return 2 GB back, the available resource becomes 0 again.
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
// Verify no NPE is trigger in schedule after resource is updated.
am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
alloc1Response = am1.schedule();
Assert.assertEquals("Shouldn't have enough resource to allocate containers",
0, alloc1Response.getAllocatedContainers().size());
int times = 0;
// try 10 times as scheduling is async process.
while (alloc1Response.getAllocatedContainers().size() < 1
&& times++ < 10) {
LOG.info("Waiting for containers to be allocated for app 1... Tried "
+ times + " times already..");
Thread.sleep(100);
}
Assert.assertEquals("Shouldn't have enough resource to allocate containers",
0, alloc1Response.getAllocatedContainers().size());
rm.stop();
}
@Test (timeout = 5000)
public void testApplicationComparator()

View File

@ -186,9 +186,14 @@ public void testAllocationFileParsing() throws Exception {
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("</queue>");
//Create hierarchical queues G,H
// Create hierarchical queues G,H, with different min/fair share preemption
// timeouts
out.println("<queue name=\"queueG\">");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println(" <queue name=\"queueH\">");
out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
out.println(" </queue>");
out.println("</queue>");
// Set default limit of apps per queue to 15
@ -204,8 +209,8 @@ public void testAllocationFileParsing() throws Exception {
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
// Set default fair share preemption timeout to 5 minutes
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
// Set default scheduling policy to DRF
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
@ -270,16 +275,30 @@ public void testAllocationFileParsing() throws Exception {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." +
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF"));
assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG"));
assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF"));
assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT)
.contains("root.queueF"));
@ -393,16 +412,23 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." +
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
}
@Test

View File

@ -1059,7 +1059,11 @@ public void testConfigureRootQueue() throws Exception {
out.println(" <queue name=\"child2\">");
out.println(" <minResources>1024mb,4vcores</minResources>");
out.println(" </queue>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1073,6 +1077,9 @@ public void testConfigureRootQueue() throws Exception {
assertNotNull(queueManager.getLeafQueue("child1", false));
assertNotNull(queueManager.getLeafQueue("child2", false));
assertEquals(100000, root.getFairSharePreemptionTimeout());
assertEquals(120000, root.getMinSharePreemptionTimeout());
}
@Test (timeout = 5000)
@ -1378,7 +1385,7 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("</queue>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1462,7 +1469,7 @@ public void testPreemptionDecision() throws Exception {
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
@ -1489,7 +1496,6 @@ public void testPreemptionDecision() throws Exception {
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
@ -1563,6 +1569,279 @@ public void testPreemptionDecision() throws Exception {
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
}
@Test
/**
* Tests the various timing of decision to preempt tasks.
*/
public void testPreemptionDecisionWithVariousTimeout() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
out.println("<queue name=\"queueB1\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// Create one big node
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A takes all resources
for (int i = 0; i < 6; i ++) {
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
}
scheduler.update();
// Sufficient node check-ins to fully schedule containers
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 6; i++) {
scheduler.handle(nodeUpdate1);
}
// Now new requests arrive from queues B1, B2 and C
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
scheduler.update();
FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueC, clock.getTime())));
// After 5 seconds, queueB1 wants to preempt min share
scheduler.update();
clock.tick(6);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 10 seconds, queueB2 wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 15 seconds, queueC wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 20 seconds, queueB2 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 25 seconds, queueB1 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 30 seconds, queueC should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
}
@Test
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// If both exist, we take the default one
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(25000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
}
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf);

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -278,17 +279,16 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
(Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
assertEquals(schedulerNodes.values().size(), 1);
// set resource of RMNode to 1024 and verify it works.
node0.setResourceOption(ResourceOption.newInstance(
Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
assertEquals(node0.getTotalCapability().getMemory(), 1024);
// verify that SchedulerNode's resource hasn't been changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 2048);
// now, NM heartbeat comes.
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
scheduler.handle(node0Update);
// SchedulerNode's available resource is changed.
Resource newResource = Resources.createResource(1024, 4);
NodeResourceUpdateSchedulerEvent node0ResourceUpdate = new
NodeResourceUpdateSchedulerEvent(node0, ResourceOption.newInstance(
newResource, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
scheduler.handle(node0ResourceUpdate);
// SchedulerNode's total resource and available resource are changed.
assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource()
.getMemory(), 1024);
assertEquals(schedulerNodes.get(node0.getNodeID()).
getAvailableResource().getMemory(), 1024);
QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
@ -324,6 +324,7 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
// Before the node update event, there are one local request
Assert.assertEquals(1, nodeLocal.getNumContainers());
NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
// Now schedule.
scheduler.handle(node0Update);
@ -544,7 +545,6 @@ public void testFifoScheduler() throws Exception {
LOG.info("--- END: testFifoScheduler ---");
}
@SuppressWarnings("resource")
@Test
public void testBlackListNodes() throws Exception {
Configuration conf = new Configuration();

View File

@ -271,6 +271,11 @@ Allocation file format
* minSharePreemptionTimeout: number of seconds the queue is under its minimum share
before it will try to preempt containers to take resources from other queues.
If not set, the queue will inherit the value from its parent queue.
* fairSharePreemptionTimeout: number of seconds the queue is under its fair share
threshold before it will try to preempt containers to take resources from other
queues. If not set, the queue will inherit the value from its parent queue.
* <<User elements>>, which represent settings governing the behavior of individual
users. They can contain a single property: maxRunningApps, a limit on the
@ -279,14 +284,13 @@ Allocation file format
* <<A userMaxAppsDefault element>>, which sets the default running app limit
for any users whose limit is not otherwise specified.
* <<A fairSharePreemptionTimeout element>>, number of seconds a queue is under
its fair share before it will try to preempt containers to take resources from
other queues.
* <<A defaultFairSharePreemptionTimeout element>>, which sets the fair share
preemption timeout for the root queue; overridden by fairSharePreemptionTimeout
element in root queue.
* <<A defaultMinSharePreemptionTimeout element>>, which sets the default number
of seconds the queue is under its minimum share before it will try to preempt
containers to take resources from other queues; overriden by
minSharePreemptionTimeout element in each queue if specified.
* <<A defaultMinSharePreemptionTimeout element>>, which sets the min share
preemption timeout for the root queue; overridden by minSharePreemptionTimeout
element in root queue.
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.
@ -429,13 +433,19 @@ Monitoring through web UI
* Max Resources - The configured maximum resources that are allowed to the queue.
* Fair Share - The queue's fair share of resources. Queues may be allocated
resources beyond their fair share when other queues aren't using them. A
queue whose resource consumption lies at or below its fair share will never
have its containers preempted.
* Instantaneous Fair Share - The queue's instantaneous fair share of resources.
These shares consider only actives queues (those with running applications),
and are used for scheduling decisions. Queues may be allocated resources
beyond their shares when other queues aren't using them. A queue whose
resource consumption lies at or below its instantaneous fair share will never
have its containers preempted.
In addition to the information that the ResourceManager normally displays
about each application, the web interface includes the application's fair share.
* Steady Fair Share - The queue's steady fair share of resources. These shares
consider all the queues irrespective of whether they are active (have
running applications) or not. These are computed less frequently and
change only when the configuration or capacity changes.They are meant to
provide visibility into resources the user can expect, and hence displayed
in the Web UI.
Moving applications between queues