Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-09-16 18:18:09 -07:00
commit a24a9331d4
43 changed files with 1008 additions and 394 deletions

View File

@ -327,6 +327,9 @@ Trunk (Unreleased)
HADOOP-11055. non-daemon pid files are missing (aw)
HADOOP-11022. User replaced functions get lost 2-3 levels deep (e.g.,
sbin) (aw)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -525,6 +528,8 @@ Release 2.6.0 - UNRELEASED
HADOOP-10868. AuthenticationFilter should support externalizing the
secret for signing and provide rotation support. (rkanter via tucu)
HADOOP-10922. User documentation for CredentialShell. (Larry McCay via wang)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

View File

@ -156,6 +156,7 @@ done
hadoop_find_confdir
hadoop_exec_hadoopenv
hadoop_exec_userfuncs
#
# IMPORTANT! User provided code is now available!

View File

@ -104,6 +104,15 @@ function hadoop_exec_hadoopenv
fi
}
function hadoop_exec_userfuncs
{
# NOTE: This function is not user replaceable.
if [[ -e "${HADOOP_CONF_DIR}/hadoop-user-functions.sh" ]]; then
. "${HADOOP_CONF_DIR}/hadoop-user-functions.sh"
fi
}
function hadoop_basic_init
{
# Some of these are also set in hadoop-env.sh.

View File

@ -337,84 +337,3 @@ esac
# via this special env var:
# HADOOP_ENABLE_BUILD_PATHS="true"
# You can do things like replace parts of the shell underbelly.
# Most of this code is in hadoop-functions.sh.
#
#
# For example, if you want to add compression to the rotation
# menthod for the .out files that daemons generate, you can do
# that by redefining the hadoop_rotate_log function by
# uncommenting this code block:
#function hadoop_rotate_log
#{
# #
# # log rotation (mainly used for .out files)
# # Users are likely to replace this one for something
# # that gzips or uses dates or who knows what.
# #
# # be aware that &1 and &2 might go through here
# # so don't do anything too crazy...
# #
# local log=$1;
# local num=${2:-5};
#
# if [[ -f "${log}" ]]; then # rotate logs
# while [[ ${num} -gt 1 ]]; do
# #shellcheck disable=SC2086
# let prev=${num}-1
# if [[ -f "${log}.${prev}" ]]; then
# mv "${log}.${prev}" "${log}.${num}"
# fi
# num=${prev}
# done
# mv "${log}" "${log}.${num}"
# gzip -9 "${log}.${num}"
# fi
#}
#
#
# Another example: finding java
#
# By default, Hadoop assumes that $JAVA_HOME is always defined
# outside of its configuration. Eons ago, Apple standardized
# on a helper program called java_home to find it for you.
#
#function hadoop_java_setup
#{
#
# if [[ -z "${JAVA_HOME}" ]]; then
# case $HADOOP_OS_TYPE in
# Darwin*)
# JAVA_HOME=$(/usr/libexec/java_home)
# ;;
# esac
# fi
#
# # Bail if we did not detect it
# if [[ -z "${JAVA_HOME}" ]]; then
# echo "ERROR: JAVA_HOME is not set and could not be found." 1>&2
# exit 1
# fi
#
# if [[ ! -d "${JAVA_HOME}" ]]; then
# echo "ERROR: JAVA_HOME (${JAVA_HOME}) does not exist." 1>&2
# exit 1
# fi
#
# JAVA="${JAVA_HOME}/bin/java"
#
# if [[ ! -x ${JAVA} ]]; then
# echo "ERROR: ${JAVA} is not executable." 1>&2
# exit 1
# fi
# JAVA_HEAP_MAX=-Xmx1g
# HADOOP_HEAPSIZE=${HADOOP_HEAPSIZE:-128}
#
# # check envvars which might override default args
# if [[ -n "$HADOOP_HEAPSIZE" ]]; then
# JAVA_HEAP_MAX="-Xmx${HADOOP_HEAPSIZE}m"
# fi
#}

View File

@ -0,0 +1,94 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#######
# Advanced Users Only
######
# You can do things like replace parts of the shell underbelly.
# Most of this code is in hadoop-functions.sh.
#
#
# For example, if you want to add compression to the rotation
# menthod for the .out files that daemons generate, you can do
# that by redefining the hadoop_rotate_log function by
# uncommenting this code block:
#function hadoop_rotate_log
#{
# local log=$1;
# local num=${2:-5};
#
# if [[ -f "${log}" ]]; then
# while [[ ${num} -gt 1 ]]; do
# #shellcheck disable=SC2086
# let prev=${num}-1
# if [[ -f "${log}.${prev}.gz" ]]; then
# mv "${log}.${prev}.gz" "${log}.${num}.gz"
# fi
# num=${prev}
# done
# mv "${log}" "${log}.${num}"
# gzip -9 "${log}.${num}"
# fi
#}
#
#
#
# Another example: finding java
#
# By default, Hadoop assumes that $JAVA_HOME is always defined
# outside of its configuration. Eons ago, Apple standardized
# on a helper program called java_home to find it for you.
#
#function hadoop_java_setup
#{
#
# if [[ -z "${JAVA_HOME}" ]]; then
# case $HADOOP_OS_TYPE in
# Darwin*)
# JAVA_HOME=$(/usr/libexec/java_home)
# ;;
# esac
# fi
#
# # Bail if we did not detect it
# if [[ -z "${JAVA_HOME}" ]]; then
# echo "ERROR: JAVA_HOME is not set and could not be found." 1>&2
# exit 1
# fi
#
# if [[ ! -d "${JAVA_HOME}" ]]; then
# echo "ERROR: JAVA_HOME (${JAVA_HOME}) does not exist." 1>&2
# exit 1
# fi
#
# JAVA="${JAVA_HOME}/bin/java"
#
# if [[ ! -x ${JAVA} ]]; then
# echo "ERROR: ${JAVA} is not executable." 1>&2
# exit 1
# fi
# JAVA_HEAP_MAX=-Xmx1g
# HADOOP_HEAPSIZE=${HADOOP_HEAPSIZE:-128}
#
# # check envvars which might override default args
# if [[ -n "$HADOOP_HEAPSIZE" ]]; then
# JAVA_HEAP_MAX="-Xmx${HADOOP_HEAPSIZE}m"
# fi
#}

View File

@ -85,6 +85,59 @@ User Commands
{{{../../hadoop-mapreduce-client/hadoop-mapreduce-client-core/HadoopArchives.html}
Hadoop Archives Guide}}.
* <<<credential>>>
Command to manage credentials, passwords and secrets within credential providers.
The CredentialProvider API in Hadoop allows for the separation of applications
and how they store their required passwords/secrets. In order to indicate
a particular provider type and location, the user must provide the
<hadoop.security.credential.provider.path> configuration element in core-site.xml
or use the command line option <<<-provider>>> on each of the following commands.
This provider path is a comma-separated list of URLs that indicates the type and
location of a list of providers that should be consulted.
For example, the following path:
<<<user:///,jceks://file/tmp/test.jceks,jceks://hdfs@nn1.example.com/my/path/test.jceks>>>
indicates that the current user's credentials file should be consulted through
the User Provider, that the local file located at <<</tmp/test.jceks>>> is a Java Keystore
Provider and that the file located within HDFS at <<<nn1.example.com/my/path/test.jceks>>>
is also a store for a Java Keystore Provider.
When utilizing the credential command it will often be for provisioning a password
or secret to a particular credential store provider. In order to explicitly
indicate which provider store to use the <<<-provider>>> option should be used. Otherwise,
given a path of multiple providers, the first non-transient provider will be used.
This may or may not be the one that you intended.
Example: <<<-provider jceks://file/tmp/test.jceks>>>
Usage: <<<hadoop credential <subcommand> [options]>>>
*-------------------+-------------------------------------------------------+
||COMMAND_OPTION || Description
*-------------------+-------------------------------------------------------+
| create <alias> [-v <value>][-provider <provider-path>]| Prompts the user for
| a credential to be stored as the given alias when a value
| is not provided via <<<-v>>>. The
| <hadoop.security.credential.provider.path> within the
| core-site.xml file will be used unless a <<<-provider>>> is
| indicated.
*-------------------+-------------------------------------------------------+
| delete <alias> [-i][-provider <provider-path>] | Deletes the credential with
| the provided alias and optionally warns the user when
| <<<--interactive>>> is used.
| The <hadoop.security.credential.provider.path> within the
| core-site.xml file will be used unless a <<<-provider>>> is
| indicated.
*-------------------+-------------------------------------------------------+
| list [-provider <provider-path>] | Lists all of the credential aliases
| The <hadoop.security.credential.provider.path> within the
| core-site.xml file will be used unless a <<<-provider>>> is
| indicated.
*-------------------+-------------------------------------------------------+
* <<<distcp>>>
Copy file or directories recursively. More information can be found at

View File

@ -238,7 +238,7 @@
<executions>
<execution>
<id>default-war</id>
<phase>package</phase>
<phase>prepare-package</phase>
<goals>
<goal>war</goal>
</goals>
@ -251,6 +251,29 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>classes</classifier>
</configuration>
</execution>
<execution>
<id>prepare-test-jar</id>
<phase>prepare-package</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.crypto.key.kms.server;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.kms.KMSRESTConstants;
import org.apache.hadoop.fs.Path;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
@ -26,7 +28,10 @@ import org.mortbay.jetty.security.SslSocketConnector;
import org.mortbay.jetty.webapp.WebAppContext;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Writer;
import java.net.InetAddress;
import java.net.MalformedURLException;
@ -34,6 +39,7 @@ import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.UUID;
public class MiniKMS {
@ -140,13 +146,15 @@ public class MiniKMS {
}
public void start() throws Exception {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
System.setProperty(KMSConfiguration.KMS_CONFIG_DIR, kmsConfDir);
File aclsFile = new File(kmsConfDir, "kms-acls.xml");
if (!aclsFile.exists()) {
Configuration acls = new Configuration(false);
Writer writer = new FileWriter(aclsFile);
acls.writeXml(writer);
writer.close();
InputStream is = cl.getResourceAsStream("mini-kms-acls-default.xml");
OutputStream os = new FileOutputStream(aclsFile);
IOUtils.copy(is, os);
is.close();
os.close();
}
File coreFile = new File(kmsConfDir, "core-site.xml");
if (!coreFile.exists()) {
@ -161,19 +169,42 @@ public class MiniKMS {
kms.set("hadoop.security.key.provider.path",
"jceks://file@" + new Path(kmsConfDir, "kms.keystore").toUri());
kms.set("hadoop.kms.authentication.type", "simple");
kms.setBoolean(KMSConfiguration.KEY_AUTHORIZATION_ENABLE, false);
Writer writer = new FileWriter(kmsFile);
kms.writeXml(writer);
writer.close();
}
System.setProperty("log4j.configuration", log4jConfFile);
jetty = createJettyServer(keyStore, keyStorePassword);
ClassLoader cl = Thread.currentThread().getContextClassLoader();
URL url = cl.getResource("kms-webapp");
if (url == null) {
// we need to do a special handling for MiniKMS to work when in a dir and
// when in a JAR in the classpath thanks to Jetty way of handling of webapps
// when they are in the a DIR, WAR or JAR.
URL webXmlUrl = cl.getResource("kms-webapp/WEB-INF/web.xml");
if (webXmlUrl == null) {
throw new RuntimeException(
"Could not find kms-webapp/ dir in test classpath");
}
WebAppContext context = new WebAppContext(url.getPath(), "/kms");
boolean webXmlInJar = webXmlUrl.getPath().contains(".jar!/");
String webappPath;
if (webXmlInJar) {
File webInf = new File("target/" + UUID.randomUUID().toString() +
"/kms-webapp/WEB-INF");
webInf.mkdirs();
new File(webInf, "web.xml").delete();
InputStream is = cl.getResourceAsStream("kms-webapp/WEB-INF/web.xml");
OutputStream os = new FileOutputStream(new File(webInf, "web.xml"));
IOUtils.copy(is, os);
is.close();
os.close();
webappPath = webInf.getParentFile().getAbsolutePath();
} else {
webappPath = cl.getResource("kms-webapp").getPath();
}
WebAppContext context = new WebAppContext(webappPath, "/kms");
if (webXmlInJar) {
context.setClassLoader(cl);
}
jetty.addHandler(context);
jetty.start();
kmsURL = new URL(getJettyURL(jetty), "kms");

View File

@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<!-- This file is hot-reloaded when it changes -->
<!-- KMS ACLs -->
<property>
<name>hadoop.kms.acl.CREATE</name>
<value>*</value>
<description>
ACL for create-key operations.
If the user does is not in the GET ACL, the key material is not returned
as part of the response.
</description>
</property>
<property>
<name>hadoop.kms.acl.DELETE</name>
<value>*</value>
<description>
ACL for delete-key operations.
</description>
</property>
<property>
<name>hadoop.kms.acl.ROLLOVER</name>
<value>*</value>
<description>
ACL for rollover-key operations.
If the user does is not in the GET ACL, the key material is not returned
as part of the response.
</description>
</property>
<property>
<name>hadoop.kms.acl.GET</name>
<value>*</value>
<description>
ACL for get-key-version and get-current-key operations.
</description>
</property>
<property>
<name>hadoop.kms.acl.GET_KEYS</name>
<value>*</value>
<description>
ACL for get-keys operation.
</description>
</property>
<property>
<name>hadoop.kms.acl.GET_METADATA</name>
<value>*</value>
<description>
ACL for get-key-metadata an get-keys-metadata operations.
</description>
</property>
<property>
<name>hadoop.kms.acl.SET_KEY_MATERIAL</name>
<value>*</value>
<description>
Complimentary ACL for CREATE and ROLLOVER operation to allow the client
to provide the key material when creating or rolling a key.
</description>
</property>
<property>
<name>hadoop.kms.acl.GENERATE_EEK</name>
<value>*</value>
<description>
ACL for generateEncryptedKey CryptoExtension operations
</description>
</property>
<property>
<name>hadoop.kms.acl.DECRYPT_EEK</name>
<value>*</value>
<description>
ACL for decrypt EncryptedKey CryptoExtension operations
</description>
</property>
<property>
<name>default.key.acl.MANAGEMENT</name>
<value>*</value>
<description>
default ACL for MANAGEMENT operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.GENERATE_EEK</name>
<value>*</value>
<description>
default ACL for GENERATE_EEK operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.DECRYPT_EEK</name>
<value>*</value>
<description>
default ACL for DECRYPT_EEK operations for all key acls that are not
explicitly defined.
</description>
</property>
<property>
<name>default.key.acl.READ</name>
<value>*</value>
<description>
default ACL for READ operations for all key acls that are not
explicitly defined.
</description>
</property>
</configuration>

View File

@ -464,6 +464,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6880. Adding tracing to DataNode data transfer protocol. (iwasakims
via cmccabe)
HDFS-7006. Test encryption zones with KMS. (Anthony Young-Garner and tucu)
HDFS-6851. Refactor EncryptionZoneWithId and EncryptionZone. (clamb via wang)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -659,6 +663,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6965. NN continues to issue block locations for DNs with full disks.
(Rushabh Shah via kihwal)
HDFS-6789. TestDFSClientFailover.testFileContextDoesntDnsResolveLogicalURI
and TestDFSClientFailover.testDoesntDnsResolveLogicalURI failing on jdk7.
(Akira Ajisaka via wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -185,6 +185,19 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.htrace</groupId>
<artifactId>htrace-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<classifier>classes</classifier>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -20,8 +20,11 @@
function hadoop_subproject_init
{
if [[ -e "${HADOOP_CONF_DIR}/hdfs-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/hdfs-env.sh"
if [[ -z "${HADOOP_HDFS_ENV_PROCESSED}" ]]; then
if [[ -e "${HADOOP_CONF_DIR}/hdfs-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/hdfs-env.sh"
export HADOOP_HDFS_ENV_PROCESSED=true
fi
fi
# at some point in time, someone thought it would be a good idea to

View File

@ -154,7 +154,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -2887,8 +2886,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException {
checkOpen();
try {
final EncryptionZoneWithId ezi = namenode.getEZForPath(src);
return (ezi.getId() < 0) ? null : ezi;
final EncryptionZone ez = namenode.getEZForPath(src);
return (ez.getId() < 0) ? null : ez;
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class);

View File

@ -1282,7 +1282,7 @@ public interface ClientProtocol {
* Get the encryption zone for a path.
*/
@Idempotent
public EncryptionZoneWithId getEZForPath(String src)
public EncryptionZone getEZForPath(String src)
throws IOException;
/**
@ -1293,7 +1293,7 @@ public interface ClientProtocol {
* @return Batch of encryption zones.
*/
@Idempotent
public BatchedEntries<EncryptionZoneWithId> listEncryptionZones(
public BatchedEntries<EncryptionZone> listEncryptionZones(
long prevId) throws IOException;
/**

View File

@ -24,7 +24,8 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* A simple class for representing an encryption zone. Presently an encryption
* zone only has a path (the root of the encryption zone) and a key name.
* zone only has a path (the root of the encryption zone), a key name, and a
* unique id. The id is used to implement batched listing of encryption zones.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@ -32,10 +33,12 @@ public class EncryptionZone {
private final String path;
private final String keyName;
private final long id;
public EncryptionZone(String path, String keyName) {
public EncryptionZone(String path, String keyName, long id) {
this.path = path;
this.keyName = keyName;
this.id = id;
}
public String getPath() {
@ -46,10 +49,14 @@ public class EncryptionZone {
return keyName;
}
public long getId() {
return id;
}
@Override
public int hashCode() {
return new HashCodeBuilder(13, 31).
append(path).append(keyName).
append(path).append(keyName).append(id).
toHashCode();
}
@ -69,11 +76,14 @@ public class EncryptionZone {
return new EqualsBuilder().
append(path, rhs.path).
append(keyName, rhs.keyName).
append(id, rhs.id).
isEquals();
}
@Override
public String toString() {
return "EncryptionZone [path=" + path + ", keyName=" + keyName + "]";
return "EncryptionZone [path=" + path +
", keyName=" + keyName +
", id=" + id + "]";
}
}

View File

@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.BatchedRemoteIterator;
/**
* EncryptionZoneIterator is a remote iterator that iterates over encryption
@ -30,22 +30,24 @@ import org.apache.hadoop.fs.RemoteIterator;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class EncryptionZoneIterator implements RemoteIterator<EncryptionZone> {
public class EncryptionZoneIterator
extends BatchedRemoteIterator<Long, EncryptionZone> {
private final EncryptionZoneWithIdIterator iterator;
private final ClientProtocol namenode;
public EncryptionZoneIterator(ClientProtocol namenode) {
iterator = new EncryptionZoneWithIdIterator(namenode);
super(Long.valueOf(0));
this.namenode = namenode;
}
@Override
public boolean hasNext() throws IOException {
return iterator.hasNext();
public BatchedEntries<EncryptionZone> makeRequest(Long prevId)
throws IOException {
return namenode.listEncryptionZones(prevId);
}
@Override
public EncryptionZone next() throws IOException {
EncryptionZoneWithId ezwi = iterator.next();
return ezwi.toEncryptionZone();
public Long elementToPrevKey(EncryptionZone entry) {
return entry.getId();
}
}

View File

@ -1,81 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Internal class similar to an {@link EncryptionZone} which also holds a
* unique id. Used to implement batched listing of encryption zones.
*/
@InterfaceAudience.Private
public class EncryptionZoneWithId extends EncryptionZone {
final long id;
public EncryptionZoneWithId(String path, String keyName, long id) {
super(path, keyName);
this.id = id;
}
public long getId() {
return id;
}
EncryptionZone toEncryptionZone() {
return new EncryptionZone(getPath(), getKeyName());
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 29)
.append(super.hashCode())
.append(id)
.toHashCode();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
EncryptionZoneWithId that = (EncryptionZoneWithId) o;
if (id != that.id) {
return false;
}
return true;
}
@Override
public String toString() {
return "EncryptionZoneWithId [" +
"id=" + id +
", " + super.toString() +
']';
}
}

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
/**
* Used on the client-side to iterate over the list of encryption zones
* stored on the namenode.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class EncryptionZoneWithIdIterator
extends BatchedRemoteIterator<Long, EncryptionZoneWithId> {
private final ClientProtocol namenode;
EncryptionZoneWithIdIterator(ClientProtocol namenode) {
super(Long.valueOf(0));
this.namenode = namenode;
}
@Override
public BatchedEntries<EncryptionZoneWithId> makeRequest(Long prevId)
throws IOException {
return namenode.listEncryptionZones(prevId);
}
@Override
public Long elementToPrevKey(EncryptionZoneWithId entry) {
return entry.getId();
}
}

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -1331,7 +1331,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try {
GetEZForPathResponseProto.Builder builder =
GetEZForPathResponseProto.newBuilder();
final EncryptionZoneWithId ret = server.getEZForPath(req.getSrc());
final EncryptionZone ret = server.getEZForPath(req.getSrc());
builder.setZone(PBHelper.convert(ret));
return builder.build();
} catch (IOException e) {
@ -1344,7 +1344,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
RpcController controller, ListEncryptionZonesRequestProto req)
throws ServiceException {
try {
BatchedEntries<EncryptionZoneWithId> entries = server
BatchedEntries<EncryptionZone> entries = server
.listEncryptionZones(req.getId());
ListEncryptionZonesResponseProto.Builder builder =
ListEncryptionZonesResponseProto.newBuilder();

View File

@ -55,7 +55,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -186,7 +186,7 @@ import com.google.protobuf.ServiceException;
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
.EncryptionZoneWithIdProto;
.EncryptionZoneProto;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
@ -1331,7 +1331,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public EncryptionZoneWithId getEZForPath(String src)
public EncryptionZone getEZForPath(String src)
throws IOException {
final GetEZForPathRequestProto.Builder builder =
GetEZForPathRequestProto.newBuilder();
@ -1347,7 +1347,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public BatchedEntries<EncryptionZoneWithId> listEncryptionZones(long id)
public BatchedEntries<EncryptionZone> listEncryptionZones(long id)
throws IOException {
final ListEncryptionZonesRequestProto req =
ListEncryptionZonesRequestProto.newBuilder()
@ -1356,12 +1356,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
try {
EncryptionZonesProtos.ListEncryptionZonesResponseProto response =
rpcProxy.listEncryptionZones(null, req);
List<EncryptionZoneWithId> elements =
List<EncryptionZone> elements =
Lists.newArrayListWithCapacity(response.getZonesCount());
for (EncryptionZoneWithIdProto p : response.getZonesList()) {
for (EncryptionZoneProto p : response.getZonesList()) {
elements.add(PBHelper.convert(p));
}
return new BatchedListEntries<EncryptionZoneWithId>(elements,
return new BatchedListEntries<EncryptionZone>(elements,
response.getHasMore());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.protocolPB;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
.EncryptionZoneWithIdProto;
.EncryptionZoneProto;
import java.io.EOFException;
import java.io.IOException;
@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdfs.protocol.FsAclPermission;
@ -2316,15 +2316,15 @@ public class PBHelper {
return builder.build();
}
public static EncryptionZoneWithIdProto convert(EncryptionZoneWithId zone) {
return EncryptionZoneWithIdProto.newBuilder()
public static EncryptionZoneProto convert(EncryptionZone zone) {
return EncryptionZoneProto.newBuilder()
.setId(zone.getId())
.setKeyName(zone.getKeyName())
.setPath(zone.getPath()).build();
}
public static EncryptionZoneWithId convert(EncryptionZoneWithIdProto proto) {
return new EncryptionZoneWithId(proto.getPath(), proto.getKeyName(),
public static EncryptionZone convert(EncryptionZoneProto proto) {
return new EncryptionZone(proto.getPath(), proto.getKeyName(),
proto.getId());
}

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,8 +53,8 @@ public class EncryptionZoneManager {
public static Logger LOG = LoggerFactory.getLogger(EncryptionZoneManager
.class);
private static final EncryptionZoneWithId NULL_EZ =
new EncryptionZoneWithId("", "", -1);
private static final EncryptionZone NULL_EZ =
new EncryptionZone("", "", -1);
/**
* EncryptionZoneInt is the internal representation of an encryption zone. The
@ -196,18 +196,18 @@ public class EncryptionZoneManager {
}
/**
* Returns an EncryptionZoneWithId representing the ez for a given path.
* Returns an empty marker EncryptionZoneWithId if path is not in an ez.
* Returns an EncryptionZone representing the ez for a given path.
* Returns an empty marker EncryptionZone if path is not in an ez.
*
* @param iip The INodesInPath of the path to check
* @return the EncryptionZoneWithId representing the ez for the path.
* @return the EncryptionZone representing the ez for the path.
*/
EncryptionZoneWithId getEZINodeForPath(INodesInPath iip) {
EncryptionZone getEZINodeForPath(INodesInPath iip) {
final EncryptionZoneInt ezi = getEncryptionZoneForPath(iip);
if (ezi == null) {
return NULL_EZ;
} else {
return new EncryptionZoneWithId(getFullPathName(ezi), ezi.getKeyName(),
return new EncryptionZone(getFullPathName(ezi), ezi.getKeyName(),
ezi.getINodeId());
}
}
@ -300,19 +300,19 @@ public class EncryptionZoneManager {
* <p/>
* Called while holding the FSDirectory lock.
*/
BatchedListEntries<EncryptionZoneWithId> listEncryptionZones(long prevId)
BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
throws IOException {
assert dir.hasReadLock();
NavigableMap<Long, EncryptionZoneInt> tailMap = encryptionZones.tailMap
(prevId, false);
final int numResponses = Math.min(maxListEncryptionZonesResponses,
tailMap.size());
final List<EncryptionZoneWithId> zones =
final List<EncryptionZone> zones =
Lists.newArrayListWithExpectedSize(numResponses);
int count = 0;
for (EncryptionZoneInt ezi : tailMap.values()) {
zones.add(new EncryptionZoneWithId(getFullPathName(ezi),
zones.add(new EncryptionZone(getFullPathName(ezi),
ezi.getKeyName(), ezi.getINodeId()));
count++;
if (count >= numResponses) {
@ -320,6 +320,6 @@ public class EncryptionZoneManager {
}
}
final boolean hasMore = (numResponses < tailMap.size());
return new BatchedListEntries<EncryptionZoneWithId>(zones, hasMore);
return new BatchedListEntries<EncryptionZone>(zones, hasMore);
}
}

View File

@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.FSLimitException.MaxDirectoryItemsExceededException;
import org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooLongException;
import org.apache.hadoop.hdfs.protocol.FsAclPermission;
@ -2677,7 +2677,7 @@ public class FSDirectory implements Closeable {
}
}
EncryptionZoneWithId getEZForPath(INodesInPath iip) {
EncryptionZone getEZForPath(INodesInPath iip) {
readLock();
try {
return ezManager.getEZINodeForPath(iip);
@ -2686,7 +2686,7 @@ public class FSDirectory implements Closeable {
}
}
BatchedListEntries<EncryptionZoneWithId> listEncryptionZones(long prevId)
BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
throws IOException {
readLock();
try {

View File

@ -180,7 +180,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@ -8733,7 +8733,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws AccessControlException if the caller is not the superuser.
* @throws UnresolvedLinkException if the path can't be resolved.
*/
EncryptionZoneWithId getEZForPath(final String srcArg)
EncryptionZone getEZForPath(final String srcArg)
throws AccessControlException, UnresolvedLinkException, IOException {
String src = srcArg;
HdfsFileStatus resultingStat = null;
@ -8750,7 +8750,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.READ);
src = resolvePath(src, pathComponents);
final INodesInPath iip = dir.getINodesInPath(src, true);
final EncryptionZoneWithId ret = dir.getEZForPath(iip);
final EncryptionZone ret = dir.getEZForPath(iip);
resultingStat = getAuditFileInfo(src, false);
success = true;
return ret;
@ -8760,7 +8760,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
BatchedListEntries<EncryptionZoneWithId> listEncryptionZones(long prevId)
BatchedListEntries<EncryptionZone> listEncryptionZones(long prevId)
throws IOException {
boolean success = false;
checkSuperuserPrivilege();
@ -8769,7 +8769,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
final BatchedListEntries<EncryptionZoneWithId> ret =
final BatchedListEntries<EncryptionZone> ret =
dir.listEncryptionZones(prevId);
success = true;
return ret;

View File

@ -81,7 +81,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZoneWithId;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSLimitException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@ -1437,13 +1437,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override
public EncryptionZoneWithId getEZForPath(String src)
public EncryptionZone getEZForPath(String src)
throws IOException {
return namesystem.getEZForPath(src);
}
@Override
public BatchedEntries<EncryptionZoneWithId> listEncryptionZones(
public BatchedEntries<EncryptionZone> listEncryptionZones(
long prevId) throws IOException {
return namesystem.listEncryptionZones(prevId);
}

View File

@ -45,14 +45,14 @@ message ListEncryptionZonesRequestProto {
required int64 id = 1;
}
message EncryptionZoneWithIdProto {
message EncryptionZoneProto {
required string path = 1;
required string keyName = 2;
required int64 id = 3;
}
message ListEncryptionZonesResponseProto {
repeated EncryptionZoneWithIdProto zones = 1;
repeated EncryptionZoneProto zones = 1;
required bool hasMore = 2;
}
@ -61,5 +61,5 @@ message GetEZForPathRequestProto {
}
message GetEZForPathResponseProto {
required EncryptionZoneWithIdProto zone = 1;
required EncryptionZoneProto zone = 1;
}

View File

@ -257,9 +257,8 @@ public class TestDFSClientFailover {
*/
@Test
public void testDoesntDnsResolveLogicalURI() throws Exception {
NameService spyNS = spyOnNameService();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
NameService spyNS = spyOnNameService();
String logicalHost = fs.getUri().getHost();
Path qualifiedRoot = fs.makeQualified(new Path("/"));
@ -276,8 +275,8 @@ public class TestDFSClientFailover {
*/
@Test
public void testFileContextDoesntDnsResolveLogicalURI() throws Exception {
NameService spyNS = spyOnNameService();
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
NameService spyNS = spyOnNameService();
String logicalHost = fs.getUri().getHost();
Configuration haClientConf = fs.getConf();

View File

@ -112,6 +112,11 @@ public class TestEncryptionZones {
protected FileSystemTestWrapper fsWrapper;
protected FileContextTestWrapper fcWrapper;
protected String getKeyProviderURI() {
return JavaKeyStoreProvider.SCHEME_NAME + "://file" + testRootDir +
"/test.jks";
}
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
@ -119,10 +124,7 @@ public class TestEncryptionZones {
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()
);
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH, getKeyProviderURI());
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
// Lower the batch size for testing
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES,

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.io.File;
import java.util.UUID;
public class TestEncryptionZonesWithKMS extends TestEncryptionZones {
private MiniKMS miniKMS;
@Override
protected String getKeyProviderURI() {
return KMSClientProvider.SCHEME_NAME + "://" +
miniKMS.getKMSUrl().toExternalForm().replace("://", "@");
}
@Before
public void setup() throws Exception {
File kmsDir = new File("target/test-classes/" +
UUID.randomUUID().toString());
Assert.assertTrue(kmsDir.mkdirs());
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder();
miniKMS = miniKMSBuilder.setKmsConfDir(kmsDir).build();
miniKMS.start();
super.setup();
}
@After
public void teardown() {
super.teardown();
miniKMS.stop();
}
}

View File

@ -487,9 +487,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override // FsDatasetSpi
public synchronized void unfinalizeBlock(ExtendedBlock b) {
public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{
if (isValidRbw(b)) {
blockMap.remove(b.getLocalBlock());
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
map.remove(b.getLocalBlock());
}
}
@ -629,7 +630,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
continue;
}
storage.free(bpid, binfo.getNumBytes());
blockMap.remove(b);
map.remove(b);
}
if (error) {
throw new IOException("Invalidate: Missing blocks.");

View File

@ -20,8 +20,11 @@
function hadoop_subproject_init
{
if [[ -e "${HADOOP_CONF_DIR}/mapred-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/mapred-env.sh"
if [[ -z "${HADOOP_MAPRED_ENV_PROCESSED}" ]]; then
if [[ -e "${HADOOP_CONF_DIR}/mapred-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/mapred-env.sh"
export HADOOP_MAPRED_ENV_PROCESSED=true
fi
fi
# at some point in time, someone thought it would be a good idea to

View File

@ -334,6 +334,20 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<version>${project.version}</version>
<classifier>classes</classifier>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>

View File

@ -24,10 +24,13 @@ function hadoop_subproject_init
# ...
# this should get deprecated at some point.
if [[ -e "${YARN_CONF_DIR}/yarn-env.sh" ]]; then
. "${YARN_CONF_DIR}/yarn-env.sh"
elif [[ -e "${HADOOP_CONF_DIR}/yarn-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/yarn-env.sh"
if [[ -z "${HADOOP_YARN_ENV_PROCESSED}" ]]; then
if [[ -e "${YARN_CONF_DIR}/yarn-env.sh" ]]; then
. "${YARN_CONF_DIR}/yarn-env.sh"
elif [[ -e "${HADOOP_CONF_DIR}/yarn-env.sh" ]]; then
. "${HADOOP_CONF_DIR}/yarn-env.sh"
fi
export HADOOP_YARN_ENV_PROCESSED=true
fi
if [[ -n "${YARN_CONF_DIR}" ]]; then

View File

@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
@ -62,12 +64,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
ApplicationHistoryManager {
private TimelineDataManager timelineDataManager;
private ApplicationACLsManager aclsManager;
private String serverHttpAddress;
public ApplicationHistoryManagerOnTimelineStore(
TimelineDataManager timelineDataManager) {
TimelineDataManager timelineDataManager,
ApplicationACLsManager aclsManager) {
super(ApplicationHistoryManagerOnTimelineStore.class.getName());
this.timelineDataManager = timelineDataManager;
this.aclsManager = aclsManager;
}
@Override
@ -80,7 +85,7 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
@Override
public ApplicationReport getApplication(ApplicationId appId)
throws YarnException, IOException {
return getApplication(appId, ApplicationReportField.ALL);
return getApplication(appId, ApplicationReportField.ALL).appReport;
}
@Override
@ -94,9 +99,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
new HashMap<ApplicationId, ApplicationReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ApplicationReport app =
ApplicationReportExt app =
generateApplicationReport(entity, ApplicationReportField.ALL);
apps.put(app.getApplicationId(), app);
apps.put(app.appReport.getApplicationId(), app.appReport);
}
}
return apps;
@ -106,6 +111,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
public Map<ApplicationAttemptId, ApplicationAttemptReport>
getApplicationAttempts(ApplicationId appId)
throws YarnException, IOException {
ApplicationReportExt app = getApplication(
appId, ApplicationReportField.USER_AND_ACLS);
checkAccess(app);
TimelineEntities entities = timelineDataManager.getEntities(
AppAttemptMetricsConstants.ENTITY_TYPE,
new NameValuePair(
@ -115,16 +123,10 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
UserGroupInformation.getLoginUser());
Map<ApplicationAttemptId, ApplicationAttemptReport> appAttempts =
new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ApplicationAttemptReport appAttempt =
convertToApplicationAttemptReport(entity);
appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt);
}
} else {
// It is likely that the attemtps are not found due to non-existing
// application. In this case, we need to throw ApplicationNotFoundException.
getApplication(appId, ApplicationReportField.NONE);
for (TimelineEntity entity : entities.getEntities()) {
ApplicationAttemptReport appAttempt =
convertToApplicationAttemptReport(entity);
appAttempts.put(appAttempt.getApplicationAttemptId(), appAttempt);
}
return appAttempts;
}
@ -132,13 +134,14 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
@Override
public ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
ApplicationReportExt app = getApplication(
appAttemptId.getApplicationId(), ApplicationReportField.USER_AND_ACLS);
checkAccess(app);
TimelineEntity entity = timelineDataManager.getEntity(
AppAttemptMetricsConstants.ENTITY_TYPE,
appAttemptId.toString(), EnumSet.allOf(Field.class),
UserGroupInformation.getLoginUser());
if (entity == null) {
// Will throw ApplicationNotFoundException first
getApplication(appAttemptId.getApplicationId(), ApplicationReportField.NONE);
throw new ApplicationAttemptNotFoundException(
"The entity for application attempt " + appAttemptId +
" doesn't exist in the timeline store");
@ -150,9 +153,10 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
@Override
public ContainerReport getContainer(ContainerId containerId)
throws YarnException, IOException {
ApplicationReport app = getApplication(
ApplicationReportExt app = getApplication(
containerId.getApplicationAttemptId().getApplicationId(),
ApplicationReportField.USER);
ApplicationReportField.USER_AND_ACLS);
checkAccess(app);
TimelineEntity entity = timelineDataManager.getEntity(
ContainerMetricsConstants.ENTITY_TYPE,
containerId.toString(), EnumSet.allOf(Field.class),
@ -162,7 +166,8 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
"The entity for container " + containerId +
" doesn't exist in the timeline store");
} else {
return convertToContainerReport(entity, serverHttpAddress, app.getUser());
return convertToContainerReport(
entity, serverHttpAddress, app.appReport.getUser());
}
}
@ -176,8 +181,9 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
@Override
public Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws YarnException, IOException {
ApplicationReport app = getApplication(
appAttemptId.getApplicationId(), ApplicationReportField.USER);
ApplicationReportExt app = getApplication(
appAttemptId.getApplicationId(), ApplicationReportField.USER_AND_ACLS);
checkAccess(app);
TimelineEntities entities = timelineDataManager.getEntities(
ContainerMetricsConstants.ENTITY_TYPE,
new NameValuePair(
@ -189,15 +195,15 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
new HashMap<ContainerId, ContainerReport>();
if (entities != null && entities.getEntities() != null) {
for (TimelineEntity entity : entities.getEntities()) {
ContainerReport container =
convertToContainerReport(entity, serverHttpAddress, app.getUser());
ContainerReport container = convertToContainerReport(
entity, serverHttpAddress, app.appReport.getUser());
containers.put(container.getContainerId(), container);
}
}
return containers;
}
private static ApplicationReport convertToApplicationReport(
private static ApplicationReportExt convertToApplicationReport(
TimelineEntity entity, ApplicationReportField field) {
String user = null;
String queue = null;
@ -209,13 +215,8 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
String diagnosticsInfo = null;
FinalApplicationStatus finalStatus = FinalApplicationStatus.UNDEFINED;
YarnApplicationState state = null;
if (field == ApplicationReportField.NONE) {
return ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
}
Map<ApplicationAccessType, String> appViewACLs =
new HashMap<ApplicationAccessType, String>();
Map<String, Object> entityInfo = entity.getOtherInfo();
if (entityInfo != null) {
if (entityInfo.containsKey(ApplicationMetricsConstants.USER_ENTITY_INFO)) {
@ -223,12 +224,17 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
entityInfo.get(ApplicationMetricsConstants.USER_ENTITY_INFO)
.toString();
}
if (field == ApplicationReportField.USER) {
return ApplicationReport.newInstance(
if (entityInfo.containsKey(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)) {
String appViewACLsStr = entityInfo.get(
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO).toString();
appViewACLs.put(ApplicationAccessType.VIEW_APP, appViewACLsStr);
}
if (field == ApplicationReportField.USER_AND_ACLS) {
return new ApplicationReportExt(ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
null, 1.0F, type, null), appViewACLs);
}
if (entityInfo.containsKey(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)) {
queue =
@ -292,11 +298,11 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
}
}
}
return ApplicationReport.newInstance(
return new ApplicationReportExt(ApplicationReport.newInstance(
ConverterUtils.toApplicationId(entity.getEntityId()),
latestApplicationAttemptId, user, queue, name, null, -1, null, state,
diagnosticsInfo, null, createdTime, finishedTime, finalStatus, null,
null, 1.0F, type, null);
null, 1.0F, type, null), appViewACLs);
}
private static ApplicationAttemptReport convertToApplicationAttemptReport(
@ -471,24 +477,39 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
createdTime, finishedTime, diagnosticsInfo, logUrl, exitStatus, state);
}
private ApplicationReport generateApplicationReport(TimelineEntity entity,
private ApplicationReportExt generateApplicationReport(TimelineEntity entity,
ApplicationReportField field) throws YarnException, IOException {
ApplicationReport app = convertToApplicationReport(entity, field);
if (field == ApplicationReportField.ALL &&
app != null && app.getCurrentApplicationAttemptId() != null) {
ApplicationAttemptReport appAttempt =
getApplicationAttempt(app.getCurrentApplicationAttemptId());
if (appAttempt != null) {
app.setHost(appAttempt.getHost());
app.setRpcPort(appAttempt.getRpcPort());
app.setTrackingUrl(appAttempt.getTrackingUrl());
app.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl());
ApplicationReportExt app = convertToApplicationReport(entity, field);
// If only user and acls are pulled to check attempt(s)/container(s) access
// control, we can return immediately
if (field == ApplicationReportField.USER_AND_ACLS) {
return app;
}
try {
checkAccess(app);
if (app.appReport.getCurrentApplicationAttemptId() != null) {
ApplicationAttemptReport appAttempt =
getApplicationAttempt(app.appReport.getCurrentApplicationAttemptId());
if (appAttempt != null) {
app.appReport.setHost(appAttempt.getHost());
app.appReport.setRpcPort(appAttempt.getRpcPort());
app.appReport.setTrackingUrl(appAttempt.getTrackingUrl());
app.appReport.setOriginalTrackingUrl(appAttempt.getOriginalTrackingUrl());
}
}
} catch (YarnException e) {
// YarnExcetpion is thrown because the user doesn't have access
app.appReport.setDiagnostics(null);
app.appReport.setCurrentApplicationAttemptId(null);
}
if (app.appReport.getCurrentApplicationAttemptId() == null) {
app.appReport.setCurrentApplicationAttemptId(
ApplicationAttemptId.newInstance(app.appReport.getApplicationId(), -1));
}
return app;
}
private ApplicationReport getApplication(ApplicationId appId,
private ApplicationReportExt getApplication(ApplicationId appId,
ApplicationReportField field) throws YarnException, IOException {
TimelineEntity entity = timelineDataManager.getEntity(
ApplicationMetricsConstants.ENTITY_TYPE,
@ -502,10 +523,40 @@ public class ApplicationHistoryManagerOnTimelineStore extends AbstractService
}
}
private void checkAccess(ApplicationReportExt app)
throws YarnException, IOException {
if (app.appViewACLs != null) {
aclsManager.addApplication(
app.appReport.getApplicationId(), app.appViewACLs);
try {
if (!aclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, app.appReport.getUser(),
app.appReport.getApplicationId())) {
throw new YarnException("User "
+ UserGroupInformation.getCurrentUser().getShortUserName()
+ " does not have privilage to see this application "
+ app.appReport.getApplicationId());
}
} finally {
aclsManager.removeApplication(app.appReport.getApplicationId());
}
}
}
private static enum ApplicationReportField {
ALL, // retrieve all the fields
NONE, // retrieve no fields
USER // retrieve user info only
USER_AND_ACLS // retrieve user and ACLs info only
}
private static class ApplicationReportExt {
private ApplicationReport appReport;
private Map<ApplicationAccessType, String> appViewACLs;
public ApplicationReportExt(
ApplicationReport appReport,
Map<ApplicationAccessType, String> appViewACLs) {
this.appReport = appReport;
this.appViewACLs = appViewACLs;
}
}
}

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
@ -64,6 +65,7 @@ public class ApplicationHistoryServer extends CompositeService {
.getLog(ApplicationHistoryServer.class);
private ApplicationHistoryClientService ahsClientService;
private ApplicationACLsManager aclsManager;
private ApplicationHistoryManager historyManager;
private TimelineStore timelineStore;
private TimelineDelegationTokenSecretManagerService secretManagerService;
@ -84,6 +86,7 @@ public class ApplicationHistoryServer extends CompositeService {
timelineDataManager = createTimelineDataManager(conf);
// init generic history service afterwards
aclsManager = createApplicationACLsManager(conf);
historyManager = createApplicationHistoryManager(conf);
ahsClientService = createApplicationHistoryClientService(historyManager);
addService(ahsClientService);
@ -168,6 +171,11 @@ public class ApplicationHistoryServer extends CompositeService {
return new ApplicationHistoryClientService(historyManager);
}
private ApplicationACLsManager createApplicationACLsManager(
Configuration conf) {
return new ApplicationACLsManager(conf);
}
private ApplicationHistoryManager createApplicationHistoryManager(
Configuration conf) {
// Backward compatibility:
@ -175,7 +183,8 @@ public class ApplicationHistoryServer extends CompositeService {
// user has enabled it explicitly.
if (conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE) == null ||
conf.get(YarnConfiguration.APPLICATION_HISTORY_STORE).length() == 0) {
return new ApplicationHistoryManagerOnTimelineStore(timelineDataManager);
return new ApplicationHistoryManagerOnTimelineStore(
timelineDataManager, aclsManager);
} else {
LOG.warn("The filesystem based application history store is deprecated.");
return new ApplicationHistoryManagerImpl();

View File

@ -18,10 +18,16 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -42,40 +48,75 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestApplicationHistoryManagerOnTimelineStore {
private static ApplicationHistoryManagerOnTimelineStore historyManager;
private static final int SCALE = 5;
private static TimelineStore store;
private ApplicationHistoryManagerOnTimelineStore historyManager;
private UserGroupInformation callerUGI;
private Configuration conf;
@BeforeClass
public static void setup() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
TimelineStore store = new MemoryTimelineStore();
public static void prepareStore() throws Exception {
store = new MemoryTimelineStore();
prepareTimelineStore(store);
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
}
@Before
public void setup() throws Exception {
// Only test the ACLs of the generic history
TimelineACLsManager aclsManager = new TimelineACLsManager(new YarnConfiguration());
TimelineDataManager dataManager =
new TimelineDataManager(store, aclsManager);
historyManager = new ApplicationHistoryManagerOnTimelineStore(dataManager);
ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
historyManager =
new ApplicationHistoryManagerOnTimelineStore(dataManager, appAclsManager);
historyManager.init(conf);
historyManager.start();
}
@AfterClass
public static void tearDown() {
@After
public void tearDown() {
if (historyManager != null) {
historyManager.stop();
}
}
@Parameters
public static Collection<Object[]> callers() {
// user1 is the owner
// user2 is the authorized user
// user3 is the unauthorized user
// admin is the admin acl
return Arrays.asList(
new Object[][] { { "" }, { "user1" }, { "user2" }, { "user3" }, { "admin" } });
}
public TestApplicationHistoryManagerOnTimelineStore(String caller) {
conf = new YarnConfiguration();
if (!caller.equals("")) {
callerUGI = UserGroupInformation.createRemoteUser(caller, AuthMethod.SIMPLE);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
}
}
private static void prepareTimelineStore(TimelineStore store)
throws Exception {
for (int i = 1; i <= SCALE; ++i) {
@ -101,23 +142,46 @@ public class TestApplicationHistoryManagerOnTimelineStore {
@Test
public void testGetApplicationReport() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationReport app = historyManager.getApplication(appId);
final ApplicationId appId = ApplicationId.newInstance(0, 1);
ApplicationReport app;
if (callerUGI == null) {
app = historyManager.getApplication(appId);
} else {
app =
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationReport> () {
@Override
public ApplicationReport run() throws Exception {
return historyManager.getApplication(appId);
}
});
}
Assert.assertNotNull(app);
Assert.assertEquals(appId, app.getApplicationId());
Assert.assertEquals("test app", app.getName());
Assert.assertEquals("test app type", app.getApplicationType());
Assert.assertEquals("test user", app.getUser());
Assert.assertEquals("user1", app.getUser());
Assert.assertEquals("test queue", app.getQueue());
Assert.assertEquals(Integer.MAX_VALUE + 2L, app.getStartTime());
Assert.assertEquals(Integer.MAX_VALUE + 3L, app.getFinishTime());
Assert.assertTrue(Math.abs(app.getProgress() - 1.0F) < 0.0001);
Assert.assertEquals("test host", app.getHost());
Assert.assertEquals(-100, app.getRpcPort());
Assert.assertEquals("test tracking url", app.getTrackingUrl());
Assert.assertEquals("test original tracking url",
app.getOriginalTrackingUrl());
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, -1),
app.getCurrentApplicationAttemptId());
Assert.assertEquals(null, app.getHost());
Assert.assertEquals(-1, app.getRpcPort());
Assert.assertEquals(null, app.getTrackingUrl());
Assert.assertEquals(null, app.getOriginalTrackingUrl());
Assert.assertEquals(null, app.getDiagnostics());
} else {
Assert.assertEquals(ApplicationAttemptId.newInstance(appId, 1),
app.getCurrentApplicationAttemptId());
Assert.assertEquals("test host", app.getHost());
Assert.assertEquals(-100, app.getRpcPort());
Assert.assertEquals("test tracking url", app.getTrackingUrl());
Assert.assertEquals("test original tracking url",
app.getOriginalTrackingUrl());
Assert.assertEquals("test diagnostics info", app.getDiagnostics());
}
Assert.assertEquals(FinalApplicationStatus.UNDEFINED,
app.getFinalApplicationStatus());
Assert.assertEquals(YarnApplicationState.FINISHED,
@ -126,10 +190,35 @@ public class TestApplicationHistoryManagerOnTimelineStore {
@Test
public void testGetApplicationAttemptReport() throws Exception {
ApplicationAttemptId appAttemptId =
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
ApplicationAttemptReport appAttempt =
historyManager.getApplicationAttempt(appAttemptId);
ApplicationAttemptReport appAttempt;
if (callerUGI == null) {
appAttempt = historyManager.getApplicationAttempt(appAttemptId);
} else {
try {
appAttempt =
callerUGI.doAs(new PrivilegedExceptionAction<ApplicationAttemptReport> () {
@Override
public ApplicationAttemptReport run() throws Exception {
return historyManager.getApplicationAttempt(appAttemptId);
}
});
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
// The exception is expected
Assert.fail();
}
} catch (UndeclaredThrowableException e) {
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
if (e.getCause().getMessage().contains(
"does not have privilage to see this application")) {
// The exception is expected
return;
}
}
throw e;
}
}
Assert.assertNotNull(appAttempt);
Assert.assertEquals(appAttemptId, appAttempt.getApplicationAttemptId());
Assert.assertEquals(ContainerId.newInstance(appAttemptId, 1),
@ -146,10 +235,36 @@ public class TestApplicationHistoryManagerOnTimelineStore {
@Test
public void testGetContainerReport() throws Exception {
ContainerId containerId =
final ContainerId containerId =
ContainerId.newInstance(ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), 1);
ContainerReport container = historyManager.getContainer(containerId);
ContainerReport container;
if (callerUGI == null) {
container = historyManager.getContainer(containerId);
} else {
try {
container =
callerUGI.doAs(new PrivilegedExceptionAction<ContainerReport> () {
@Override
public ContainerReport run() throws Exception {
return historyManager.getContainer(containerId);
}
});
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
// The exception is expected
Assert.fail();
}
} catch (UndeclaredThrowableException e) {
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
if (e.getCause().getMessage().contains(
"does not have privilage to see this application")) {
// The exception is expected
return;
}
}
throw e;
}
}
Assert.assertNotNull(container);
Assert.assertEquals(Integer.MAX_VALUE + 1L, container.getCreationTime());
Assert.assertEquals(Integer.MAX_VALUE + 2L, container.getFinishTime());
@ -164,7 +279,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
Assert.assertEquals(-1, container.getContainerExitStatus());
Assert.assertEquals("http://0.0.0.0:8188/applicationhistory/logs/" +
"test host:-100/container_0_0001_01_000001/"
+ "container_0_0001_01_000001/test user", container.getLogUrl());
+ "container_0_0001_01_000001/user1", container.getLogUrl());
}
@Test
@ -177,29 +292,104 @@ public class TestApplicationHistoryManagerOnTimelineStore {
@Test
public void testGetApplicationAttempts() throws Exception {
Collection<ApplicationAttemptReport> appAttempts =
historyManager.getApplicationAttempts(ApplicationId.newInstance(0, 1))
.values();
final ApplicationId appId = ApplicationId.newInstance(0, 1);
Collection<ApplicationAttemptReport> appAttempts;
if (callerUGI == null) {
appAttempts = historyManager.getApplicationAttempts(appId).values();
} else {
try {
appAttempts = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ApplicationAttemptReport>> () {
@Override
public Collection<ApplicationAttemptReport> run() throws Exception {
return historyManager.getApplicationAttempts(appId).values();
}
});
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
// The exception is expected
Assert.fail();
}
} catch (UndeclaredThrowableException e) {
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
if (e.getCause().getMessage().contains(
"does not have privilage to see this application")) {
// The exception is expected
return;
}
}
throw e;
}
}
Assert.assertNotNull(appAttempts);
Assert.assertEquals(SCALE, appAttempts.size());
}
@Test
public void testGetContainers() throws Exception {
Collection<ContainerReport> containers =
historyManager
.getContainers(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1)).values();
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
Collection<ContainerReport> containers;
if (callerUGI == null) {
containers = historyManager.getContainers(appAttemptId).values();
} else {
try {
containers = callerUGI.doAs(
new PrivilegedExceptionAction<Collection<ContainerReport>> () {
@Override
public Collection<ContainerReport> run() throws Exception {
return historyManager.getContainers(appAttemptId).values();
}
});
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
// The exception is expected
Assert.fail();
}
} catch (UndeclaredThrowableException e) {
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
if (e.getCause().getMessage().contains(
"does not have privilage to see this application")) {
// The exception is expected
return;
}
}
throw e;
}
}
Assert.assertNotNull(containers);
Assert.assertEquals(SCALE, containers.size());
}
@Test
public void testGetAMContainer() throws Exception {
ApplicationAttemptId appAttemptId =
final ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
ContainerReport container = historyManager.getAMContainer(appAttemptId);
ContainerReport container;
if (callerUGI == null) {
container = historyManager.getAMContainer(appAttemptId);
} else {
try {
container =
callerUGI.doAs(new PrivilegedExceptionAction<ContainerReport> () {
@Override
public ContainerReport run() throws Exception {
return historyManager.getAMContainer(appAttemptId);
}
});
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
// The exception is expected
Assert.fail();
}
} catch (UndeclaredThrowableException e) {
if (callerUGI != null && callerUGI.getShortUserName().equals("user3")) {
if (e.getCause().getMessage().contains(
"does not have privilage to see this application")) {
// The exception is expected
return;
}
}
throw e;
}
}
Assert.assertNotNull(container);
Assert.assertEquals(appAttemptId, container.getContainerId()
.getApplicationAttemptId());
@ -210,14 +400,18 @@ public class TestApplicationHistoryManagerOnTimelineStore {
TimelineEntity entity = new TimelineEntity();
entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
entity.setEntityId(appId.toString());
entity.addPrimaryFilter(
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, "test app");
entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
"test app type");
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "test user");
entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, "user1");
entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO, "test queue");
entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
Integer.MAX_VALUE + 1L);
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
"user2");
entity.setOtherInfo(entityInfo);
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
@ -248,6 +442,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
entity.setEntityId(appAttemptId.toString());
entity.addPrimaryFilter(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER,
appAttemptId.getApplicationId().toString());
entity.addPrimaryFilter(
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
tEvent.setTimestamp(Integer.MAX_VALUE + 1L);
@ -287,6 +483,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
entity.setEntityId(containerId.toString());
entity.addPrimaryFilter(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER,
containerId.getApplicationAttemptId().toString());
entity.addPrimaryFilter(
TimelineStore.SystemFilter.ENTITY_OWNER.toString(), "yarn");
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, -1);
entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, -1);

View File

@ -34,6 +34,9 @@ public class ApplicationMetricsConstants {
public static final String FINISHED_EVENT_TYPE =
"YARN_APPLICATION_FINISHED";
public static final String ACLS_UPDATED_EVENT_TYPE =
"YARN_APPLICATION_ACLS_UPDATED";
public static final String NAME_ENTITY_INFO =
"YARN_APPLICATION_NAME";
@ -49,6 +52,9 @@ public class ApplicationMetricsConstants {
public static final String SUBMITTED_TIME_ENTITY_INFO =
"YARN_APPLICATION_SUBMITTED_TIME";
public static final String APP_VIEW_ACLS_ENTITY_INFO =
"YARN_APPLICATION_VIEW_ACLS";
public static final String DIAGNOSTICS_INFO_EVENT_INFO =
"YARN_APPLICATION_DIAGNOSTICS_INFO";

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -365,6 +366,10 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
String appViewACLs = submissionContext.getAMContainerSpec()
.getApplicationACLs().get(ApplicationAccessType.VIEW_APP);
rmContext.getSystemMetricsPublisher().appACLsUpdated(
application, appViewACLs, System.currentTimeMillis());
return application;
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent {
private ApplicationId appId;
private String viewAppACLs;
public ApplicationACLsUpdatedEvent(ApplicationId appId,
String viewAppACLs,
long updatedTime) {
super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime);
this.appId = appId;
this.viewAppACLs = viewAppACLs;
}
public ApplicationId getApplicationId() {
return appId;
}
public String getViewAppACLs() {
return viewAppACLs;
}
}

View File

@ -23,6 +23,7 @@ public enum SystemMetricsEventType {
// app events
APP_CREATED,
APP_FINISHED,
APP_ACLS_UPDATED,
// app attempt events
APP_ATTEMPT_REGISTERED,

View File

@ -54,6 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@Private
@Unstable
public class SystemMetricsPublisher extends CompositeService {
@ -125,6 +130,18 @@ public class SystemMetricsPublisher extends CompositeService {
}
}
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
appViewACLs,
updatedTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
@ -202,6 +219,9 @@ public class SystemMetricsPublisher extends CompositeService {
case APP_FINISHED:
publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
break;
case APP_ACLS_UPDATED:
publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
break;
case APP_ATTEMPT_REGISTERED:
publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
break;
@ -265,6 +285,22 @@ public class SystemMetricsPublisher extends CompositeService {
putEntity(entity);
}
private void publishApplicationACLsUpdatedEvent(
ApplicationACLsUpdatedEvent event) {
TimelineEntity entity =
createApplicationEntity(event.getApplicationId());
TimelineEvent tEvent = new TimelineEvent();
Map<String, Object> entityInfo = new HashMap<String, Object>();
entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
event.getViewAppACLs());
entity.setOtherInfo(entityInfo);
tEvent.setEventType(
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
entity.addEvent(tEvent);
putEntity(entity);
}
private static TimelineEntity createApplicationEntity(
ApplicationId applicationId) {
TimelineEntity entity = new TimelineEntity();

View File

@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -97,7 +99,7 @@ public class TestAppManager{
return list;
}
public static RMContext mockRMContext(int n, long time) {
public RMContext mockRMContext(int n, long time) {
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
@ -120,8 +122,8 @@ public class TestAppManager{
}
};
((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
((RMContextImpl)context).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
metricsPublisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)context).setSystemMetricsPublisher(metricsPublisher);
return context;
}
@ -200,6 +202,7 @@ public class TestAppManager{
}
private RMContext rmContext;
private SystemMetricsPublisher metricsPublisher;
private TestRMAppManager appMonitor;
private ApplicationSubmissionContext asContext;
private ApplicationId appId;
@ -460,6 +463,8 @@ public class TestAppManager{
Assert.assertNotNull("app is null", app);
Assert.assertEquals("app id doesn't match", appId, app.getApplicationId());
Assert.assertEquals("app state doesn't match", RMAppState.NEW, app.getState());
verify(metricsPublisher).appACLsUpdated(
any(RMApp.class), any(String.class), anyLong());
// wait for event to be processed
int timeoutSecs = 0;

View File

@ -99,14 +99,15 @@ public class TestSystemMetricsPublisher {
RMApp app = createRMApp(appId);
metricsPublisher.appCreated(app, app.getStartTime());
metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
TimelineEntity entity = null;
do {
entity =
store.getEntity(appId.toString(),
ApplicationMetricsConstants.ENTITY_TYPE,
EnumSet.allOf(Field.class));
// ensure two events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 2);
// ensure three events are both published before leaving the loop
} while (entity == null || entity.getEvents().size() < 3);
// verify all the fields
Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
entity.getEntityType());
@ -133,8 +134,12 @@ public class TestSystemMetricsPublisher {
Assert.assertEquals(app.getSubmitTime(),
entity.getOtherInfo().get(
ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
Assert.assertEquals("uers1,user2",
entity.getOtherInfo().get(
ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
boolean hasCreatedEvent = false;
boolean hasFinishedEvent = false;
boolean hasACLsUpdatedEvent = false;
for (TimelineEvent event : entity.getEvents()) {
if (event.getEventType().equals(
ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
@ -154,9 +159,13 @@ public class TestSystemMetricsPublisher {
ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
.getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
} else if (event.getEventType().equals(
ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
hasACLsUpdatedEvent = true;
Assert.assertEquals(4L, event.getTimestamp());
}
}
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);
Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
}
@Test(timeout = 10000)