Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-29 14:01:47 -07:00
commit 1e6b4b9b2e
36 changed files with 2219 additions and 354 deletions

View File

@ -170,6 +170,17 @@ Create a local staging version of the website (in /tmp/hadoop-site)
$ mvn clean site; mvn site:stage -DstagingDirectory=/tmp/hadoop-site
----------------------------------------------------------------------------------
Installing Hadoop
Look for these HTML files after you build the document by the above commands.
* Single Node Setup:
hadoop-project-dist/hadoop-common/SingleCluster.html
* Cluster Setup:
hadoop-project-dist/hadoop-common/ClusterSetup.html
----------------------------------------------------------------------------------
Handling out of memory errors in builds

View File

@ -454,7 +454,7 @@ checkJavadocWarnings () {
JIRA_COMMENT="$JIRA_COMMENT
{color:red}-1 javadoc{color}. The javadoc tool appears to have generated `expr $(($numPatchJavadocWarnings-$numTrunkJavadocWarnings))` warning messages.
See $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavadocWarnings.txt for details."
See $BUILD_URL/artifact/patchprocess/diffJavadocWarnings.txt for details."
return 1
fi
fi
@ -498,7 +498,7 @@ checkJavacWarnings () {
{color:red}-1 javac{color}. The applied patch generated $patchJavacWarnings javac compiler warnings (more than the trunk's current $trunkJavacWarnings warnings)."
$DIFF $PATCH_DIR/filteredTrunkJavacWarnings.txt $PATCH_DIR/filteredPatchJavacWarnings.txt > $PATCH_DIR/diffJavacWarnings.txt
JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/diffJavacWarnings.txt
JIRA_COMMENT_FOOTER="Javac warnings: $BUILD_URL/artifact/patchprocess/diffJavacWarnings.txt
$JIRA_COMMENT_FOOTER"
return 1
@ -540,7 +540,7 @@ checkReleaseAuditWarnings () {
{color:red}-1 release audit{color}. The applied patch generated $patchReleaseAuditWarnings release audit warnings."
$GREP '\!?????' $PATCH_DIR/patchReleaseAuditWarnings.txt > $PATCH_DIR/patchReleaseAuditProblems.txt
echo "Lines that start with ????? in the release audit report indicate files that do not have an Apache license header." >> $PATCH_DIR/patchReleaseAuditProblems.txt
JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/patchReleaseAuditProblems.txt
JIRA_COMMENT_FOOTER="Release audit warnings: $BUILD_URL/artifact/patchprocess/patchReleaseAuditProblems.txt
$JIRA_COMMENT_FOOTER"
return 1
fi
@ -659,7 +659,7 @@ checkFindbugsWarnings () {
$PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.xml \
$PATCH_DIR/newPatchFindbugsWarnings${module_suffix}.html
if [[ $newFindbugsWarnings > 0 ]] ; then
JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/PreCommit-HADOOP-Build-patchprocess/newPatchFindbugsWarnings${module_suffix}.html
JIRA_COMMENT_FOOTER="Findbugs warnings: $BUILD_URL/artifact/patchprocess/newPatchFindbugsWarnings${module_suffix}.html
$JIRA_COMMENT_FOOTER"
fi
done

View File

@ -758,6 +758,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11140. hadoop-aws only need test-scoped dependency on
hadoop-common's tests jar. (Juan Yu via wang)
HADOOP-1110. JavaKeystoreProvider should not report a key as created if it
was not flushed to the backing file.
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HADOOP-10734. Implement high-performance secure random number sources.
@ -885,6 +888,9 @@ Release 2.6.0 - UNRELEASED
HADOOP-11143 NetUtils.wrapException loses inner stack trace on BindException
(stevel)
HADOOP-11049. javax package system class default is too broad (Sangjin Lee
via jlowe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -134,7 +134,7 @@ public abstract class ReconfigurableBase
}
synchronized (this.parent.reconfigLock) {
this.parent.endTime = Time.monotonicNow();
this.parent.endTime = Time.now();
this.parent.status = Collections.unmodifiableMap(results);
this.parent.reconfigThread = null;
}
@ -160,7 +160,7 @@ public abstract class ReconfigurableBase
reconfigThread.setDaemon(true);
reconfigThread.setName("Reconfiguration Task");
reconfigThread.start();
startTime = Time.monotonicNow();
startTime = Time.now();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@ -30,6 +31,8 @@ import org.apache.hadoop.security.ProviderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
@ -107,6 +110,20 @@ public class JavaKeyStoreProvider extends KeyProvider {
private final Map<String, Metadata> cache = new HashMap<String, Metadata>();
@VisibleForTesting
JavaKeyStoreProvider(JavaKeyStoreProvider other) {
super(new Configuration());
uri = other.uri;
path = other.path;
fs = other.fs;
permissions = other.permissions;
keyStore = other.keyStore;
password = other.password;
changed = other.changed;
readLock = other.readLock;
writeLock = other.writeLock;
}
private JavaKeyStoreProvider(URI uri, Configuration conf) throws IOException {
super(conf);
this.uri = uri;
@ -501,6 +518,7 @@ public class JavaKeyStoreProvider extends KeyProvider {
public void flush() throws IOException {
Path newPath = constructNewPath(path);
Path oldPath = constructOldPath(path);
Path resetPath = path;
writeLock.lock();
try {
if (!changed) {
@ -527,6 +545,9 @@ public class JavaKeyStoreProvider extends KeyProvider {
// Save old File first
boolean fileExisted = backupToOld(oldPath);
if (fileExisted) {
resetPath = oldPath;
}
// write out the keystore
// Write to _NEW path first :
try {
@ -534,16 +555,34 @@ public class JavaKeyStoreProvider extends KeyProvider {
} catch (IOException ioe) {
// rename _OLD back to curent and throw Exception
revertFromOld(oldPath, fileExisted);
resetPath = path;
throw ioe;
}
// Rename _NEW to CURRENT and delete _OLD
cleanupNewAndOld(newPath, oldPath);
changed = false;
} catch (IOException ioe) {
resetKeyStoreState(resetPath);
throw ioe;
} finally {
writeLock.unlock();
}
}
private void resetKeyStoreState(Path path) {
LOG.debug("Could not flush Keystore.."
+ "attempting to reset to previous state !!");
// 1) flush cache
cache.clear();
// 2) load keyStore from previous path
try {
loadFromPath(path, password);
LOG.debug("KeyStore resetting to previously flushed state !!");
} catch (Exception e) {
LOG.debug("Could not reset Keystore to previous state", e);
}
}
private void cleanupNewAndOld(Path newPath, Path oldPath) throws IOException {
// Rename _NEW to CURRENT
renameOrFail(newPath, path);
@ -553,7 +592,7 @@ public class JavaKeyStoreProvider extends KeyProvider {
}
}
private void writeToNew(Path newPath) throws IOException {
protected void writeToNew(Path newPath) throws IOException {
FSDataOutputStream out =
FileSystem.create(fs, newPath, permissions);
try {
@ -570,14 +609,7 @@ public class JavaKeyStoreProvider extends KeyProvider {
out.close();
}
private void revertFromOld(Path oldPath, boolean fileExisted)
throws IOException {
if (fileExisted) {
renameOrFail(oldPath, path);
}
}
private boolean backupToOld(Path oldPath)
protected boolean backupToOld(Path oldPath)
throws IOException {
boolean fileExisted = false;
if (fs.exists(path)) {
@ -587,6 +619,14 @@ public class JavaKeyStoreProvider extends KeyProvider {
return fileExisted;
}
private void revertFromOld(Path oldPath, boolean fileExisted)
throws IOException {
if (fileExisted) {
renameOrFail(oldPath, path);
}
}
private void renameOrFail(Path src, Path dest)
throws IOException {
if (!fs.rename(src, dest)) {

View File

@ -345,8 +345,8 @@ public class KeyShell extends Configured implements Tool {
+ provider + "\n for key name: " + keyName);
try {
provider.rollNewVersion(keyName);
out.println(keyName + " has been successfully rolled.");
provider.flush();
out.println(keyName + " has been successfully rolled.");
printProviderWritten();
} catch (NoSuchAlgorithmException e) {
out.println("Cannot roll key: " + keyName + " within KeyProvider: "
@ -418,8 +418,8 @@ public class KeyShell extends Configured implements Tool {
if (cont) {
try {
provider.deleteKey(keyName);
out.println(keyName + " has been successfully deleted.");
provider.flush();
out.println(keyName + " has been successfully deleted.");
printProviderWritten();
} catch (IOException e) {
out.println(keyName + " has not been deleted.");
@ -479,9 +479,9 @@ public class KeyShell extends Configured implements Tool {
warnIfTransientProvider();
try {
provider.createKey(keyName, options);
provider.flush();
out.println(keyName + " has been successfully created with options "
+ options.toString() + ".");
provider.flush();
printProviderWritten();
} catch (InvalidParameterException e) {
out.println(keyName + " has not been created. " + e.getMessage());

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.util;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -45,18 +48,12 @@ public class ApplicationClassLoader extends URLClassLoader {
* classes are considered system classes, and are not loaded by the
* application classloader.
*/
public static final String DEFAULT_SYSTEM_CLASSES =
"java.," +
"javax.," +
"org.w3c.dom.," +
"org.xml.sax.," +
"org.apache.commons.logging.," +
"org.apache.log4j.," +
"org.apache.hadoop.," +
"core-default.xml," +
"hdfs-default.xml," +
"mapred-default.xml," +
"yarn-default.xml";
public static final String SYSTEM_CLASSES_DEFAULT;
private static final String PROPERTIES_FILE =
"org.apache.hadoop.application-classloader.properties";
private static final String SYSTEM_CLASSES_DEFAULT_KEY =
"system.classes.default";
private static final Log LOG =
LogFactory.getLog(ApplicationClassLoader.class.getName());
@ -69,6 +66,30 @@ public class ApplicationClassLoader extends URLClassLoader {
}
};
static {
InputStream is = null;
try {
is = ApplicationClassLoader.class.getClassLoader().
getResourceAsStream(PROPERTIES_FILE);
if (is == null) {
throw new ExceptionInInitializerError("properties file " +
PROPERTIES_FILE + " is not found");
}
Properties props = new Properties();
props.load(is);
// get the system classes default
String systemClassesDefault =
props.getProperty(SYSTEM_CLASSES_DEFAULT_KEY);
if (systemClassesDefault == null) {
throw new ExceptionInInitializerError("property " +
SYSTEM_CLASSES_DEFAULT_KEY + " is not found");
}
SYSTEM_CLASSES_DEFAULT = systemClassesDefault;
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
private final ClassLoader parent;
private final List<String> systemClasses;
@ -85,7 +106,7 @@ public class ApplicationClassLoader extends URLClassLoader {
}
// if the caller-specified system classes are null or empty, use the default
this.systemClasses = (systemClasses == null || systemClasses.isEmpty()) ?
Arrays.asList(StringUtils.getTrimmedStrings(DEFAULT_SYSTEM_CLASSES)) :
Arrays.asList(StringUtils.getTrimmedStrings(SYSTEM_CLASSES_DEFAULT)) :
systemClasses;
LOG.info("system classes: " + this.systemClasses);
}

View File

@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# contains key properties for setting up the application classloader
system.classes.default=java.,\
javax.accessibility.,\
javax.activation.,\
javax.activity.,\
javax.annotation.,\
javax.annotation.processing.,\
javax.crypto.,\
javax.imageio.,\
javax.jws.,\
javax.lang.model.,\
-javax.management.j2ee.,\
javax.management.,\
javax.naming.,\
javax.net.,\
javax.print.,\
javax.rmi.,\
javax.script.,\
-javax.security.auth.message.,\
javax.security.auth.,\
javax.security.cert.,\
javax.security.sasl.,\
javax.sound.,\
javax.sql.,\
javax.swing.,\
javax.tools.,\
javax.transaction.,\
-javax.xml.registry.,\
-javax.xml.rpc.,\
javax.xml.,\
org.w3c.dom.,\
org.xml.sax.,\
org.apache.commons.logging.,\
org.apache.log4j.,\
org.apache.hadoop.,\
core-default.xml,\
hdfs-default.xml,\
mapred-default.xml,\
yarn-default.xml

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
public class FailureInjectingJavaKeyStoreProvider extends JavaKeyStoreProvider {
public static final String SCHEME_NAME = "failjceks";
private boolean backupFail = false;
private boolean writeFail = false;
FailureInjectingJavaKeyStoreProvider(JavaKeyStoreProvider prov) {
super(prov);
}
public void setBackupFail(boolean b) {
backupFail = b;
}
public void setWriteFail(boolean b) {
backupFail = b;
}
// Failure injection methods..
@Override
public void writeToNew(Path newPath) throws IOException {
if (writeFail) {
throw new IOException("Injecting failure on write");
}
super.writeToNew(newPath);
}
@Override
public boolean backupToOld(Path oldPath) throws IOException {
if (backupFail) {
throw new IOException("Inejection Failure on backup");
}
return super.backupToOld(oldPath);
}
public static class Factory extends KeyProviderFactory {
@Override
public KeyProvider createProvider(URI providerName,
Configuration conf) throws IOException {
if (SCHEME_NAME.equals(providerName.getScheme())) {
try {
return new FailureInjectingJavaKeyStoreProvider(
(JavaKeyStoreProvider) new JavaKeyStoreProvider.Factory()
.createProvider(
new URI(providerName.toString().replace(SCHEME_NAME,
JavaKeyStoreProvider.SCHEME_NAME)), conf));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
return null;
}
}
}

View File

@ -40,6 +40,7 @@ import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;
public class TestKeyProviderFactory {
@ -171,6 +172,7 @@ public class TestKeyProviderFactory {
assertEquals("Key no-such-key not found", e.getMessage());
}
provider.flush();
// get a new instance of the provider to ensure it was saved correctly
provider = KeyProviderFactory.getProviders(conf).get(0);
assertArrayEquals(new byte[]{2},
@ -215,6 +217,50 @@ public class TestKeyProviderFactory {
file.delete();
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ourUrl);
checkSpecificProvider(conf, ourUrl);
// START : Test flush error by failure injection
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ourUrl.replace(
JavaKeyStoreProvider.SCHEME_NAME,
FailureInjectingJavaKeyStoreProvider.SCHEME_NAME));
// get a new instance of the provider to ensure it was saved correctly
KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
// inject failure during keystore write
FailureInjectingJavaKeyStoreProvider fProvider =
(FailureInjectingJavaKeyStoreProvider) provider;
fProvider.setWriteFail(true);
provider.createKey("key5", new byte[]{1},
KeyProvider.options(conf).setBitLength(8));
assertNotNull(provider.getCurrentKey("key5"));
try {
provider.flush();
Assert.fail("Should not succeed");
} catch (Exception e) {
// Ignore
}
// SHould be reset to pre-flush state
Assert.assertNull(provider.getCurrentKey("key5"));
// Un-inject last failure and
// inject failure during keystore backup
fProvider.setWriteFail(false);
fProvider.setBackupFail(true);
provider.createKey("key6", new byte[]{1},
KeyProvider.options(conf).setBitLength(8));
assertNotNull(provider.getCurrentKey("key6"));
try {
provider.flush();
Assert.fail("Should not succeed");
} catch (Exception e) {
// Ignore
}
// SHould be reset to pre-flush state
Assert.assertNull(provider.getCurrentKey("key6"));
// END : Test flush error by failure injection
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, ourUrl.replace(
FailureInjectingJavaKeyStoreProvider.SCHEME_NAME,
JavaKeyStoreProvider.SCHEME_NAME));
Path path = ProviderUtils.unnestUri(new URI(ourUrl));
FileSystem fs = path.getFileSystem(conf);
FileStatus s = fs.getFileStatus(path);
@ -227,7 +273,7 @@ public class TestKeyProviderFactory {
file.delete();
file.createNewFile();
assertTrue(oldFile.exists());
KeyProvider provider = KeyProviderFactory.getProviders(conf).get(0);
provider = KeyProviderFactory.getProviders(conf).get(0);
assertTrue(file.exists());
assertTrue(oldFile + "should be deleted", !oldFile.exists());
verifyAfterReload(file, provider);

View File

@ -131,7 +131,7 @@ public class TestRunJar extends TestCase {
String thirdCls = ClassLoaderCheckThird.class.getName();
String systemClasses = "-" + mainCls + "," +
"-" + thirdCls + "," +
ApplicationClassLoader.DEFAULT_SYSTEM_CLASSES;
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
when(runJar.getSystemClasses()).thenReturn(systemClasses);
// create the test jar

View File

@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.crypto.key.JavaKeyStoreProvider$Factory
org.apache.hadoop.crypto.key.UserProvider$Factory
org.apache.hadoop.crypto.key.kms.KMSClientProvider$Factory
org.apache.hadoop.crypto.key.FailureInjectingJavaKeyStoreProvider$Factory

View File

@ -204,9 +204,6 @@ Trunk (Unreleased)
HDFS-4115. TestHDFSCLI.testAll fails one test due to number format.
(Trevor Robinson via suresh)
HDFS-4165. Faulty sanity check in FsDirectory.unprotectedSetQuota.
(Binglin Chang via suresh)
HDFS-4105. The SPNEGO user for secondary namenode should use the web
keytab. (Arpit Gupta via jitendra)
@ -516,6 +513,13 @@ Release 2.6.0 - UNRELEASED
HDFS-7093. Add config key to restrict setStoragePolicy. (Arpit Agarwal)
HDFS-6519. Document oiv_legacy command (Akira AJISAKA via aw)
HDFS-4165. Faulty sanity check in FsDirectory.unprotectedSetQuota.
(Binglin Chang via suresh)
HDFS-7104. Fix and clarify INodeInPath getter functions. (Zhe Zhang via wang)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -745,6 +749,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7148. TestEncryptionZones#testIsEncryptedMethod fails on branch-2
after archival storage merge. (wang)
HDFS-7157. Using Time.now() for recording start/end time of reconfiguration
tasks (Lei Xu via cmccabe)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an
@ -979,6 +986,8 @@ Release 2.6.0 - UNRELEASED
HDFS-6664. HDFS permissions guide documentation states incorrect default
group mapping class. (Ray Chiang via aw)
HDFS-4227. Document dfs.namenode.resource.* (Daisuke Kobayashi via aw)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -133,7 +133,6 @@ public class INodesInPath {
* be thrown when the path refers to a symbolic link.
* @return the specified number of existing INodes in the path
*/
// TODO: Eliminate null elements from inodes (to be provided by HDFS-7104)
static INodesInPath resolve(final INodeDirectory startingDir,
final byte[][] components, final int numOfINodes,
final boolean resolveLink) throws UnresolvedLinkException {
@ -262,7 +261,8 @@ public class INodesInPath {
*/
private boolean isSnapshot;
/**
* Index of {@link INodeDirectoryWithSnapshot} for snapshot path, else -1
* index of the {@link Snapshot.Root} node in the inodes array,
* -1 for non-snapshot paths.
*/
private int snapshotRootIndex;
/**
@ -312,15 +312,20 @@ public class INodesInPath {
}
/**
* @return the inodes array excluding the null elements.
* @return a new array of inodes excluding the null elements introduced by
* snapshot path elements. E.g., after resolving path "/dir/.snapshot",
* {@link #inodes} is {/, dir, null}, while the returned array only contains
* inodes of "/" and "dir". Note the length of the returned array is always
* equal to {@link #capacity}.
*/
INode[] getINodes() {
if (capacity < inodes.length) {
INode[] newNodes = new INode[capacity];
System.arraycopy(inodes, 0, newNodes, 0, capacity);
inodes = newNodes;
if (capacity == inodes.length) {
return inodes;
}
return inodes;
INode[] newNodes = new INode[capacity];
System.arraycopy(inodes, 0, newNodes, 0, capacity);
return newNodes;
}
/**
@ -341,8 +346,8 @@ public class INodesInPath {
}
/**
* @return index of the {@link INodeDirectoryWithSnapshot} in
* {@link #inodes} for snapshot path, else -1.
* @return index of the {@link Snapshot.Root} node in the inodes array,
* -1 for non-snapshot paths.
*/
int getSnapshotRootIndex() {
return this.snapshotRootIndex;

View File

@ -643,6 +643,44 @@
</description>
</property>
<property>
<name>dfs.namenode.resource.check.interval</name>
<value>5000</value>
<description>
The interval in milliseconds at which the NameNode resource checker runs.
The checker calculates the number of the NameNode storage volumes whose
available spaces are more than dfs.namenode.resource.du.reserved, and
enters safemode if the number becomes lower than the minimum value
specified by dfs.namenode.resource.checked.volumes.minimum.
</description>
</property>
<property>
<name>dfs.namenode.resource.du.reserved</name>
<value>104857600</value>
<description>
The amount of space to reserve/require for a NameNode storage directory
in bytes. The default is 100MB.
</description>
</property>
<property>
<name>dfs.namenode.resource.checked.volumes</name>
<value></value>
<description>
A list of local directories for the NameNode resource checker to check in
addition to the local edits directories.
</description>
</property>
<property>
<name>dfs.namenode.resource.checked.volumes.minimum</name>
<value>1</value>
<description>
The minimum number of redundant NameNode storage volumes required.
</description>
</property>
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>1048576</value>
@ -2150,4 +2188,15 @@
</description>
</property>
<property>
<name>dfs.namenode.legacy-oiv-image.dir</name>
<value></value>
<description>Determines where to save the namespace in the old fsimage format
during checkpointing by standby NameNode or SecondaryNameNode. Users can
dump the contents of the old format fsimage by oiv_legacy command. If
the value is not specified, old format fsimage will not be saved in
checkpoint.
</description>
</property>
</configuration>

View File

@ -28,7 +28,7 @@ Offline Image Viewer Guide
namespace. The tool is able to process very large image files relatively
quickly. The tool handles the layout formats that were included with Hadoop
versions 2.4 and up. If you want to handle older layout formats, you can
use the Offline Image Viewer of Hadoop 2.3.
use the Offline Image Viewer of Hadoop 2.3 or {{oiv_legacy Command}}.
If the tool is not able to process an image file, it will exit cleanly.
The Offline Image Viewer does not require a Hadoop cluster to be running;
it is entirely offline in its operation.
@ -188,3 +188,60 @@ Offline Image Viewer Guide
about the hdfs namespace. This information can then be used to explore
file system usage patterns or find specific files that match arbitrary
criteria, along with other types of namespace analysis.
* oiv_legacy Command
Due to the internal layout changes introduced by the ProtocolBuffer-based
fsimage ({{{https://issues.apache.org/jira/browse/HDFS-5698}HDFS-5698}}),
OfflineImageViewer consumes excessive amount of memory and loses some
functions such as Indented and Delimited processor. If you want to process
without large amount of memory or use these processors, you can use
<<<oiv_legacy>>> command (same as <<<oiv>>> in Hadoop 2.3).
** Usage
1. Set <<<dfs.namenode.legacy-oiv-image.dir>>> to an appropriate directory
to make standby NameNode or SecondaryNameNode save its namespace in the
old fsimage format during checkpointing.
2. Use <<<oiv_legacy>>> command to the old format fsimage.
----
bash$ bin/hdfs oiv_legacy -i fsimage_old -o output
----
** Options
*-----------------------:-----------------------------------+
| <<Flag>> | <<Description>> |
*-----------------------:-----------------------------------+
| <<<-i>>>\|<<<--inputFile>>> <input file> | Specify the input fsimage file to
| | process. Required.
*-----------------------:-----------------------------------+
| <<<-o>>>\|<<<--outputFile>>> <output file> | Specify the output filename, if
| | the specified output processor generates one. If the
| | specified file already exists, it is silently
| | overwritten. Required.
*-----------------------:-----------------------------------+
| <<<-p>>>\|<<<--processor>>> <processor> | Specify the image processor to
| | apply against the image file. Valid options are
| | Ls (default), XML, Delimited, Indented, and
| | FileDistribution.
*-----------------------:-----------------------------------+
| <<<-skipBlocks>>> | Do not enumerate individual blocks within files. This
| | may save processing time and outfile file space on
| | namespaces with very large files. The Ls processor
| | reads the blocks to correctly determine file sizes
| | and ignores this option.
*-----------------------:-----------------------------------+
| <<<-printToScreen>>> | Pipe output of processor to console as well as
| | specified file. On extremely large namespaces, this
| | may increase processing time by an order of
| | magnitude.
*-----------------------:-----------------------------------+
| <<<-delimiter>>> <arg>| When used in conjunction with the Delimited
| | processor, replaces the default tab delimiter with
| | the string specified by <arg>.
*-----------------------:-----------------------------------+
| <<<-h>>>\|<<<--help>>>| Display the tool usage and help information and exit.
*-----------------------:-----------------------------------+

View File

@ -215,7 +215,7 @@ public class TestSnapshotPathINodes {
// snapshotRootIndex should be -1.
assertSnapshot(nodesInPath, true, snapshot, -1);
// Check the INode for file1 (snapshot file)
assertINodeFile(nodesInPath.getLastINode(), file1);
assertINodeFile(inodes[inodes.length - 1], file1);
// Call getExistingPathINodes and request 2 INodes.
nodesInPath = INodesInPath.resolve(fsdir.rootDir, components, 2, false);
@ -224,7 +224,7 @@ public class TestSnapshotPathINodes {
// There should be two INodes in inodes: s1 and snapshot of file1. Thus the
// SnapshotRootIndex should be 0.
assertSnapshot(nodesInPath, true, snapshot, 0);
assertINodeFile(nodesInPath.getLastINode(), file1);
assertINodeFile(inodes[inodes.length - 1], file1);
// Resolve the path "/TestSnapshot/sub1/.snapshot"
String dotSnapshotPath = sub1.toString() + "/.snapshot";
@ -239,7 +239,7 @@ public class TestSnapshotPathINodes {
// No SnapshotRoot dir is included in the resolved inodes
assertSnapshot(nodesInPath, true, snapshot, -1);
// The last INode should be the INode for sub1
final INode last = nodesInPath.getLastINode();
final INode last = inodes[inodes.length - 1];
assertEquals(last.getFullPathName(), sub1.toString());
assertFalse(last instanceof INodeFile);

View File

@ -260,6 +260,8 @@ Release 2.6.0 - UNRELEASED
scheduler resource type is memory plus cpu. (Peng Zhang and Varun Vasudev
via zjshen)
MAPREDUCE-6072. Remove INSTALL document (Akira AJISAKA via aw)
OPTIMIZATIONS
BUG FIXES
@ -394,6 +396,9 @@ Release 2.6.0 - UNRELEASED
MRJobConfig#MR_CLIENT_TO_AM_IPC_MAX_RETRIES_ON_TIMEOUTS. Contributed by
Akira AJISAKA. (Akira AJISAKA via jianhe)
MAPREDUCE-6094. TestMRCJCFileInputFormat.testAddInputPath() fails on trunk
(Akira AJISAKA via jlowe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -1,70 +0,0 @@
To compile Hadoop Mapreduce next following, do the following:
Step 1) Install dependencies for yarn
See http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-porject/hadoop-yarn/README
Make sure protbuf library is in your library path or set: export LD_LIBRARY_PATH=/usr/local/lib
Step 2) Checkout
svn checkout http://svn.apache.org/repos/asf/hadoop/common/trunk
Step 3) Build
Go to common directory - choose your regular common build command. For example:
export MAVEN_OPTS=-Xmx512m
mvn clean package -Pdist -Dtar -DskipTests -Pnative
You can omit -Pnative it you don't want to build native packages.
Step 4) Untar the tarball from hadoop-dist/target/ into a clean and different
directory, say HADOOP_YARN_HOME.
Step 5)
Start hdfs
To run Hadoop Mapreduce next applications:
Step 6) export the following variables to where you have things installed:
You probably want to export these in hadoop-env.sh and yarn-env.sh also.
export HADOOP_MAPRED_HOME=<mapred loc>
export HADOOP_COMMON_HOME=<common loc>
export HADOOP_HDFS_HOME=<hdfs loc>
export HADOOP_YARN_HOME=directory where you untarred yarn
export HADOOP_CONF_DIR=<conf loc>
export YARN_CONF_DIR=$HADOOP_CONF_DIR
Step 7) Setup config: for running mapreduce applications, which now are in user land, you need to setup nodemanager with the following configuration in your yarn-site.xml before you start the nodemanager.
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
Step 8) Modify mapred-site.xml to use yarn framework
<property>
<name> mapreduce.framework.name</name>
<value>yarn</value>
</property>
Step 9) cd $HADOOP_YARN_HOME
Step 10) sbin/yarn-daemon.sh start resourcemanager
Step 11) sbin/yarn-daemon.sh start nodemanager
Step 12) sbin/mr-jobhistory-daemon.sh start historyserver
Step 13) You are all set, an example on how to run a mapreduce job is:
cd $HADOOP_MAPRED_HOME
ant examples -Dresolvers=internal
$HADOOP_COMMON_HOME/bin/hadoop jar $HADOOP_MAPRED_HOME/build/hadoop-mapreduce-examples-*.jar randomwriter -Dmapreduce.job.user.name=$USER -Dmapreduce.randomwriter.bytespermap=10000 -Ddfs.blocksize=536870912 -Ddfs.block.size=536870912 -libjars $HADOOP_YARN_HOME/modules/hadoop-mapreduce-client-jobclient-*.jar output
The output on the command line should be almost similar to what you see in the JT/TT setup (Hadoop 0.20/0.21)

View File

@ -498,7 +498,7 @@ public class TestMRApps {
private static final String[] SYS_CLASSES = new String[] {
"/java/fake/Klass",
"/javax/fake/Klass",
"/javax/management/fake/Klass",
"/org/apache/commons/logging/fake/Klass",
"/org/apache/log4j/fake/Klass",
"/org/apache/hadoop/fake/Klass"
@ -515,7 +515,7 @@ public class TestMRApps {
public void testSystemClasses() {
final List<String> systemClasses =
Arrays.asList(StringUtils.getTrimmedStrings(
ApplicationClassLoader.DEFAULT_SYSTEM_CLASSES));
ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT));
for (String defaultXml : DEFAULT_XMLS) {
assertTrue(defaultXml + " must be system resource",
ApplicationClassLoader.isSystemClass(defaultXml, systemClasses));

View File

@ -47,9 +47,8 @@ public class TestMRCJCFileInputFormat {
@Test
public void testAddInputPath() throws IOException {
final Configuration conf = new Configuration();
conf.set("fs.defaultFS", "s3://abc:xyz@hostname/");
conf.set("fs.defaultFS", "file:///abc/");
final Job j = Job.getInstance(conf);
j.getConfiguration().set("fs.defaultFS", "s3://abc:xyz@hostname/");
//setup default fs
final FileSystem defaultfs = FileSystem.get(conf);

View File

@ -242,7 +242,7 @@ public class TestMRJobs {
// to test AM loading user classes such as output format class, we want
// to blacklist them from the system classes (they need to be prepended
// as the first match wins)
String systemClasses = ApplicationClassLoader.DEFAULT_SYSTEM_CLASSES;
String systemClasses = ApplicationClassLoader.SYSTEM_CLASSES_DEFAULT;
// exclude the custom classes from system classes
systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
CustomSpeculator.class.getName() + "," +

View File

@ -265,6 +265,9 @@ Release 2.6.0 - UNRELEASED
YARN-668. Changed NMTokenIdentifier/AMRMTokenIdentifier/ContainerTokenIdentifier
to use protobuf object as the payload. (Junping Du via jianhe)
YARN-1769. CapacityScheduler: Improve reservations (Thomas Graves via
jlowe)
OPTIMIZATIONS
BUG FIXES

View File

@ -351,4 +351,16 @@
<Class name="org.apache.hadoop.yarn.util.ApplicationClassLoader"/>
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
</Match>
<!-- It is only changed on re-initialization the warnings are for access from a test function. -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue" />
<Field name="reservationsContinueLooking" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue" />
<Field name="reservationsContinueLooking" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
</FindBugsFilter>

View File

@ -184,10 +184,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @param needToUnreserve assign container only if it can unreserve one first
* @return the assignment
*/
public CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node);
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve);
/**
* A container assigned to the queue has completed.
@ -200,11 +201,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* container
* @param childQueue <code>CSQueue</code> to reinsert in childQueues
* @param event event to be sent to the container
* @param sortQueues indicates whether it should re-sort the queues
*/
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer container, ContainerStatus containerStatus,
RMContainerEventType event, CSQueue childQueue);
RMContainerEventType event, CSQueue childQueue,
boolean sortQueues);
/**
* Get the number of applications in the queue.

View File

@ -516,13 +516,13 @@ public class CapacityScheduler extends
"Queue configuration missing child queue names for " + queueName);
}
queue =
new LeafQueue(csContext, queueName, parent,oldQueues.get(queueName));
new LeafQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(queue);
} else {
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent,oldQueues.get(queueName));
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
queue = hook.hook(parentQueue);
@ -922,7 +922,8 @@ public class CapacityScheduler extends
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node);
CSAssignment assignment = queue.assignContainers(clusterResource, node,
false);
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@ -933,7 +934,7 @@ public class CapacityScheduler extends
SchedulerUtils.createAbnormalContainerStatus(
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null);
RMContainerEventType.RELEASED, null, true);
}
}
@ -946,7 +947,7 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
root.assignContainers(clusterResource, node);
root.assignContainers(clusterResource, node, false);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@ -1122,7 +1123,7 @@ public class CapacityScheduler extends
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
rmContainer, containerStatus, event, null);
rmContainer, containerStatus, event, null, true);
LOG.info("Application attempt " + application.getApplicationAttemptId()
+ " released container " + container.getId() + " on node: " + node
@ -1138,7 +1139,7 @@ public class CapacityScheduler extends
}
@Lock(Lock.NoLock.class)
FiCaSchedulerNode getNode(NodeId nodeId) {
public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}

View File

@ -81,6 +81,13 @@ public class CapacitySchedulerConfiguration extends Configuration {
@Private
public static final String STATE = "state";
@Private
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+ "reservations-continue-look-all-nodes";
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
@Private
public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
@ -308,6 +315,17 @@ public class CapacitySchedulerConfiguration extends Configuration {
QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
}
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could
* satisfy the request thus could be a better pick then waiting for the
* reservation to be fullfilled. This config is refreshable.
*/
public boolean getReservationContinueLook() {
return getBoolean(RESERVE_CONT_LOOK_ALL_NODES,
DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
}
private static String getAclKey(QueueACL acl) {
return "acl_" + acl.toString().toLowerCase();
}

View File

@ -21,10 +21,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
@ -55,4 +57,6 @@ public interface CapacitySchedulerContext {
ResourceCalculator getResourceCalculator();
Comparator<CSQueue> getQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
}

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -129,6 +130,8 @@ public class LeafQueue implements CSQueue {
private final ResourceCalculator resourceCalculator;
private boolean reservationsContinueLooking;
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
this.scheduler = cs;
@ -202,8 +205,9 @@ public class LeafQueue implements CSQueue {
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
.getConfiguration().getNodeLocalityDelay());
maxActiveApplications, maxActiveApplicationsPerUser, state, acls,
cs.getConfiguration().getNodeLocalityDelay(),
cs.getConfiguration().getReservationContinueLook());
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
@ -225,7 +229,8 @@ public class LeafQueue implements CSQueue {
int maxApplications, float maxAMResourcePerQueuePercent,
int maxApplicationsPerUser, int maxActiveApplications,
int maxActiveApplicationsPerUser, QueueState state,
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay)
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
boolean continueLooking)
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@ -257,6 +262,7 @@ public class LeafQueue implements CSQueue {
this.queueInfo.setQueueState(this.state);
this.nodeLocalityDelay = nodeLocalityDelay;
this.reservationsContinueLooking = continueLooking;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
@ -321,7 +327,9 @@ public class LeafQueue implements CSQueue {
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n");
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n");
}
@Override
@ -555,6 +563,11 @@ public class LeafQueue implements CSQueue {
return nodeLocalityDelay;
}
@Private
boolean getReservationContinueLooking() {
return reservationsContinueLooking;
}
public String toString() {
return queueName + ": " +
"capacity=" + capacity + ", " +
@ -613,7 +626,8 @@ public class LeafQueue implements CSQueue {
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay());
newlyParsedLeafQueue.getNodeLocalityDelay(),
newlyParsedLeafQueue.reservationsContinueLooking);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@ -802,8 +816,8 @@ public class LeafQueue implements CSQueue {
private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
@Override
public synchronized CSAssignment
assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve) {
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@ -848,9 +862,17 @@ public class LeafQueue implements CSQueue {
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (!needContainers(application, priority, required)) {
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
@ -862,14 +884,14 @@ public class LeafQueue implements CSQueue {
required);
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, required)) {
if (!assignToQueue(clusterResource, required, application, true)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!assignToUser(
clusterResource, application.getUser(), userLimit)) {
break;
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, true)) {
break;
}
// Inform the application it is about to get a scheduling opportunity
@ -878,7 +900,7 @@ public class LeafQueue implements CSQueue {
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null);
null, needToUnreserve);
// Did the application skip this node?
if (assignment.getSkipped()) {
@ -900,6 +922,9 @@ public class LeafQueue implements CSQueue {
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
application.resetSchedulingOpportunities(priority);
}
@ -935,22 +960,57 @@ public class LeafQueue implements CSQueue {
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
rmContainer);
rmContainer, false);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
private synchronized boolean assignToQueue(Resource clusterResource,
Resource required) {
@Private
protected synchronized boolean assignToQueue(Resource clusterResource,
Resource required, FiCaSchedulerApp application,
boolean checkReservations) {
Resource potentialTotalResource = Resources.add(usedResources, required);
// Check how of the cluster's absolute capacity we are currently using...
float potentialNewCapacity =
Resources.divide(
resourceCalculator, clusterResource,
Resources.add(usedResources, required),
clusterResource);
float potentialNewCapacity = Resources.divide(resourceCalculator,
clusterResource, potentialTotalResource, clusterResource);
if (potentialNewCapacity > absoluteMaxCapacity) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
Resources.subtract(potentialTotalResource,
application.getCurrentReservation()),
clusterResource);
if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
+ " usedResources: "
+ usedResources
+ " clusterResources: "
+ clusterResource
+ " reservedResources: "
+ application.getCurrentReservation()
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
usedResources, clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ " usedResources: " + usedResources
@ -966,6 +1026,8 @@ public class LeafQueue implements CSQueue {
return true;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom(
FiCaSchedulerApp application, Resource clusterResource, Resource required) {
@ -1085,25 +1147,43 @@ public class LeafQueue implements CSQueue {
return limit;
}
private synchronized boolean assignToUser(Resource clusterResource,
String userName, Resource limit) {
@Private
protected synchronized boolean assignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
boolean checkReservations) {
User user = getUser(userName);
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getConsumedResources(), limit)) {
if (Resources.greaterThan(resourceCalculator, clusterResource,
user.getConsumedResources(), limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
Resources.subtract(user.getConsumedResources(),
application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit based on reservations - " + " consumed: "
+ user.getConsumedResources() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit);
}
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() +
" will exceed limit - " +
" consumed: " + user.getConsumedResources() +
" limit: " + limit
);
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: "
+ user.getConsumedResources() + " limit: " + limit);
}
return false;
}
return true;
}
@ -1139,7 +1219,7 @@ public class LeafQueue implements CSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
@ -1149,7 +1229,7 @@ public class LeafQueue implements CSQueue {
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer);
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
@ -1166,7 +1246,7 @@ public class LeafQueue implements CSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer);
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
@ -1183,21 +1263,99 @@ public class LeafQueue implements CSQueue {
return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer),
node, application, priority, reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
private Resource assignNodeLocalContainers(
Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application,
Priority priority, RMContainer reservedContainer) {
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
Resource capability) {
// need to unreserve some other container first
NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
+ "reserved that matches for this app");
}
return false;
}
FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve);
if (nodeToUnreserve == null) {
LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
return false;
}
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
+ node.getNodeID() + " needing: " + capability);
}
// headroom
Resources.addTo(application.getHeadroom(), nodeToUnreserve
.getReservedContainer().getReservedResource());
// Make sure to not have completedContainers sort the queues here since
// we are already inside an iterator loop for the queues and this would
// cause an concurrent modification exception.
completedContainer(clusterResource, application, nodeToUnreserve,
nodeToUnreserve.getReservedContainer(),
SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve
.getReservedContainer().getContainerId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, false);
return true;
}
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
FiCaSchedulerApp application, Resource capability,
boolean needToUnreserve) {
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate");
}
return false;
}
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
clusterResource, capability);
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, capability, application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
return false;
}
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
}
return false;
}
return true;
}
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1206,11 +1364,12 @@ public class LeafQueue implements CSQueue {
private Resource assignRackLocalContainers(
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1219,11 +1378,12 @@ public class LeafQueue implements CSQueue {
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve);
}
return Resources.none();
@ -1303,14 +1463,17 @@ public class LeafQueue implements CSQueue {
return container;
}
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer) {
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
+ " request=" + request + " type=" + type);
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
}
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
@ -1335,6 +1498,18 @@ public class LeafQueue implements CSQueue {
return Resources.none();
}
// default to true since if reservation continue look feature isn't on
// needContainers is checked earlier and we wouldn't have gotten this far
boolean canAllocContainer = true;
if (this.reservationsContinueLooking) {
// based on reservations can we allocate/reserve more or do we need
// to unreserve one first
canAllocContainer = needContainers(application, priority, capability);
if (LOG.isDebugEnabled()) {
LOG.debug("can alloc container is: " + canAllocContainer);
}
}
// Can we allocate a container on this node?
int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability);
@ -1342,8 +1517,28 @@ public class LeafQueue implements CSQueue {
// Allocate...
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null){
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking
&& (!canAllocContainer || needToUnreserve)) {
// need to unreserve some other container first
boolean res = findNodeToUnreserve(clusterResource, node, application,
priority, capability);
if (!res) {
return Resources.none();
}
} else {
// we got here by possibly ignoring queue capacity limits. If the
// parameter needToUnreserve is true it means we ignored one of those
// limits in the chance we could unreserve. If we are here we aren't
// trying to unreserve so we can't allocate anymore due to that parent
// limit.
if (needToUnreserve) {
if (LOG.isDebugEnabled()) {
LOG.debug("we needed to unreserve to be able to allocate, skipping");
}
return Resources.none();
}
}
// Inform the application
@ -1366,17 +1561,38 @@ public class LeafQueue implements CSQueue {
return container.getResource();
} else {
// Reserve by 'charging' in advance...
reserve(application, priority, node, rmContainer, container);
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
if ((canAllocContainer) || (rmContainer != null)) {
LOG.info("Reserved container " +
" application attempt=" + application.getApplicationAttemptId() +
" resource=" + request.getCapability() +
" queue=" + this.toString() +
" node=" + node +
" clusterResource=" + clusterResource);
if (reservationsContinueLooking) {
// we got here by possibly ignoring parent queue capacity limits. If
// the parameter needToUnreserve is true it means we ignored one of
// those limits in the chance we could unreserve. If we are here
// we aren't trying to unreserve so we can't allocate
// anymore due to that parent limit
boolean res = checkLimitsToReserve(clusterResource, application, capability,
needToUnreserve);
if (!res) {
return Resources.none();
}
}
return request.getCapability();
// Reserve by 'charging' in advance...
reserve(application, priority, node, rmContainer, container);
LOG.info("Reserved container " +
" application=" + application.getApplicationId() +
" resource=" + request.getCapability() +
" queue=" + this.toString() +
" usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + usedResources +
" cluster=" + clusterResource);
return request.getCapability();
}
return Resources.none();
}
}
@ -1402,8 +1618,8 @@ public class LeafQueue implements CSQueue {
node.unreserveResource(application);
// Update reserved metrics
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
getMetrics().unreserveResource(application.getUser(),
rmContainer.getContainer().getResource());
return true;
}
return false;
@ -1412,7 +1628,8 @@ public class LeafQueue implements CSQueue {
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue) {
ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
boolean sortQueues) {
if (application != null) {
boolean removed = false;
@ -1449,7 +1666,7 @@ public class LeafQueue implements CSQueue {
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
getParent().completedContainer(clusterResource, application, node,
rmContainer, null, event, this);
rmContainer, null, event, this, sortQueues);
}
}
}
@ -1466,6 +1683,8 @@ public class LeafQueue implements CSQueue {
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource);
// Note this is a bit unconventional since it gets the object and modifies it here
// rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
@ -1585,7 +1804,7 @@ public class LeafQueue implements CSQueue {
public synchronized void releaseContainer(Resource resource) {
Resources.subtractFrom(consumed, resource);
}
}
}
@Override

View File

@ -100,6 +100,8 @@ public class ParentQueue implements CSQueue {
RecordFactoryProvider.getRecordFactory(null);
private final ResourceCalculator resourceCalculator;
private boolean reservationsContinueLooking;
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
@ -146,7 +148,8 @@ public class ParentQueue implements CSQueue {
setupQueueConfigs(cs.getClusterResource(),
capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls);
maximumCapacity, absoluteMaxCapacity, state, acls,
cs.getConfiguration().getReservationContinueLook());
this.queueComparator = cs.getQueueComparator();
this.childQueues = new TreeSet<CSQueue>(queueComparator);
@ -160,7 +163,8 @@ public class ParentQueue implements CSQueue {
Resource clusterResource,
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls
QueueState state, Map<QueueACL, AccessControlList> acls,
boolean continueLooking
) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@ -180,6 +184,8 @@ public class ParentQueue implements CSQueue {
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
this.queueInfo.setQueueState(this.state);
this.reservationsContinueLooking = continueLooking;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
@ -195,7 +201,8 @@ public class ParentQueue implements CSQueue {
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", state=" + state +
", acls=" + aclsString);
", acls=" + aclsString +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
private static float PRECISION = 0.0005f; // 0.05% precision
@ -383,7 +390,8 @@ public class ParentQueue implements CSQueue {
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls);
newlyParsedParentQueue.acls,
newlyParsedParentQueue.reservationsContinueLooking);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@ -551,7 +559,7 @@ public class ParentQueue implements CSQueue {
@Override
public synchronized CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node) {
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -561,14 +569,19 @@ public class ParentQueue implements CSQueue {
+ getQueueName());
}
boolean localNeedToUnreserve = false;
// Are we over maximum-capacity for this queue?
if (!assignToQueue(clusterResource)) {
break;
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node);
assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@ -632,6 +645,39 @@ public class ParentQueue implements CSQueue {
return true;
}
private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
if (this.reservationsContinueLooking) {
// check to see if we could potentially use this node instead of a reserved
// node
Resource reservedResources = Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores());
float capacityWithoutReservedCapacity = Resources.divide(
resourceCalculator, clusterResource,
Resources.subtract(usedResources, reservedResources),
clusterResource);
if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + usedResources.getMemory()
+ " clusterResources: " + clusterResource.getMemory()
+ " reservedResources: " + reservedResources.getMemory()
+ " currentCapacity " + ((float) usedResources.getMemory())
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
}
// we could potentially use this node instead of reserved node
return true;
}
}
return false;
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
@ -640,7 +686,7 @@ public class ParentQueue implements CSQueue {
}
synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
FiCaSchedulerNode node) {
FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@ -653,7 +699,7 @@ public class ParentQueue implements CSQueue {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
assignment = childQueue.assignContainers(cluster, node);
assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
@ -697,7 +743,8 @@ public class ParentQueue implements CSQueue {
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node,
RMContainer rmContainer, ContainerStatus containerStatus,
RMContainerEventType event, CSQueue completedChildQueue) {
RMContainerEventType event, CSQueue completedChildQueue,
boolean sortQueues) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
@ -713,16 +760,21 @@ public class ParentQueue implements CSQueue {
" cluster=" + clusterResource);
}
// reinsert the updated queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue csqueue = iter.next();
if(csqueue.equals(completedChildQueue))
{
iter.remove();
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
" stats: " + csqueue);
childQueues.add(csqueue);
break;
// Note that this is using an iterator on the childQueues so this can't be
// called if already within an iterator for the childQueues. Like
// from assignContainersToChildQueues.
if (sortQueues) {
// reinsert the updated queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue csqueue = iter.next();
if(csqueue.equals(completedChildQueue))
{
iter.remove();
LOG.info("Re-sorting completed queue: " + csqueue.getQueuePath() +
" stats: " + csqueue);
childQueues.add(csqueue);
break;
}
}
}
@ -730,10 +782,15 @@ public class ParentQueue implements CSQueue {
if (parent != null) {
// complete my parent
parent.completedContainer(clusterResource, application,
node, rmContainer, null, event, this);
node, rmContainer, null, event, this, sortQueues);
}
}
}
@Private
boolean getReservationContinueLooking() {
return reservationsContinueLooking;
}
synchronized void allocateResource(Resource clusterResource,
Resource resource) {

View File

@ -254,5 +254,32 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
currentContPreemption, Collections.singletonList(rr),
allocation.getNMTokenList());
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
Resource capability) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
Map<NodeId, RMContainer> reservedContainers = this.reservedContainers
.get(priority);
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
if (Resources.fitsIn(capability, entry.getValue().getContainer()
.getResource())) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
+ entry.getValue().getContainer().getResource()
+ " in order to allocate container with size: " + capability);
}
return entry.getKey();
}
}
}
return null;
}
}

View File

@ -516,7 +516,7 @@ public class TestApplicationLimits {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0);
queue.assignContainers(clusterResource, node_0, false);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
@ -535,7 +535,7 @@ public class TestApplicationLimits {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
@ -554,7 +554,7 @@ public class TestApplicationLimits {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
@ -562,7 +562,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0); // Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -141,7 +142,7 @@ public class TestChildQueueOrder {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@ -152,7 +153,7 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean());
doNothing().when(node).releaseContainer(any(Container.class));
}
@ -244,7 +245,6 @@ public class TestChildQueueOrder {
doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
any(ContainerStatus.class),any(RMContainerEventType.class));
//
Priority priority = TestUtils.createMockPriority(1);
ContainerAllocationExpirer expirer =
mock(ContainerAllocationExpirer.class);
@ -269,14 +269,14 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
for(int i=0; i < 2; i++)
{
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
for(int i=0; i < 3; i++)
{
@ -284,7 +284,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
for(int i=0; i < 4; i++)
{
@ -292,7 +292,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -305,7 +305,7 @@ public class TestChildQueueOrder {
for(int i=0; i < 3;i++)
{
d.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -325,7 +325,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
}
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -336,7 +336,7 @@ public class TestChildQueueOrder {
//Release 1GB Container from A
a.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -352,7 +352,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -362,7 +362,7 @@ public class TestChildQueueOrder {
//Release 1GB container resources from B
b.completedContainer(clusterResource, app_0, node_0,
rmContainer, null, RMContainerEventType.KILL, null);
rmContainer, null, RMContainerEventType.KILL, null, true);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -378,7 +378,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -392,12 +392,12 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -252,7 +253,7 @@ public class TestLeafQueue {
doNothing().when(parent).completedContainer(
any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class),
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class), any(CSQueue.class));
any(RMContainerEventType.class), any(CSQueue.class), anyBoolean());
return queue;
}
@ -325,7 +326,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@ -460,7 +461,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -470,7 +471,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -478,7 +479,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -487,7 +488,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -495,7 +496,7 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getMetrics().getAllocatedMB());
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -505,7 +506,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -518,7 +519,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -532,7 +533,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
@ -620,19 +621,19 @@ public class TestLeafQueue {
// recordFactory)));
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@ -705,7 +706,7 @@ public class TestLeafQueue {
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -713,7 +714,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -729,7 +730,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@ -743,7 +744,7 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@ -813,21 +814,21 @@ public class TestLeafQueue {
*/
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -845,7 +846,7 @@ public class TestLeafQueue {
// Now allocations should goto app_2 since
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -854,7 +855,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -864,7 +865,7 @@ public class TestLeafQueue {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -875,7 +876,7 @@ public class TestLeafQueue {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -883,7 +884,7 @@ public class TestLeafQueue {
assertEquals(1*GB, app_3.getCurrentConsumption().getMemory());
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -896,7 +897,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -910,7 +911,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -924,7 +925,7 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
RMContainerEventType.KILL, null, true);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -982,7 +983,7 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -992,7 +993,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1000,7 +1001,7 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getMetrics().getAllocatedMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1015,8 +1016,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1031,8 +1032,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1099,7 +1100,7 @@ public class TestLeafQueue {
// Start testing...
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1108,7 +1109,7 @@ public class TestLeafQueue {
assertEquals(0*GB, a.getMetrics().getAvailableMB());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1121,7 +1122,7 @@ public class TestLeafQueue {
// We do not need locality delay here
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1136,8 +1137,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@ -1205,20 +1206,20 @@ public class TestLeafQueue {
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1231,8 +1232,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1241,7 +1242,7 @@ public class TestLeafQueue {
assertEquals(1, app_1.getReReservations(priority));
// Re-reserve
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@ -1250,7 +1251,7 @@ public class TestLeafQueue {
assertEquals(2, app_1.getReReservations(priority));
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1266,8 +1267,8 @@ public class TestLeafQueue {
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null);
CSAssignment assignment = a.assignContainers(clusterResource, node_0);
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0, false);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@ -1278,6 +1279,7 @@ public class TestLeafQueue {
}
@Test
public void testLocalityScheduling() throws Exception {
@ -1337,7 +1339,7 @@ public class TestLeafQueue {
CSAssignment assignment = null;
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@ -1345,7 +1347,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
@ -1353,7 +1355,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
@ -1362,7 +1364,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2);
assignment = a.assignContainers(clusterResource, node_2, false);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@ -1370,7 +1372,7 @@ public class TestLeafQueue {
assertEquals(NodeType.OFF_SWITCH, assignment.getType());
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0);
assignment = a.assignContainers(clusterResource, node_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1378,7 +1380,7 @@ public class TestLeafQueue {
assertEquals(NodeType.NODE_LOCAL, assignment.getType());
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1);
assignment = a.assignContainers(clusterResource, node_1, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1406,13 +1408,13 @@ public class TestLeafQueue {
doReturn(1).when(a).getNodeLocalityDelay();
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3);
assignment = a.assignContainers(clusterResource, node_3, false);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3);
assignment = a.assignContainers(clusterResource, node_3, false);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1493,7 +1495,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@ -1505,7 +1507,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@ -1516,7 +1518,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2);
a.assignContainers(clusterResource, node_2, false);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@ -1527,7 +1529,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@ -1538,7 +1540,7 @@ public class TestLeafQueue {
assertEquals(1, app_0.getTotalRequiredResources(priority_2));
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1);
a.assignContainers(clusterResource, node_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@ -1614,7 +1616,7 @@ public class TestLeafQueue {
app_0.updateResourceRequests(app_0_requests_0);
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0);
a.assignContainers(clusterResource, node_0_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1622,7 +1624,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@ -1638,14 +1640,14 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1);
a.assignContainers(clusterResource, node_0_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(1, app_0.getTotalRequiredResources(priority));
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@ -1889,7 +1891,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1);
a.assignContainers(clusterResource, node_0_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1911,7 +1913,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1941,7 +1943,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1969,7 +1971,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@ -1995,7 +1997,7 @@ public class TestLeafQueue {
// Blacklist: < host_0_0 > <----
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1);
a.assignContainers(clusterResource, node_1_1, false);
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@ -2025,7 +2027,7 @@ public class TestLeafQueue {
// host_1_0: 8G
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0);
a.assignContainers(clusterResource, node_1_0, false);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@ -2105,7 +2107,7 @@ public class TestLeafQueue {
recordFactory)));
try {
a.assignContainers(clusterResource, node_0);
a.assignContainers(clusterResource, node_0, false);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@ -153,7 +154,7 @@ public class TestParentQueue {
// Next call - nothing
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@ -164,7 +165,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node));
when(queue).assignContainers(eq(clusterResource), eq(node), eq(false));
}
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
@ -227,19 +228,19 @@ public class TestParentQueue {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -247,12 +248,12 @@ public class TestParentQueue {
// since A has 2/6G while B has 2/14G
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -260,12 +261,12 @@ public class TestParentQueue {
// since A has 3/6G while B has 4/14G
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@ -273,12 +274,12 @@ public class TestParentQueue {
// since A has 3/6G while B has 8/14G
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@ -439,7 +440,7 @@ public class TestParentQueue {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@ -451,7 +452,7 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 0*GB);
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@ -462,14 +463,14 @@ public class TestParentQueue {
stubQueueAllocation(a1, clusterResource, node_0, 1*GB);
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@ -488,16 +489,16 @@ public class TestParentQueue {
stubQueueAllocation(b3, clusterResource, node_2, 1*GB);
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2);
root.assignContainers(clusterResource, node_2, false);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(c).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@ -597,7 +598,7 @@ public class TestParentQueue {
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@ -605,12 +606,12 @@ public class TestParentQueue {
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@ -619,12 +620,12 @@ public class TestParentQueue {
// However, since B returns off-switch, A won't get an opportunity
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(a).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@ -663,7 +664,7 @@ public class TestParentQueue {
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@ -671,12 +672,12 @@ public class TestParentQueue {
// also, B3 gets a scheduling opportunity since B2 allocates RACK_LOCAL
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1);
root.assignContainers(clusterResource, node_1, false);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@ -685,12 +686,12 @@ public class TestParentQueue {
// However, since B3 returns off-switch, B2 won't get an opportunity
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0);
root.assignContainers(clusterResource, node_0, false);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
any(FiCaSchedulerNode.class));
any(FiCaSchedulerNode.class), anyBoolean());
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);