Merge branch 'trunk' into HADOOP-12111

This commit is contained in:
Allen Wittenauer 2015-07-20 09:39:55 -07:00
commit fbe6a692fd
44 changed files with 667 additions and 253 deletions

View File

@ -232,6 +232,8 @@ Trunk (Unreleased)
HADOOP-12149. copy all of test-patch BINDIR prior to re-exec (aw)
HADOOP-10979. Auto-entries in hadoop_usage (aw)
BUG FIXES
HADOOP-11473. test-patch says "-1 overall" even when all checks are +1
@ -700,6 +702,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12153. ByteBufferReadable doesn't declare @InterfaceAudience and
@InterfaceStability. (Brahma Reddy Battula via ozawa)
HADOOP-11893. Mark org.apache.hadoop.security.token.Token as
@InterfaceAudience.Public. (Brahma Reddy Battula via stevel)
OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp
@ -967,6 +972,18 @@ Release 2.8.0 - UNRELEASED
HADOOP-12240. Fix tests requiring native library to be skipped in non-native
profile. (Masatake Iwasaki via ozawa)
HADOOP-12235 hadoop-openstack junit & mockito dependencies should be
"provided". (Ted Yu via stevel)
HADOOP-12209 Comparable type should be in FileStatus.
(Yong Zhang via stevel)
HADOOP-12088. KMSClientProvider uses equalsIgnoreCase("application/json").
(Brahma Reddy Battula via stevel)
HADOOP-12051. ProtobufRpcEngine.invoke() should use Exception.toString()
over getMessage() in logging/span events. (Varun Saxena via stevel)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
@ -987,6 +1004,9 @@ Release 2.7.2 - UNRELEASED
HADOOP-12191. Bzip2Factory is not thread safe. (Brahma Reddy Battula
via ozawa)
HDFS-8767. RawLocalFileSystem.listStatus() returns null for UNIX pipefile.
(kanaka kumar avvaru via wheat9)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

View File

@ -15,47 +15,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
MYNAME="${BASH_SOURCE-$0}"
function hadoop_usage()
{
echo "Usage: hadoop [--config confdir] [COMMAND | CLASSNAME]"
echo " CLASSNAME run the class named CLASSNAME"
echo " or"
echo " where COMMAND is one of:"
echo " archive -archiveName NAME -p <parent path> <src>* <dest>"
echo " create a Hadoop archive"
echo " checknative [-a|-h] check native Hadoop and compression "
echo " libraries availability"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " conftest validate configuration XML files"
echo " credential interact with credential providers"
echo " daemonlog get/set the log level for each daemon"
echo " distch path:owner:group:permisson"
echo " distributed metadata changer"
echo " distcp <srcurl> <desturl> "
echo " copy file or directories recursively"
echo " fs run a generic filesystem user client"
echo " jar <jar> run a jar file"
echo " note: please use \"yarn jar\" to launch"
echo " YARN applications, not this command."
echo " jnipath prints the java.library.path"
echo " kerbname show auth_to_local principal conversion"
echo " key manage keys via the KeyProvider"
echo " trace view and modify Hadoop tracing settings"
echo " version print the version"
echo ""
echo "Most commands print help when invoked w/o parameters."
hadoop_add_subcommand "archive" "create a Hadoop archive"
hadoop_add_subcommand "checknative" "check native Hadoop and compression libraries availability"
hadoop_add_subcommand "classpath" "prints the class path needed to get the Hadoop jar and the required libraries"
hadoop_add_subcommand "conftest" "validate configuration XML files"
hadoop_add_subcommand "credential" "interact with credential providers"
hadoop_add_subcommand "daemonlog" "get/set the log level for each daemon"
hadoop_add_subcommand "distch" "distributed metadata changer"
hadoop_add_subcommand "distcp" "copy file or directories recursively"
hadoop_add_subcommand "fs" "run a generic filesystem user client"
hadoop_add_subcommand "jar <jar>" "run a jar file. NOTE: please use \"yarn jar\" to launch YARN applications, not this command."
hadoop_add_subcommand "jnipath" "prints the java.library.path"
hadoop_add_subcommand "kerbname" "show auth_to_local principal conversion"
hadoop_add_subcommand "key" "manage keys via the KeyProvider"
hadoop_add_subcommand "trace" "view and modify Hadoop tracing settings"
hadoop_add_subcommand "version" "print the version"
hadoop_generate_usage "${MYNAME}" true
}
# This script runs the hadoop core commands.
# let's locate libexec...
if [[ -n "${HADOOP_PREFIX}" ]]; then
DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec"
else
this="${BASH_SOURCE-$0}"
bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
@ -98,7 +86,7 @@ case ${COMMAND} in
exit 1
fi
;;
#mapred commands for backwards compatibility
pipes|job|queue|mrgroups|mradmin|jobtracker|tasktracker)
hadoop_error "WARNING: Use of this script to execute ${COMMAND} is deprecated."

View File

@ -14,6 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# we need to declare this globally as an array, which can only
# be done outside of a function
declare -a HADOOP_USAGE=()
## @description Print a message to stderr
## @audience public
## @stability stable
@ -36,6 +40,89 @@ function hadoop_debug
fi
}
## @description Add a subcommand to the usage output
## @audience private
## @stability evolving
## @replaceable no
## @param subcommand
## @param subcommanddesc
function hadoop_add_subcommand
{
local option=$1
local text=$2
HADOOP_USAGE[${HADOOP_USAGE_COUNTER}]="${option}@${text}"
((HADOOP_USAGE_COUNTER=HADOOP_USAGE_COUNTER+1))
}
## @description generate standard usage output
## @description and optionally takes a class
## @audience private
## @stability evolving
## @replaceable no
## @param execname
## @param [true|false]
function hadoop_generate_usage
{
local cmd=$1
local takesclass=$2
local i
local counter
local line
local option
local giventext
local maxoptsize
local foldsize=75
declare -a tmpa
cmd=${cmd##*/}
echo "Usage: ${cmd} [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]"
if [[ ${takesclass} = true ]]; then
echo " or ${cmd} [OPTIONS] CLASSNAME [CLASSNAME OPTIONS]"
echo " where CLASSNAME is a user-provided Java class"
fi
echo ""
echo " OPTIONS is none or any of:"
echo " --config confdir"
echo " --daemon (start|stop|status)"
echo " --debug"
echo " --hostnames list[,of,host,names]"
echo " --hosts filename"
echo " --loglevel loglevel"
echo " --slaves"
echo ""
echo " SUBCOMMAND is one of:"
counter=0
while read -r line; do
tmpa[${counter}]=${line}
((counter=counter+1))
option=$(echo "${line}" | cut -f1 -d'@')
if [[ ${#option} -gt ${maxoptsize} ]]; then
maxoptsize=${#option}
fi
done < <(for i in "${HADOOP_USAGE[@]}"; do
echo "${i}"
done | sort)
i=0
((foldsize=75-maxoptsize))
until [[ $i -eq ${#tmpa[@]} ]]; do
option=$(echo "${tmpa[$i]}" | cut -f1 -d'@')
giventext=$(echo "${tmpa[$i]}" | cut -f2 -d'@')
while read -r line; do
printf "%-${maxoptsize}s %-s\n" "${option}" "${line}"
option=" "
done < <(echo "${giventext}"| fold -s -w ${foldsize})
((i=i+1))
done
echo ""
echo "Most subcommands print help when invoked w/o parameters or with -h."
}
## @description Replace `oldvar` with `newvar` if `oldvar` exists.
## @audience public
## @stability stable
@ -101,6 +188,9 @@ function hadoop_bootstrap
# setup a default TOOL_PATH
TOOL_PATH=${TOOL_PATH:-${HADOOP_PREFIX}/share/hadoop/tools/lib/*}
# usage output set to zero
HADOOP_USAGE_COUNTER=0
export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)}
# defaults

View File

@ -544,7 +544,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
// AuthenticatedURL properly to set authToken post initialization)
}
HttpExceptionUtils.validateResponse(conn, expectedResponse);
if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())
if (conn.getContentType() != null
&& conn.getContentType().trim().toLowerCase()
.startsWith(APPLICATION_JSON_MIME)
&& klass != null) {
ObjectMapper mapper = new ObjectMapper();
InputStream is = null;

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.io.Writable;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileStatus implements Writable, Comparable {
public class FileStatus implements Writable, Comparable<FileStatus> {
private Path path;
private long length;
@ -323,19 +323,14 @@ public class FileStatus implements Writable, Comparable {
}
/**
* Compare this object to another object
*
* @param o the object to be compared.
* Compare this FileStatus to another FileStatus
* @param o the FileStatus to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
*
* @throws ClassCastException if the specified object's is not of
* type FileStatus
*/
@Override
public int compareTo(Object o) {
FileStatus other = (FileStatus)o;
return this.getPath().compareTo(other.getPath());
public int compareTo(FileStatus o) {
return this.getPath().compareTo(o.getPath());
}
/** Compare if this object is equal to another object

View File

@ -90,17 +90,13 @@ public class LocatedFileStatus extends FileStatus {
}
/**
* Compare this object to another object
*
* @param o the object to be compared.
* Compare this FileStatus to another FileStatus
* @param o the FileStatus to be compared.
* @return a negative integer, zero, or a positive integer as this object
* is less than, equal to, or greater than the specified object.
*
* @throws ClassCastException if the specified object's is not of
* type FileStatus
*/
@Override
public int compareTo(Object o) {
public int compareTo(FileStatus o) {
return super.compareTo(o);
}

View File

@ -459,35 +459,38 @@ public class RawLocalFileSystem extends FileSystem {
if (!localf.exists()) {
throw new FileNotFoundException("File " + f + " does not exist");
}
if (localf.isFile()) {
if (!useDeprecatedFileStatus) {
return new FileStatus[] { getFileStatus(f) };
if (localf.isDirectory()) {
String[] names = localf.list();
if (names == null) {
return null;
}
return new FileStatus[] {
new DeprecatedRawLocalFileStatus(localf, getDefaultBlockSize(f), this)};
results = new FileStatus[names.length];
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
// Assemble the path using the Path 3 arg constructor to make sure
// paths with colon are properly resolved on Linux
results[j] = getFileStatus(new Path(f, new Path(null, null,
names[i])));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have
// changed since the names[] list was generated.
}
}
if (j == names.length) {
return results;
}
return Arrays.copyOf(results, j);
}
String[] names = localf.list();
if (names == null) {
return null;
if (!useDeprecatedFileStatus) {
return new FileStatus[] { getFileStatus(f) };
}
results = new FileStatus[names.length];
int j = 0;
for (int i = 0; i < names.length; i++) {
try {
// Assemble the path using the Path 3 arg constructor to make sure
// paths with colon are properly resolved on Linux
results[j] = getFileStatus(new Path(f, new Path(null, null, names[i])));
j++;
} catch (FileNotFoundException e) {
// ignore the files not found since the dir list may have have changed
// since the names[] list was generated.
}
}
if (j == names.length) {
return results;
}
return Arrays.copyOf(results, j);
return new FileStatus[] {
new DeprecatedRawLocalFileStatus(localf,
getDefaultBlockSize(f), this) };
}
protected boolean mkOneDir(File p2f) throws IOException {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.viewfs;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -120,7 +121,7 @@ class ViewFsLocatedFileStatus extends LocatedFileStatus {
}
@Override
public int compareTo(Object o) {
public int compareTo(FileStatus o) {
return super.compareTo(o);
}

View File

@ -238,7 +238,7 @@ public class ProtobufRpcEngine implements RpcEngine {
}
if (Trace.isTracing()) {
traceScope.getSpan().addTimelineAnnotation(
"Call got exception: " + e.getMessage());
"Call got exception: " + e.toString());
}
throw new ServiceException(e);
} finally {

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.ipc.StandbyException;
* The server-side secret manager for each token type.
* @param <T> The type of the token identifier
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class SecretManager<T extends TokenIdentifier> {
/**

View File

@ -36,7 +36,7 @@ import java.util.ServiceLoader;
/**
* The client-side form of the token.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class Token<T extends TokenIdentifier> implements Writable {
public static final Log LOG = LogFactory.getLog(Token.class);

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
* An identifier that identifies a token, may contain public information
* about a token, including its kind (or type).
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class TokenIdentifier implements Writable {

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public @interface TokenInfo {
/** The type of TokenSelector to be used */

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.io.Text;
/**
* This is the interface for plugins that handle tokens.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class TokenRenewer {

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.io.Text;
* @param <T>
* T extends TokenIdentifier
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TokenSelector<T extends TokenIdentifier> {
Token<T> selectToken(Text service,

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"})
@InterfaceAudience.Public
@InterfaceStability.Evolving
package org.apache.hadoop.security.token;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -26,6 +26,9 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.Test;
import org.apache.commons.logging.Log;
@ -183,6 +186,25 @@ public class TestFileStatus {
validateToString(fileStatus);
}
@Test
public void testCompareTo() throws IOException {
Path path1 = new Path("path1");
Path path2 = new Path("path2");
FileStatus fileStatus1 =
new FileStatus(1, true, 1, 1, 1, 1, FsPermission.valueOf("-rw-rw-rw-"),
"one", "one", null, path1);
FileStatus fileStatus2 =
new FileStatus(1, true, 1, 1, 1, 1, FsPermission.valueOf("-rw-rw-rw-"),
"one", "one", null, path2);
assertTrue(fileStatus1.compareTo(fileStatus2) < 0);
assertTrue(fileStatus2.compareTo(fileStatus1) > 0);
List<FileStatus> statList = new ArrayList<>();
statList.add(fileStatus1);
statList.add(fileStatus2);
assertTrue(Collections.binarySearch(statList, fileStatus1) > -1);
}
/**
* Check that toString produces the expected output for a symlink.
*/

View File

@ -32,11 +32,14 @@ import java.util.Random;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
/**
* This class tests the local file system via the FileSystem abstraction.
@ -612,4 +615,23 @@ public class TestLocalFileSystem {
}
}
@Test
public void testFileStatusPipeFile() throws Exception {
RawLocalFileSystem origFs = new RawLocalFileSystem();
RawLocalFileSystem fs = spy(origFs);
Configuration conf = mock(Configuration.class);
fs.setConf(conf);
Whitebox.setInternalState(fs, "useDeprecatedFileStatus", false);
Path path = new Path("/foo");
File pipe = mock(File.class);
when(pipe.isFile()).thenReturn(false);
when(pipe.isDirectory()).thenReturn(false);
when(pipe.exists()).thenReturn(true);
FileStatus stat = mock(FileStatus.class);
doReturn(pipe).when(fs).pathToFile(path);
doReturn(stat).when(fs).getFileStatus(path);
FileStatus[] stats = fs.listStatus(path);
assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
}
}

View File

@ -722,6 +722,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8742. Inotify: Support event for OP_TRUNCATE.
(Surendra Singh Lilhore via aajisaka)
HDFS-7314. When the DFSClient lease cannot be renewed, abort open-for-write
files rather than the entire DFSClient. (mingma)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -15,55 +15,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.
MYNAME="${BASH_SOURCE-$0}"
function hadoop_usage
{
echo "Usage: hdfs [--config confdir] [--daemon (start|stop|status)]"
echo " [--loglevel loglevel] COMMAND"
echo " where COMMAND is one of:"
echo " balancer run a cluster balancing utility"
echo " cacheadmin configure the HDFS cache"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " crypto configure HDFS encryption zones"
echo " datanode run a DFS datanode"
echo " debug run a Debug Admin to execute HDFS debug commands"
echo " dfs run a filesystem command on the file system"
echo " dfsadmin run a DFS admin client"
echo " fetchdt fetch a delegation token from the NameNode"
echo " fsck run a DFS filesystem checking utility"
echo " getconf get config values from configuration"
echo " groups get the groups which users belong to"
echo " haadmin run a DFS HA admin client"
echo " jmxget get JMX exported values from NameNode or DataNode."
echo " journalnode run the DFS journalnode"
echo " lsSnapshottableDir list all snapshottable dirs owned by the current user"
echo " Use -help to see options"
echo " mover run a utility to move block replicas across"
echo " storage types"
echo " namenode run the DFS namenode"
echo " Use -format to initialize the DFS filesystem"
echo " nfs3 run an NFS version 3 gateway"
echo " oev apply the offline edits viewer to an edits file"
echo " oiv apply the offline fsimage viewer to an fsimage"
echo " oiv_legacy apply the offline fsimage viewer to a legacy fsimage"
echo " portmap run a portmap service"
echo " secondarynamenode run the DFS secondary namenode"
echo " snapshotDiff diff two snapshots of a directory or diff the"
echo " current directory contents with a snapshot"
echo " storagepolicies list/get/set block storage policies"
echo " version print the version"
echo " zkfc run the ZK Failover Controller daemon"
echo ""
echo "Most commands print help when invoked w/o parameters."
# There are also debug commands, but they don't show up in this listing.
hadoop_add_subcommand "balancer" "run a cluster balancing utility"
hadoop_add_subcommand "cacheadmin" "configure the HDFS cache"
hadoop_add_subcommand "classpath" "prints the class path needed to get the hadoop jar and the required libraries"
hadoop_add_subcommand "crypto" "configure HDFS encryption zones"
hadoop_add_subcommand "datanode" "run a DFS datanode"
hadoop_add_subcommand "debug" "run a Debug Admin to execute HDFS debug commands"
hadoop_add_subcommand "dfs" "run a filesystem command on the file system"
hadoop_add_subcommand "dfsadmin" "run a DFS admin client"
hadoop_add_subcommand "fetchdt" "fetch a delegation token from the NameNode"
hadoop_add_subcommand "fsck" "run a DFS filesystem checking utility"
hadoop_add_subcommand "getconf" "get config values from configuration"
hadoop_add_subcommand "groups" "get the groups which users belong to"
hadoop_add_subcommand "haadmin" "run a DFS HA admin client"
hadoop_add_subcommand "jmxget" "get JMX exported values from NameNode or DataNode."
hadoop_add_subcommand "journalnode" "run the DFS journalnode"
hadoop_add_subcommand "lsSnapshottableDir" "list all snapshottable dirs owned by the current user"
hadoop_add_subcommand "mover" "run a utility to move block replicas across storage types"
hadoop_add_subcommand "namenode" "run the DFS namenode"
hadoop_add_subcommand "nfs3" "run an NFS version 3 gateway"
hadoop_add_subcommand "oev" "apply the offline edits viewer to an edits file"
hadoop_add_subcommand "oiv" "apply the offline fsimage viewer to an fsimage"
hadoop_add_subcommand "oiv_legacy" "apply the offline fsimage viewer to a legacy fsimage"
hadoop_add_subcommand "portmap" "run a portmap service"
hadoop_add_subcommand "secondarynamenode" "run the DFS secondary namenode"
hadoop_add_subcommand "snapshotDiff" "diff two snapshots of a directory or diff the current directory contents with a snapshot"
hadoop_add_subcommand "storagepolicies" "list/get/set block storage policies"
hadoop_add_subcommand "version" "print the version"
hadoop_add_subcommand "zkfc" "run the ZK Failover Controller daemon"
hadoop_generate_usage "${MYNAME}"
}
# let's locate libexec...
if [[ -n "${HADOOP_PREFIX}" ]]; then
DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec"
else
this="${BASH_SOURCE-$0}"
bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
@ -107,11 +99,11 @@ case ${COMMAND} in
if [[ -n "${HADOOP_SECURE_DN_USER}" ]]; then
secure_service="true"
secure_user="${HADOOP_SECURE_DN_USER}"
# backward compatiblity
HADOOP_SECURE_PID_DIR="${HADOOP_SECURE_PID_DIR:-$HADOOP_SECURE_DN_PID_DIR}"
HADOOP_SECURE_LOG_DIR="${HADOOP_SECURE_LOG_DIR:-$HADOOP_SECURE_DN_LOG_DIR}"
hadoop_debug "Appending HADOOP_DATANODE_OPTS onto HADOOP_OPTS"
hadoop_debug "Appending HADOOP_DN_SECURE_EXTRA_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_DATANODE_OPTS} ${HADOOP_DN_SECURE_EXTRA_OPTS}"
@ -186,11 +178,11 @@ case ${COMMAND} in
if [[ -n "${HADOOP_PRIVILEGED_NFS_USER}" ]]; then
secure_service="true"
secure_user="${HADOOP_PRIVILEGED_NFS_USER}"
# backward compatiblity
HADOOP_SECURE_PID_DIR="${HADOOP_SECURE_PID_DIR:-$HADOOP_SECURE_NFS3_PID_DIR}"
HADOOP_SECURE_LOG_DIR="${HADOOP_SECURE_LOG_DIR:-$HADOOP_SECURE_NFS3_LOG_DIR}"
hadoop_debug "Appending HADOOP_NFS3_OPTS onto HADOOP_OPTS"
hadoop_debug "Appending HADOOP_NFS3_SECURE_EXTRA_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_NFS3_OPTS} ${HADOOP_NFS3_SECURE_EXTRA_OPTS}"

View File

@ -567,23 +567,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
void closeConnectionToNamenode() {
RPC.stopProxy(namenode);
}
/** Abort and release resources held. Ignore all errors. */
public void abort() {
clientRunning = false;
closeAllFilesBeingWritten(true);
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
getLeaseRenewer().closeClient(this);
} catch (IOException ioe) {
LOG.info("Exception occurred while aborting the client " + ioe);
}
closeConnectionToNamenode();
}
/** Close/abort all files being written. */
private void closeAllFilesBeingWritten(final boolean abort) {
public void closeAllFilesBeingWritten(final boolean abort) {
for(;;) {
final long inodeId;
final DFSOutputStream out;

View File

@ -215,6 +215,12 @@ public class LeaseRenewer {
return renewal;
}
/** Used for testing only. */
@VisibleForTesting
public synchronized void setRenewalTime(final long renewal) {
this.renewal = renewal;
}
/** Add a client. */
private synchronized void addClient(final DFSClient dfsc) {
for(DFSClient c : dfsclients) {
@ -453,8 +459,12 @@ public class LeaseRenewer {
+ (elapsed/1000) + " seconds. Aborting ...", ie);
synchronized (this) {
while (!dfsclients.isEmpty()) {
dfsclients.get(0).abort();
DFSClient dfsClient = dfsclients.get(0);
dfsClient.closeAllFilesBeingWritten(true);
closeClient(dfsClient);
}
//Expire the current LeaseRenewer thread.
emptyTime = 0;
}
break;
} catch (IOException ie) {

View File

@ -31,6 +31,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
@ -62,6 +63,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.client.HdfsUtils;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
@ -354,7 +356,59 @@ public class TestDFSClientRetries {
cluster.shutdown();
}
}
/**
* Test DFSClient can continue to function after renewLease RPC
* receives SocketTimeoutException.
*/
@Test
public void testLeaseRenewSocketTimeout() throws Exception
{
String file1 = "/testFile1";
String file2 = "/testFile2";
// Set short retry timeouts so this test runs faster
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2 * 1000);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
Mockito.doThrow(new SocketTimeoutException()).when(spyNN).renewLease(
Mockito.anyString());
DFSClient client = new DFSClient(null, spyNN, conf, null);
// Get hold of the lease renewer instance used by the client
LeaseRenewer leaseRenewer = client.getLeaseRenewer();
leaseRenewer.setRenewalTime(100);
OutputStream out1 = client.create(file1, false);
Mockito.verify(spyNN, timeout(10000).times(1)).renewLease(
Mockito.anyString());
verifyEmptyLease(leaseRenewer);
try {
out1.write(new byte[256]);
fail("existing output stream should be aborted");
} catch (IOException e) {
}
// Verify DFSClient can do read operation after renewLease aborted.
client.exists(file2);
// Verify DFSClient can do write operation after renewLease no longer
// throws SocketTimeoutException.
Mockito.doNothing().when(spyNN).renewLease(
Mockito.anyString());
leaseRenewer = client.getLeaseRenewer();
leaseRenewer.setRenewalTime(100);
OutputStream out2 = client.create(file2, false);
Mockito.verify(spyNN, timeout(10000).times(2)).renewLease(
Mockito.anyString());
out2.write(new byte[256]);
out2.close();
verifyEmptyLease(leaseRenewer);
} finally {
cluster.shutdown();
}
}
/**
* Test that getAdditionalBlock() and close() are idempotent. This allows
* a client to safely retry a call and still produce a correct
@ -673,7 +727,15 @@ public class TestDFSClientRetries {
}
return ret;
}
private void verifyEmptyLease(LeaseRenewer leaseRenewer) throws Exception {
int sleepCount = 0;
while (!leaseRenewer.isEmpty() && sleepCount++ < 20) {
Thread.sleep(500);
}
assertTrue("Lease should be empty.", leaseRenewer.isEmpty());
}
class DFSClientReader implements Runnable {
DFSClient client;

View File

@ -15,29 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
MYNAME="${BASH_SOURCE-$0}"
function hadoop_usage
{
echo "Usage: mapred [--config confdir] [--daemon (start|stop|status)]"
echo " [--loglevel loglevel] COMMAND"
echo " where COMMAND is one of:"
echo " archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive"
echo " classpath prints the class path needed for running"
echo " mapreduce subcommands"
echo " distcp <srcurl> <desturl> copy file or directories recursively"
echo " historyserver run job history servers as a standalone daemon"
echo " hsadmin job history server admin interface"
echo " job manipulate MapReduce jobs"
echo " pipes run a Pipes job"
echo " queue get information regarding JobQueues"
echo " sampler sampler"
echo " version print the version"
echo ""
echo "Most commands print help when invoked w/o parameters."
hadoop_add_subcommand "archive" "create a hadoop archive"
hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands"
hadoop_add_subcommand "distcp" "copy file or directories recursively"
hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon"
hadoop_add_subcommand "hsadmin" "job history server admin interface"
hadoop_add_subcommand "job" "manipulate MapReduce jobs"
hadoop_add_subcommand "pipes" "run a Pipes job"
hadoop_add_subcommand "queue" "get information regarding JobQueues"
hadoop_add_subcommand "sampler" "sampler"
hadoop_add_subcommand "version" "print the version"
hadoop_generate_usage "${MYNAME}"
}
this="${BASH_SOURCE-$0}"
bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
# let's locate libexec...
if [[ -n "${HADOOP_PREFIX}" ]]; then

View File

@ -128,12 +128,12 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>

View File

@ -332,6 +332,9 @@ Release 2.8.0 - UNRELEASED
YARN-3069. Document missing properties in yarn-default.xml.
(Ray Chiang via aajisaka)
YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison
via Colin P. McCabe)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
@ -637,6 +640,12 @@ Release 2.8.0 - UNRELEASED
YARN-3805. Update the documentation of Disk Checker based on YARN-90.
(Masatake Iwasaki via ozawa)
YARN-3930. FileSystemNodeLabelsStore should make sure edit log file closed when
exception is thrown. (Dian Fu via wangda)
YARN-3885. ProportionalCapacityPreemptionPolicy doesn't preempt if queue is
more than 2 level. (Ajith S via wangda)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
@ -660,6 +669,12 @@ Release 2.7.2 - UNRELEASED
YARN-3690. [JDK8] 'mvn site' fails. (Brahma Reddy Battula via aajisaka)
YARN-3905. Application History Server UI NPEs when accessing apps run after
RM restart (Eric Payne via jeagles)
YARN-3535. Scheduler must re-request container resources when RMContainer transitions
from ALLOCATED to KILLED (rohithsharma and peng.zhang via asuresh)
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES

54
hadoop-yarn-project/hadoop-yarn/bin/yarn Normal file → Executable file
View File

@ -15,37 +15,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
MYNAME="${BASH_SOURCE-$0}"
function hadoop_usage
{
echo "Usage: yarn [--config confdir] [COMMAND | CLASSNAME]"
echo " CLASSNAME run the class named CLASSNAME"
echo " or"
echo " where COMMAND is one of:"
echo " application prints application(s) report/kill application"
echo " applicationattempt prints applicationattempt(s) report"
echo " classpath prints the class path needed to get the"
echo " Hadoop jar and the required libraries"
echo " cluster prints cluster information"
echo " container prints container(s) report"
echo " daemonlog get/set the log level for each daemon"
echo " jar <jar> run a jar file"
echo " logs dump container logs"
echo " node prints node report(s)"
echo " nodemanager run a nodemanager on each slave"
echo " proxyserver run the web app proxy server"
echo " queue prints queue information"
echo " resourcemanager run the ResourceManager"
echo " Use -format-state-store for deleting the RMStateStore."
echo " Use -remove-application-from-state-store <appId> for "
echo " removing application from RMStateStore."
echo " rmadmin admin tools"
echo " scmadmin SharedCacheManager admin tools"
echo " sharedcachemanager run the SharedCacheManager daemon"
echo " timelineserver run the timeline server"
echo " top view cluster information"
echo " version print the version"
echo ""
echo "Most commands print help when invoked w/o parameters."
hadoop_add_subcommand "application" "prints application(s) report/kill application"
hadoop_add_subcommand "applicationattempt" "prints applicationattempt(s) report"
hadoop_add_subcommand "classpath" "prints the class path needed to get the hadoop jar and the required libraries"
hadoop_add_subcommand "cluster" "prints cluster information"
hadoop_add_subcommand "container" "prints container(s) report"
hadoop_add_subcommand "daemonlog" "get/set the log level for each daemon"
hadoop_add_subcommand "jar <jar>" "run a jar file"
hadoop_add_subcommand "logs" "dump container logs"
hadoop_add_subcommand "node" "prints node report(s)"
hadoop_add_subcommand "nodemanager" "run a nodemanager on each slave"
hadoop_add_subcommand "proxyserver" "run the web app proxy server"
hadoop_add_subcommand "queue" "prints queue information"
hadoop_add_subcommand "resourcemanager" "run the ResourceManager"
hadoop_add_subcommand "rmadmin" "admin tools"
hadoop_add_subcommand "scmadmin" "SharedCacheManager admin tools"
hadoop_add_subcommand "sharedcachemanager" "run the SharedCacheManager daemon"
hadoop_add_subcommand "timelineserver" "run the timeline server"
hadoop_add_subcommand "top" "view cluster information"
hadoop_add_subcommand "version" "print the version"
hadoop_generate_usage "${MYNAME}" true
}
@ -53,8 +46,7 @@ function hadoop_usage
if [[ -n "${HADOOP_PREFIX}" ]]; then
DEFAULT_LIBEXEC_DIR="${HADOOP_PREFIX}/libexec"
else
this="${BASH_SOURCE-$0}"
bin=$(cd -P -- "$(dirname -- "${this}")" >/dev/null && pwd -P)
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi

View File

@ -127,31 +127,40 @@ public class FileSystemNodeLabelsStore extends NodeLabelsStore {
@Override
public void updateNodeToLabelsMappings(
Map<NodeId, Set<String>> nodeToLabels) throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.NODE_TO_LABELS.ordinal());
((ReplaceLabelsOnNodeRequestPBImpl) ReplaceLabelsOnNodeRequest
.newInstance(nodeToLabels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
@Override
public void storeNewClusterNodeLabels(List<NodeLabel> labels)
throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(labels)).getProto().writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.ADD_LABELS.ordinal());
((AddToClusterNodeLabelsRequestPBImpl) AddToClusterNodeLabelsRequest
.newInstance(labels)).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
@Override
public void removeClusterNodeLabels(Collection<String> labels)
throws IOException {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
ensureCloseEditlogFile();
try {
ensureAppendEditlogFile();
editlogOs.writeInt(SerializedLogType.REMOVE_LABELS.ordinal());
((RemoveFromClusterNodeLabelsRequestPBImpl) RemoveFromClusterNodeLabelsRequest.newInstance(Sets
.newHashSet(labels.iterator()))).getProto().writeDelimitedTo(editlogOs);
} finally {
ensureCloseEditlogFile();
}
}
/* (non-Javadoc)

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
@ -255,10 +254,9 @@ public class AppBlock extends HtmlBlock {
AppAttemptInfo appAttempt = new AppAttemptInfo(appAttemptReport);
ContainerReport containerReport;
try {
// AM container is always the first container of the attempt
final GetContainerReportRequest request =
GetContainerReportRequest.newInstance(ContainerId.newContainerId(
appAttemptReport.getApplicationAttemptId(), 1));
GetContainerReportRequest.newInstance(
appAttemptReport.getAMContainerId());
if (callerUGI == null) {
containerReport =
appBaseProt.getContainerReport(request).getContainerReport();

View File

@ -22,6 +22,7 @@
#include "configuration.h"
#include "container-executor.h"
#include <inttypes.h>
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
@ -74,14 +75,14 @@ static int is_only_root_writable(const char *file) {
return 0;
}
if (file_stat.st_uid != 0) {
fprintf(ERRORFILE, "File %s must be owned by root, but is owned by %d\n",
file, file_stat.st_uid);
fprintf(ERRORFILE, "File %s must be owned by root, but is owned by %" PRId64 "\n",
file, (int64_t)file_stat.st_uid);
return 0;
}
if ((file_stat.st_mode & (S_IWGRP | S_IWOTH)) != 0) {
fprintf(ERRORFILE,
"File %s must not be world or group writable, but is %03o\n",
file, file_stat.st_mode & (~S_IFMT));
"File %s must not be world or group writable, but is %03lo\n",
file, (unsigned long)file_stat.st_mode & (~S_IFMT));
return 0;
}
return 1;

View File

@ -19,6 +19,7 @@
#include "configuration.h"
#include "container-executor.h"
#include <inttypes.h>
#include <libgen.h>
#include <dirent.h>
#include <fcntl.h>
@ -68,7 +69,7 @@ void set_nm_uid(uid_t user, gid_t group) {
*/
char* get_executable() {
char buffer[PATH_MAX];
snprintf(buffer, PATH_MAX, "/proc/%u/exe", getpid());
snprintf(buffer, PATH_MAX, "/proc/%" PRId64 "/exe", (int64_t)getpid());
char *filename = malloc(PATH_MAX);
ssize_t len = readlink(buffer, filename, PATH_MAX);
if (len == -1) {
@ -181,7 +182,7 @@ static int write_pid_to_cgroup_as_root(const char* cgroup_file, pid_t pid) {
// write pid
char pid_buf[21];
snprintf(pid_buf, sizeof(pid_buf), "%d", pid);
snprintf(pid_buf, sizeof(pid_buf), "%" PRId64, (int64_t)pid);
ssize_t written = write(cgroup_fd, pid_buf, strlen(pid_buf));
close(cgroup_fd);
if (written == -1) {
@ -222,7 +223,7 @@ static int write_pid_to_file_as_nm(const char* pid_file, pid_t pid) {
// write pid to temp file
char pid_buf[21];
snprintf(pid_buf, 21, "%d", pid);
snprintf(pid_buf, 21, "%" PRId64, (int64_t)pid);
ssize_t written = write(pid_fd, pid_buf, strlen(pid_buf));
close(pid_fd);
if (written == -1) {
@ -307,7 +308,7 @@ static int wait_and_get_exit_code(pid_t pid) {
} while (waitpid_result == -1 && errno == EINTR);
if (waitpid_result < 0) {
fprintf(LOGFILE, "error waiting for process %d - %s\n", pid, strerror(errno));
fprintf(LOGFILE, "error waiting for process %" PRId64 " - %s\n", (int64_t)pid, strerror(errno));
return -1;
}
@ -316,7 +317,7 @@ static int wait_and_get_exit_code(pid_t pid) {
} else if (WIFSIGNALED(child_status)) {
exit_code = 0x80 + WTERMSIG(child_status);
} else {
fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
fprintf(LOGFILE, "Unable to determine exit status for pid %" PRId64 "\n", (int64_t)pid);
}
return exit_code;
@ -510,7 +511,8 @@ int mkdirs(const char* path, mode_t perm) {
* Give 0 or 1 to represent whether this is the final component. If it is, we
* need to check the permission.
*/
int create_validate_dir(char* npath, mode_t perm, char* path, int finalComponent) {
int create_validate_dir(const char* npath, mode_t perm, const char* path,
int finalComponent) {
struct stat sb;
if (stat(npath, &sb) != 0) {
if (mkdir(npath, perm) != 0) {
@ -534,7 +536,7 @@ int create_validate_dir(char* npath, mode_t perm, char* path, int finalComponent
// check whether the given path is a directory
// also check the access permissions whether it is the same as desired permissions
int check_dir(char* npath, mode_t st_mode, mode_t desired, int finalComponent) {
int check_dir(const char* npath, mode_t st_mode, mode_t desired, int finalComponent) {
if (!S_ISDIR(st_mode)) {
fprintf(LOGFILE, "Path %s is file not dir\n", npath);
return -1;
@ -1491,7 +1493,7 @@ int mount_cgroup(const char *pair, const char *hierarchy) {
static int run_traffic_control(const char *opts[], char *command_file) {
const int max_tc_args = 16;
char *args[max_tc_args];
const char *args[max_tc_args];
int i = 0, j = 0;
args[i++] = TC_BIN;
@ -1517,7 +1519,7 @@ static int run_traffic_control(const char *opts[], char *command_file) {
unlink(command_file);
return 0;
} else {
execv(TC_BIN, args);
execv(TC_BIN, (char**)args);
//if we reach here, exec failed
fprintf(LOGFILE, "failed to execute tc command! error: %s\n", strerror(errno));
return TRAFFIC_CONTROL_EXECUTION_FAILED;

View File

@ -217,10 +217,10 @@ int change_user(uid_t user, gid_t group);
int mount_cgroup(const char *pair, const char *hierarchy);
int check_dir(char* npath, mode_t st_mode, mode_t desired,
int check_dir(const char* npath, mode_t st_mode, mode_t desired,
int finalComponent);
int create_validate_dir(char* npath, mode_t perm, char* path,
int create_validate_dir(const char* npath, mode_t perm, const char* path,
int finalComponent);
/**

View File

@ -18,6 +18,7 @@
#include "configuration.h"
#include "container-executor.h"
#include <inttypes.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
@ -73,17 +74,17 @@ void run(const char *cmd) {
} else {
int status = 0;
if (waitpid(child, &status, 0) <= 0) {
printf("FAIL: failed waiting for child process %s pid %d - %s\n",
cmd, child, strerror(errno));
printf("FAIL: failed waiting for child process %s pid %" PRId64 " - %s\n",
cmd, (int64_t)child, strerror(errno));
exit(1);
}
if (!WIFEXITED(status)) {
printf("FAIL: process %s pid %d did not exit\n", cmd, child);
printf("FAIL: process %s pid %" PRId64 " did not exit\n", cmd, (int64_t)child);
exit(1);
}
if (WEXITSTATUS(status) != 0) {
printf("FAIL: process %s pid %d exited with error status %d\n", cmd,
child, WEXITSTATUS(status));
printf("FAIL: process %s pid %" PRId64 " exited with error status %d\n", cmd,
(int64_t)child, WEXITSTATUS(status));
exit(1);
}
}
@ -144,10 +145,11 @@ void check_pid_file(const char* pid_file, pid_t mypid) {
}
char myPidBuf[33];
snprintf(myPidBuf, 33, "%d", mypid);
snprintf(myPidBuf, 33, "%" PRId64, (int64_t)mypid);
if (strncmp(pidBuf, myPidBuf, strlen(myPidBuf)) != 0) {
printf("FAIL: failed to find matching pid in pid file\n");
printf("FAIL: Expected pid %d : Got %.*s", mypid, (int)bytes, pidBuf);
printf("FAIL: Expected pid %" PRId64 " : Got %.*s", (int64_t)mypid,
(int)bytes, pidBuf);
exit(1);
}
}
@ -441,16 +443,16 @@ void run_test_in_child(const char* test_name, void (*func)()) {
} else {
int status = 0;
if (waitpid(child, &status, 0) == -1) {
printf("FAIL: waitpid %d failed - %s\n", child, strerror(errno));
printf("FAIL: waitpid %" PRId64 " failed - %s\n", (int64_t)child, strerror(errno));
exit(1);
}
if (!WIFEXITED(status)) {
printf("FAIL: child %d didn't exit - %d\n", child, status);
printf("FAIL: child %" PRId64 " didn't exit - %d\n", (int64_t)child, status);
exit(1);
}
if (WEXITSTATUS(status) != 0) {
printf("FAIL: child %d exited with bad status %d\n",
child, WEXITSTATUS(status));
printf("FAIL: child %" PRId64 " exited with bad status %d\n",
(int64_t)child, WEXITSTATUS(status));
exit(1);
}
}
@ -471,7 +473,7 @@ void test_signal_container() {
sleep(3600);
exit(0);
} else {
printf("Child container launched as %d\n", child);
printf("Child container launched as %" PRId64 "\n", (int64_t)child);
if (signal_container_as_user(yarn_username, child, SIGQUIT) != 0) {
exit(1);
}
@ -508,7 +510,7 @@ void test_signal_container_group() {
sleep(3600);
exit(0);
}
printf("Child container launched as %d\n", child);
printf("Child container launched as %" PRId64 "\n", (int64_t)child);
// there's a race condition for child calling change_user and us
// calling signal_container_as_user, hence sleeping
sleep(3);
@ -586,7 +588,7 @@ void test_init_app() {
}
int status = 0;
if (waitpid(child, &status, 0) <= 0) {
printf("FAIL: failed waiting for process %d - %s\n", child,
printf("FAIL: failed waiting for process %" PRId64 " - %s\n", (int64_t)child,
strerror(errno));
exit(1);
}
@ -687,7 +689,7 @@ void test_run_container() {
}
int status = 0;
if (waitpid(child, &status, 0) <= 0) {
printf("FAIL: failed waiting for process %d - %s\n", child,
printf("FAIL: failed waiting for process %" PRId64 " - %s\n", (int64_t)child,
strerror(errno));
exit(1);
}

View File

@ -896,8 +896,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
ret.untouchableExtra = Resource.newInstance(0, 0);
} else {
ret.untouchableExtra =
Resources.subtractFrom(extra, childrensPreemptable);
Resources.subtract(extra, childrensPreemptable);
}
ret.preemptableExtra = Resources.min(
rc, partitionResource, childrensPreemptable, extra);
}
}
addTempQueuePartition(ret);
@ -1127,4 +1129,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
}
@VisibleForTesting
public Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
return queueToPartitions;
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@ -94,7 +95,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
.addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED,
RMContainerEventType.EXPIRE, new FinishedTransition())
.addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED,
RMContainerEventType.KILL, new FinishedTransition())
RMContainerEventType.KILL, new ContainerRescheduledTransition())
// Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
@ -495,6 +496,17 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
private static final class ContainerRescheduledTransition extends
FinishedTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
// Tell scheduler to recover request of this container to app
container.eventHandler.handle(new ContainerRescheduledEvent(container));
super.transition(container, event);
}
}
private static class FinishedTransition extends BaseTransition {
@Override

View File

@ -108,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@ -1370,6 +1371,14 @@ public class CapacityScheduler extends
killContainer(containerToBeKilled);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
@ -1543,7 +1552,6 @@ public class CapacityScheduler extends
if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString());
}
recoverResourceRequestForContainer(cont);
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
RMContainerEventType.KILL);

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
public class ContainerRescheduledEvent extends SchedulerEvent {
private RMContainer container;
public ContainerRescheduledEvent(RMContainer container) {
super(SchedulerEventType.CONTAINER_RESCHEDULED);
this.container = container;
}
public RMContainer getContainer() {
return container;
}
}

View File

@ -38,6 +38,9 @@ public enum SchedulerEventType {
// Source: ContainerAllocationExpirer
CONTAINER_EXPIRED,
// Source: RMContainer
CONTAINER_RESCHEDULED,
// Source: SchedulingEditPolicy
DROP_RESERVATION,
PREEMPT_CONTAINER,

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -462,7 +463,6 @@ public class FairScheduler extends
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
recoverResourceRequestForContainer(container);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
@ -1236,6 +1236,15 @@ public class FairScheduler extends
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
break;
case CONTAINER_RESCHEDULED:
if (!(event instanceof ContainerRescheduledEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
break;
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@ -846,6 +847,14 @@ public class FifoScheduler extends
RMContainerEventType.EXPIRE);
}
break;
case CONTAINER_RESCHEDULED:
{
ContainerRescheduledEvent containerRescheduledEvent =
(ContainerRescheduledEvent) event;
RMContainer container = containerRescheduledEvent.getContainer();
recoverResourceRequestForContainer(container);
}
break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}

View File

@ -247,7 +247,7 @@ public class TestAMRestart {
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
while (attempt.getJustFinishedContainers().size() != expectedNum
while (attempt.getJustFinishedContainers().size() < expectedNum
&& count < 500) {
Thread.sleep(100);
count++;

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -898,6 +899,37 @@ public class TestProportionalCapacityPreemptionPolicy {
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
}
@Test
public void testHierarchicalLarge3Levels() {
int[][] qData = new int[][] {
// / A F I
// B C G H J K
// D E
{ 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
{ 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF appG
{ 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability
//run with Logger.getRootLogger().setLevel(Level.DEBUG);
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemory());
//2nd level child(E) preempts 10, but parent A has only 9 extra
//check the parent can prempt only the extra from > 2 level child
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory());
int extraForQueueA = tempQueueAPartition.current.getMemory()- tempQueueAPartition.guaranteed.getMemory();
assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory());
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;

View File

@ -22,14 +22,20 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -40,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -403,6 +410,89 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
}
}
@Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception {
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 =
new MockNM("127.0.0.1:2351", 10240, rm1.getResourceTrackerService());
nm2.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
int NUM_CONTAINERS = 1;
// allocate NUM_CONTAINERS containers
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm1.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// 3rd container is in Allocated state.
am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS,
new ArrayList<ContainerId>());
nm2.nodeHeartbeat(true);
ContainerId containerId3 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm1.waitForContainerAllocated(nm2, containerId3);
rm1.waitForState(nm2, containerId3, RMContainerState.ALLOCATED);
// NodeManager restart
nm2.registerNode();
// NM restart kills all allocated and running containers.
rm1.waitForState(nm2, containerId3, RMContainerState.KILLED);
// The killed RMContainer request should be restored. In successive
// nodeHeartBeats AM should be able to get container allocated.
containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.size() != NUM_CONTAINERS) {
nm2.nodeHeartbeat(true);
containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
Thread.sleep(200);
}
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 4,
ContainerState.RUNNING);
ContainerId containerId4 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4);
rm1.waitForState(nm2, containerId4, RMContainerState.RUNNING);
} finally {
rm1.stop();
}
}
private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -4453,6 +4454,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// preempt now
scheduler.warnOrKillContainer(rmContainer);
// Trigger container rescheduled event
scheduler.handle(new ContainerRescheduledEvent(rmContainer));
List<ResourceRequest> requests = rmContainer.getResourceRequests();
// Once recovered, resource request will be present again in app
Assert.assertEquals(3, requests.size());