Merge r1609845 through r1618763 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-6584@1618764 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-08-18 23:41:53 +00:00
commit 4e324bcd3d
54 changed files with 2442 additions and 289 deletions

View File

@ -521,6 +521,15 @@ Release 2.6.0 - UNRELEASED
HADOOP-10231. Add some components in Native Libraries document (Akira
AJISAKA via aw)
HADOOP-10650. Add ability to specify a reverse ACL (black list) of users
and groups. (Benoy Antony via Arpit Agarwal)
HADOOP-10335. An ip whilelist based implementation to resolve Sasl
properties per connection. (Benoy Antony via Arpit Agarwal)
HADOOP-10975. org.apache.hadoop.util.DataChecksum should support calculating
checksums in native code (James Thomas via Colin Patrick McCabe)
OPTIMIZATIONS
HADOOP-10838. Byte array native checksumming. (James Thomas via todd)
@ -599,6 +608,19 @@ Release 2.6.0 - UNRELEASED
HADOOP-10964. Small fix for NetworkTopologyWithNodeGroup#sortByDistance.
(Yi Liu via wang)
HADOOP-10059. RPC authentication and authorization metrics overflow to
negative values on busy clusters (Tsuyoshi OZAWA and Akira AJISAKA
via jlowe)
HADOOP-10973. Native Libraries Guide contains format error. (Peter Klavins
via Arpit Agarwal)
HADOOP-10972. Native Libraries Guide contains mis-spelt build line (Peter
Klavins via aw)
HADOOP-10873. Fix dead link in Configuration javadoc (Akira AJISAKA
via aw)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -110,8 +110,9 @@ import com.google.common.base.Preconditions;
*
* <p>Unless explicitly turned off, Hadoop by default specifies two
* resources, loaded in-order from the classpath: <ol>
* <li><tt><a href="{@docRoot}/../core-default.html">core-default.xml</a>
* </tt>: Read-only defaults for hadoop.</li>
* <li><tt>
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a></tt>: Read-only defaults for hadoop.</li>
* <li><tt>core-site.xml</tt>: Site-specific configuration for a given hadoop
* installation.</li>
* </ol>

View File

@ -134,6 +134,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_ACL =
"security.service.authorization.default.acl";
public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_BLOCKED_ACL =
"security.service.authorization.default.acl.blocked";
public static final String
HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY =
"security.refresh.policy.protocol.acl";
public static final String

View File

@ -88,13 +88,13 @@ public class RpcMetrics {
@Metric("Processsing time") MutableRate rpcProcessingTime;
MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
@Metric("Number of authentication failures")
MutableCounterInt rpcAuthenticationFailures;
MutableCounterLong rpcAuthenticationFailures;
@Metric("Number of authentication successes")
MutableCounterInt rpcAuthenticationSuccesses;
MutableCounterLong rpcAuthenticationSuccesses;
@Metric("Number of authorization failures")
MutableCounterInt rpcAuthorizationFailures;
MutableCounterLong rpcAuthorizationFailures;
@Metric("Number of authorization sucesses")
MutableCounterInt rpcAuthorizationSuccesses;
MutableCounterLong rpcAuthorizationSuccesses;
@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.TreeMap;
import javax.security.sasl.Sasl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SaslPropertiesResolver;
import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
import org.apache.hadoop.util.CombinedIPWhiteList;
import org.apache.hadoop.util.StringUtils;
/**
* An implementation of the SaslPropertiesResolver.
* Uses a white list of IPs.
* If the connection's IP address is in the list of IP addresses, the salProperties
* will be unchanged.
* If the connection's IP is not in the list of IP addresses, then QOP for the
* connection will be restricted to "hadoop.rpc.protection.non-whitelist"
*
* Uses 3 IPList implementations together to form an aggregate whitelist.
* 1. ConstantIPList - to check against a set of hardcoded IPs
* 2. Fixed IP List - to check against a list of IP addresses which are specified externally, but
* will not change over runtime.
* 3. Variable IP List - to check against a list of IP addresses which are specified externally and
* could change during runtime.
* A connection IP address will checked against these 3 IP Lists in the order specified above.
* Once a match is found , the IP address is determined to be in whitelist.
*
* The behavior can be configured using a bunch of configuration parameters.
*
*/
public class WhitelistBasedResolver extends SaslPropertiesResolver {
public static final Log LOG = LogFactory.getLog(WhitelistBasedResolver.class);
private static final String FIXEDWHITELIST_DEFAULT_LOCATION = "/etc/hadoop/fixedwhitelist";
private static final String VARIABLEWHITELIST_DEFAULT_LOCATION = "/etc/hadoop/whitelist";
/**
* Path to the file to containing subnets and ip addresses to form fixed whitelist.
*/
public static final String HADOOP_SECURITY_SASL_FIXEDWHITELIST_FILE =
"hadoop.security.sasl.fixedwhitelist.file";
/**
* Enables/Disables variable whitelist
*/
public static final String HADOOP_SECURITY_SASL_VARIABLEWHITELIST_ENABLE =
"hadoop.security.sasl.variablewhitelist.enable";
/**
* Path to the file to containing subnets and ip addresses to form variable whitelist.
*/
public static final String HADOOP_SECURITY_SASL_VARIABLEWHITELIST_FILE =
"hadoop.security.sasl.variablewhitelist.file";
/**
* time in seconds by which the variable whitelist file is checked for updates
*/
public static final String HADOOP_SECURITY_SASL_VARIABLEWHITELIST_CACHE_SECS =
"hadoop.security.sasl.variablewhitelist.cache.secs";
/**
* comma separated list containing alternate hadoop.rpc.protection values for
* clients which are not in whitelist
*/
public static final String HADOOP_RPC_PROTECTION_NON_WHITELIST =
"hadoop.rpc.protection.non-whitelist";
private CombinedIPWhiteList whiteList;
private Map<String, String> saslProps;
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
String fixedFile = conf.get(HADOOP_SECURITY_SASL_FIXEDWHITELIST_FILE,
FIXEDWHITELIST_DEFAULT_LOCATION);
String variableFile = null;
long expiryTime = 0;
if (conf.getBoolean(HADOOP_SECURITY_SASL_VARIABLEWHITELIST_ENABLE, false)) {
variableFile = conf.get(HADOOP_SECURITY_SASL_VARIABLEWHITELIST_FILE,
VARIABLEWHITELIST_DEFAULT_LOCATION);
expiryTime =
conf.getLong(HADOOP_SECURITY_SASL_VARIABLEWHITELIST_CACHE_SECS,3600) * 1000;
}
whiteList = new CombinedIPWhiteList(fixedFile,variableFile,expiryTime);
this.saslProps = getSaslProperties(conf);
}
/**
* Identify the Sasl Properties to be used for a connection with a client.
* @param clientAddress client's address
* @return the sasl properties to be used for the connection.
*/
@Override
public Map<String, String> getServerProperties(InetAddress clientAddress) {
if (clientAddress == null) {
return saslProps;
}
return whiteList.isIn(clientAddress.getHostAddress())?getDefaultProperties():saslProps;
}
public Map<String, String> getServerProperties(String clientAddress) throws UnknownHostException {
if (clientAddress == null) {
return saslProps;
}
return getServerProperties(InetAddress.getByName(clientAddress));
}
static Map<String, String> getSaslProperties(Configuration conf) {
Map<String, String> saslProps =new TreeMap<String, String>();
String[] qop = conf.getStrings(HADOOP_RPC_PROTECTION_NON_WHITELIST,
QualityOfProtection.PRIVACY.toString());
for (int i=0; i < qop.length; i++) {
qop[i] = QualityOfProtection.valueOf(qop[i].toUpperCase()).getSaslQop();
}
saslProps.put(Sasl.QOP, StringUtils.join(",", qop));
saslProps.put(Sasl.SERVER_AUTH, "true");
return saslProps;
}
}

View File

@ -43,10 +43,14 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public class ServiceAuthorizationManager {
static final String BLOCKED = ".blocked";
private static final String HADOOP_POLICY_FILE = "hadoop-policy.xml";
private volatile Map<Class<?>, AccessControlList> protocolToAcl =
new IdentityHashMap<Class<?>, AccessControlList>();
// For each class, first ACL in the array specifies the allowed entries
// and second ACL specifies blocked entries.
private volatile Map<Class<?>, AccessControlList[]> protocolToAcls =
new IdentityHashMap<Class<?>, AccessControlList[]>();
/**
* Configuration key for controlling service-level authorization for Hadoop.
@ -80,8 +84,8 @@ public class ServiceAuthorizationManager {
Configuration conf,
InetAddress addr
) throws AuthorizationException {
AccessControlList acl = protocolToAcl.get(protocol);
if (acl == null) {
AccessControlList[] acls = protocolToAcls.get(protocol);
if (acls == null) {
throw new AuthorizationException("Protocol " + protocol +
" is not known.");
}
@ -104,7 +108,7 @@ public class ServiceAuthorizationManager {
}
}
if((clientPrincipal != null && !clientPrincipal.equals(user.getUserName())) ||
!acl.isUserAllowed(user)) {
acls.length != 2 || !acls[0].isUserAllowed(user) || acls[1].isUserAllowed(user)) {
AUDITLOG.warn(AUTHZ_FAILED_FOR + user + " for protocol=" + protocol
+ ", expected client Kerberos principal is " + clientPrincipal);
throw new AuthorizationException("User " + user +
@ -129,13 +133,16 @@ public class ServiceAuthorizationManager {
@Private
public void refreshWithLoadedConfiguration(Configuration conf,
PolicyProvider provider) {
final Map<Class<?>, AccessControlList> newAcls =
new IdentityHashMap<Class<?>, AccessControlList>();
final Map<Class<?>, AccessControlList[]> newAcls =
new IdentityHashMap<Class<?>, AccessControlList[]>();
String defaultAcl = conf.get(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_ACL,
AccessControlList.WILDCARD_ACL_VALUE);
String defaultBlockedAcl = conf.get(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_BLOCKED_ACL, "");
// Parse the config file
Service[] services = provider.getServices();
if (services != null) {
@ -145,21 +152,30 @@ public class ServiceAuthorizationManager {
conf.get(service.getServiceKey(),
defaultAcl)
);
newAcls.put(service.getProtocol(), acl);
AccessControlList blockedAcl =
new AccessControlList(
conf.get(service.getServiceKey() + BLOCKED,
defaultBlockedAcl));
newAcls.put(service.getProtocol(), new AccessControlList[] {acl, blockedAcl});
}
}
// Flip to the newly parsed permissions
protocolToAcl = newAcls;
protocolToAcls = newAcls;
}
@VisibleForTesting
public Set<Class<?>> getProtocolsWithAcls() {
return protocolToAcl.keySet();
return protocolToAcls.keySet();
}
@VisibleForTesting
public AccessControlList getProtocolsAcls(Class<?> className) {
return protocolToAcl.get(className);
return protocolToAcls.get(className)[0];
}
@VisibleForTesting
public AccessControlList getProtocolsBlockedAcls(Class<?> className) {
return protocolToAcls.get(className)[1];
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
/**
* CacheableIPList loads a list of subnets from a file.
* The list is cached and the cache can be refreshed by specifying cache timeout.
* A negative value of cache timeout disables any caching.
*
* Thread safe.
*/
public class CacheableIPList implements IPList {
private final long cacheTimeout;
private volatile long cacheExpiryTimeStamp;
private volatile FileBasedIPList ipList;
public CacheableIPList(FileBasedIPList ipList, long cacheTimeout) {
this.cacheTimeout = cacheTimeout;
this.ipList = ipList;
updateCacheExpiryTime();
}
/**
* Reloads the ip list
*/
private void reset() {
ipList = ipList.reload();
updateCacheExpiryTime();
}
private void updateCacheExpiryTime() {
if (cacheTimeout < 0) {
cacheExpiryTimeStamp = -1; // no automatic cache expiry.
}else {
cacheExpiryTimeStamp = System.currentTimeMillis() + cacheTimeout;
}
}
/**
* Refreshes the ip list
*/
public void refresh () {
cacheExpiryTimeStamp = 0;
}
@Override
public boolean isIn(String ipAddress) {
//is cache expired
//Uses Double Checked Locking using volatile
if (cacheExpiryTimeStamp >= 0 && cacheExpiryTimeStamp < System.currentTimeMillis()) {
synchronized(this) {
//check if cache expired again
if (cacheExpiryTimeStamp < System.currentTimeMillis()) {
reset();
}
}
}
return ipList.isIn(ipAddress);
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class CombinedIPWhiteList implements IPList {
public static final Log LOG = LogFactory.getLog(CombinedIPWhiteList.class);
private static final String LOCALHOST_IP = "127.0.0.1";
private final IPList[] networkLists;
public CombinedIPWhiteList(String fixedWhiteListFile,
String variableWhiteListFile, long cacheExpiryInSeconds) {
IPList fixedNetworkList = new FileBasedIPList(fixedWhiteListFile);
if (variableWhiteListFile != null){
IPList variableNetworkList = new CacheableIPList(
new FileBasedIPList(variableWhiteListFile),cacheExpiryInSeconds);
networkLists = new IPList[] {fixedNetworkList, variableNetworkList};
}
else {
networkLists = new IPList[] {fixedNetworkList};
}
}
@Override
public boolean isIn(String ipAddress) {
if (ipAddress == null) {
throw new IllegalArgumentException("ipAddress is null");
}
if (LOCALHOST_IP.equals(ipAddress)) {
return true;
}
for (IPList networkList:networkLists) {
if (networkList.isIn(ipAddress)) {
return true;
}
}
return false;
}
}

View File

@ -391,6 +391,12 @@ public class DataChecksum implements Checksum {
return;
}
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSums(bytesPerChecksum, type.id,
checksums, data);
return;
}
data.mark();
checksums.mark();
try {
@ -412,10 +418,16 @@ public class DataChecksum implements Checksum {
* Implementation of chunked calculation specifically on byte arrays. This
* is to avoid the copy when dealing with ByteBuffers that have array backing.
*/
private void calculateChunkedSums(
public void calculateChunkedSums(
byte[] data, int dataOffset, int dataLength,
byte[] sums, int sumsOffset) {
if (NativeCrc32.isAvailable()) {
NativeCrc32.calculateChunkedSumsByteArray(bytesPerChecksum, type.id,
sums, sumsOffset, data, dataOffset, dataLength);
return;
}
int remaining = dataLength;
while (remaining > 0) {
int n = Math.min(remaining, bytesPerChecksum);

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* FileBasedIPList loads a list of subnets in CIDR format and ip addresses from a file.
*
* Given an ip address, isIn method returns true if ip belongs to one of the subnets.
*
* Thread safe.
*/
public class FileBasedIPList implements IPList {
private static final Log LOG = LogFactory.getLog(FileBasedIPList.class);
private final String fileName;
private final MachineList addressList;
public FileBasedIPList(String fileName) {
this.fileName = fileName;
String[] lines = readLines(fileName);
if (lines != null) {
addressList = new MachineList(new HashSet<String>(Arrays.asList(lines)));
} else {
addressList = null;
}
}
public FileBasedIPList reload() {
return new FileBasedIPList(fileName);
}
@Override
public boolean isIn(String ipAddress) {
if (ipAddress == null || addressList == null) {
return false;
}
return addressList.includes(ipAddress);
}
/**
* reads the lines in a file.
* @param fileName
* @return lines in a String array; null if the file does not exist or if the
* file name is null
* @throws IOException
*/
private static String[] readLines(String fileName) {
try {
if (fileName != null) {
File file = new File (fileName);
if (file.exists()) {
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
List<String> lines = new ArrayList<String>();
String line = null;
while ((line = bufferedReader.readLine()) != null) {
lines.add(line);
}
bufferedReader.close();
LOG.debug("Loaded IP list of size = " + lines.size() +" from file = " + fileName);
return(lines.toArray(new String[lines.size()]));
}
else {
LOG.debug("Missing ip list file : "+ fileName);
}
}
}
catch (Throwable t) {
LOG.error(t);
}
return null;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceStability.Unstable
@InterfaceAudience.Public
public interface IPList {
/**
* returns true if the ipAddress is in the IPList.
* @param ipAddress
* @return boolean value indicating whether the ipAddress is in the IPList
*/
public abstract boolean isIn(String ipAddress);
}

View File

@ -37,7 +37,7 @@ import com.google.common.net.InetAddresses;
/**
* Container class which holds a list of ip/host addresses and
* answers membership queries.
* .
*
* Accepts list of ip addresses, ip addreses in CIDR format and/or
* host addresses.
*/
@ -71,8 +71,15 @@ public class MachineList {
* @param hostEntries comma separated ip/cidr/host addresses
*/
public MachineList(String hostEntries) {
this(StringUtils.getTrimmedStringCollection(hostEntries),
InetAddressFactory.S_INSTANCE);
this(StringUtils.getTrimmedStringCollection(hostEntries));
}
/**
*
* @param hostEntries collection of separated ip/cidr/host addresses
*/
public MachineList(Collection<String> hostEntries) {
this(hostEntries, InetAddressFactory.S_INSTANCE);
}
/**

View File

@ -54,33 +54,50 @@ class NativeCrc32 {
public static void verifyChunkedSums(int bytesPerSum, int checksumType,
ByteBuffer sums, ByteBuffer data, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSums(bytesPerSum, checksumType,
nativeComputeChunkedSums(bytesPerSum, checksumType,
sums, sums.position(),
data, data.position(), data.remaining(),
fileName, basePos);
fileName, basePos, true);
}
public static void verifyChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength, String fileName, long basePos)
throws ChecksumException {
nativeVerifyChunkedSumsByteArray(bytesPerSum, checksumType,
nativeComputeChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
fileName, basePos);
fileName, basePos, true);
}
private static native void nativeVerifyChunkedSums(
public static void calculateChunkedSums(int bytesPerSum, int checksumType,
ByteBuffer sums, ByteBuffer data) {
nativeComputeChunkedSums(bytesPerSum, checksumType,
sums, sums.position(),
data, data.position(), data.remaining(),
"", 0, false);
}
public static void calculateChunkedSumsByteArray(int bytesPerSum,
int checksumType, byte[] sums, int sumsOffset, byte[] data,
int dataOffset, int dataLength) {
nativeComputeChunkedSumsByteArray(bytesPerSum, checksumType,
sums, sumsOffset,
data, dataOffset, dataLength,
"", 0, false);
}
private static native void nativeComputeChunkedSums(
int bytesPerSum, int checksumType,
ByteBuffer sums, int sumsOffset,
ByteBuffer data, int dataOffset, int dataLength,
String fileName, long basePos);
String fileName, long basePos, boolean verify);
private static native void nativeVerifyChunkedSumsByteArray(
private static native void nativeComputeChunkedSumsByteArray(
int bytesPerSum, int checksumType,
byte[] sums, int sumsOffset,
byte[] data, int dataOffset, int dataLength,
String fileName, long basePos);
String fileName, long basePos, boolean verify);
// Copy the constants over from DataChecksum so that javah will pick them up
// and make them available in the native code header.

View File

@ -117,12 +117,12 @@ static int convert_java_crc_type(JNIEnv *env, jint crc_type) {
}
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSums
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeComputeChunkedSums
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jobject j_sums, jint sums_offset,
jobject j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
jstring j_filename, jlong base_pos, jboolean verify)
{
uint8_t *sums_addr;
uint8_t *data_addr;
@ -166,27 +166,27 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
if (crc_type == -1) return; // exception already thrown
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, data_len, sums, crc_type,
bytes_per_checksum, &error_data);
if (likely(ret == CHECKSUMS_VALID)) {
ret = bulk_crc(data, data_len, sums, crc_type,
bytes_per_checksum, verify ? &error_data : NULL);
if (likely(verify && ret == CHECKSUMS_VALID || !verify && ret == 0)) {
return;
} else if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
} else if (unlikely(verify && ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data);
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
} else {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
"Bad response code from native bulk_crc");
}
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunkedSumsByteArray
JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeComputeChunkedSumsByteArray
(JNIEnv *env, jclass clazz,
jint bytes_per_checksum, jint j_crc_type,
jarray j_sums, jint sums_offset,
jarray j_data, jint data_offset, jint data_len,
jstring j_filename, jlong base_pos)
jstring j_filename, jlong base_pos, jboolean verify)
{
uint8_t *sums_addr;
uint8_t *data_addr;
@ -237,21 +237,21 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_util_NativeCrc32_nativeVerifyChunk
data = data_addr + data_offset + checksumNum * bytes_per_checksum;
// Setup complete. Actually verify checksums.
ret = bulk_verify_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
ret = bulk_crc(data, MIN(numChecksumsPerIter * bytes_per_checksum,
data_len - checksumNum * bytes_per_checksum),
sums, crc_type, bytes_per_checksum, &error_data);
sums, crc_type, bytes_per_checksum, verify ? &error_data : NULL);
(*env)->ReleasePrimitiveArrayCritical(env, j_data, data_addr, 0);
(*env)->ReleasePrimitiveArrayCritical(env, j_sums, sums_addr, 0);
if (unlikely(ret == INVALID_CHECKSUM_DETECTED)) {
if (unlikely(verify && ret == INVALID_CHECKSUM_DETECTED)) {
long pos = base_pos + (error_data.bad_data - data) + checksumNum *
bytes_per_checksum;
throw_checksum_exception(
env, error_data.got_crc, error_data.expected_crc,
j_filename, pos);
return;
} else if (unlikely(ret != CHECKSUMS_VALID)) {
} else if (unlikely(verify && ret != CHECKSUMS_VALID || !verify && ret != 0)) {
THROW(env, "java/lang/AssertionError",
"Bad response code from native bulk_verify_crc");
"Bad response code from native bulk_crc");
return;
}
checksumNum += numChecksumsPerIter;

View File

@ -55,40 +55,23 @@ static void pipelined_crc32c(uint32_t *crc1, uint32_t *crc2, uint32_t *crc3, con
static int cached_cpu_supports_crc32; // initialized by constructor below
static uint32_t crc32c_hardware(uint32_t crc, const uint8_t* data, size_t length);
int bulk_calculate_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum) {
uint32_t crc;
crc_update_func_t crc_update_func;
switch (checksum_type) {
case CRC32_ZLIB_POLYNOMIAL:
crc_update_func = crc32_zlib_sb8;
break;
case CRC32C_POLYNOMIAL:
crc_update_func = crc32c_sb8;
break;
default:
return -EINVAL;
break;
static inline int store_or_verify(uint32_t *sums, uint32_t crc,
int is_verify) {
if (!is_verify) {
*sums = crc;
return 1;
} else {
return crc == *sums;
}
while (likely(data_len > 0)) {
int len = likely(data_len >= bytes_per_checksum) ? bytes_per_checksum : data_len;
crc = CRC_INITIAL_VAL;
crc = crc_update_func(crc, data, len);
*sums = ntohl(crc_val(crc));
data += len;
data_len -= len;
sums++;
}
return 0;
}
int bulk_verify_crc(const uint8_t *data, size_t data_len,
const uint32_t *sums, int checksum_type,
int bulk_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum,
crc32_error_t *error_info) {
int is_verify = error_info != NULL;
#ifdef USE_PIPELINED
uint32_t crc1, crc2, crc3;
int n_blocks = data_len / bytes_per_checksum;
@ -112,7 +95,7 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
}
break;
default:
return INVALID_CHECKSUM_TYPE;
return is_verify ? INVALID_CHECKSUM_TYPE : -EINVAL;
}
#ifdef USE_PIPELINED
@ -122,16 +105,15 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, bytes_per_checksum, 3);
crc = ntohl(crc_val(crc1));
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
if ((crc = ntohl(crc_val(crc2))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc2))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
if ((crc = ntohl(crc_val(crc3))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc3))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
@ -143,12 +125,12 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, bytes_per_checksum, n_blocks);
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
data += bytes_per_checksum;
sums++;
if (n_blocks == 2) {
if ((crc = ntohl(crc_val(crc2))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc2))), is_verify)))
goto return_crc_error;
sums++;
data += bytes_per_checksum;
@ -160,10 +142,10 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc1 = crc2 = crc3 = CRC_INITIAL_VAL;
pipelined_crc32c(&crc1, &crc2, &crc3, data, remainder, 1);
if ((crc = ntohl(crc_val(crc1))) != *sums)
if (unlikely(!store_or_verify(sums, (crc = ntohl(crc_val(crc1))), is_verify)))
goto return_crc_error;
}
return CHECKSUMS_VALID;
return is_verify ? CHECKSUMS_VALID : 0;
}
#endif
@ -172,14 +154,14 @@ int bulk_verify_crc(const uint8_t *data, size_t data_len,
crc = CRC_INITIAL_VAL;
crc = crc_update_func(crc, data, len);
crc = ntohl(crc_val(crc));
if (unlikely(crc != *sums)) {
if (unlikely(!store_or_verify(sums, crc, is_verify))) {
goto return_crc_error;
}
data += len;
data_len -= len;
sums++;
}
return CHECKSUMS_VALID;
return is_verify ? CHECKSUMS_VALID : 0;
return_crc_error:
if (error_info != NULL) {

View File

@ -42,49 +42,32 @@ typedef struct crc32_error {
/**
* Verify a buffer of data which is checksummed in chunks
* of bytes_per_checksum bytes. The checksums are each 32 bits
* and are stored in sequential indexes of the 'sums' array.
* Either calculates checksums for or verifies a buffer of data.
* Checksums performed in chunks of bytes_per_checksum bytes. The checksums
* are each 32 bits and are stored in sequential indexes of the 'sums' array.
* Verification is done (sums is assumed to already contain the checksums)
* if error_info is non-null; otherwise calculation is done and checksums
* are stored into sums.
*
* @param data The data to checksum
* @param dataLen Length of the data buffer
* @param sums (out param) buffer to write checksums into.
* It must contain at least dataLen * 4 bytes.
* @param sums (out param) buffer to write checksums into or
* where checksums are already stored.
* It must contain at least
* ((dataLen - 1) / bytes_per_checksum + 1) * 4 bytes.
* @param checksum_type One of the CRC32 algorithm constants defined
* above
* @param bytes_per_checksum How many bytes of data to process per checksum.
* @param error_info If non-NULL, will be filled in if an error
* is detected
* @param error_info If non-NULL, verification will be performed and
* it will be filled in if an error
* is detected. Otherwise calculation is performed.
*
* @return 0 for success, non-zero for an error, result codes
* for which are defined above
* for verification are defined above
*/
extern int bulk_verify_crc(const uint8_t *data, size_t data_len,
const uint32_t *sums, int checksum_type,
extern int bulk_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum,
crc32_error_t *error_info);
/**
* Calculate checksums for some data.
*
* The checksums are each 32 bits and are stored in sequential indexes of the
* 'sums' array.
*
* This function is not (yet) optimized. It is provided for testing purposes
* only.
*
* @param data The data to checksum
* @param dataLen Length of the data buffer
* @param sums (out param) buffer to write checksums into.
* It must contain at least dataLen * 4 bytes.
* @param checksum_type One of the CRC32 algorithm constants defined
* above
* @param bytesPerChecksum How many bytes of data to process per checksum.
*
* @return 0 for success, non-zero for an error
*/
int bulk_calculate_crc(const uint8_t *data, size_t data_len,
uint32_t *sums, int checksum_type,
int bytes_per_checksum);
#endif

View File

@ -48,9 +48,9 @@ static int testBulkVerifyCrc(int dataLen, int crcType, int bytesPerChecksum)
sums = calloc(sizeof(uint32_t),
(dataLen + bytesPerChecksum - 1) / bytesPerChecksum);
EXPECT_ZERO(bulk_calculate_crc(data, dataLen, sums, crcType,
bytesPerChecksum));
EXPECT_ZERO(bulk_verify_crc(data, dataLen, sums, crcType,
EXPECT_ZERO(bulk_crc(data, dataLen, sums, crcType,
bytesPerChecksum, NULL));
EXPECT_ZERO(bulk_crc(data, dataLen, sums, crcType,
bytesPerChecksum, &errorData));
free(data);
free(sums);

View File

@ -56,10 +56,12 @@ Native Libraries Guide
[[4]] Install the compression codec development packages (>zlib-1.2,
>gzip-1.2):
+ If you download the library, install one or more development
* If you download the library, install one or more development
packages - whichever compression codecs you want to use with
your deployment.
+ If you build the library, it is mandatory to install both
* If you build the library, it is mandatory to install both
development packages.
[[5]] Check the runtime log files.
@ -127,7 +129,7 @@ Native Libraries Guide
library:
----
$ mvn package -Pdist,native -Dskiptests -Dtar
$ mvn package -Pdist,native -DskipTests -Dtar
----
You should see the newly-built library in:

View File

@ -110,6 +110,27 @@ security.ha.service.protocol.acl | ACL for HAService protocol used by HAAdm
<<<security.service.authorization.default.acl>>> is applied. If
<<<security.service.authorization.default.acl>>> is not defined, <<<*>>> is applied.
** Blocked Access Control Lists
In some cases, it is required to specify blocked access control list for a service. This specifies
the list of users and groups who are not authorized to access the service. The format of
the blocked access control list is same as that of access control list. The blocked access
control list can be specified via <<<${HADOOP_CONF_DIR}/hadoop-policy.xml>>>. The property name
is derived by suffixing with ".blocked".
Example: The property name of blocked access control list for <<<security.client.protocol.acl>>
will be <<<security.client.protocol.acl.blocked>>>
For a service, it is possible to specify both an access control list and a blocked control
list. A user is authorized to access the service if the user is in the access control and not in
the blocked access control list.
If blocked access control list is not defined for a service, the value of
<<<security.service.authorization.default.acl.blocked>>> is applied. If
<<<security.service.authorization.default.acl.blocked>>> is not defined,
empty blocked access control list is applied.
** Refreshing Service Level Authorization Configuration
The service-level authorization configuration for the NameNode and

View File

@ -583,14 +583,14 @@ public class TestRPC {
}
MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
if (expectFailure) {
assertCounter("RpcAuthorizationFailures", 1, rb);
assertCounter("RpcAuthorizationFailures", 1L, rb);
} else {
assertCounter("RpcAuthorizationSuccesses", 1, rb);
assertCounter("RpcAuthorizationSuccesses", 1L, rb);
}
//since we don't have authentication turned ON, we should see
// 0 for the authentication successes and 0 for failure
assertCounter("RpcAuthenticationFailures", 0, rb);
assertCounter("RpcAuthenticationSuccesses", 0, rb);
assertCounter("RpcAuthenticationFailures", 0L, rb);
assertCounter("RpcAuthenticationSuccesses", 0L, rb);
}
}

View File

@ -0,0 +1,163 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.WhitelistBasedResolver;
import org.apache.hadoop.util.TestFileBasedIPList;
public class TestWhitelistBasedResolver extends TestCase {
public static final Map<String, String> SASL_PRIVACY_PROPS =
WhitelistBasedResolver.getSaslProperties(new Configuration());
public void testFixedVariableAndLocalWhiteList() throws IOException {
String[] fixedIps = {"10.119.103.112", "10.221.102.0/23"};
TestFileBasedIPList.createFileWithEntries ("fixedwhitelist.txt", fixedIps);
String[] variableIps = {"10.222.0.0/16", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("variablewhitelist.txt", variableIps);
Configuration conf = new Configuration();
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_FIXEDWHITELIST_FILE ,
"fixedwhitelist.txt");
conf.setBoolean(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_ENABLE,
true);
conf.setLong(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_CACHE_SECS,
1);
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_FILE ,
"variablewhitelist.txt");
WhitelistBasedResolver wqr = new WhitelistBasedResolver ();
wqr.setConf(conf);
assertEquals (wqr.getDefaultProperties(),
wqr.getServerProperties(InetAddress.getByName("10.119.103.112")));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.119.103.113"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("10.221.103.121"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.221.104.0"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("10.222.103.121"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.223.104.0"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("10.113.221.221"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.113.221.222"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("127.0.0.1"));
TestFileBasedIPList.removeFile("fixedwhitelist.txt");
TestFileBasedIPList.removeFile("variablewhitelist.txt");
}
/**
* Add a bunch of subnets and IPSs to the whitelist
* Check for inclusion in whitelist
* Check for exclusion from whitelist
*/
public void testFixedAndLocalWhiteList() throws IOException {
String[] fixedIps = {"10.119.103.112", "10.221.102.0/23"};
TestFileBasedIPList.createFileWithEntries ("fixedwhitelist.txt", fixedIps);
String[] variableIps = {"10.222.0.0/16", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("variablewhitelist.txt", variableIps);
Configuration conf = new Configuration();
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_FIXEDWHITELIST_FILE ,
"fixedwhitelist.txt");
conf.setBoolean(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_ENABLE,
false);
conf.setLong(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_CACHE_SECS,
100);
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_FILE ,
"variablewhitelist.txt");
WhitelistBasedResolver wqr = new WhitelistBasedResolver();
wqr.setConf(conf);
assertEquals (wqr.getDefaultProperties(),
wqr.getServerProperties(InetAddress.getByName("10.119.103.112")));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.119.103.113"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("10.221.103.121"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.221.104.0"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.222.103.121"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.223.104.0"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.113.221.221"));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties("10.113.221.222"));
assertEquals (wqr.getDefaultProperties(), wqr.getServerProperties("127.0.0.1"));;
TestFileBasedIPList.removeFile("fixedwhitelist.txt");
TestFileBasedIPList.removeFile("variablewhitelist.txt");
}
/**
* Add a bunch of subnets and IPSs to the whitelist
* Check for inclusion in whitelist with a null value
*/
public void testNullIPAddress() throws IOException {
String[] fixedIps = {"10.119.103.112", "10.221.102.0/23"};
TestFileBasedIPList.createFileWithEntries ("fixedwhitelist.txt", fixedIps);
String[] variableIps = {"10.222.0.0/16", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("variablewhitelist.txt", variableIps);
Configuration conf = new Configuration();
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_FIXEDWHITELIST_FILE ,
"fixedwhitelist.txt");
conf.setBoolean(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_ENABLE,
true);
conf.setLong(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_CACHE_SECS,
100);
conf.set(WhitelistBasedResolver.HADOOP_SECURITY_SASL_VARIABLEWHITELIST_FILE ,
"variablewhitelist.txt");
WhitelistBasedResolver wqr = new WhitelistBasedResolver();
wqr.setConf(conf);
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties((InetAddress)null));
assertEquals (SASL_PRIVACY_PROPS, wqr.getServerProperties((String)null));
TestFileBasedIPList.removeFile("fixedwhitelist.txt");
TestFileBasedIPList.removeFile("variablewhitelist.txt");
}
}

View File

@ -18,16 +18,22 @@
package org.apache.hadoop.security.authorize;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.TestRPC.TestProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
public class TestServiceAuthorization {
private static final String ACL_CONFIG = "test.protocol.acl";
private static final String ACL_CONFIG1 = "test.protocol1.acl";
private static final String ADDRESS = "0.0.0.0";
public interface TestProtocol1 extends TestProtocol {};
@ -64,4 +70,115 @@ public class TestServiceAuthorization {
acl = serviceAuthorizationManager.getProtocolsAcls(TestProtocol1.class);
assertEquals("user2 group2", acl.getAclString());
}
@Test
public void testBlockedAcl() throws UnknownHostException {
UserGroupInformation drwho =
UserGroupInformation.createUserForTesting("drwho@EXAMPLE.COM",
new String[] { "group1", "group2" });
ServiceAuthorizationManager serviceAuthorizationManager =
new ServiceAuthorizationManager();
Configuration conf = new Configuration ();
//test without setting a blocked acl
conf.set(ACL_CONFIG, "user1 group1");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
//now set a blocked acl with another user and another group
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "drwho2 group3");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
//now set a blocked acl with the user and another group
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "drwho group3");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
fail();
} catch (AuthorizationException e) {
}
//now set a blocked acl with another user and another group
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "drwho2 group3");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
//now set a blocked acl with another user and group that the user belongs to
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "drwho2 group2");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
fail();
} catch (AuthorizationException e) {
//expects Exception
}
//reset blocked acl so that there is no blocked ACL
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
}
@Test
public void testDefaultBlockedAcl() throws UnknownHostException {
UserGroupInformation drwho =
UserGroupInformation.createUserForTesting("drwho@EXAMPLE.COM",
new String[] { "group1", "group2" });
ServiceAuthorizationManager serviceAuthorizationManager =
new ServiceAuthorizationManager();
Configuration conf = new Configuration ();
//test without setting a default blocked acl
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol1.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
//set a restrictive default blocked acl and an non-restricting blocked acl for TestProtocol
conf.set(
CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_DEFAULT_BLOCKED_ACL,
"user2 group2");
conf.set(ACL_CONFIG + ServiceAuthorizationManager.BLOCKED, "user2");
serviceAuthorizationManager.refresh(conf, new TestPolicyProvider());
//drwho is authorized to access TestProtocol
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol.class, conf,
InetAddress.getByName(ADDRESS));
} catch (AuthorizationException e) {
fail();
}
//drwho is not authorized to access TestProtocol1 because it uses the default blocked acl.
try {
serviceAuthorizationManager.authorize(drwho, TestProtocol1.class, conf,
InetAddress.getByName(ADDRESS));
fail();
} catch (AuthorizationException e) {
//expects Exception
}
}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.IOException;
import org.apache.hadoop.util.CacheableIPList;
import org.apache.hadoop.util.FileBasedIPList;
import junit.framework.TestCase;
public class TestCacheableIPList extends TestCase {
/**
* Add a bunch of subnets and IPSs to the file
* setup a low cache refresh
* test for inclusion
* Check for exclusion
* Add a bunch of subnets and Ips
* wait for cache timeout.
* test for inclusion
* Check for exclusion
*/
public void testAddWithSleepForCacheTimeout() throws IOException, InterruptedException {
String[] ips = {"10.119.103.112", "10.221.102.0/23", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips);
CacheableIPList cipl = new CacheableIPList(
new FileBasedIPList("ips.txt"),100);
assertFalse("10.113.221.222 is in the list",
cipl.isIn("10.113.221.222"));
assertFalse ("10.222.103.121 is in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
String[]ips2 = {"10.119.103.112", "10.221.102.0/23",
"10.222.0.0/16", "10.113.221.221", "10.113.221.222"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips2);
Thread.sleep(101);
assertTrue("10.113.221.222 is not in the list",
cipl.isIn("10.113.221.222"));
assertTrue ("10.222.103.121 is not in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
}
/**
* Add a bunch of subnets and IPSs to the file
* setup a low cache refresh
* test for inclusion
* Check for exclusion
* Remove a bunch of subnets and Ips
* wait for cache timeout.
* test for inclusion
* Check for exclusion
*/
public void testRemovalWithSleepForCacheTimeout() throws IOException, InterruptedException {
String[] ips = {"10.119.103.112", "10.221.102.0/23",
"10.222.0.0/16", "10.113.221.221", "10.113.221.222"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips);
CacheableIPList cipl = new CacheableIPList(
new FileBasedIPList("ips.txt"),100);
assertTrue("10.113.221.222 is not in the list",
cipl.isIn("10.113.221.222"));
assertTrue ("10.222.103.121 is not in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
String[]ips2 = {"10.119.103.112", "10.221.102.0/23", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips2);
Thread.sleep(1005);
assertFalse("10.113.221.222 is in the list",
cipl.isIn("10.113.221.222"));
assertFalse ("10.222.103.121 is in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
}
/**
* Add a bunch of subnets and IPSs to the file
* setup a low cache refresh
* test for inclusion
* Check for exclusion
* Add a bunch of subnets and Ips
* do a refresh
* test for inclusion
* Check for exclusion
*/
public void testAddWithRefresh() throws IOException, InterruptedException {
String[] ips = {"10.119.103.112", "10.221.102.0/23", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips);
CacheableIPList cipl = new CacheableIPList(
new FileBasedIPList("ips.txt"),100);
assertFalse("10.113.221.222 is in the list",
cipl.isIn("10.113.221.222"));
assertFalse ("10.222.103.121 is in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
String[]ips2 = {"10.119.103.112", "10.221.102.0/23",
"10.222.0.0/16", "10.113.221.221", "10.113.221.222"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips2);
cipl.refresh();
assertTrue("10.113.221.222 is not in the list",
cipl.isIn("10.113.221.222"));
assertTrue ("10.222.103.121 is not in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
}
/**
* Add a bunch of subnets and IPSs to the file
* setup a low cache refresh
* test for inclusion
* Check for exclusion
* Remove a bunch of subnets and Ips
* wait for cache timeout.
* test for inclusion
* Check for exclusion
*/
public void testRemovalWithRefresh() throws IOException, InterruptedException {
String[] ips = {"10.119.103.112", "10.221.102.0/23",
"10.222.0.0/16", "10.113.221.221", "10.113.221.222"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips);
CacheableIPList cipl = new CacheableIPList(
new FileBasedIPList("ips.txt"),100);
assertTrue("10.113.221.222 is not in the list",
cipl.isIn("10.113.221.222"));
assertTrue ("10.222.103.121 is not in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
String[]ips2 = {"10.119.103.112", "10.221.102.0/23", "10.113.221.221"};
TestFileBasedIPList.createFileWithEntries ("ips.txt", ips2);
cipl.refresh();
assertFalse("10.113.221.222 is in the list",
cipl.isIn("10.113.221.222"));
assertFalse ("10.222.103.121 is in the list",
cipl.isIn("10.222.103.121"));
TestFileBasedIPList.removeFile("ips.txt");
}
}

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.util;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.hadoop.fs.ChecksumException;
import org.junit.Test;
@ -54,23 +57,27 @@ public class TestDataChecksum {
}
}
private void doBulkTest(DataChecksum checksum, int dataLength,
boolean useDirect) throws Exception {
System.err.println("Testing bulk checksums of length " +
dataLength + " with " +
(useDirect ? "direct" : "array-backed") + " buffers");
int numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
int sumsLength = numSums * checksum.getChecksumSize();
private static class Harness {
final DataChecksum checksum;
final int dataLength, sumsLength, numSums;
ByteBuffer dataBuf, checksumBuf;
Harness(DataChecksum checksum, int dataLength, boolean useDirect) {
this.checksum = checksum;
this.dataLength = dataLength;
numSums = (dataLength - 1)/checksum.getBytesPerChecksum() + 1;
sumsLength = numSums * checksum.getChecksumSize();
byte data[] = new byte[dataLength +
DATA_OFFSET_IN_BUFFER +
DATA_TRAILER_IN_BUFFER];
new Random().nextBytes(data);
ByteBuffer dataBuf = ByteBuffer.wrap(
dataBuf = ByteBuffer.wrap(
data, DATA_OFFSET_IN_BUFFER, dataLength);
byte checksums[] = new byte[SUMS_OFFSET_IN_BUFFER + sumsLength];
ByteBuffer checksumBuf = ByteBuffer.wrap(
checksumBuf = ByteBuffer.wrap(
checksums, SUMS_OFFSET_IN_BUFFER, sumsLength);
// Swap out for direct buffers if requested.
@ -78,7 +85,9 @@ public class TestDataChecksum {
dataBuf = directify(dataBuf);
checksumBuf = directify(checksumBuf);
}
}
void testCorrectness() throws ChecksumException {
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(dataBuf, checksumBuf);
checksum.verifyChunkedSums(dataBuf, checksumBuf, "fake file", 0);
@ -114,6 +123,45 @@ public class TestDataChecksum {
assertTrue(ce.getMessage().contains("fake file"));
}
}
}
private void doBulkTest(DataChecksum checksum, int dataLength,
boolean useDirect) throws Exception {
System.err.println("Testing bulk checksums of length " +
dataLength + " with " +
(useDirect ? "direct" : "array-backed") + " buffers");
new Harness(checksum, dataLength, useDirect).testCorrectness();
}
/**
* Simple performance test for the "common case" checksum usage in HDFS:
* computing and verifying CRC32C with 512 byte chunking on native
* buffers.
*/
@Test
public void commonUsagePerfTest() throws Exception {
final int NUM_RUNS = 5;
final DataChecksum checksum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
final int dataLength = 512 * 1024 * 1024;
Harness h = new Harness(checksum, dataLength, true);
for (int i = 0; i < NUM_RUNS; i++) {
Stopwatch s = new Stopwatch().start();
// calculate real checksum, make sure it passes
checksum.calculateChunkedSums(h.dataBuf, h.checksumBuf);
s.stop();
System.err.println("Calculate run #" + i + ": " +
s.elapsedTime(TimeUnit.MICROSECONDS) + "us");
s = new Stopwatch().start();
// calculate real checksum, make sure it passes
checksum.verifyChunkedSums(h.dataBuf, h.checksumBuf, "fake file", 0);
s.stop();
System.err.println("Verify run #" + i + ": " +
s.elapsedTime(TimeUnit.MICROSECONDS) + "us");
}
}
@Test
public void testEquality() {

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.util.FileBasedIPList;
import org.apache.hadoop.util.IPList;
import org.junit.After;
import org.junit.Test;
import junit.framework.TestCase;
public class TestFileBasedIPList extends TestCase {
@After
public void tearDown() {
removeFile("ips.txt");
}
/**
* Add a bunch of IPS to the file
* Check for inclusion
* Check for exclusion
*/
@Test
public void testSubnetsAndIPs() throws IOException {
String[] ips = {"10.119.103.112", "10.221.102.0/23"};
createFileWithEntries ("ips.txt", ips);
IPList ipList = new FileBasedIPList("ips.txt");
assertTrue ("10.119.103.112 is not in the list",
ipList.isIn("10.119.103.112"));
assertFalse ("10.119.103.113 is in the list",
ipList.isIn("10.119.103.113"));
assertTrue ("10.221.102.0 is not in the list",
ipList.isIn("10.221.102.0"));
assertTrue ("10.221.102.1 is not in the list",
ipList.isIn("10.221.102.1"));
assertTrue ("10.221.103.1 is not in the list",
ipList.isIn("10.221.103.1"));
assertTrue ("10.221.103.255 is not in the list",
ipList.isIn("10.221.103.255"));
assertFalse("10.221.104.0 is in the list",
ipList.isIn("10.221.104.0"));
assertFalse("10.221.104.1 is in the list",
ipList.isIn("10.221.104.1"));
}
/**
* Add a bunch of IPS to the file
* Check for inclusion
* Check for exclusion
*/
@Test
public void testNullIP() throws IOException {
String[] ips = {"10.119.103.112", "10.221.102.0/23"};
createFileWithEntries ("ips.txt", ips);
IPList ipList = new FileBasedIPList("ips.txt");
assertFalse ("Null Ip is in the list",
ipList.isIn(null));
}
/**
* Add a bunch of subnets and IPSs to the file
* Check for inclusion
* Check for exclusion
*/
@Test
public void testWithMultipleSubnetAndIPs() throws IOException {
String[] ips = {"10.119.103.112", "10.221.102.0/23", "10.222.0.0/16",
"10.113.221.221"};
createFileWithEntries ("ips.txt", ips);
IPList ipList = new FileBasedIPList("ips.txt");
assertTrue ("10.119.103.112 is not in the list",
ipList.isIn("10.119.103.112"));
assertFalse ("10.119.103.113 is in the list",
ipList.isIn("10.119.103.113"));
assertTrue ("10.221.103.121 is not in the list",
ipList.isIn("10.221.103.121"));
assertFalse("10.221.104.0 is in the list",
ipList.isIn("10.221.104.0"));
assertTrue ("10.222.103.121 is not in the list",
ipList.isIn("10.222.103.121"));
assertFalse("10.223.104.0 is in the list",
ipList.isIn("10.223.104.0"));
assertTrue ("10.113.221.221 is not in the list",
ipList.isIn("10.113.221.221"));
assertFalse("10.113.221.222 is in the list",
ipList.isIn("10.113.221.222"));
}
/**
* Do not specify the file
* test for inclusion
* should be true as if the feature is turned off
*/
public void testFileNotSpecified() {
IPList ipl = new FileBasedIPList(null);
assertFalse("110.113.221.222 is in the list",
ipl.isIn("110.113.221.222"));
}
/**
* Specify a non existent file
* test for inclusion
* should be true as if the feature is turned off
*/
public void testFileMissing() {
IPList ipl = new FileBasedIPList("missingips.txt");
assertFalse("110.113.221.222 is in the list",
ipl.isIn("110.113.221.222"));
}
/**
* Specify an existing file, but empty
* test for inclusion
* should be true as if the feature is turned off
*/
public void testWithEmptyList() throws IOException {
String[] ips = {};
createFileWithEntries ("ips.txt", ips);
IPList ipl = new FileBasedIPList("ips.txt");
assertFalse("110.113.221.222 is in the list",
ipl.isIn("110.113.221.222"));
}
/**
* Specify an existing file, but ips in wrong format
* test for inclusion
* should be true as if the feature is turned off
*/
public void testForBadFIle() throws IOException {
String[] ips = { "10.221.102/23"};
createFileWithEntries ("ips.txt", ips);
try {
new FileBasedIPList("ips.txt");
fail();
} catch (Exception e) {
//expects Exception
}
}
/**
* Add a bunch of subnets and IPSs to the file. Keep one entry wrong.
* The good entries will still be used.
* Check for inclusion with good entries
* Check for exclusion
*/
public void testWithAWrongEntry() throws IOException {
String[] ips = {"10.119.103.112", "10.221.102/23", "10.221.204.1/23"};
createFileWithEntries ("ips.txt", ips);
try {
new FileBasedIPList("ips.txt");
fail();
} catch (Exception e) {
//expects Exception
}
}
public static void createFileWithEntries(String fileName, String[] ips)
throws IOException {
FileUtils.writeLines(new File(fileName), Arrays.asList(ips));
}
public static void removeFile(String fileName) {
File file = new File(fileName);
if (file.exists()) {
new File(fileName).delete();
}
}
}

View File

@ -425,6 +425,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6850. Move NFS out of order write unit tests into TestWrites class.
(Zhe Zhang via atm)
HDFS-6188. An ip whitelist based implementation of TrustedChannelResolver.
(Benoy Antony via Arpit Agarwal)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
@ -534,6 +537,12 @@ Release 2.6.0 - UNRELEASED
HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu and Colin Patrick McCabe via umamahesh)
HDFS-6825. Edit log corruption due to delayed block removal.
(Yongjun Zhang via wang)
HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
(brandonli)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.protocol.datatransfer;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.util.CombinedIPWhiteList;
public class WhitelistBasedTrustedChannelResolver extends TrustedChannelResolver {
private CombinedIPWhiteList whiteListForServer;
private CombinedIPWhiteList whitelistForClient;
private static final String FIXEDWHITELIST_DEFAULT_LOCATION = "/etc/hadoop/fixedwhitelist";
private static final String VARIABLEWHITELIST_DEFAULT_LOCATION = "/etc/hadoop/whitelist";
/**
* Path to the file to containing subnets and ip addresses to form fixed whitelist.
*/
public static final String DFS_DATATRANSFER_SERVER_FIXEDWHITELIST_FILE =
"dfs.datatransfer.server.fixedwhitelist.file";
/**
* Enables/Disables variable whitelist
*/
public static final String DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_ENABLE =
"dfs.datatransfer.server.variablewhitelist.enable";
/**
* Path to the file to containing subnets and ip addresses to form variable whitelist.
*/
public static final String DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_FILE =
"dfs.datatransfer.server.variablewhitelist.file";
/**
* time in seconds by which the variable whitelist file is checked for updates
*/
public static final String DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_CACHE_SECS =
"dfs.datatransfer.server.variablewhitelist.cache.secs";
/**
* Path to the file to containing subnets and ip addresses to form fixed whitelist.
*/
public static final String DFS_DATATRANSFER_CLIENT_FIXEDWHITELIST_FILE =
"dfs.datatransfer.client.fixedwhitelist.file";
/**
* Enables/Disables variable whitelist
*/
public static final String DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_ENABLE =
"dfs.datatransfer.client.variablewhitelist.enable";
/**
* Path to the file to containing subnets and ip addresses to form variable whitelist.
*/
public static final String DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_FILE =
"dfs.datatransfer.client.variablewhitelist.file";
/**
* time in seconds by which the variable whitelist file is checked for updates
*/
public static final String DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_CACHE_SECS =
"dfs.datatransfer.client.variablewhitelist.cache.secs";
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
String fixedFile = conf.get(DFS_DATATRANSFER_SERVER_FIXEDWHITELIST_FILE,
FIXEDWHITELIST_DEFAULT_LOCATION);
String variableFile = null;
long expiryTime = 0;
if (conf.getBoolean(DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_ENABLE, false)) {
variableFile = conf.get(DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_FILE,
VARIABLEWHITELIST_DEFAULT_LOCATION);
expiryTime =
conf.getLong(DFS_DATATRANSFER_SERVER_VARIABLEWHITELIST_CACHE_SECS,3600) * 1000;
}
whiteListForServer = new CombinedIPWhiteList(fixedFile,variableFile,expiryTime);
fixedFile = conf.get(DFS_DATATRANSFER_CLIENT_FIXEDWHITELIST_FILE, fixedFile);
expiryTime = 0;
if (conf.getBoolean(DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_ENABLE, false)) {
variableFile = conf.get(DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_FILE,variableFile);
expiryTime =
conf.getLong(DFS_DATATRANSFER_CLIENT_VARIABLEWHITELIST_CACHE_SECS,3600) * 1000;
}
whitelistForClient = new CombinedIPWhiteList(fixedFile,variableFile,expiryTime);
}
public boolean isTrusted() {
try {
return whitelistForClient.isIn(InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) {
return false;
}
}
public boolean isTrusted(InetAddress clientAddress) {
return whiteListForServer.isIn(clientAddress.getHostAddress());
}
}

View File

@ -373,6 +373,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
sb.append("{blockUCState=").append(blockUCState)
.append(", primaryNodeIndex=").append(primaryNodeIndex)
.append(", replicas=[");
if (replicas != null) {
Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
if (iter.hasNext()) {
iter.next().appendStringTo(sb);
@ -381,6 +382,7 @@ public class BlockInfoUnderConstruction extends BlockInfo {
iter.next().appendStringTo(sb);
}
}
}
sb.append("]}");
}
}

View File

@ -286,6 +286,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
private void rescan() throws InterruptedException {
scannedDirectives = 0;
scannedBlocks = 0;
try {
namesystem.writeLock();
try {
lock.lock();
@ -294,11 +295,10 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
"shut down.");
}
curScanCount = completedScanCount + 1;
}
finally {
} finally {
lock.unlock();
}
try {
resetStatistics();
rescanCacheDirectives();
rescanCachedBlockMap();

View File

@ -739,6 +739,11 @@ class BlockReceiver implements Closeable {
}
}
public void sendOOB() throws IOException, InterruptedException {
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
.getRestartOOBStatus());
}
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@ -831,8 +836,6 @@ class BlockReceiver implements Closeable {
// Client will fall back to regular pipeline recovery.
}
try {
((PacketResponder) responder.getRunnable()).
sendOOBResponse(PipelineAck.getRestartOOBStatus());
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
@ -840,8 +843,6 @@ class BlockReceiver implements Closeable {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
} catch (IOException ioe) {
LOG.info("Error sending OOB Ack.", ioe);
}
}
responder.interrupt();

View File

@ -270,6 +270,7 @@ public class DataNode extends Configured
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
DataXceiverServer xserver = null;
Daemon localDataXceiverServer = null;
ShortCircuitRegistry shortCircuitRegistry = null;
ThreadGroup threadGroup = null;
@ -649,8 +650,8 @@ public class DataNode extends Configured
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(tcpPeerServer, conf, this));
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@ -1137,6 +1138,11 @@ public class DataNode extends Configured
dataNodeInfoBeanName = MBeans.register("DataNode", "DataNodeInfo", this);
}
@VisibleForTesting
public DataXceiverServer getXferServer() {
return xserver;
}
@VisibleForTesting
public int getXferPort() {
return streamingAddr.getPort();
@ -1395,6 +1401,7 @@ public class DataNode extends Configured
// in order to avoid any further acceptance of requests, but the peers
// for block writes are not closed until the clients are notified.
if (dataXceiverServer != null) {
xserver.sendOOBToPeers();
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
}

View File

@ -103,6 +103,7 @@ class DataXceiver extends Receiver implements Runnable {
private long opStartTime; //the start time of receiving an Op
private final InputStream socketIn;
private OutputStream socketOut;
private BlockReceiver blockReceiver = null;
/**
* Client Name used in previous operation. Not available on first request
@ -159,6 +160,12 @@ class DataXceiver extends Receiver implements Runnable {
return socketOut;
}
public void sendOOB() throws IOException, InterruptedException {
LOG.info("Sending OOB to peer: " + peer);
if(blockReceiver!=null)
blockReceiver.sendOOB();
}
/**
* Read/write data from/to the DataXceiverServer.
*/
@ -168,7 +175,7 @@ class DataXceiver extends Receiver implements Runnable {
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread());
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@ -584,7 +591,6 @@ class DataXceiver extends Receiver implements Runnable {
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
@ -747,6 +753,7 @@ class DataXceiver extends Receiver implements Runnable {
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
blockReceiver = null;
}
//update metrics

View File

@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
import com.google.common.annotations.VisibleForTesting;
/**
* Server used for receiving/sending a block of data.
@ -45,6 +45,7 @@ class DataXceiverServer implements Runnable {
private final PeerServer peerServer;
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
@ -217,18 +218,38 @@ class DataXceiverServer implements Runnable {
}
}
synchronized void addPeer(Peer peer, Thread t) throws IOException {
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
peersXceiver.put(peer, xceiver);
}
synchronized void closePeer(Peer peer) {
peers.remove(peer);
peersXceiver.remove(peer);
IOUtils.cleanup(null, peer);
}
// Sending OOB to all peers
public synchronized void sendOOBToPeers() {
if (!datanode.shutdownForUpgrade) {
return;
}
for (Peer p : peers.keySet()) {
try {
peersXceiver.get(p).sendOOB();
} catch (IOException e) {
LOG.warn("Got error when sending OOB message.", e);
} catch (InterruptedException e) {
LOG.warn("Interrupted when sending OOB message.");
}
}
}
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should
// be set true before calling this method.
@ -247,6 +268,7 @@ class DataXceiverServer implements Runnable {
IOUtils.cleanup(LOG, p);
}
peers.clear();
peersXceiver.clear();
}
// Return the number of peers.
@ -254,7 +276,14 @@ class DataXceiverServer implements Runnable {
return peers.size();
}
// Return the number of peers and DataXceivers.
@VisibleForTesting
synchronized int getNumPeersXceiver() {
return peersXceiver.size();
}
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
peersXceiver.remove(peer);
}
}

View File

@ -4350,7 +4350,30 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException("Block (=" + lastblock + ") not found");
}
}
INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
//
// The implementation of delete operation (see @deleteInternal method)
// first removes the file paths from namespace, and delays the removal
// of blocks to later time for better performance. When
// commitBlockSynchronization (this method) is called in between, the
// blockCollection of storedBlock could have been assigned to null by
// the delete operation, throw IOException here instead of NPE; if the
// file path is already removed from namespace by the delete operation,
// throw FileNotFoundException here, so not to proceed to the end of
// this method to add a CloseOp to the edit log for an already deleted
// file (See HDFS-6825).
//
BlockCollection blockCollection = storedBlock.getBlockCollection();
if (blockCollection == null) {
throw new IOException("The blockCollection of " + storedBlock
+ " is null, likely because the file owning this block was"
+ " deleted and the block removal is delayed");
}
INodeFile iFile = ((INode)blockCollection).asFile();
if (isFileDeleted(iFile)) {
throw new FileNotFoundException("File not found: "
+ iFile.getFullPathName() + ", likely due to delayed block"
+ " removal");
}
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected block (=" + lastblock
@ -6349,9 +6372,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean isFileDeleted(INodeFile file) {
// Not in the inodeMap or in the snapshot but marked deleted.
if (dir.getInode(file.getId()) == null ||
file.getParent() == null || (file.isWithSnapshot() &&
file.getFileWithSnapshotFeature().isCurrentFileDeleted())) {
if (dir.getInode(file.getId()) == null) {
return true;
}
// look at the path hierarchy to see if one parent is deleted by recursive
// deletion
INode tmpChild = file;
INodeDirectory tmpParent = file.getParent();
while (true) {
if (tmpParent == null ||
tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) {
return true;
}
if (tmpParent.isRoot()) {
break;
}
tmpChild = tmpParent;
tmpParent = tmpParent.getParent();
}
if (file.isWithSnapshot() &&
file.getFileWithSnapshotFeature().isCurrentFileDeleted()) {
return true;
}
return false;

View File

@ -176,7 +176,7 @@ public class INodeDirectory extends INodeWithAdditionalFields
return quota;
}
private int searchChildren(byte[] name) {
int searchChildren(byte[] name) {
return children == null? -1: Collections.binarySearch(children, name);
}

View File

@ -44,6 +44,9 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -1300,4 +1303,33 @@ public class DFSTestUtil {
sockDir.close();
}
}
/**
* @return the node which is expected to run the recovery of the
* given block, which is known to be under construction inside the
* given NameNOde.
*/
public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
ExtendedBlock blk) {
BlockManager bm0 = nn.getNamesystem().getBlockManager();
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
assertTrue("Block " + blk + " should be under construction, " +
"got: " + storedBlock,
storedBlock instanceof BlockInfoUnderConstruction);
BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction)storedBlock;
// We expect that the replica with the most recent heart beat will be
// the one to be in charge of the synchronization / recovery protocol.
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
DatanodeStorageInfo expectedPrimary = storages[0];
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
for (int i = 1; i < storages.length; i++) {
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
if (lastUpdate > mostRecentLastUpdate) {
expectedPrimary = storages[i];
mostRecentLastUpdate = lastUpdate;
}
}
return expectedPrimary.getDatanodeDescriptor();
}
}

View File

@ -27,11 +27,14 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -67,6 +70,7 @@ public class TestDataNodeRollingUpgrade {
private void startCluster() throws IOException {
conf = new HdfsConfiguration();
conf.setInt("dfs.blocksize", 1024*1024);
cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@ -243,4 +247,48 @@ public class TestDataNodeRollingUpgrade {
shutdownCluster();
}
}
@Test (timeout=600000)
// Test DatanodeXceiver has correct peer-dataxceiver pairs for sending OOB message
public void testDatanodePeersXceiver() throws Exception {
try {
startCluster();
// Create files in DFS.
String testFile1 = "/TestDataNodeXceiver1.dat";
String testFile2 = "/TestDataNodeXceiver2.dat";
String testFile3 = "/TestDataNodeXceiver3.dat";
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
DFSClient client3 = new DFSClient(NameNode.getAddress(conf), conf);
DFSOutputStream s1 = (DFSOutputStream) client1.create(testFile1, true);
DFSOutputStream s2 = (DFSOutputStream) client2.create(testFile2, true);
DFSOutputStream s3 = (DFSOutputStream) client3.create(testFile3, true);
byte[] toWrite = new byte[1024*1024*8];
Random rb = new Random(1111);
rb.nextBytes(toWrite);
s1.write(toWrite, 0, 1024*1024*8);
s1.flush();
s2.write(toWrite, 0, 1024*1024*8);
s2.flush();
s3.write(toWrite, 0, 1024*1024*8);
s3.flush();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
.getNumPeersXceiver());
s1.close();
s2.close();
s3.close();
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
.getNumPeersXceiver());
client1.close();
client2.close();
client3.close();
} finally {
shutdownCluster();
}
}
}

View File

@ -50,6 +50,17 @@ public class TestCommitBlockSynchronization {
FSNamesystem namesystem = new FSNamesystem(conf, image);
namesystem.setImageLoaded(true);
// set file's parent as root and put the file to inodeMap, so
// FSNamesystem's isFileDeleted() method will return false on this file
if (file.getParent() == null) {
INodeDirectory parent = mock(INodeDirectory.class);
parent.setLocalName(new byte[0]);
parent.addChild(file);
file.setParent(parent);
}
namesystem.dir.getINodeMap().put(file);
FSNamesystem namesystemSpy = spy(namesystem);
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
@ -28,18 +30,29 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockStoragePolicy;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
@ -49,6 +62,7 @@ import org.mockito.internal.util.reflection.Whitebox;
* whole duration.
*/
public class TestDeleteRace {
private static final int BLOCK_SIZE = 4096;
private static final Log LOG = LogFactory.getLog(TestDeleteRace.class);
private static final Configuration conf = new HdfsConfiguration();
private MiniDFSCluster cluster;
@ -201,7 +215,126 @@ public class TestDeleteRace {
cluster.shutdown();
}
}
}
/**
* Test race between delete operation and commitBlockSynchronization method.
* See HDFS-6825.
* @param hasSnapshot
* @throws Exception
*/
private void testDeleteAndCommitBlockSynchronizationRace(boolean hasSnapshot)
throws Exception {
LOG.info("Start testing, hasSnapshot: " + hasSnapshot);
final String testPaths[] = {
"/test-file",
"/testdir/testdir1/test-file"
};
final Path rootPath = new Path("/");
final Configuration conf = new Configuration();
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
Map<DataNode, DatanodeProtocolClientSideTranslatorPB> dnMap =
new HashMap<DataNode, DatanodeProtocolClientSideTranslatorPB>();
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
int stId = 0;
for (String testPath : testPaths) {
LOG.info("test on " + testPath + " snapshot: " + hasSnapshot);
Path fPath = new Path(testPath);
//find grandest non-root parent
Path grandestNonRootParent = fPath;
while (!grandestNonRootParent.getParent().equals(rootPath)) {
grandestNonRootParent = grandestNonRootParent.getParent();
}
stm = fs.create(fPath);
LOG.info("test on " + testPath + " created " + fPath);
// write a half block
AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
stm.hflush();
if (hasSnapshot) {
SnapshotTestHelper.createSnapshot(fs, rootPath,
"st" + String.valueOf(stId));
++stId;
}
// Look into the block manager on the active node for the block
// under construction.
NameNode nn = cluster.getNameNode();
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, fPath);
DatanodeDescriptor expectedPrimary =
DFSTestUtil.getExpectedPrimaryNode(nn, blk);
LOG.info("Expecting block recovery to be triggered on DN " +
expectedPrimary);
// Find the corresponding DN daemon, and spy on its connection to the
// active.
DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
DatanodeProtocolClientSideTranslatorPB nnSpy = dnMap.get(primaryDN);
if (nnSpy == null) {
nnSpy = DataNodeTestUtils.spyOnBposToNN(primaryDN, nn);
dnMap.put(primaryDN, nnSpy);
}
// Delay the commitBlockSynchronization call
DelayAnswer delayer = new DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
Mockito.eq(blk),
Mockito.anyInt(), // new genstamp
Mockito.anyLong(), // new length
Mockito.eq(true), // close file
Mockito.eq(false), // delete block
(DatanodeID[]) Mockito.anyObject(), // new targets
(String[]) Mockito.anyObject()); // new target storages
fs.recoverLease(fPath);
LOG.info("Waiting for commitBlockSynchronization call from primary");
delayer.waitForCall();
LOG.info("Deleting recursively " + grandestNonRootParent);
fs.delete(grandestNonRootParent, true);
delayer.proceed();
LOG.info("Now wait for result");
delayer.waitForResult();
Throwable t = delayer.getThrown();
if (t != null) {
LOG.info("Result exception (snapshot: " + hasSnapshot + "): " + t);
}
} // end of loop each fPath
LOG.info("Now check we can restart");
cluster.restartNameNodes();
LOG.info("Restart finished");
} finally {
if (stm != null) {
IOUtils.closeStream(stm);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=600000)
public void testDeleteAndCommitBlockSynchonizationRaceNoSnapshot()
throws Exception {
testDeleteAndCommitBlockSynchronizationRace(false);
}
@Test(timeout=600000)
public void testDeleteAndCommitBlockSynchronizationRaceHasSnapshot()
throws Exception {
testDeleteAndCommitBlockSynchronizationRace(true);
}
}

View File

@ -356,7 +356,8 @@ public class TestPipelinesFailover {
NameNode nn0 = cluster.getNameNode(0);
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
DatanodeDescriptor expectedPrimary =
DFSTestUtil.getExpectedPrimaryNode(nn0, blk);
LOG.info("Expecting block recovery to be triggered on DN " +
expectedPrimary);
@ -506,37 +507,6 @@ public class TestPipelinesFailover {
}
}
/**
* @return the node which is expected to run the recovery of the
* given block, which is known to be under construction inside the
* given NameNOde.
*/
private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
ExtendedBlock blk) {
BlockManager bm0 = nn.getNamesystem().getBlockManager();
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
assertTrue("Block " + blk + " should be under construction, " +
"got: " + storedBlock,
storedBlock instanceof BlockInfoUnderConstruction);
BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction)storedBlock;
// We expect that the replica with the most recent heart beat will be
// the one to be in charge of the synchronization / recovery protocol.
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
DatanodeStorageInfo expectedPrimary = storages[0];
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
for (int i = 1; i < storages.length; i++) {
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
if (lastUpdate > mostRecentLastUpdate) {
expectedPrimary = storages[i];
mostRecentLastUpdate = lastUpdate;
}
}
return expectedPrimary.getDatanodeDescriptor();
}
private DistributedFileSystem createFsAsOtherUser(
final MiniDFSCluster cluster, final Configuration conf)
throws IOException, InterruptedException {

View File

@ -227,6 +227,15 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
MAPREDUCE-6024. Shortened the time when Fetcher is stuck in retrying before
concluding the failure by configuration. (Yunjiong Zhao via zjshen)
MAPREDUCE-6036. TestJobEndNotifier fails intermittently in branch-2 (chang
li via jlowe)
MAPREDUCE-6012. DBInputSplit creates invalid ranges on Oracle.
(Wei Yan via kasha)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -308,6 +317,9 @@ Release 2.5.0 - UNRELEASED
BUG FIXES
MAPREDUCE-6033. Updated access check for displaying job information
(Yu Gao via Eric Yang)
MAPREDUCE-5759. Remove unnecessary conf load in Limits (Sandy Ryza)
MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing.

View File

@ -148,10 +148,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
private static final Log LOG = LogFactory.getLog(JobImpl.class);
//The maximum fraction of fetch failures allowed for a map
private static final double MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5;
private float maxAllowedFetchFailuresFraction;
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
//Maximum no. of fetch-failure notifications after which map task is failed
private int maxFetchFailuresNotifications;
public static final String JOB_KILLED_DIAG =
"Job received Kill while in RUNNING state.";
@ -704,6 +704,13 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if(forcedDiagnostic != null) {
this.diagnostics.add(forcedDiagnostic);
}
this.maxAllowedFetchFailuresFraction = conf.getFloat(
MRJobConfig.MAX_ALLOWED_FETCH_FAILURES_FRACTION,
MRJobConfig.DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION);
this.maxFetchFailuresNotifications = conf.getInt(
MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
}
protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
@ -730,7 +737,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
if (jobACL == null) {
return true;
}
return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
return aclsManager.checkAccess(callerUGI, jobOperation, userName, jobACL);
}
@Override
@ -1900,9 +1907,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
float failureRate = shufflingReduceTasks == 0 ? 1.0f :
(float) fetchFailures / shufflingReduceTasks;
// declare faulty if fetch-failures >= max-allowed-failures
boolean isMapFaulty =
(failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS && isMapFaulty) {
if (fetchFailures >= job.getMaxFetchFailuresNotifications()
&& failureRate >= job.getMaxAllowedFetchFailuresFraction()) {
LOG.info("Too many fetch-failures for output of task attempt: " +
mapId + " ... raising fetch failure to map");
job.eventHandler.handle(new TaskAttemptEvent(mapId,
@ -2185,4 +2191,12 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
jobConf.addResource(fc.open(confPath), confPath.toString());
return jobConf;
}
public float getMaxAllowedFetchFailuresFraction() {
return maxAllowedFetchFailuresFraction;
}
public int getMaxFetchFailuresNotifications() {
return maxFetchFailuresNotifications;
}
}

View File

@ -270,7 +270,8 @@ public class TestJobEndNotifier extends JobEndNotifier {
app.waitForInternalState(job, JobStateInternal.REBOOT);
// Now shutdown. User should see FAILED state.
// Unregistration fails: isLastAMRetry is recalculated, this is
app.shutDownJob();
///reboot will stop service internally, we don't need to shutdown twice
app.waitForServiceToStop(10000);
Assert.assertFalse(app.isLastAMRetry());
// Since it's not last retry, JobEndServlet didn't called
Assert.assertEquals(0, JobEndServlet.calledTimes);

View File

@ -536,7 +536,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, null, true, null, 0, null, null, null, null);
null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -547,7 +547,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, null, true, null, 0, null, null, null, null);
null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -558,7 +558,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, null, true, null, 0, null, null, null, null);
null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -569,7 +569,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, null, true, null, 0, null, null, null, null);
null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@ -580,7 +580,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
null, null, null, true, null, 0, null, null, null, null);
null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}

View File

@ -293,12 +293,20 @@ public interface MRJobConfig {
public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
public static final String MAX_ALLOWED_FETCH_FAILURES_FRACTION = "mapreduce.reduce.shuffle.max-fetch-failures-fraction";
public static final float DEFAULT_MAX_ALLOWED_FETCH_FAILURES_FRACTION = 0.5f;
public static final String MAX_FETCH_FAILURES_NOTIFICATIONS = "mapreduce.reduce.shuffle.max-fetch-failures-notifications";
public static final int DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
public static final String MAX_SHUFFLE_FETCH_HOST_FAILURES = "mapreduce.reduce.shuffle.max-host-failures";
public static final int DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES = 5;
public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";

View File

@ -81,15 +81,14 @@ public class OracleDBRecordReader<T extends DBWritable> extends DBRecordReader<T
try {
DBInputFormat.DBInputSplit split = getSplit();
if (split.getLength() > 0 && split.getStart() > 0){
if (split.getLength() > 0){
String querystring = query.toString();
query = new StringBuilder();
query.append("SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( ");
query.append(querystring);
query.append(" ) a WHERE rownum <= ").append(split.getStart());
query.append(" + ").append(split.getLength());
query.append(" ) WHERE dbif_rno >= ").append(split.getStart());
query.append(" ) a WHERE rownum <= ").append(split.getEnd());
query.append(" ) WHERE dbif_rno > ").append(split.getStart());
}
} catch (IOException ex) {
// ignore, will not throw.

View File

@ -319,6 +319,7 @@ class Fetcher<K,V> extends Thread {
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, false, connectExcpt);
}
@ -343,6 +344,7 @@ class Fetcher<K,V> extends Thread {
if(failedTasks != null && failedTasks.length > 0) {
LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: failedTasks) {
scheduler.copyFailed(left, host, true, false);
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
@ -101,6 +100,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
private final boolean reportReadErrorImmediately;
private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
private int maxHostFailures;
public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
TaskAttemptID reduceId,
@ -132,6 +132,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
this.maxHostFailures = job.getInt(
MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES,
MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_HOST_FAILURES);
}
@Override
@ -214,6 +217,15 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
+ mbpsFormat.format(transferRate) + " MB/s)");
}
public synchronized void hostFailed(String hostname) {
if (hostFailures.containsKey(hostname)) {
IntWritable x = hostFailures.get(hostname);
x.set(x.get() + 1);
} else {
hostFailures.put(hostname, new IntWritable(1));
}
}
public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
boolean readError, boolean connectExcpt) {
host.penalize();
@ -226,12 +238,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
failureCounts.put(mapId, new IntWritable(1));
}
String hostname = host.getHostName();
if (hostFailures.containsKey(hostname)) {
IntWritable x = hostFailures.get(hostname);
x.set(x.get() + 1);
} else {
hostFailures.put(hostname, new IntWritable(1));
}
//report failure if already retried maxHostFailures times
boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
if (failures >= abortFailureLimit) {
try {
throw new IOException(failures + " failures downloading " + mapId);
@ -240,7 +249,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
}
}
checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
checkAndInformJobTracker(failures, mapId, readError, connectExcpt, hostFail);
checkReducerHealth();
@ -270,9 +279,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
// after every 'maxFetchFailuresBeforeReporting' failures
private void checkAndInformJobTracker(
int failures, TaskAttemptID mapId, boolean readError,
boolean connectExcpt) {
boolean connectExcpt, boolean hostFailed) {
if (connectExcpt || (reportReadErrorImmediately && readError)
|| ((failures % maxFetchFailuresBeforeReporting) == 0)) {
|| ((failures % maxFetchFailuresBeforeReporting) == 0) || hostFailed) {
LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
}
@ -507,4 +516,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
referee.join();
}
public int getMaxHostFailures() {
return maxHostFailures;
}
}

View File

@ -110,7 +110,7 @@ public class TestDbClasses {
splitter, NullDBWritable.class, configuration, connect,
dbConfiguration, "condition", fields, "table");
assertEquals(
"SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 1 + 9 ) WHERE dbif_rno >= 1",
"SELECT * FROM (SELECT a.*,ROWNUM dbif_rno FROM ( SELECT f1, f2 FROM table WHERE condition ORDER BY Order ) a WHERE rownum <= 10 ) WHERE dbif_rno > 1",
recorder.getSelectQuery());
}

View File

@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
YARN-2378. Added support for moving applications across queues in
CapacityScheduler. (Subramaniam Venkatraman Krishnan via jianhe)
YARN-2411. Support simple user and group mappings to queues. (Ram Venkatesh
via jianhe)
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

View File

@ -108,4 +108,27 @@
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings</name>
<value></value>
<description>
A list of mappings that will be used to assign jobs to queues
The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]*
Typically this list will be used to map users to queues,
for example, u:%user:%user maps all users to queues with the same name
as the user.
</description>
</property>
<property>
<name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
<value>false</value>
<description>
If a queue mapping is present, will it override the value specified
by the user? This can be used by administrators to place jobs in queues
that are different than the one specified by the user.
The default is false.
</description>
</property>
</configuration>

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -41,6 +39,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -59,10 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -77,6 +73,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -94,6 +92,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@LimitedPrivate("yarn")
@Evolving
@ -199,6 +198,16 @@ public class CapacityScheduler extends
+ ".scheduling-interval-ms";
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private boolean overrideWithQueueMappings = false;
private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
private Groups groups;
@VisibleForTesting
public synchronized String getMappedQueueForTest(String user)
throws IOException {
return getMappedQueue(user);
}
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
}
@ -263,7 +272,6 @@ public class CapacityScheduler extends
this.applications =
new ConcurrentHashMap<ApplicationId,
SchedulerApplication<FiCaSchedulerApp>>();
initializeQueues(this.conf);
scheduleAsynchronously = this.conf.getScheduleAynschronously();
@ -403,6 +411,31 @@ public class CapacityScheduler extends
}
private static final QueueHook noop = new QueueHook();
private void initializeQueueMappings() throws IOException {
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
LOG.info("Initialized queue mappings, override: "
+ overrideWithQueueMappings);
// Get new user/group mappings
List<QueueMapping> newMappings = conf.getQueueMappings();
//check if mappings refer to valid queues
for (QueueMapping mapping : newMappings) {
if (!mapping.queue.equals(CURRENT_USER_MAPPING) &&
!mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
CSQueue queue = queues.get(mapping.queue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mapping.queue);
}
}
}
//apply the new mappings since they are valid
mappings = newMappings;
// initialize groups if mappings are present
if (mappings.size() > 0) {
groups = new Groups(conf);
}
}
@Lock(CapacityScheduler.class)
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
@ -410,7 +443,9 @@ public class CapacityScheduler extends
root =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
queues, queues, noop);
LOG.info("Initialized root queue " + root);
initializeQueueMappings();
}
@Lock(CapacityScheduler.class)
@ -430,6 +465,7 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
}
/**
@ -517,12 +553,73 @@ public class CapacityScheduler extends
}
synchronized CSQueue getQueue(String queueName) {
if (queueName == null) {
return null;
}
return queues.get(queueName);
}
private static final String CURRENT_USER_MAPPING = "%user";
private static final String PRIMARY_GROUP_MAPPING = "%primary_group";
private String getMappedQueue(String user) throws IOException {
for (QueueMapping mapping : mappings) {
if (mapping.type == MappingType.USER) {
if (mapping.source.equals(CURRENT_USER_MAPPING)) {
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
return user;
}
else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) {
return groups.getGroups(user).get(0);
}
else {
return mapping.queue;
}
}
if (user.equals(mapping.source)) {
return mapping.queue;
}
}
if (mapping.type == MappingType.GROUP) {
for (String userGroups : groups.getGroups(user)) {
if (userGroups.equals(mapping.source)) {
return mapping.queue;
}
}
}
}
return null;
}
private synchronized void addApplication(ApplicationId applicationId,
String queueName, String user, boolean isAppRecovering) {
// santiy checks.
if (mappings != null && mappings.size() > 0) {
try {
String mappedQueue = getMappedQueue(user);
if (mappedQueue != null) {
// We have a mapping, should we use it?
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|| overrideWithQueueMappings) {
LOG.info("Application " + applicationId + " user " + user
+ " mapping [" + queueName + "] to [" + mappedQueue
+ "] override " + overrideWithQueueMappings);
queueName = mappedQueue;
RMApp rmApp = rmContext.getRMApps().get(applicationId);
rmApp.setQueue(queueName);
}
}
} catch (IOException ioex) {
String message = "Failed to submit application " + applicationId +
" submitted by user " + user + " reason: " + ioex.getMessage();
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return;
}
}
// sanity checks.
CSQueue queue = getQueue(queueName);
if (queue == null) {
String message = "Application " + applicationId +
@ -902,8 +999,8 @@ public class CapacityScheduler extends
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
appAddedEvent.getQueue(),
appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:

View File

@ -18,8 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -146,6 +145,44 @@ public class CapacitySchedulerConfiguration extends Configuration {
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
@Private
public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
@Private
public static final String ENABLE_QUEUE_MAPPING_OVERRIDE = QUEUE_MAPPING + "-override.enable";
@Private
public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false;
@Private
public static class QueueMapping {
public enum MappingType {
USER("u"),
GROUP("g");
private final String type;
private MappingType(String type) {
this.type = type;
}
public String toString() {
return type;
}
};
MappingType type;
String source;
String queue;
public QueueMapping(MappingType type, String source, String queue) {
this.type = type;
this.source = source;
this.queue = queue;
}
}
public CapacitySchedulerConfiguration() {
this(new Configuration());
}
@ -378,4 +415,82 @@ public class CapacitySchedulerConfiguration extends Configuration {
setBoolean(SCHEDULE_ASYNCHRONOUSLY_ENABLE, async);
}
public boolean getOverrideWithQueueMappings() {
return getBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE,
DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE);
}
/**
* Returns a collection of strings, trimming leading and trailing whitespeace
* on each value
*
* @param str
* String to parse
* @param delim
* delimiter to separate the values
* @return Collection of parsed elements.
*/
private static Collection<String> getTrimmedStringCollection(String str,
String delim) {
List<String> values = new ArrayList<String>();
if (str == null)
return values;
StringTokenizer tokenizer = new StringTokenizer(str, delim);
while (tokenizer.hasMoreTokens()) {
String next = tokenizer.nextToken();
if (next == null || next.trim().isEmpty()) {
continue;
}
values.add(next.trim());
}
return values;
}
/**
* Get user/group mappings to queues.
*
* @return user/groups mappings or null on illegal configs
*/
public List<QueueMapping> getQueueMappings() {
List<QueueMapping> mappings =
new ArrayList<CapacitySchedulerConfiguration.QueueMapping>();
Collection<String> mappingsString =
getTrimmedStringCollection(QUEUE_MAPPING);
for (String mappingValue : mappingsString) {
String[] mapping =
getTrimmedStringCollection(mappingValue, ":")
.toArray(new String[] {});
if (mapping.length != 3 || mapping[1].length() == 0
|| mapping[2].length() == 0) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
QueueMapping m;
try {
QueueMapping.MappingType mappingType;
if (mapping[0].equals("u")) {
mappingType = QueueMapping.MappingType.USER;
} else if (mapping[0].equals("g")) {
mappingType = QueueMapping.MappingType.GROUP;
} else {
throw new IllegalArgumentException(
"unknown mapping prefix " + mapping[0]);
}
m = new QueueMapping(
mappingType,
mapping[1],
mapping[2]);
} catch (Throwable t) {
throw new IllegalArgumentException(
"Illegal queue mapping " + mappingValue);
}
if (m != null) {
mappings.add(m);
}
}
return mappings;
}
}

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestQueueMappings {
private static final Log LOG = LogFactory.getLog(TestQueueMappings.class);
private static final String Q1 = "q1";
private static final String Q2 = "q2";
private final static String Q1_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q1;
private final static String Q2_PATH =
CapacitySchedulerConfiguration.ROOT + "." + Q2;
private MockRM resourceManager;
@After
public void tearDown() throws Exception {
if (resourceManager != null) {
LOG.info("Stopping the resource manager");
resourceManager.stop();
}
}
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { Q1, Q2 });
conf.setCapacity(Q1_PATH, 10);
conf.setCapacity(Q2_PATH, 90);
LOG.info("Setup top-level queues q1 and q2");
}
@Test (timeout = 60000)
public void testQueueMapping() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration(csConf);
CapacityScheduler cs = new CapacityScheduler();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
cs.setConf(conf);
cs.setRMContext(rmContext);
cs.init(conf);
cs.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
// configuration parsing tests - negative test cases
checkInvalidQMapping(conf, cs, "x:a:b", "invalid specifier");
checkInvalidQMapping(conf, cs, "u:a", "no queue specified");
checkInvalidQMapping(conf, cs, "g:a", "no queue specified");
checkInvalidQMapping(conf, cs, "u:a:b,g:a",
"multiple mappings with invalid mapping");
checkInvalidQMapping(conf, cs, "u:a:b,g:a:d:e", "too many path segments");
checkInvalidQMapping(conf, cs, "u::", "empty source and queue");
checkInvalidQMapping(conf, cs, "u:", "missing source missing queue");
checkInvalidQMapping(conf, cs, "u:a:", "empty source missing q");
// simple base case for mapping user to queue
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:a:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// group mapping test
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:agroup:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// %user tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:" + Q2);
cs.reinitialize(conf, null);
checkQMapping("a", Q2, cs);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:%user:%user");
cs.reinitialize(conf, null);
checkQMapping("a", "a", cs);
// %primary_group tests
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:%user:%primary_group");
cs.reinitialize(conf, null);
checkQMapping("a", "agroup", cs);
// non-primary group mapping
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"g:asubgroup1:" + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
// space trimming
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
cs.reinitialize(conf, null);
checkQMapping("a", Q1, cs);
csConf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
conf = new YarnConfiguration(csConf);
resourceManager = new MockRM(csConf);
resourceManager.start();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
conf.set(CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
"true");
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if the user specifies a Q that is still overriden
checkAppQueue(resourceManager, "user", Q2, Q1);
// toggle admin override and retry
conf.setBoolean(
CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE,
false);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "u:user:" + Q1);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
checkAppQueue(resourceManager, "user", Q2, Q2);
// ensure that if a user does not specify a Q, the user mapping is used
checkAppQueue(resourceManager, "user", null, Q1);
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, "g:usergroup:" + Q2);
setupQueueConfiguration(csConf);
resourceManager.getResourceScheduler().reinitialize(conf, null);
// ensure that if a user does not specify a Q, the group mapping is used
checkAppQueue(resourceManager, "user", null, Q2);
// if the mapping specifies a queue that does not exist, the job is rejected
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
"u:user:non_existent_queue");
setupQueueConfiguration(csConf);
boolean fail = false;
try {
resourceManager.getResourceScheduler().reinitialize(conf, null);
}
catch (IOException ioex) {
fail = true;
}
Assert.assertTrue("queue initialization failed for non-existent q", fail);
resourceManager.stop();
}
private void checkAppQueue(MockRM resourceManager, String user,
String submissionQueue, String expected)
throws Exception {
RMApp app = resourceManager.submitApp(200, "name", user,
new HashMap<ApplicationAccessType, String>(), false, submissionQueue, -1,
null, "MAPREDUCE", false);
RMAppState expectedState = expected.isEmpty() ? RMAppState.FAILED
: RMAppState.ACCEPTED;
resourceManager.waitForState(app.getApplicationId(), expectedState);
// get scheduler app
CapacityScheduler cs = (CapacityScheduler)
resourceManager.getResourceScheduler();
SchedulerApplication schedulerApp =
cs.getSchedulerApplications().get(app.getApplicationId());
String queue = "";
if (schedulerApp != null) {
queue = schedulerApp.getQueue().getQueueName();
}
Assert.assertTrue("expected " + expected + " actual " + queue,
expected.equals(queue));
Assert.assertEquals(expected, app.getQueue());
}
private void checkInvalidQMapping(YarnConfiguration conf,
CapacityScheduler cs,
String mapping, String reason)
throws IOException {
boolean fail = false;
try {
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, mapping);
cs.reinitialize(conf, null);
} catch (IOException ex) {
fail = true;
}
Assert.assertTrue("invalid mapping did not throw exception for " + reason,
fail);
}
private void checkQMapping(String user, String expected, CapacityScheduler cs)
throws IOException {
String actual = cs.getMappedQueueForTest(user);
Assert.assertTrue("expected " + expected + " actual " + actual,
expected.equals(actual));
}
}