Merge trunk to HDFS-4685.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1563327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-01-31 23:05:23 +00:00
commit 9e8cabf8ce
52 changed files with 2251 additions and 1297 deletions

124
dev-support/create-release.sh Executable file
View File

@ -0,0 +1,124 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Function to probe the exit code of the script commands,
# and stop in the case of failure with an contextual error
# message.
run() {
echo "\$ ${@}"
"${@}"
exitCode=$?
if [[ $exitCode != 0 ]]; then
echo
echo "Failed! running ${@} in `pwd`"
echo
exit $exitCode
fi
}
doMD5() {
MD5CMD="md5sum"
which $MD5CMD
if [[ $? != 0 ]]; then
MD5CMD="md5"
fi
run $MD5CMD ${1} > ${1}.md5
}
# If provided, the created release artifacts will be tagged with it
# (use RC#, i.e: RC0). Do not use a label to create the final release
# artifact.
RC_LABEL=$1
# Extract Hadoop version from POM
HADOOP_VERSION=`cat pom.xml | grep "<version>" | head -1 | sed 's|^ *<version>||' | sed 's|</version>.*$||'`
echo
echo "*****************************************************************"
echo
echo "Hadoop version to create release artifacts: ${HADOOP_VERSION}"
echo
echo "Release Candidate Label: ${RC_LABEL}"
echo
echo "*****************************************************************"
echo
if [[ ! -z ${RC_LABEL} ]]; then
RC_LABEL="-${RC_LABEL}"
fi
# Get Maven command
if [ -z "$MAVEN_HOME" ]; then
MVN=mvn
else
MVN=$MAVEN_HOME/bin/mvn
fi
ARTIFACTS_DIR="target/artifacts"
# Create staging dir for release artifacts
run mkdir -p ${ARTIFACTS_DIR}
# Create RAT report
run ${MVN} apache-rat:check
# Create SRC and BIN tarballs for release,
# Using 'install goal instead of 'package' so artifacts are available
# in the Maven local cache for the site generation
run ${MVN} install -Pdist,docs,src,native -DskipTests -Dtar
# Create site for release
run ${MVN} site site:stage -Pdist -Psrc
run mv target/staging/hadoop-project target/r${HADOOP_VERSION}/
run cd target/
run tar czf hadoop-site-${HADOOP_VERSION}.tar.gz r${HADOOP_VERSION}/*
run cd ..
# Stage RAT report
find . -name rat.txt | xargs -I% cat % > ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}-rat.txt
# Stage CHANGES.txt files
run cp ./hadoop-common-project/hadoop-common/CHANGES.txt ${ARTIFACTS_DIR}/CHANGES-COMMON-${HADOOP_VERSION}${RC_LABEL}.txt
run cp ./hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ${ARTIFACTS_DIR}/CHANGES-HDFS--${HADOOP_VERSION}${RC_LABEL}.txt
run cp ./hadoop-mapreduce-project/CHANGES.txt ${ARTIFACTS_DIR}/CHANGES-MAPREDUCE-${HADOOP_VERSION}${RC_LABEL}.txt
run cp ./hadoop-yarn-project/CHANGES.txt ${ARTIFACTS_DIR}/CHANGES-YARN-${HADOOP_VERSION}${RC_LABEL}.txt
# Stage BIN tarball
run mv hadoop-dist/target/hadoop-${HADOOP_VERSION}.tar.gz ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}.tar.gz
# State SRC tarball
run mv hadoop-dist/target/hadoop-${HADOOP_VERSION}-src.tar.gz ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}-src.tar.gz
# Stage SITE tarball
run mv target/hadoop-site-${HADOOP_VERSION}.tar.gz ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}-site.tar.gz
# MD5 SRC and BIN tarballs
doMD5 ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}.tar.gz
doMD5 ${ARTIFACTS_DIR}/hadoop-${HADOOP_VERSION}${RC_LABEL}-src.tar.gz
run cd ${ARTIFACTS_DIR}
ARTIFACTS_DIR=`pwd`
echo
echo "Congratulations, you have successfully built the release"
echo "artifacts for Apache Hadoop ${HADOOP_VERSION}${RC_LABEL}"
echo
echo "The artifacts for this run are available at ${ARTIFACTS_DIR}:"
run ls -1 ${ARTIFACTS_DIR}
echo
echo "Remember to sign them before staging them on the open"
echo

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
* <li>Classes that are {@link Private} are to be considered unstable unless
* a different InterfaceStability annotation states otherwise.</li>
* <li>Incompatible changes must not be made to classes marked as stable.</li>
* </ul>
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving

View File

@ -301,10 +301,19 @@ Release 2.4.0 - UNRELEASED
IMPROVEMENTS
HADOOP-10139. Update and improve the Single Cluster Setup document.
(Akira Ajisaka via Arpit Agarwal)
HADOOP-10295. Allow distcp to automatically identify the checksum type of
source files and use it for the target. (jing9 and Laurent Goujon)
OPTIMIZATIONS
BUG FIXES
HADOOP-10320. Javadoc in InterfaceStability.java lacks final </ul>.
(René Nyffenegger via cnauroth)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -461,6 +470,12 @@ Release 2.3.0 - UNRELEASED
"rpc.metrics.percentiles.intervals" to core-default.xml.
(Akira Ajisaka via wang)
HADOOP-10317. Rename branch-2.3 release version from 2.4.0-SNAPSHOT
to 2.3.0-SNAPSHOT. (wang)
HADOOP-10313. Script and jenkins job to produce Hadoop release artifacts.
(tucu)
OPTIMIZATIONS
HADOOP-10142. Avoid groups lookup for unprivileged users such as "dr.who"
@ -665,6 +680,11 @@ Release 2.3.0 - UNRELEASED
HADOOP-10288. Explicit reference to Log4JLogger breaks non-log4j users
(todd)
HADOOP-10310. SaslRpcServer should be initialized even when no secret
manager present. (atm)
HADOOP-10311. Cleanup vendor names from the code base. (tucu)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@ import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.io.Writable;
/** An abstract class representing file checksums for files. */
@ -36,6 +37,10 @@ public abstract class FileChecksum implements Writable {
/** The value of the checksum in bytes */
public abstract byte[] getBytes();
public ChecksumOpt getChecksumOpt() {
return null;
}
/** Return true if both the algorithms and the values are the same. */
@Override
public boolean equals(Object other) {

View File

@ -88,6 +88,7 @@ public class MD5MD5CRC32FileChecksum extends FileChecksum {
return DataChecksum.Type.CRC32;
}
@Override
public ChecksumOpt getChecksumOpt() {
return new ChecksumOpt(getCrcType(), bytesPerCRC);
}

View File

@ -2206,7 +2206,7 @@ public abstract class Server {
// Create the responder here
responder = new Responder();
if (secretManager != null) {
if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}

View File

@ -20,174 +20,267 @@ Hadoop MapReduce Next Generation - Setting up a Single Node Cluster.
%{toc|section=1|fromDepth=0}
* Mapreduce Tarball
* Purpose
You should be able to obtain the MapReduce tarball from the release.
If not, you should be able to create a tarball from the source.
+---+
$ mvn clean install -DskipTests
$ cd hadoop-mapreduce-project
$ mvn clean install assembly:assembly -Pnative
+---+
<<NOTE:>> You will need {{{http://code.google.com/p/protobuf}protoc 2.5.0}}
installed.
To ignore the native builds in mapreduce you can omit the <<<-Pnative>>> argument
for maven. The tarball should be available in <<<target/>>> directory.
* Setting up the environment.
Assuming you have installed hadoop-common/hadoop-hdfs and exported
<<$HADOOP_COMMON_HOME>>/<<$HADOOP_HDFS_HOME>>, untar hadoop mapreduce
tarball and set environment variable <<$HADOOP_MAPRED_HOME>> to the
untarred directory. Set <<$HADOOP_YARN_HOME>> the same as <<$HADOOP_MAPRED_HOME>>.
<<NOTE:>> The following instructions assume you have hdfs running.
* Setting up Configuration.
To start the ResourceManager and NodeManager, you will have to update the configs.
Assuming your $HADOOP_CONF_DIR is the configuration directory and has the installed
configs for HDFS and <<<core-site.xml>>>. There are 2 config files you will have to setup
<<<mapred-site.xml>>> and <<<yarn-site.xml>>>.
** Setting up <<<mapred-site.xml>>>
Add the following configs to your <<<mapred-site.xml>>>.
This document describes how to set up and configure a single-node Hadoop
installation so that you can quickly perform simple operations using Hadoop
MapReduce and the Hadoop Distributed File System (HDFS).
* Prerequisites
** Supported Platforms
* GNU/Linux is supported as a development and production platform.
Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes.
* Windows is also a supported platform but the followings steps
are for Linux only. To set up Hadoop on Windows, see
{{{http://wiki.apache.org/hadoop/Hadoop2OnWindows}wiki page}}.
** Required Software
Required software for Linux include:
[[1]] Java\u2122 must be installed. Recommended Java versions are described
at {{{http://wiki.apache.org/hadoop/HadoopJavaVersions}
HadoopJavaVersions}}.
[[2]] ssh must be installed and sshd must be running to use the Hadoop
scripts that manage remote Hadoop daemons.
** Installing Software
If your cluster doesn't have the requisite software you will need to install
it.
For example on Ubuntu Linux:
----
$ sudo apt-get install ssh
$ sudo apt-get install rsync
----
* Download
To get a Hadoop distribution, download a recent stable release from one of
the {{{http://www.apache.org/dyn/closer.cgi/hadoop/common/}
Apache Download Mirrors}}.
* Prepare to Start the Hadoop Cluster
Unpack the downloaded Hadoop distribution. In the distribution, edit
the file <<<etc/hadoop/hadoop-env.sh>>> to define some parameters as
follows:
----
# set to the root of your Java installation
export JAVA_HOME=/usr/java/latest
# Assuming your installation directory is /usr/local/hadoop
export HADOOP_PREFIX=/usr/local/hadoop
----
Try the following command:
----
$ bin/hadoop
----
This will display the usage documentation for the hadoop script.
Now you are ready to start your Hadoop cluster in one of the three supported
modes:
* {{{Standalone Operation}Local (Standalone) Mode}}
* {{{Pseudo-Distributed Operation}Pseudo-Distributed Mode}}
* {{{Fully-Distributed Operation}Fully-Distributed Mode}}
* Standalone Operation
By default, Hadoop is configured to run in a non-distributed mode, as a
single Java process. This is useful for debugging.
The following example copies the unpacked conf directory to use as input
and then finds and displays every match of the given regular expression.
Output is written to the given output directory.
----
$ mkdir input
$ cp etc/hadoop/*.xml input
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-${project.version}.jar grep input output 'dfs[a-z.]+'
$ cat output/*
----
* Pseudo-Distributed Operation
Hadoop can also be run on a single-node in a pseudo-distributed mode where
each Hadoop daemon runs in a separate Java process.
** Configuration
Use the following:
etc/hadoop/core-site.xml:
+---+
<configuration>
<property>
<name>mapreduce.cluster.temp.dir</name>
<value></value>
<description>No description</description>
<final>true</final>
</property>
<property>
<name>mapreduce.cluster.local.dir</name>
<value></value>
<description>No description</description>
<final>true</final>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
+---+
** Setting up <<<yarn-site.xml>>>
Add the following configs to your <<<yarn-site.xml>>>
etc/hadoop/hdfs-site.xml:
+---+
<configuration>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>host:port</value>
<description>host is the hostname of the resource manager and
port is the port on which the NodeManagers contact the Resource Manager.
</description>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
+---+
** Setup passphraseless ssh
Now check that you can ssh to the localhost without a passphrase:
----
$ ssh localhost
----
If you cannot ssh to localhost without a passphrase, execute the
following commands:
----
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
----
** Execution
The following instructions are to run a MapReduce job locally.
If you want to execute a job on YARN, see {{YARN on Single Node}}.
[[1]] Format the filesystem:
----
$ bin/hdfs namenode -format
----
[[2]] Start NameNode daemon and DataNode daemon:
----
$ sbin/start-dfs.sh
----
The hadoop daemon log output is written to the <<<${HADOOP_LOG_DIR}>>>
directory (defaults to <<<${HADOOP_HOME}/logs>>>).
[[3]] Browse the web interface for the NameNode; by default it is
available at:
* NameNode - <<<http://localhost:50070/>>>
[[4]] Make the HDFS directories required to execute MapReduce jobs:
----
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
----
[[5]] Copy the input files into the distributed filesystem:
----
$ bin/hdfs dfs -put etc/hadoop input
----
[[6]] Run some of the examples provided:
----
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-${project.version}.jar grep input output 'dfs[a-z.]+'
----
[[7]] Examine the output files:
Copy the output files from the distributed filesystem to the local
filesystem and examine them:
----
$ bin/hdfs dfs -get output output
$ cat output/*
----
or
View the output files on the distributed filesystem:
----
$ bin/hdfs dfs -cat output/*
----
[[8]] When you're done, stop the daemons with:
----
$ sbin/stop-dfs.sh
----
** YARN on Single Node
You can run a MapReduce job on YARN in a pseudo-distributed mode by setting
a few parameters and running ResourceManager daemon and NodeManager daemon
in addition.
The following instructions assume that 1. ~ 4. steps of
{{{Execution}the above instructions}} are already executed.
[[1]] Configure parameters as follows:
etc/hadoop/mapred-site.xml:
+---+
<configuration>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>host:port</value>
<description>host is the hostname of the resourcemanager and port is the port
on which the Applications in the cluster talk to the Resource Manager.
</description>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
+---+
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
<description>In case you do not want to use the default scheduler</description>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>host:port</value>
<description>the host is the hostname of the ResourceManager and the port is the port on
which the clients can talk to the Resource Manager. </description>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value></value>
<description>the local directories used by the nodemanager</description>
</property>
<property>
<name>yarn.nodemanager.address</name>
<value>0.0.0.0:port</value>
<description>the nodemanagers bind to this port</description>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>10240</value>
<description>the amount of memory on the NodeManager in GB</description>
</property>
<property>
<name>yarn.nodemanager.remote-app-log-dir</name>
<value>/app-logs</value>
<description>directory on hdfs where the application logs are moved to </description>
</property>
<property>
<name>yarn.nodemanager.log-dirs</name>
<value></value>
<description>the directories used by Nodemanagers as log directories</description>
</property>
etc/hadoop/yarn-site.xml:
+---+
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
<description>shuffle service that needs to be set for Map Reduce to run </description>
</property>
</configuration>
+---+
* Setting up <<<capacity-scheduler.xml>>>
[[2]] Start ResourceManager daemon and NodeManager daemon:
Make sure you populate the root queues in <<<capacity-scheduler.xml>>>.
----
$ sbin/start-yarn.sh
----
+---+
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>unfunded,default</value>
</property>
[[3]] Browse the web interface for the ResourceManager; by default it is
available at:
<property>
<name>yarn.scheduler.capacity.root.capacity</name>
<value>100</value>
</property>
* ResourceManager - <<<http://localhost:8088/>>>
<property>
<name>yarn.scheduler.capacity.root.unfunded.capacity</name>
<value>50</value>
</property>
[[4]] Run a MapReduce job.
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>50</value>
</property>
+---+
[[5]] When you're done, stop the daemons with:
* Running daemons.
----
$ sbin/stop-yarn.sh
----
Assuming that the environment variables <<$HADOOP_COMMON_HOME>>, <<$HADOOP_HDFS_HOME>>, <<$HADOO_MAPRED_HOME>>,
<<$HADOOP_YARN_HOME>>, <<$JAVA_HOME>> and <<$HADOOP_CONF_DIR>> have been set appropriately.
Set $<<$YARN_CONF_DIR>> the same as $<<HADOOP_CONF_DIR>>
* Fully-Distributed Operation
Run ResourceManager and NodeManager as:
+---+
$ cd $HADOOP_MAPRED_HOME
$ sbin/yarn-daemon.sh start resourcemanager
$ sbin/yarn-daemon.sh start nodemanager
+---+
You should be up and running. You can run randomwriter as:
+---+
$ $HADOOP_COMMON_HOME/bin/hadoop jar hadoop-examples.jar randomwriter out
+---+
Good luck.
For information on setting up fully-distributed, non-trivial clusters
see {{{./ClusterSetup.html}Cluster Setup}}.

View File

@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -163,8 +164,9 @@ class DFSClientCache {
return new CacheLoader<String, DFSClient>() {
@Override
public DFSClient load(String userName) throws Exception {
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser(userName);
UserGroupInformation ugi = getUserGroupInformation(
userName,
UserGroupInformation.getCurrentUser());
// Guava requires CacheLoader never returns null.
return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() {
@ -177,6 +179,28 @@ class DFSClientCache {
};
}
/**
* This method uses the currentUser, and real user to create a proxy
* @param effectiveUser The user who is being proxied by the real user
* @param realUser The actual user who does the command
* @return Proxy UserGroupInformation
* @throws IOException If proxying fails
*/
UserGroupInformation getUserGroupInformation(
String effectiveUser,
UserGroupInformation realUser)
throws IOException {
Preconditions.checkNotNull(effectiveUser);
Preconditions.checkNotNull(realUser);
UserGroupInformation ugi =
UserGroupInformation.createProxyUser(effectiveUser, realUser);
if (LOG.isDebugEnabled()){
LOG.debug(String.format("Created ugi:" +
" %s for username: %s", ugi, effectiveUser));
}
return ugi;
}
private RemovalListener<String, DFSClient> clientRemovalListener() {
return new RemovalListener<String, DFSClient>() {
@Override

View File

@ -479,9 +479,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
}
try {
// Use superUserClient to get file attr since we don't know whether the
// NFS client user has access permission to the file
attrs = writeManager.getFileAttr(superUserClient, handle, iug);
// HDFS-5804 removed supserUserClient access
attrs = writeManager.getFileAttr(dfsClient, handle, iug);
if (attrs == null) {
LOG.error("Can't get path for fileId:" + handle.getFileId());
return new ACCESS3Response(Nfs3Status.NFS3ERR_STALE);
@ -603,8 +603,10 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
// Only do access check.
try {
// Don't read from cache. Client may not have read permission.
attrs = Nfs3Utils.getFileAttr(superUserClient,
Nfs3Utils.getFileIdPath(handle), iug);
attrs = Nfs3Utils.getFileAttr(
dfsClient,
Nfs3Utils.getFileIdPath(handle),
iug);
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Get error accessing file, fileId:" + handle.getFileId());

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -40,6 +41,9 @@ import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response;
import org.apache.hadoop.nfs.nfs3.response.READDIRPLUS3Response.EntryPlus3;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@ -61,6 +65,14 @@ public class TestReaddir {
@BeforeClass
public static void setup() throws Exception {
String currentUser = System.getProperty("user.name");
config.set(
ProxyUsers.getProxySuperuserGroupConfKey(currentUser),
"*");
config.set(
ProxyUsers.getProxySuperuserIpConfKey(currentUser),
"*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();

View File

@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.hamcrest.core.Is.is;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestDFSClientCache {
@ -49,6 +52,28 @@ public class TestDFSClientCache {
assertEquals(MAX_CACHE_SIZE - 1, cache.clientCache.size());
}
@Test
public void testGetUserGroupInformation() throws IOException {
String userName = "user1";
String currentUser = "currentUser";
UserGroupInformation currentUserUgi = UserGroupInformation
.createUserForTesting(currentUser, new String[0]);
currentUserUgi.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS);
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "hdfs://localhost");
DFSClientCache cache = new DFSClientCache(conf);
UserGroupInformation ugiResult
= cache.getUserGroupInformation(userName, currentUserUgi);
assertThat(ugiResult.getUserName(), is(userName));
assertThat(ugiResult.getRealUser(), is(currentUserUgi));
assertThat(
ugiResult.getAuthenticationMethod(),
is(UserGroupInformation.AuthenticationMethod.PROXY));
}
private static boolean isDfsClientClose(DFSClient c) {
try {
c.exists("");

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.nfs.nfs3.response.CREATE3Response;
import org.apache.hadoop.nfs.nfs3.response.READ3Response;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.SecurityHandler;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.jboss.netty.channel.Channel;
import org.junit.Assert;
import org.junit.Test;
@ -285,6 +286,14 @@ public class TestWrites {
SecurityHandler securityHandler = Mockito.mock(SecurityHandler.class);
Mockito.when(securityHandler.getUser()).thenReturn(
System.getProperty("user.name"));
String currentUser = System.getProperty("user.name");
config.set(
ProxyUsers.getProxySuperuserGroupConfKey(currentUser),
"*");
config.set(
ProxyUsers.getProxySuperuserIpConfKey(currentUser),
"*");
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
try {
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();

View File

@ -297,10 +297,28 @@ Release 2.4.0 - UNRELEASED
HDFS-5781. Use an array to record the mapping between FSEditLogOpCode and
the corresponding byte value. (jing9)
HDFS-5153. Datanode should send block reports for each storage in a
separate message. (Arpit Agarwal)
HDFS-5804. HDFS NFS Gateway fails to mount and proxy when using Kerberos.
(Abin Shahab via jing9)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery
(todd)
BUG FIXES
HDFS-5492. Port HDFS-2069 (Incorrect default trash interval in the
docs) to trunk. (Akira Ajisaka via Arpit Agarwal)
HDFS-5843. DFSClient.getFileChecksum() throws IOException if checksum is
disabled. (Laurent Goujon via jing9)
HDFS-5856. DataNode.checkDiskError might throw NPE.
(Josh Elser via suresh)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -401,6 +401,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 60 * 60 * 1000;
public static final String DFS_BLOCKREPORT_INITIAL_DELAY_KEY = "dfs.blockreport.initialDelay";
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";

View File

@ -1616,15 +1616,19 @@ public class BlockManager {
/**
* The given storage is reporting all its blocks.
* Update the (storage-->block list) and (block-->storage list) maps.
*
* @return true if all known storages of the given DN have finished reporting.
* @throws IOException
*/
public void processReport(final DatanodeID nodeID,
public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage, final String poolId,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
final long endTime;
DatanodeDescriptor node;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException(
"ProcessReport from dead or unregistered node: " + nodeID);
@ -1632,13 +1636,21 @@ public class BlockManager {
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
final DatanodeStorageInfo storageInfo = node.updateStorage(storage);
DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());
if (storageInfo == null) {
// We handle this for backwards compatibility.
storageInfo = node.updateStorage(storage);
LOG.warn("Unknown storageId " + storage.getStorageID() +
", updating storageMap. This indicates a buggy " +
"DataNode that isn't heartbeating correctly.");
}
if (namesystem.isInStartupSafeMode()
&& storageInfo.getBlockReportCount() > 0) {
blockLog.info("BLOCK* processReport: "
+ "discarded non-initial block report from " + nodeID
+ " because namenode still in startup phase");
return;
return !node.hasStaleStorages();
}
if (storageInfo.numBlocks() == 0) {
@ -1655,7 +1667,7 @@ public class BlockManager {
storageInfo.receivedBlockReport();
if (staleBefore && !storageInfo.areBlockContentsStale()) {
LOG.info("BLOCK* processReport: Received first block report from "
+ node + " after starting up or becoming active. Its block "
+ storage + " after starting up or becoming active. Its block "
+ "contents are no longer considered stale");
rescanPostponedMisreplicatedBlocks();
}
@ -1670,9 +1682,10 @@ public class BlockManager {
if (metrics != null) {
metrics.addBlockReport((int) (endTime - startTime));
}
blockLog.info("BLOCK* processReport: from "
+ nodeID + ", blocks: " + newReport.getNumberOfBlocks()
blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID()
+ " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+ ", processing time: " + (endTime - startTime) + " msecs");
return !node.hasStaleStorages();
}
/**
@ -1827,7 +1840,7 @@ public class BlockManager {
Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
Collection<StatefulBlockInfo> toUC) { // add to under-construction list
final DatanodeStorageInfo storageInfo = dn.updateStorage(storage);
final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID());
// place a delimiter in the list which separates blocks
// that have been reported from those that have not

View File

@ -257,6 +257,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
}
}
boolean hasStaleStorages() {
synchronized (storageMap) {
for (DatanodeStorageInfo storage : storageMap.values()) {
if (storage.areBlockContentsStale()) {
return true;
}
}
return false;
}
}
/**
* Remove block from the list of blocks belonging to the data-node. Remove
* data-node from the block.

View File

@ -22,11 +22,9 @@ import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.*;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -437,59 +435,87 @@ class BPServiceActor implements Runnable {
/**
* Report the list blocks to the Namenode
* @return DatanodeCommands returned by the NN. May be null.
* @throws IOException
*/
DatanodeCommand blockReport() throws IOException {
List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > dnConf.blockReportInterval) {
final long startTime = now();
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
return null;
}
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
// Flush any block information that precedes the block report. Otherwise
// we have a chance that we will miss the delHint information
// or we will report an RBW replica after the BlockReport already reports
// a FINALIZED one.
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
// Send one block report per known storage.
// Create block report
long brCreateStartTime = now();
long totalBlockCount = 0;
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
// Send block report
long brSendStartTime = now();
StorageBlockReport[] reports =
// Convert the reports to the format expected by the NN.
int i = 0;
int totalBlockCount = 0;
StorageBlockReport reports[] =
new StorageBlockReport[perVolumeBlockLists.size()];
int i = 0;
for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
DatanodeStorage dnStorage = kvPair.getKey();
BlockListAsLongs blockList = kvPair.getValue();
reports[i++] = new StorageBlockReport(
kvPair.getKey(), blockList.getBlockListAsLongs());
totalBlockCount += blockList.getNumberOfBlocks();
reports[i++] =
new StorageBlockReport(
dnStorage, blockList.getBlockListAsLongs());
}
cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
// Send the reports to the NN.
int numReportsSent;
long brSendStartTime = now();
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
numReportsSent = 1;
DatanodeCommand cmd =
bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);
if (cmd != null) {
cmds.add(cmd);
}
} else {
// Send one block report per message.
numReportsSent = i;
for (StorageBlockReport report : reports) {
StorageBlockReport singleReport[] = { report };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport);
if (cmd != null) {
cmds.add(cmd);
}
}
}
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
LOG.info("BlockReport of " + totalBlockCount
+ " blocks took " + brCreateCost + " msec to generate and "
+ brSendCost + " msecs for RPC and NN processing");
LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
" blocks total. Took " + brCreateCost +
" msec to generate and " + brSendCost +
" msecs for RPC and NN processing. " +
" Got back commands " +
(cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));
// If we have sent the first block report, then wait a random
scheduleNextBlockReport(startTime);
return cmds.size() == 0 ? null : cmds;
}
private void scheduleNextBlockReport(long previousReportStartTime) {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
lastBlockReport = previousReportStartTime -
DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
@ -501,9 +527,6 @@ class BPServiceActor implements Runnable {
lastBlockReport += (now() - lastBlockReport) /
dnConf.blockReportInterval * dnConf.blockReportInterval;
}
LOG.info("sent block report, processed command:" + cmd);
}
return cmd;
}
DatanodeCommand cacheReport() throws IOException {
@ -513,7 +536,7 @@ class BPServiceActor implements Runnable {
}
// send cache report if timer has expired.
DatanodeCommand cmd = null;
long startTime = Time.monotonicNow();
final long startTime = Time.monotonicNow();
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending cacheReport from service actor: " + this);
@ -613,7 +636,7 @@ class BPServiceActor implements Runnable {
//
while (shouldRun()) {
try {
long startTime = now();
final long startTime = now();
//
// Every so often, send heartbeat or block-report
@ -659,10 +682,10 @@ class BPServiceActor implements Runnable {
lastDeletedReport = startTime;
}
DatanodeCommand cmd = blockReport();
processCommand(new DatanodeCommand[]{ cmd });
List<DatanodeCommand> cmds = blockReport();
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
cmd = cacheReport();
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
// Now safe to start scanning the block pool.

View File

@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
@ -70,6 +72,7 @@ public class DNConf {
final long readaheadLength;
final long heartBeatInterval;
final long blockReportInterval;
final long blockReportSplitThreshold;
final long deleteReportInterval;
final long initialBlockReportDelay;
final long cacheReportInterval;
@ -117,6 +120,8 @@ public class DNConf {
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.blockReportSplitThreshold = conf.getLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT);
this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);

View File

@ -36,6 +36,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -51,7 +52,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -1324,12 +1324,7 @@ public class DataNode extends Configured
protected void checkDiskError(Exception e ) throws IOException {
LOG.warn("checkDiskError: exception: ", e);
if (e instanceof SocketException || e instanceof SocketTimeoutException
|| e instanceof ClosedByInterruptException
|| e.getMessage().startsWith("An established connection was aborted")
|| e.getMessage().startsWith("Broken pipe")
|| e.getMessage().startsWith("Connection reset")
|| e.getMessage().contains("java.nio.channels.SocketChannel")) {
if (isNetworkRelatedException(e)) {
LOG.info("Not checking disk as checkDiskError was called on a network" +
" related exception");
return;
@ -1342,6 +1337,28 @@ public class DataNode extends Configured
}
}
/**
* Check if the provided exception looks like it's from a network error
* @param e the exception from a checkDiskError call
* @return true if this exception is network related, false otherwise
*/
protected boolean isNetworkRelatedException(Exception e) {
if (e instanceof SocketException
|| e instanceof SocketTimeoutException
|| e instanceof ClosedChannelException
|| e instanceof ClosedByInterruptException) {
return true;
}
String msg = e.getMessage();
return null != msg
&& (msg.startsWith("An established connection was aborted")
|| msg.startsWith("Broken pipe")
|| msg.startsWith("Connection reset")
|| msg.contains("java.nio.channels.SocketChannel"));
}
/**
* Check if there is a disk failure and if so, handle the error
*/

View File

@ -655,8 +655,9 @@ class DataXceiver extends Receiver implements Runnable {
final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
final DataChecksum checksum = header.getChecksum();
final int bytesPerCRC = checksum.getBytesPerChecksum();
final long crcPerBlock = (metadataIn.getLength()
- BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize();
final long crcPerBlock = checksum.getChecksumSize() > 0
? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize()
: 0;
//compute block checksum
final MD5Hash md5 = MD5Hash.digest(checksumIn);

View File

@ -3802,7 +3802,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
if (diff > 0) {
try {
String path = leaseManager.findPath(fileINode);
String path = fileINode.getFullPathName();
dir.updateSpaceConsumed(path, 0, -diff*fileINode.getFileReplication());
} catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e);
@ -4004,7 +4004,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@VisibleForTesting
String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
throws IOException {
String src = leaseManager.findPath(pendingFile);
String src = pendingFile.getFullPathName();
// commit the last block and complete it if it has minimum replicas
commitOrCompleteLastBlock(pendingFile, storedBlock);
@ -4026,7 +4026,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
@VisibleForTesting
String persistBlocks(INodeFile pendingFile, boolean logRetryCache)
throws IOException {
String src = leaseManager.findPath(pendingFile);
String src = pendingFile.getFullPathName();
dir.persistBlocks(src, pendingFile, logRetryCache);
return src;
}
@ -5954,7 +5954,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
.getDatanodeStorageInfos(newNodes, newStorageIDs);
blockinfo.setExpectedLocations(storages);
String src = leaseManager.findPath(pendingFile);
String src = pendingFile.getFullPathName();
dir.persistBlocks(src, pendingFile, logRetryCache);
}

View File

@ -179,24 +179,6 @@ public class LeaseManager {
return addLease(newHolder, src);
}
/**
* Finds the pathname for the specified pendingFile
*/
public synchronized String findPath(INodeFile pendingFile)
throws IOException {
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
Lease lease = getLease(uc.getClientName());
if (lease != null) {
String src = lease.findPath(pendingFile);
if (src != null) {
return src;
}
}
throw new IOException("pendingFile (=" + pendingFile + ") not found."
+ "(lease=" + lease + ")");
}
/**
* Renew the lease(s) held by the given client
*/
@ -252,24 +234,6 @@ public class LeaseManager {
return now() - lastUpdate > softLimit;
}
/**
* @return the path associated with the pendingFile and null if not found.
*/
private String findPath(INodeFile pendingFile) {
try {
for (String src : paths) {
INode node = fsnamesystem.dir.getINode(src);
if (node == pendingFile
|| (node.isFile() && node.asFile() == pendingFile)) {
return src;
}
}
} catch (UnresolvedLinkException e) {
throw new AssertionError("Lease files should reside on this FS");
}
return null;
}
/** Does this lease contain any path? */
boolean hasPath() {return !paths.isEmpty();}

View File

@ -984,13 +984,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
+ "from " + nodeReg + ", reports.length=" + reports.length);
}
final BlockManager bm = namesystem.getBlockManager();
boolean hasStaleStorages = true;
for(StorageBlockReport r : reports) {
final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
hasStaleStorages = bm.processReport(nodeReg, r.getStorage(), poolId, blocks);
}
if (nn.getFSImage().isUpgradeFinalized() && !nn.isStandbyState())
if (nn.getFSImage().isUpgradeFinalized() &&
!nn.isStandbyState() &&
!hasStaleStorages) {
return new FinalizeCommand(poolId);
}
return null;
}

View File

@ -482,6 +482,20 @@
<description>Delay for first block report in seconds.</description>
</property>
<property>
<name>dfs.blockreport.split.threshold</name>
<value>1000000</value>
<description>If the number of blocks on the DataNode is below this
threshold then it will send block reports for all Storage Directories
in a single message.
If the number of blocks exceeds this threshold then the DataNode will
send block reports for each Storage Directory in separate messages.
Set to zero to always split.
</description>
</property>
<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600</value>

View File

@ -17,11 +17,11 @@
---
${maven.build.timestamp}
%{toc|section=1|fromDepth=0}
HDFS Architecture
Introduction
%{toc|section=1|fromDepth=0}
* Introduction
The Hadoop Distributed File System (HDFS) is a distributed file system
designed to run on commodity hardware. It has many similarities with
@ -35,9 +35,9 @@ Introduction
is part of the Apache Hadoop Core project. The project URL is
{{http://hadoop.apache.org/}}.
Assumptions and Goals
* Assumptions and Goals
Hardware Failure
** Hardware Failure
Hardware failure is the norm rather than the exception. An HDFS
instance may consist of hundreds or thousands of server machines, each
@ -47,7 +47,7 @@ Hardware Failure
non-functional. Therefore, detection of faults and quick, automatic
recovery from them is a core architectural goal of HDFS.
Streaming Data Access
** Streaming Data Access
Applications that run on HDFS need streaming access to their data sets.
They are not general purpose applications that typically run on general
@ -58,7 +58,7 @@ Streaming Data Access
targeted for HDFS. POSIX semantics in a few key areas has been traded
to increase data throughput rates.
Large Data Sets
** Large Data Sets
Applications that run on HDFS have large data sets. A typical file in
HDFS is gigabytes to terabytes in size. Thus, HDFS is tuned to support
@ -66,7 +66,7 @@ Large Data Sets
to hundreds of nodes in a single cluster. It should support tens of
millions of files in a single instance.
Simple Coherency Model
** Simple Coherency Model
HDFS applications need a write-once-read-many access model for files. A
file once created, written, and closed need not be changed. This
@ -75,7 +75,7 @@ Simple Coherency Model
perfectly with this model. There is a plan to support appending-writes
to files in the future.
“Moving Computation is Cheaper than Moving Data”
** “Moving Computation is Cheaper than Moving Data”
A computation requested by an application is much more efficient if it
is executed near the data it operates on. This is especially true when
@ -86,13 +86,13 @@ Simple Coherency Model
running. HDFS provides interfaces for applications to move themselves
closer to where the data is located.
Portability Across Heterogeneous Hardware and Software Platforms
** Portability Across Heterogeneous Hardware and Software Platforms
HDFS has been designed to be easily portable from one platform to
another. This facilitates widespread adoption of HDFS as a platform of
choice for a large set of applications.
NameNode and DataNodes
* NameNode and DataNodes
HDFS has a master/slave architecture. An HDFS cluster consists of a
single NameNode, a master server that manages the file system namespace
@ -127,7 +127,7 @@ NameNode and DataNodes
repository for all HDFS metadata. The system is designed in such a way
that user data never flows through the NameNode.
The File System Namespace
* The File System Namespace
HDFS supports a traditional hierarchical file organization. A user or
an application can create directories and store files inside these
@ -145,7 +145,7 @@ The File System Namespace
replication factor of that file. This information is stored by the
NameNode.
Data Replication
* Data Replication
HDFS is designed to reliably store very large files across machines in
a large cluster. It stores each file as a sequence of blocks; all
@ -164,7 +164,7 @@ Data Replication
[images/hdfsdatanodes.png] HDFS DataNodes
Replica Placement: The First Baby Steps
** Replica Placement: The First Baby Steps
The placement of replicas is critical to HDFS reliability and
performance. Optimizing replica placement distinguishes HDFS from most
@ -210,7 +210,7 @@ Replica Placement: The First Baby Steps
The current, default replica placement policy described here is a work
in progress.
Replica Selection
** Replica Selection
To minimize global bandwidth consumption and read latency, HDFS tries
to satisfy a read request from a replica that is closest to the reader.
@ -219,7 +219,7 @@ Replica Selection
cluster spans multiple data centers, then a replica that is resident in
the local data center is preferred over any remote replica.
Safemode
** Safemode
On startup, the NameNode enters a special state called Safemode.
Replication of data blocks does not occur when the NameNode is in the
@ -234,7 +234,7 @@ Safemode
blocks (if any) that still have fewer than the specified number of
replicas. The NameNode then replicates these blocks to other DataNodes.
The Persistence of File System Metadata
* The Persistence of File System Metadata
The HDFS namespace is stored by the NameNode. The NameNode uses a
transaction log called the EditLog to persistently record every change
@ -273,7 +273,7 @@ The Persistence of File System Metadata
each of these local files and sends this report to the NameNode: this
is the Blockreport.
The Communication Protocols
* The Communication Protocols
All HDFS communication protocols are layered on top of the TCP/IP
protocol. A client establishes a connection to a configurable TCP port
@ -284,13 +284,13 @@ The Communication Protocols
RPCs. Instead, it only responds to RPC requests issued by DataNodes or
clients.
Robustness
* Robustness
The primary objective of HDFS is to store data reliably even in the
presence of failures. The three common types of failures are NameNode
failures, DataNode failures and network partitions.
Data Disk Failure, Heartbeats and Re-Replication
** Data Disk Failure, Heartbeats and Re-Replication
Each DataNode sends a Heartbeat message to the NameNode periodically. A
network partition can cause a subset of DataNodes to lose connectivity
@ -306,7 +306,7 @@ Data Disk Failure, Heartbeats and Re-Replication
corrupted, a hard disk on a DataNode may fail, or the replication
factor of a file may be increased.
Cluster Rebalancing
** Cluster Rebalancing
The HDFS architecture is compatible with data rebalancing schemes. A
scheme might automatically move data from one DataNode to another if
@ -316,7 +316,7 @@ Cluster Rebalancing
cluster. These types of data rebalancing schemes are not yet
implemented.
Data Integrity
** Data Integrity
It is possible that a block of data fetched from a DataNode arrives
corrupted. This corruption can occur because of faults in a storage
@ -330,7 +330,7 @@ Data Integrity
to retrieve that block from another DataNode that has a replica of that
block.
Metadata Disk Failure
** Metadata Disk Failure
The FsImage and the EditLog are central data structures of HDFS. A
corruption of these files can cause the HDFS instance to be
@ -350,16 +350,16 @@ Metadata Disk Failure
Currently, automatic restart and failover of the NameNode software to
another machine is not supported.
Snapshots
** Snapshots
Snapshots support storing a copy of data at a particular instant of
time. One usage of the snapshot feature may be to roll back a corrupted
HDFS instance to a previously known good point in time. HDFS does not
currently support snapshots but will in a future release.
Data Organization
* Data Organization
Data Blocks
** Data Blocks
HDFS is designed to support very large files. Applications that are
compatible with HDFS are those that deal with large data sets. These
@ -370,7 +370,7 @@ Data Blocks
chunks, and if possible, each chunk will reside on a different
DataNode.
Staging
** Staging
A client request to create a file does not reach the NameNode
immediately. In fact, initially the HDFS client caches the file data
@ -397,7 +397,7 @@ Staging
side caching to improve performance. A POSIX requirement has been
relaxed to achieve higher performance of data uploads.
Replication Pipelining
** Replication Pipelining
When a client is writing data to an HDFS file, its data is first
written to a local file as explained in the previous section. Suppose
@ -406,7 +406,7 @@ Replication Pipelining
DataNodes from the NameNode. This list contains the DataNodes that will
host a replica of that block. The client then flushes the data block to
the first DataNode. The first DataNode starts receiving the data in
small portions (4 KB), writes each portion to its local repository and
small portions, writes each portion to its local repository and
transfers that portion to the second DataNode in the list. The second
DataNode, in turn starts receiving each portion of the data block,
writes that portion to its repository and then flushes that portion to
@ -416,7 +416,7 @@ Replication Pipelining
the next one in the pipeline. Thus, the data is pipelined from one
DataNode to the next.
Accessibility
* Accessibility
HDFS can be accessed from applications in many different ways.
Natively, HDFS provides a
@ -426,7 +426,7 @@ Accessibility
of an HDFS instance. Work is in progress to expose HDFS through the WebDAV
protocol.
FS Shell
** FS Shell
HDFS allows user data to be organized in the form of files and
directories. It provides a commandline interface called FS shell that
@ -447,7 +447,7 @@ FS Shell
FS shell is targeted for applications that need a scripting language to
interact with the stored data.
DFSAdmin
** DFSAdmin
The DFSAdmin command set is used for administering an HDFS cluster.
These are commands that are used only by an HDFS administrator. Here
@ -463,16 +463,16 @@ DFSAdmin
|Recommission or decommission DataNode(s) | <<<bin/hadoop dfsadmin -refreshNodes>>>
*---------+---------+
Browser Interface
** Browser Interface
A typical HDFS install configures a web server to expose the HDFS
namespace through a configurable TCP port. This allows a user to
navigate the HDFS namespace and view the contents of its files using a
web browser.
Space Reclamation
* Space Reclamation
File Deletes and Undeletes
** File Deletes and Undeletes
When a file is deleted by a user or an application, it is not
immediately removed from HDFS. Instead, HDFS first renames it to a file
@ -490,12 +490,12 @@ File Deletes and Undeletes
file. The <<</trash>>> directory contains only the latest copy of the file
that was deleted. The <<</trash>>> directory is just like any other directory
with one special feature: HDFS applies specified policies to
automatically delete files from this directory. The current default
policy is to delete files from <<</trash>>> that are more than 6 hours old.
In the future, this policy will be configurable through a well defined
interface.
automatically delete files from this directory. Current default trash
interval is set to 0 (Deletes file without storing in trash). This value is
configurable parameter stored as <<<fs.trash.interval>>> stored in
core-site.xml.
Decrease Replication Factor
** Decrease Replication Factor
When the replication factor of a file is reduced, the NameNode selects
excess replicas that can be deleted. The next Heartbeat transfers this
@ -505,7 +505,7 @@ Decrease Replication Factor
of the setReplication API call and the appearance of free space in the
cluster.
References
* References
Hadoop {{{http://hadoop.apache.org/docs/current/api/}JavaDoc API}}.

View File

@ -71,7 +71,7 @@ public class TestFSOutputSummer {
cleanupFile(name);
}
/* create a file, write data with vairable amount of data */
/* create a file, write data with variable amount of data */
private void writeFile3(Path name) throws Exception {
FSDataOutputStream stm = fileSys.create(name, true,
fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, 4096),
@ -103,6 +103,8 @@ public class TestFSOutputSummer {
stm.readFully(0, actual);
checkAndEraseData(actual, 0, expected, "Read Sanity Test");
stm.close();
// do a sanity check. Get the file checksum
fileSys.getFileChecksum(name);
}
private void cleanupFile(Path name) throws IOException {
@ -112,13 +114,20 @@ public class TestFSOutputSummer {
}
/**
* Test write opeation for output stream in DFS.
* Test write operation for output stream in DFS.
*/
@Test
public void testFSOutputSummer() throws Exception {
doTestFSOutputSummer("CRC32");
doTestFSOutputSummer("CRC32C");
doTestFSOutputSummer("NULL");
}
private void doTestFSOutputSummer(String checksumType) throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, checksumType);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(NUM_OF_DATANODES)
.build();

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@ -69,20 +68,16 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
/**
* This test simulates a variety of situations when blocks are being
* intentionally corrupted, unexpectedly modified, and so on before a block
* report is happening.
* This is the base class for simulating a variety of situations
* when blocks are being intentionally corrupted, unexpectedly modified,
* and so on before a block report is happening.
*
* For each test case it runs two variations:
* #1 - For a given DN, the first variation sends block reports for all
* storages in a single call to the NN.
* #2 - For a given DN, the second variation sends block reports for each
* storage in a separate call.
*
* The behavior should be the same in either variation.
* By overriding {@link #sendBlockReports}, derived classes can test
* different variations of how block reports are split across storages
* and messages.
*/
public class TestBlockReport {
public static final Log LOG = LogFactory.getLog(TestBlockReport.class);
public abstract class BlockReportTestBase {
public static final Log LOG = LogFactory.getLog(BlockReportTestBase.class);
private static short REPL_FACTOR = 1;
private static final int RAND_LIMIT = 2000;
@ -91,12 +86,11 @@ public class TestBlockReport {
private static final int DN_N0 = 0;
private static final int FILE_START = 0;
static final int BLOCK_SIZE = 1024;
static final int NUM_BLOCKS = 10;
static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
static String bpid;
private static final int BLOCK_SIZE = 1024;
private static final int NUM_BLOCKS = 10;
private static final int FILE_SIZE = NUM_BLOCKS * BLOCK_SIZE + 1;
private MiniDFSCluster cluster;
protected MiniDFSCluster cluster;
private DistributedFileSystem fs;
private static Random rand = new Random(RAND_LIMIT);
@ -112,8 +106,7 @@ public class TestBlockReport {
public void startUpCluster() throws IOException {
REPL_FACTOR = 1; //Reset if case a test has modified the value
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPL_FACTOR).build();
fs = (DistributedFileSystem) cluster.getFileSystem();
bpid = cluster.getNamesystem().getBlockPoolId();
fs = cluster.getFileSystem();
}
@After
@ -123,6 +116,15 @@ public class TestBlockReport {
cluster.shutdown();
}
protected static void resetConfiguration() {
conf = new Configuration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
}
// Generate a block report, optionally corrupting the generation
// stamp and/or length of one block.
private static StorageBlockReport[] getBlockReports(
@ -172,106 +174,11 @@ public class TestBlockReport {
* @param dnR
* @param poolId
* @param reports
* @param needtoSplit
* @throws IOException
*/
private void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports, boolean needtoSplit) throws IOException {
if (!needtoSplit) {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
} else {
for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
}
}
}
protected abstract void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException;
/**
* Test variations blockReport_01 through blockReport_09 with combined
* and split block reports.
*/
@Test
public void blockReportCombined_01() throws IOException {
blockReport_01(false);
}
@Test
public void blockReportSplit_01() throws IOException {
blockReport_01(true);
}
@Test
public void blockReportCombined_02() throws IOException {
blockReport_02(false);
}
@Test
public void blockReportSplit_02() throws IOException {
blockReport_02(true);
}
@Test
public void blockReportCombined_03() throws IOException {
blockReport_03(false);
}
@Test
public void blockReportSplit_03() throws IOException {
blockReport_03(true);
}
@Test
public void blockReportCombined_04() throws IOException {
blockReport_04(false);
}
@Test
public void blockReportSplit_04() throws IOException {
blockReport_04(true);
}
@Test
public void blockReportCombined_06() throws Exception {
blockReport_06(false);
}
@Test
public void blockReportSplit_06() throws Exception {
blockReport_06(true);
}
@Test
public void blockReportCombined_07() throws Exception {
blockReport_07(false);
}
@Test
public void blockReportSplit_07() throws Exception {
blockReport_07(true);
}
@Test
public void blockReportCombined_08() throws Exception {
blockReport_08(false);
}
@Test
public void blockReportSplit_08() throws Exception {
blockReport_08(true);
}
@Test
public void blockReportCombined_09() throws Exception {
blockReport_09(false);
}
@Test
public void blockReportSplit_09() throws Exception {
blockReport_09(true);
}
/**
* Test write a file, verifies and closes it. Then the length of the blocks
* are messed up and BlockReport is forced.
@ -279,7 +186,8 @@ public class TestBlockReport {
*
* @throws java.io.IOException on an error
*/
private void blockReport_01(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_01() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
@ -312,7 +220,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath));
@ -338,7 +246,8 @@ public class TestBlockReport {
*
* @throws IOException in case of errors
*/
private void blockReport_02(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_02() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
LOG.info("Running test " + METHOD_NAME);
@ -393,7 +302,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager());
@ -414,7 +323,8 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
private void blockReport_03(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_03() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
ArrayList<Block> blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath);
@ -424,7 +334,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
@ -441,7 +351,8 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
private void blockReport_04(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_04() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
DFSTestUtil.createFile(fs, filePath,
@ -459,7 +370,7 @@ public class TestBlockReport {
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
@ -476,7 +387,8 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
private void blockReport_06(boolean splitBlockReports) throws Exception {
@Test(timeout=300000)
public void blockReport_06() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@ -489,7 +401,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertEquals("Wrong number of PendingReplication Blocks",
0, cluster.getNamesystem().getUnderReplicatedBlocks());
@ -508,7 +420,8 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
private void blockReport_07(boolean splitBlockReports) throws Exception {
@Test(timeout=300000)
public void blockReport_07() throws Exception {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@ -522,7 +435,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
@ -533,7 +446,7 @@ public class TestBlockReport {
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
reports = getBlockReports(dn, poolId, true, true);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertThat("Wrong number of corrupt blocks",
@ -559,7 +472,8 @@ public class TestBlockReport {
*
* @throws IOException in case of an error
*/
private void blockReport_08(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_08() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@ -584,7 +498,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks());
@ -600,7 +514,8 @@ public class TestBlockReport {
// Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's
// replica block. Expect the same behaviour: NN should simply ignore this
// block
private void blockReport_09(boolean splitBlockReports) throws IOException {
@Test(timeout=300000)
public void blockReport_09() throws IOException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path filePath = new Path("/" + METHOD_NAME + ".dat");
final int DN_N1 = DN_N0 + 1;
@ -626,7 +541,7 @@ public class TestBlockReport {
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true);
sendBlockReports(dnR, poolId, reports, splitBlockReports);
sendBlockReports(dnR, poolId, reports);
printStats();
assertEquals("Wrong number of PendingReplication blocks",
2, cluster.getNamesystem().getPendingReplicationBlocks());
@ -648,7 +563,7 @@ public class TestBlockReport {
* corrupt.
* This is a regression test for HDFS-2791.
*/
@Test
@Test(timeout=300000)
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
final CountDownLatch brFinished = new CountDownLatch(1);
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
@ -898,7 +813,7 @@ public class TestBlockReport {
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL);
((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) TestBlockReport.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger) BlockReportTestBase.LOG).getLogger().setLevel(Level.ALL);
}
private Block findBlock(Path path, long size) throws IOException {
@ -933,13 +848,4 @@ public class TestBlockReport {
}
}
}
private static void resetConfiguration() {
conf = new Configuration();
int customPerChecksumSize = 512;
int customBlockSize = customPerChecksumSize * 3;
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
conf.setLong(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DN_RESCAN_INTERVAL);
}
}

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -196,4 +200,16 @@ public class TestDiskError {
}
}
}
@Test
public void testNetworkErrorsIgnored() {
DataNode dn = cluster.getDataNodes().iterator().next();
assertTrue(dn.isNetworkRelatedException(new SocketException()));
assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException()));
assertTrue(dn.isNetworkRelatedException(new ClosedChannelException()));
assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar")));
assertFalse(dn.isNetworkRelatedException(new Exception()));
assertFalse(dn.isNetworkRelatedException(new Exception("random problem")));
}
}

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.times;
/**
* Tests that the DataNode respects
* {@link DFSConfigKeys#DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY}
*/
public class TestDnRespectsBlockReportSplitThreshold {
public static final Log LOG = LogFactory.getLog(TestStorageReport.class);
private static final int BLOCK_SIZE = 1024;
private static final short REPL_FACTOR = 1;
private static final long seed = 0xFEEDFACE;
private static final int BLOCKS_IN_FILE = 5;
private static Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
static String bpid;
public void startUpCluster(long splitThreshold) throws IOException {
conf = new HdfsConfiguration();
conf.setLong(DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY, splitThreshold);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(REPL_FACTOR)
.build();
fs = cluster.getFileSystem();
bpid = cluster.getNamesystem().getBlockPoolId();
}
@After
public void shutDownCluster() throws IOException {
if (cluster != null) {
fs.close();
cluster.shutdown();
cluster = null;
}
}
private void createFile(String filenamePrefix, int blockCount)
throws IOException {
Path path = new Path("/" + filenamePrefix + ".dat");
DFSTestUtil.createFile(fs, path, BLOCK_SIZE,
blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
}
private void verifyCapturedArguments(
ArgumentCaptor<StorageBlockReport[]> captor,
int expectedReportsPerCall,
int expectedTotalBlockCount) {
List<StorageBlockReport[]> listOfReports = captor.getAllValues();
int numBlocksReported = 0;
for (StorageBlockReport[] reports : listOfReports) {
assertThat(reports.length, is(expectedReportsPerCall));
for (StorageBlockReport report : reports) {
BlockListAsLongs blockList = new BlockListAsLongs(report.getBlocks());
numBlocksReported += blockList.getNumberOfBlocks();
}
}
assert(numBlocksReported >= expectedTotalBlockCount);
}
/**
* Test that if splitThreshold is zero, then we always get a separate
* call per storage.
*/
@Test(timeout=300000)
public void testAlwaysSplit() throws IOException, InterruptedException {
startUpCluster(0);
NameNode nn = cluster.getNameNode();
DataNode dn = cluster.getDataNodes().get(0);
// Create a file with a few blocks.
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
// Insert a spy object for the NN RPC.
DatanodeProtocolClientSideTranslatorPB nnSpy =
DataNodeTestUtils.spyOnBposToNN(dn, nn);
// Trigger a block report so there is an interaction with the spy
// object.
DataNodeTestUtils.triggerBlockReport(dn);
ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
/**
* Tests the behavior when the count of blocks is exactly one less than
* the threshold.
*/
@Test(timeout=300000)
public void testCornerCaseUnderThreshold() throws IOException, InterruptedException {
startUpCluster(BLOCKS_IN_FILE + 1);
NameNode nn = cluster.getNameNode();
DataNode dn = cluster.getDataNodes().get(0);
// Create a file with a few blocks.
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
// Insert a spy object for the NN RPC.
DatanodeProtocolClientSideTranslatorPB nnSpy =
DataNodeTestUtils.spyOnBposToNN(dn, nn);
// Trigger a block report so there is an interaction with the spy
// object.
DataNodeTestUtils.triggerBlockReport(dn);
ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
Mockito.verify(nnSpy, times(1)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
verifyCapturedArguments(captor, MiniDFSCluster.DIRS_PER_DATANODE, BLOCKS_IN_FILE);
}
/**
* Tests the behavior when the count of blocks is exactly equal to the
* threshold.
*/
@Test(timeout=300000)
public void testCornerCaseAtThreshold() throws IOException, InterruptedException {
startUpCluster(BLOCKS_IN_FILE);
NameNode nn = cluster.getNameNode();
DataNode dn = cluster.getDataNodes().get(0);
// Create a file with a few blocks.
createFile(GenericTestUtils.getMethodName(), BLOCKS_IN_FILE);
// Insert a spy object for the NN RPC.
DatanodeProtocolClientSideTranslatorPB nnSpy =
DataNodeTestUtils.spyOnBposToNN(dn, nn);
// Trigger a block report so there is an interaction with the spy
// object.
DataNodeTestUtils.triggerBlockReport(dn);
ArgumentCaptor<StorageBlockReport[]> captor =
ArgumentCaptor.forClass(StorageBlockReport[].class);
Mockito.verify(nnSpy, times(MiniDFSCluster.DIRS_PER_DATANODE)).blockReport(
any(DatanodeRegistration.class),
anyString(),
captor.capture());
verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
/**
* Runs all tests in BlockReportTestBase, sending one block per storage.
* This is the default DataNode behavior post HDFS-2832.
*/
public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
@Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
for (StorageBlockReport report : reports) {
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
}
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
/**
* Runs all tests in BlockReportTestBase, sending one block report
* per DataNode. This tests that the NN can handle the legacy DN
* behavior where it presents itself as a single logical storage.
*/
public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
@Override
protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
}
}

View File

@ -82,9 +82,6 @@ Trunk (Unreleased)
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
(Yu Gao via llu)
MAPREDUCE-3194. "mapred mradmin" command is broken in mrv2
(Jason Lowe via bobby)
@ -130,15 +127,9 @@ Trunk (Unreleased)
MAPREDUCE-4574. Fix TotalOrderParitioner to work with
non-WritableComparable key types. (harsh)
MAPREDUCE-4884. Streaming tests fail to start MiniMRCluster due to missing
queue configuration. (Chris Nauroth via suresh)
MAPREDUCE-5012. Typo in javadoc for IdentityMapper class. (Adam Monsen
via suresh)
MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris
Nauroth via bikas)
MAPREDUCE-4987. TestMRJobs#testDistributedCache fails on Windows due to
classpath problems and unexpected behavior of symlinks (Chris Nauroth via
bikas)
@ -152,6 +143,24 @@ Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza)
MAPREDUCE-5732. Report proper queue when job has been automatically placed
(Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
MAPREDUCE-5265. History server admin service to refresh user and superuser
@ -170,6 +179,19 @@ Release 2.4.0 - UNRELEASED
IMPROVEMENTS
MAPREDUCE-5329. Allow MR applications to use additional AuxServices,
which are compatible with the default MapReduce shuffle.
(Avner BenHanoch via sseth)
MAPREDUCE-5463. Deprecate SLOTS_MILLIS counters (Tzuyoshi Ozawa via Sandy
Ryza)
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
out text files without separators (Sandy Ryza)
MAPREDUCE-5596. Allow configuring the number of threads used to serve
shuffle connections (Sandy Ryza via jlowe)
MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and
Aaron Kimball via Sandy Ryza)
@ -206,14 +228,11 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
Scheduler (Sandy Ryza)
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
YARN resource model (Sandy Ryza)
MAPREDUCE-5732. Report proper queue when job has been automatically placed
(Sandy Ryza)
OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
old enough directories (Robert Kanter via Sandy Ryza)
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
MAPREDUCE-5487. In task processes, JobConf is unnecessarily loaded again
@ -224,6 +243,37 @@ Release 2.4.0 - UNRELEASED
BUG FIXES
MAPREDUCE-5569. FloatSplitter is not generating correct splits (Nathan
Roberts via jlowe)
MAPREDUCE-5546. mapred.cmd on Windows set HADOOP_OPTS incorrectly (Chuan Liu
via cnauroth)
MAPREDUCE-5518. Fixed typo "can't read paritions file". (Albert Chu
via devaraj)
MAPREDUCE-5561. org.apache.hadoop.mapreduce.v2.app.job.impl.TestJobImpl
testcase failing on trunk (Karthik Kambatla via jlowe)
MAPREDUCE-5598. TestUserDefinedCounters.testMapReduceJob is flakey
(Robert Kanter via jlowe)
MAPREDUCE-5604. TestMRAMWithNonNormalizedCapabilities fails on Windows due to
exceeding max path length. (cnauroth)
MAPREDUCE-5451. MR uses LD_LIBRARY_PATH which doesn't mean anything in
Windows. (Yingda Chen via cnauroth)
MAPREDUCE-5409. MRAppMaster throws InvalidStateTransitonException: Invalid
event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera
Shegalov via jlowe)
MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
(Chuan Liu via cnauroth)
MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
is specified (Gera Shegalov via Sandy Ryza)
MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal
task-state (Ashwin Shankar via jlowe)
@ -297,65 +347,6 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5723. MR AM container log can be truncated or empty.
(Mohammad Kamrul Islam via kasha)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
MAPREDUCE-5329. Allow MR applications to use additional AuxServices,
which are compatible with the default MapReduce shuffle.
(Avner BenHanoch via sseth)
MAPREDUCE-5463. Deprecate SLOTS_MILLIS counters (Tzuyoshi Ozawa via Sandy
Ryza)
MAPREDUCE-5457. Add a KeyOnlyTextOutputReader to enable streaming to write
out text files without separators (Sandy Ryza)
MAPREDUCE-5596. Allow configuring the number of threads used to serve
shuffle connections (Sandy Ryza via jlowe)
OPTIMIZATIONS
MAPREDUCE-4680. Job history cleaner should only check timestamps of files in
old enough directories (Robert Kanter via Sandy Ryza)
BUG FIXES
MAPREDUCE-5569. FloatSplitter is not generating correct splits (Nathan
Roberts via jlowe)
MAPREDUCE-5546. mapred.cmd on Windows set HADOOP_OPTS incorrectly (Chuan Liu
via cnauroth)
MAPREDUCE-5518. Fixed typo "can't read paritions file". (Albert Chu
via devaraj)
MAPREDUCE-5561. org.apache.hadoop.mapreduce.v2.app.job.impl.TestJobImpl
testcase failing on trunk (Karthik Kambatla via jlowe)
MAPREDUCE-5598. TestUserDefinedCounters.testMapReduceJob is flakey
(Robert Kanter via jlowe)
MAPREDUCE-5604. TestMRAMWithNonNormalizedCapabilities fails on Windows due to
exceeding max path length. (cnauroth)
MAPREDUCE-5451. MR uses LD_LIBRARY_PATH which doesn't mean anything in
Windows. (Yingda Chen via cnauroth)
MAPREDUCE-5409. MRAppMaster throws InvalidStateTransitonException: Invalid
event: TA_TOO_MANY_FETCH_FAILURE at KILLED for TaskAttemptImpl (Gera
Shegalov via jlowe)
MAPREDUCE-5674. Missing start and finish time in mapred.JobStatus.
(Chuan Liu via cnauroth)
MAPREDUCE-5650. Job fails when hprof mapreduce.task.profile.map/reduce.params
is specified (Gera Shegalov via Sandy Ryza)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
@ -1002,9 +993,15 @@ Release 2.1.0-beta - 2013-08-22
HADOOP-9372. Fix bad timeout annotations on tests.
(Arpit Agarwal via suresh)
MAPREDUCE-4885. Streaming tests have multiple failures on Windows. (Chris
Nauroth via bikas)
MAPREDUCE-5177. Use common utils FileUtil#setReadable/Writable/Executable &
FileUtil#canRead/Write/Execute. (Ivan Mitic via suresh)
MAPREDUCE-5349. TestClusterMapReduceTestCase and TestJobName fail on Windows
in branch-2. (Chuan Liu via cnauroth)
MAPREDUCE-5355. MiniMRYarnCluster with localFs does not work on Windows.
(Chuan Liu via cnauroth)
@ -1155,6 +1152,9 @@ Release 2.0.3-alpha - 2013-02-06
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
(Yu Gao via llu)
MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task
being incorrectly failed. (Bikas Saha via tomwhite)
@ -1217,6 +1217,9 @@ Release 2.0.3-alpha - 2013-02-06
MAPREDUCE-4969. TestKeyValueTextInputFormat test fails with Open JDK 7.
(Arpit Agarwal via suresh)
MAPREDUCE-4884. Streaming tests fail to start MiniMRCluster due to missing
queue configuration. (Chris Nauroth via suresh)
MAPREDUCE-4953. HadoopPipes misuses fprintf. (Andy Isaacson via atm)
Release 2.0.2-alpha - 2012-09-07

View File

@ -37,15 +37,16 @@ public enum DistCpOptionSwitch {
/**
* Preserves status of file/path in the target.
* Default behavior with -p, is to preserve replication,
* block size, user, group and permission on the target file
* block size, user, group, permission and checksum type on the target file.
* Note that when preserving checksum type, block size is also preserved.
*
* If any of the optional switches are present among rbugp, then
* only the corresponding file attribute is preserved
* If any of the optional switches are present among rbugpc, then
* only the corresponding file attribute is preserved.
*
*/
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
new Option("p", true, "preserve status (rbugp)" +
"(replication, block-size, user, group, permission)")),
new Option("p", true, "preserve status (rbugpc)" +
"(replication, block-size, user, group, permission, checksum-type)")),
/**
* Update target location by copying only files that are missing

View File

@ -61,7 +61,7 @@ public class DistCpOptions {
private Path targetPath;
public static enum FileAttribute{
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
public static FileAttribute getAttribute(char symbol) {
for (FileAttribute attribute : values()) {

View File

@ -50,7 +50,7 @@ public class OptionsParser {
protected String[] flatten(Options options, String[] arguments, boolean stopAtNonOption) {
for (int index = 0; index < arguments.length; index++) {
if (arguments[index].equals("-" + DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
arguments[index] = "-prbugp";
arguments[index] = "-prbugpc";
}
}
return super.flatten(options, arguments, stopAtNonOption);

View File

@ -322,7 +322,7 @@ public class CopyMapper extends Mapper<Text, FileStatus, Text, Text> {
targetFileStatus.getLen() != source.getLen()
|| (!skipCrc &&
!DistCpUtils.checksumsAreEqual(sourceFS,
source.getPath(), targetFS, target))
source.getPath(), null, targetFS, target))
|| (source.getBlockSize() != targetFileStatus.getBlockSize() &&
preserve.contains(FileAttribute.BLOCKSIZE))
);

View File

@ -18,23 +18,33 @@
package org.apache.hadoop.tools.mapred;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.DistCpOptions.*;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.RetriableCommand;
import org.apache.hadoop.tools.util.ThrottledInputStream;
import com.google.common.annotations.VisibleForTesting;
import java.io.*;
import java.util.EnumSet;
/**
* This class extends RetriableCommand to implement the copy of files,
* with retries on failure.
@ -99,15 +109,21 @@ public class RetriableFileCopyCommand extends RetriableCommand {
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
LOG.debug("Tmp-file path: " + tmpTargetPath);
}
FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
configuration);
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
context, fileAttributes);
final Path sourcePath = sourceFileStatus.getPath();
final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
final FileChecksum sourceChecksum = fileAttributes
.contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
.getFileChecksum(sourcePath) : null;
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration, bytesRead);
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
context, fileAttributes, sourceChecksum);
compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
bytesRead);
//At this point, src&dest lengths are same. if length==0, we skip checksum
if ((bytesRead != 0) && (!skipCrc)) {
compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS, tmpTargetPath);
compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
targetFS, tmpTargetPath);
}
promoteTmpToTarget(tmpTargetPath, target, targetFS);
return bytesRead;
@ -118,14 +134,33 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
}
/**
* @return the checksum spec of the source checksum if checksum type should be
* preserved
*/
private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
FileChecksum sourceChecksum) {
if (fileAttributes.contains(FileAttribute.CHECKSUMTYPE)
&& sourceChecksum != null) {
return sourceChecksum.getChecksumOpt();
}
return null;
}
private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
FileStatus sourceFileStatus, Mapper.Context context,
EnumSet<FileAttribute> fileAttributes)
EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
throws IOException {
OutputStream outStream = new BufferedOutputStream(targetFS.create(
tmpTargetPath, true, BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath),
getBlockSize(fileAttributes, sourceFileStatus, targetFS, tmpTargetPath), context));
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(targetFS.getConf()));
OutputStream outStream = new BufferedOutputStream(
targetFS.create(tmpTargetPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath),
getBlockSize(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath),
context, getChecksumOpt(fileAttributes, sourceChecksum)));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
}
@ -140,9 +175,10 @@ public class RetriableFileCopyCommand extends RetriableCommand {
}
private void compareCheckSums(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target)
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) {
if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() != targetFS.getFileStatus(target).getBlockSize()) {
@ -249,11 +285,18 @@ public class RetriableFileCopyCommand extends RetriableCommand {
sourceFile.getReplication() : targetFS.getDefaultReplication(tmpTargetPath);
}
/**
* @return the block size of the source file if we need to preserve either
* the block size or the checksum type. Otherwise the default block
* size of the target FS.
*/
private static long getBlockSize(
EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
sourceFile.getBlockSize() : targetFS.getDefaultBlockSize(tmpTargetPath);
boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
|| fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
return preserve ? sourceFile.getBlockSize() : targetFS
.getDefaultBlockSize(tmpTargetPath);
}
/**

View File

@ -280,6 +280,8 @@ public class DistCpUtils {
*
* @param sourceFS FileSystem for the source path.
* @param source The source path.
* @param sourceChecksum The checksum of the source file. If it is null we
* still need to retrieve it through sourceFS.
* @param targetFS FileSystem for the target path.
* @param target The target path.
* @return If either checksum couldn't be retrieved, the function returns
@ -288,12 +290,12 @@ public class DistCpUtils {
* @throws IOException if there's an exception while retrieving checksums.
*/
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
FileSystem targetFS, Path target)
FileChecksum sourceChecksum, FileSystem targetFS, Path target)
throws IOException {
FileChecksum sourceChecksum = null;
FileChecksum targetChecksum = null;
try {
sourceChecksum = sourceFS.getFileChecksum(source);
sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS
.getFileChecksum(source);
targetChecksum = targetFS.getFileChecksum(target);
} catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);

View File

@ -397,6 +397,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@ -408,6 +409,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@ -418,6 +420,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbr",
@ -429,6 +432,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbrgup",
@ -440,6 +444,31 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbrgupc",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pc",
"-f",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@ -452,7 +481,7 @@ public class TestOptionsParser {
attribIterator.next();
i++;
}
Assert.assertEquals(i, 5);
Assert.assertEquals(i, 6);
try {
OptionsParser.parse(new String[] {

View File

@ -18,29 +18,6 @@
package org.apache.hadoop.tools.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@ -49,11 +26,38 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptionSwitch;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>();
private static int nFiles = 0;
private static final int DEFAULT_FILE_SIZE = 1024;
private static final long NON_DEFAULT_BLOCK_SIZE = 4096;
private static MiniDFSCluster cluster;
@ -119,12 +123,27 @@ public class TestCopyMapper {
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6", true);
touchFile(SOURCE_PATH + "/5/6", true, null);
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9");
}
private static void createSourceDataWithDifferentChecksumType()
throws Exception {
mkdirs(SOURCE_PATH + "/1");
mkdirs(SOURCE_PATH + "/2");
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
touchFile(SOURCE_PATH + "/5/6", new ChecksumOpt(DataChecksum.Type.CRC32,
512));
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9", new ChecksumOpt(DataChecksum.Type.CRC32C,
512));
}
private static void mkdirs(String path) throws Exception {
FileSystem fileSystem = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fileSystem.getUri(),
@ -134,21 +153,31 @@ public class TestCopyMapper {
}
private static void touchFile(String path) throws Exception {
touchFile(path, false);
touchFile(path, false, null);
}
private static void touchFile(String path, boolean createMultipleBlocks) throws Exception {
final long NON_DEFAULT_BLOCK_SIZE = 4096;
private static void touchFile(String path, ChecksumOpt checksumOpt)
throws Exception {
// create files with specific checksum opt and non-default block size
touchFile(path, true, checksumOpt);
}
private static void touchFile(String path, boolean createMultipleBlocks,
ChecksumOpt checksumOpt) throws Exception {
FileSystem fs;
DataOutputStream outputStream = null;
try {
fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
fs.getWorkingDirectory());
final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE : fs.getDefaultBlockSize(qualifiedPath) * 2;
outputStream = fs.create(qualifiedPath, true, 0,
(short)(fs.getDefaultReplication(qualifiedPath)*2),
blockSize);
final long blockSize = createMultipleBlocks ? NON_DEFAULT_BLOCK_SIZE : fs
.getDefaultBlockSize(qualifiedPath) * 2;
FsPermission permission = FsPermission.getFileDefault().applyUMask(
FsPermission.getUMask(fs.getConf()));
outputStream = fs.create(qualifiedPath, permission,
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 0,
(short) (fs.getDefaultReplication(qualifiedPath) * 2), blockSize,
null, checksumOpt);
byte[] bytes = new byte[DEFAULT_FILE_SIZE];
outputStream.write(bytes);
long fileSize = DEFAULT_FILE_SIZE;
@ -171,17 +200,40 @@ public class TestCopyMapper {
}
}
@Test
public void testCopyWithDifferentChecksumType() throws Exception {
testCopy(true);
}
@Test(timeout=40000)
public void testRun() {
testCopy(false);
}
private void testCopy(boolean preserveChecksum) {
try {
deleteState();
if (preserveChecksum) {
createSourceDataWithDifferentChecksumType();
} else {
createSourceData();
}
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
Configuration configuration = context.getConfiguration();
EnumSet<DistCpOptions.FileAttribute> fileAttributes
= EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
if (preserveChecksum) {
fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
}
configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
DistCpUtils.packAttributes(fileAttributes));
copyMapper.setup(context);
for (Path path: pathList) {
@ -195,19 +247,29 @@ public class TestCopyMapper {
.replaceAll(SOURCE_PATH, TARGET_PATH));
Assert.assertTrue(fs.exists(targetPath));
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
Assert.assertEquals(fs.getFileStatus(path).getReplication(),
fs.getFileStatus(targetPath).getReplication());
Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
fs.getFileStatus(targetPath).getBlockSize());
Assert.assertTrue(!fs.isFile(targetPath) ||
fs.getFileChecksum(targetPath).equals(
fs.getFileChecksum(path)));
FileStatus sourceStatus = fs.getFileStatus(path);
FileStatus targetStatus = fs.getFileStatus(targetPath);
Assert.assertEquals(sourceStatus.getReplication(),
targetStatus.getReplication());
if (preserveChecksum) {
Assert.assertEquals(sourceStatus.getBlockSize(),
targetStatus.getBlockSize());
}
Assert.assertTrue(!fs.isFile(targetPath)
|| fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
}
Assert.assertEquals(pathList.size(),
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
if (!preserveChecksum) {
Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
} else {
Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
.getValue());
}
testCopyingExistingFiles(fs, copyMapper, context);
for (Text value : stubContext.getWriter().values()) {

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,24 @@ Trunk - Unreleased
YARN-1496. Protocol additions to allow moving apps between queues (Sandy
Ryza)
YARN-1498. Common scheduler changes for moving apps between queues (Sandy
Ryza)
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel)
Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
YARN-930. Bootstrapping ApplicationHistoryService module. (vinodkv)
YARN-947. Implementing the data objects to be used by the History reader
@ -70,13 +88,16 @@ Trunk - Unreleased
YARN-987. Added ApplicationHistoryManager responsible for exposing reports to
all clients. (Mayank Bansal via vinodkv)
YARN-1630. Introduce timeout for async polling operations in YarnClientImpl
(Aditya Acharya via Sandy Ryza)
YARN-1617. Remove ancient comment and surround LOG.debug in
AppSchedulingInfo.allocate (Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
YARN-524 TestYarnVersionInfo failing if generated properties doesn't
include an SVN URL. (stevel)
YARN-935. Correcting pom.xml to build applicationhistoryserver module
successfully. (Zhijie Shen via vinodkv)
@ -112,7 +133,17 @@ Trunk - Unreleased
YARN-1613. Fixed the typo with the configuration name
YARN_HISTORY_SERVICE_ENABLED. (Akira Ajisaka via vinodkv)
Release 2.4.0 - UNRELEASED
YARN-1618. Fix invalid RMApp transition from NEW to FINAL_SAVING (kasha)
YARN-1600. RM does not startup when security is enabled without spnego
configured (Haohui Mai via jlowe)
YARN-1642. RMDTRenewer#getRMClient should use ClientRMProxy (kasha)
YARN-1632. TestApplicationMasterServices should be under
org.apache.hadoop.yarn.server.resourcemanager package (Chen He via jeagles)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -169,6 +200,30 @@ Release 2.4.0 - UNRELEASED
IMPROVEMENTS
YARN-305. Fair scheduler logs too many "Node offered to app" messages.
(Lohit Vijayarenu via Sandy Ryza)
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
YARN-1288. Make Fair Scheduler ACLs more user friendly (Sandy Ryza)
YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza)
YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
into SchedulerApplication (Sandy Ryza)
YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via
Sandy Ryza)
YARN-1109. Demote NodeManager "Sending out status for container" logs to
debug (haosdent via Sandy Ryza)
YARN-1321. Changed NMTokenCache to support both singleton and an instance
usage. (Alejandro Abdelnur via vinodkv)
YARN-1388. Fair Scheduler page always displays blank fair share (Liyin Liang
via Sandy Ryza)
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
@ -334,13 +389,66 @@ Release 2.4.0 - UNRELEASED
YARN-1573. ZK store should use a private password for root-node-acls.
(kasha).
YARN-1630. Introduce timeout for async polling operations in YarnClientImpl
(Aditya Acharya via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
YARN-1284. LCE: Race condition leaves dangling cgroups entries for killed
containers. (Alejandro Abdelnur via Sandy Ryza)
YARN-1283. Fixed RM to give a fully-qualified proxy URL for an application
so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via
vinodkv)
YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application.
(Junping Du via devaraj)
YARN-1265. Fair Scheduler chokes on unhealthy node reconnect (Sandy Ryza)
YARN-1044. used/min/max resources do not display info in the scheduler page
(Sangjin Lee via Sandy Ryza)
YARN-1259. In Fair Scheduler web UI, queue num pending and num active apps
switched. (Robert Kanter via Sandy Ryza)
YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
file busy errors (Sandy Ryza)
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv)
YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin or
logs. (cnauroth)
YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take effect
(Sandy Ryza)
YARN-1022. Unnecessary INFO logs in AMRMClientAsync (haosdent via bikas)
YARN-1349. yarn.cmd does not support passthrough to any arbitrary class.
(cnauroth)
YARN-1357. TestContainerLaunch.testContainerEnvVariables fails on Windows.
(Chuan Liu via cnauroth)
YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via
cnauroth)
YARN-1343. NodeManagers additions/restarts are not reported as node updates
in AllocateResponse responses to AMs. (tucu)
YARN-1381. Same relaxLocality appears twice in exception message of
AMRMClientImpl#checkLocalityRelaxationConflict() (Ted Yu via Sandy Ryza)
YARN-1407. RM Web UI and REST APIs should uniformly use
YarnApplicationState (Sandy Ryza)
YARN-1438. Ensure container diagnostics includes exception from container
launch. (stevel via acmurthy)
YARN-1138. yarn.application.classpath is set to point to $HADOOP_CONF_DIR
etc., which does not work on Windows. (Chuan Liu via cnauroth)
YARN-461. Fair scheduler should not accept apps with empty string queue name.
(ywskycn via tucu)
@ -466,107 +574,8 @@ Release 2.4.0 - UNRELEASED
YARN-1575. Public localizer crashes with "Localized unkown resource"
(jlowe)
YARN-1642. RMDTRenewer#getRMClient should use ClientRMProxy (kasha)
YARN-1629. IndexOutOfBoundsException in MaxRunningAppsEnforcer (Sandy Ryza)
YARN-1618. Fix invalid RMApp transition from NEW to FINAL_SAVING (kasha)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
YARN-305. Fair scheduler logs too many "Node offered to app" messages.
(Lohit Vijayarenu via Sandy Ryza)
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
YARN-1288. Make Fair Scheduler ACLs more user friendly (Sandy Ryza)
YARN-1315. TestQueueACLs should also test FairScheduler (Sandy Ryza)
YARN-1335. Move duplicate code from FSSchedulerApp and FiCaSchedulerApp
into SchedulerApplication (Sandy Ryza)
YARN-1333. Support blacklisting in the Fair Scheduler (Tsuyoshi Ozawa via
Sandy Ryza)
YARN-1109. Demote NodeManager "Sending out status for container" logs to
debug (haosdent via Sandy Ryza)
YARN-1321. Changed NMTokenCache to support both singleton and an instance
usage. (Alejandro Abdelnur via vinodkv)
YARN-1388. Fair Scheduler page always displays blank fair share (Liyin Liang
via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES
YARN-1284. LCE: Race condition leaves dangling cgroups entries for killed
containers. (Alejandro Abdelnur via Sandy Ryza)
YARN-1283. Fixed RM to give a fully-qualified proxy URL for an application
so that clients don't need to do scheme-mangling. (Omkar Vinit Joshi via
vinodkv)
YARN-879. Fixed tests w.r.t o.a.h.y.server.resourcemanager.Application.
(Junping Du via devaraj)
YARN-1265. Fair Scheduler chokes on unhealthy node reconnect (Sandy Ryza)
YARN-1044. used/min/max resources do not display info in the scheduler page
(Sangjin Lee via Sandy Ryza)
YARN-1259. In Fair Scheduler web UI, queue num pending and num active apps
switched. (Robert Kanter via Sandy Ryza)
YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
file busy errors (Sandy Ryza)
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv)
YARN-1331. yarn.cmd exits with NoClassDefFoundError trying to run rmadmin or
logs. (cnauroth)
YARN-1330. Fair Scheduler: defaultQueueSchedulingPolicy does not take effect
(Sandy Ryza)
YARN-1022. Unnecessary INFO logs in AMRMClientAsync (haosdent via bikas)
YARN-1349. yarn.cmd does not support passthrough to any arbitrary class.
(cnauroth)
YARN-1357. TestContainerLaunch.testContainerEnvVariables fails on Windows.
(Chuan Liu via cnauroth)
YARN-1358. TestYarnCLI fails on Windows due to line endings. (Chuan Liu via
cnauroth)
YARN-1343. NodeManagers additions/restarts are not reported as node updates
in AllocateResponse responses to AMs. (tucu)
YARN-1381. Same relaxLocality appears twice in exception message of
AMRMClientImpl#checkLocalityRelaxationConflict() (Ted Yu via Sandy Ryza)
YARN-1407. RM Web UI and REST APIs should uniformly use
YarnApplicationState (Sandy Ryza)
YARN-1438. Ensure container diagnostics includes exception from container
launch. (stevel via acmurthy)
YARN-1138. yarn.application.classpath is set to point to $HADOOP_CONF_DIR
etc., which does not work on Windows. (Chuan Liu via cnauroth)
YARN-1600. RM does not startup when security is enabled without spnego
configured (Haohui Mai via jlowe)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -64,7 +64,7 @@ public class AppSchedulingInfo {
private Set<String> blacklist = new HashSet<String>();
//private final ApplicationStore store;
private final ActiveUsersManager activeUsersManager;
private ActiveUsersManager activeUsersManager;
/* Allocated by scheduler */
boolean pending = true; // for app metrics
@ -171,11 +171,10 @@ public class AppSchedulingInfo {
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
metrics.incrPendingResources(user, request.getNumContainers()
- lastRequestContainers, Resources.subtractFrom( // save a clone
Resources.multiply(request.getCapability(), request
.getNumContainers()), Resources.multiply(lastRequestCapability,
lastRequestContainers)));
metrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
}
}
}
@ -262,9 +261,15 @@ public class AppSchedulingInfo {
pending = false;
metrics.runAppAttempt(applicationId, user);
}
LOG.debug("allocate: user: " + user + ", memory: "
+ request.getCapability());
metrics.allocateResources(user, 1, request.getCapability());
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId
+ " container=" + container.getId()
+ " host=" + container.getNodeId().toString()
+ " user=" + user
+ " resource=" + request.getCapability());
}
metrics.allocateResources(user, 1, request.getCapability(), true);
}
/**
@ -277,9 +282,6 @@ public class AppSchedulingInfo {
synchronized private void allocateNodeLocal(
SchedulerNode node, Priority priority,
ResourceRequest nodeLocalRequest, Container container) {
// Update consumption and track allocations
allocate(container);
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
@ -306,10 +308,6 @@ public class AppSchedulingInfo {
synchronized private void allocateRackLocal(
SchedulerNode node, Priority priority,
ResourceRequest rackLocalRequest, Container container) {
// Update consumption and track allocations
allocate(container);
// Update future requirements
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
@ -329,10 +327,6 @@ public class AppSchedulingInfo {
synchronized private void allocateOffSwitch(
SchedulerNode node, Priority priority,
ResourceRequest offSwitchRequest, Container container) {
// Update consumption and track allocations
allocate(container);
// Update future requirements
decrementOutstanding(offSwitchRequest);
}
@ -365,18 +359,24 @@ public class AppSchedulingInfo {
}
}
synchronized private void allocate(Container container) {
// Update consumption and track allocations
//TODO: fixme sharad
/* try {
store.storeContainer(container);
} catch (IOException ie) {
// TODO fix this. we shouldnt ignore
}*/
LOG.debug("allocate: applicationId=" + applicationId + " container="
+ container.getId() + " host="
+ container.getNodeId().toString());
synchronized public void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
for (Map<String, ResourceRequest> asks : requests.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
oldMetrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability());
newMetrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
}
}
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
activeUsersManager.deactivateApplication(user, applicationId);
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
}
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
@ -386,8 +386,7 @@ public class AppSchedulingInfo {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
Resources.multiply(request.getCapability(), request
.getNumContainers()));
request.getCapability());
}
}
metrics.finishAppAttempt(applicationId, pending, user);

View File

@ -58,4 +58,6 @@ public interface Queue {
List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user);
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
}

View File

@ -281,6 +281,36 @@ public class QueueMetrics implements MetricsSource {
}
}
public void moveAppFrom(AppSchedulingInfo app) {
if (app.isPending()) {
appsPending.decr();
} else {
appsRunning.decr();
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppFrom(app);
}
if (parent != null) {
parent.moveAppFrom(app);
}
}
public void moveAppTo(AppSchedulingInfo app) {
if (app.isPending()) {
appsPending.incr();
} else {
appsRunning.incr();
}
QueueMetrics userMetrics = getUserMetrics(app.getUser());
if (userMetrics != null) {
userMetrics.moveAppTo(app);
}
if (parent != null) {
parent.moveAppTo(app);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
@ -324,8 +354,8 @@ public class QueueMetrics implements MetricsSource {
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemory());
pendingVCores.incr(res.getVirtualCores());
pendingMB.incr(res.getMemory() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
}
public void decrPendingResources(String user, int containers, Resource res) {
@ -341,22 +371,25 @@ public class QueueMetrics implements MetricsSource {
private void _decrPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemory());
pendingVCores.decr(res.getVirtualCores());
pendingMB.decr(res.getMemory() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
}
public void allocateResources(String user, int containers, Resource res) {
public void allocateResources(String user, int containers, Resource res,
boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemory() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
_decrPendingResources(containers, Resources.multiply(res, containers));
if (decrPending) {
_decrPendingResources(containers, res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(user, containers, res);
userMetrics.allocateResources(user, containers, res, decrPending);
}
if (parent != null) {
parent.allocateResources(user, containers, res);
parent.allocateResources(user, containers, res, decrPending);
}
}

View File

@ -57,7 +57,7 @@ import com.google.common.collect.Multiset;
*/
@Private
@Unstable
public abstract class SchedulerApplicationAttempt {
public class SchedulerApplicationAttempt {
private static final Log LOG = LogFactory
.getLog(SchedulerApplicationAttempt.class);
@ -91,7 +91,7 @@ public abstract class SchedulerApplicationAttempt {
protected Map<Priority, Long> lastScheduledContainer =
new HashMap<Priority, Long>();
protected final Queue queue;
protected Queue queue;
protected boolean isStopped = false;
protected final RMContext rmContext;
@ -431,4 +431,25 @@ public abstract class SchedulerApplicationAttempt {
this.appSchedulingInfo
.transferStateFromPreviousAppSchedulingInfo(appAttempt.appSchedulingInfo);
}
public void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
String user = getUser();
for (RMContainer liveContainer : liveContainers.values()) {
Resource resource = liveContainer.getContainer().getResource();
oldMetrics.releaseResources(user, 1, resource);
newMetrics.allocateResources(user, 1, resource, false);
}
for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
for (RMContainer reservedContainer : map.values()) {
Resource resource = reservedContainer.getReservedResource();
oldMetrics.unreserveResource(user, resource);
newMetrics.reserveResource(user, resource);
}
}
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@ -54,11 +55,14 @@ public class FSLeafQueue extends FSQueue {
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
private final ActiveUsersManager activeUsersManager;
public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
}
public void addApp(FSSchedulerApp app, boolean runnable) {
@ -245,4 +249,9 @@ public class FSLeafQueue extends FSQueue {
public int getNumRunnableApps() {
return runnableAppScheds.size();
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
}

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@Private
@Unstable
@ -194,4 +194,10 @@ public class FSParentQueue extends FSQueue {
childQueue.collectSchedulerApplications(apps);
}
}
@Override
public ActiveUsersManager getActiveUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
}

View File

@ -184,6 +184,11 @@ public class FifoScheduler extends AbstractYarnScheduler implements
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return getQueueAcls().get(acl).isUserAllowed(user);
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
};
@Override

View File

@ -16,28 +16,21 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.applicationmasterservice;
package org.apache.hadoop.yarn.server.resourcemanager;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
@ -152,33 +145,4 @@ public class TestApplicationMasterService {
}
}
}
@Test (timeout = 60000)
public void testNotifyAMOfPlacedQueue() throws Exception {
// By default, FairScheduler assigns queue by user name
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
try {
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(1024, "somename", "user1");
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
RegisterApplicationMasterResponse response = am1.registerAppAttempt();
Assert.assertEquals("root.user1", response.getQueue());
} finally {
if (rm != null) {
rm.stop();
}
}
}
}

View File

@ -73,7 +73,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@ -81,7 +81,7 @@ public class TestQueueMetrics {
metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
@ -171,7 +171,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@ -181,7 +181,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
@ -232,7 +232,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(user, 5, Resources.createResource(15*GB, 15));
metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
@ -242,7 +242,7 @@ public class TestQueueMetrics {
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2));
metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
metrics.reserveResource(user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.junit.After;
import org.junit.Test;
public class TestSchedulerApplicationAttempt {
private static final NodeId nodeId = NodeId.newInstance("somehost", 5);
private Configuration conf = new Configuration();
@After
public void tearDown() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.shutdown();
}
@Test
public void testMove() {
final String user = "user1";
Queue parentQueue = createQueue("parent", null);
Queue oldQueue = createQueue("old", parentQueue);
Queue newQueue = createQueue("new", parentQueue);
QueueMetrics parentMetrics = parentQueue.getMetrics();
QueueMetrics oldMetrics = oldQueue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
ApplicationAttemptId appAttId = createAppAttemptId(0, 0);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getActiveUsersManager(), null);
oldMetrics.submitApp(user);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
Priority requestedPriority = Priority.newInstance(2);
ResourceRequest request = ResourceRequest.newInstance(requestedPriority,
ResourceRequest.ANY, requestedResource, 3);
app.updateResourceRequests(Arrays.asList(request));
// Allocated container
RMContainer container1 = createRMContainer(appAttId, 1, requestedResource);
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, requestedPriority,
request, container1.getContainer());
// Reserved container
Priority prio1 = Priority.newInstance(1);
Resource reservedResource = Resource.newInstance(2048, 3);
RMContainer container2 = createReservedRMContainer(appAttId, 1, reservedResource,
node.getNodeID(), prio1);
Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
reservations.put(node.getNodeID(), container2);
app.reservedContainers.put(prio1, reservations);
oldMetrics.reserveResource(user, reservedResource);
checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
app.move(newQueue);
checkQueueMetrics(oldMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
checkQueueMetrics(newMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
checkQueueMetrics(parentMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
}
private void checkQueueMetrics(QueueMetrics metrics, int activeApps,
int runningApps, int allocMb, int allocVcores, int reservedMb,
int reservedVcores, int pendingMb, int pendingVcores) {
assertEquals(activeApps, metrics.getActiveApps());
assertEquals(runningApps, metrics.getAppsRunning());
assertEquals(allocMb, metrics.getAllocatedMB());
assertEquals(allocVcores, metrics.getAllocatedVirtualCores());
assertEquals(reservedMb, metrics.getReservedMB());
assertEquals(reservedVcores, metrics.getReservedVirtualCores());
assertEquals(pendingMb, metrics.getPendingMB());
assertEquals(pendingVcores, metrics.getPendingVirtualCores());
}
private SchedulerNode createNode() {
SchedulerNode node = mock(SchedulerNode.class);
when(node.getNodeName()).thenReturn("somehost");
when(node.getRackName()).thenReturn("somerack");
when(node.getNodeID()).thenReturn(nodeId);
return node;
}
private RMContainer createReservedRMContainer(ApplicationAttemptId appAttId,
int id, Resource resource, NodeId nodeId, Priority reservedPriority) {
RMContainer container = createRMContainer(appAttId, id, resource);
when(container.getReservedResource()).thenReturn(resource);
when(container.getReservedPriority()).thenReturn(reservedPriority);
when(container.getReservedNode()).thenReturn(nodeId);
return container;
}
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
Resource resource) {
ContainerId containerId = ContainerId.newInstance(appAttId, id);
RMContainer rmContainer = mock(RMContainer.class);
Container container = mock(Container.class);
when(container.getResource()).thenReturn(resource);
when(container.getNodeId()).thenReturn(nodeId);
when(rmContainer.getContainer()).thenReturn(container);
when(rmContainer.getContainerId()).thenReturn(containerId);
return rmContainer;
}
private Queue createQueue(String name, Queue parent) {
QueueMetrics metrics = QueueMetrics.forQueue(name, parent, false, conf);
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
return queue;
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
}