HDFS-8185. Separate client related routines in HAUtil into a new class. Contributed by Haohui Mai.
This commit is contained in:
parent
613a783380
commit
f02ca4ab15
|
@ -17,11 +17,28 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
|
||||
|
||||
public class DFSUtilClient {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
DFSUtilClient.class);
|
||||
/**
|
||||
* Converts a byte array to a string using UTF8 encoding.
|
||||
*/
|
||||
|
@ -44,6 +61,58 @@ public class DFSUtilClient {
|
|||
return StringUtils.format("%.2f%%", percentage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns collection of nameservice Ids from the configuration.
|
||||
* @param conf configuration
|
||||
* @return collection of nameservice Ids, or null if not specified
|
||||
*/
|
||||
public static Collection<String> getNameServiceIds(Configuration conf) {
|
||||
return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Namenode HighAvailability related configuration.
|
||||
* Returns collection of namenode Ids from the configuration. One logical id
|
||||
* for each namenode in the in the HA setup.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param nsId the nameservice ID to look at, or null for non-federated
|
||||
* @return collection of namenode Ids
|
||||
*/
|
||||
public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
|
||||
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
|
||||
return conf.getTrimmedStringCollection(key);
|
||||
}
|
||||
|
||||
/** Add non empty and non null suffix to a key */
|
||||
static String addSuffix(String key, String suffix) {
|
||||
if (suffix == null || suffix.isEmpty()) {
|
||||
return key;
|
||||
}
|
||||
assert !suffix.startsWith(".") :
|
||||
"suffix '" + suffix + "' should not already have '.' prepended.";
|
||||
return key + "." + suffix;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
|
||||
* the configuration.
|
||||
*
|
||||
* @return list of InetSocketAddresses
|
||||
*/
|
||||
public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses(
|
||||
Configuration conf, String scheme) {
|
||||
if (WebHdfsConstants.WEBHDFS_SCHEME.equals(scheme)) {
|
||||
return getAddresses(conf, null,
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
} else if (WebHdfsConstants.SWEBHDFS_SCHEME.equals(scheme)) {
|
||||
return getAddresses(conf, null,
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode a specific range of bytes of the given byte array to a string
|
||||
* using UTF8.
|
||||
|
@ -62,4 +131,107 @@ public class DFSUtilClient {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
|
||||
* returns a list with a single null value.
|
||||
*/
|
||||
static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
|
||||
if (coll == null || coll.isEmpty()) {
|
||||
return Collections.singletonList(null);
|
||||
} else {
|
||||
return coll;
|
||||
}
|
||||
}
|
||||
|
||||
/** Concatenate list of suffix strings '.' separated */
|
||||
static String concatSuffixes(String... suffixes) {
|
||||
if (suffixes == null) {
|
||||
return null;
|
||||
}
|
||||
return Joiner.on(".").skipNulls().join(suffixes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured address for all NameNodes in the cluster.
|
||||
* @param conf configuration
|
||||
* @param defaultAddress default address to return in case key is not found.
|
||||
* @param keys Set of keys to look for in the order of preference
|
||||
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
|
||||
*/
|
||||
static Map<String, Map<String, InetSocketAddress>>
|
||||
getAddresses(Configuration conf, String defaultAddress, String... keys) {
|
||||
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
||||
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured address for all NameNodes in the cluster.
|
||||
* @param conf configuration
|
||||
* @param defaultAddress default address to return in case key is not found.
|
||||
* @param keys Set of keys to look for in the order of preference
|
||||
*
|
||||
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
|
||||
*/
|
||||
static Map<String, Map<String, InetSocketAddress>>
|
||||
getAddressesForNsIds(
|
||||
Configuration conf, Collection<String> nsIds, String defaultAddress,
|
||||
String... keys) {
|
||||
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
|
||||
// across all of the configured nameservices and namenodes.
|
||||
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
|
||||
for (String nsId : emptyAsSingletonNull(nsIds)) {
|
||||
Map<String, InetSocketAddress> isas =
|
||||
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
|
||||
if (!isas.isEmpty()) {
|
||||
ret.put(nsId, isas);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static Map<String, InetSocketAddress> getAddressesForNameserviceId(
|
||||
Configuration conf, String nsId, String defaultValue, String... keys) {
|
||||
Collection<String> nnIds = getNameNodeIds(conf, nsId);
|
||||
Map<String, InetSocketAddress> ret = Maps.newHashMap();
|
||||
for (String nnId : emptyAsSingletonNull(nnIds)) {
|
||||
String suffix = concatSuffixes(nsId, nnId);
|
||||
String address = getConfValue(defaultValue, suffix, conf, keys);
|
||||
if (address != null) {
|
||||
InetSocketAddress isa = NetUtils.createSocketAddr(address);
|
||||
if (isa.isUnresolved()) {
|
||||
LOG.warn("Namenode for " + nsId +
|
||||
" remains unresolved for ID " + nnId +
|
||||
". Check your hdfs-site.xml file to " +
|
||||
"ensure namenodes are configured properly.");
|
||||
}
|
||||
ret.put(nnId, isa);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a list of keys in the order of preference, returns a value
|
||||
* for the key in the given order from the configuration.
|
||||
* @param defaultValue default value to return, when key was not found
|
||||
* @param keySuffix suffix to add to the key, if it is not null
|
||||
* @param conf Configuration
|
||||
* @param keys list of keys in the order of preference
|
||||
* @return value of the key or default if a key was not found in configuration
|
||||
*/
|
||||
private static String getConfValue(String defaultValue, String keySuffix,
|
||||
Configuration conf, String... keys) {
|
||||
String value = null;
|
||||
for (String key : keys) {
|
||||
key = addSuffix(key, keySuffix);
|
||||
value = conf.get(key);
|
||||
if (value != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (value == null) {
|
||||
value = defaultValue;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstantsClient.HA_DT_SERVICE_PREFIX;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class HAUtilClient {
|
||||
/**
|
||||
* @return true if the given nameNodeUri appears to be a logical URI.
|
||||
*/
|
||||
public static boolean isLogicalUri(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
// A logical name must be one of the service IDs.
|
||||
return DFSUtilClient.getNameServiceIds(conf).contains(host);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the client has a failover proxy provider configured
|
||||
* for the namenode/nameservice.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nameNodeUri The URI of namenode
|
||||
* @return true if failover is configured.
|
||||
*/
|
||||
public static boolean isClientFailoverConfigured(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
|
||||
+ "." + host;
|
||||
return conf.get(configKey) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the service name used in the delegation token for the given logical
|
||||
* HA service.
|
||||
* @param uri the logical URI of the cluster
|
||||
* @param scheme the scheme of the corresponding FileSystem
|
||||
* @return the service name
|
||||
*/
|
||||
public static Text buildTokenServiceForLogicalUri(final URI uri,
|
||||
final String scheme) {
|
||||
return new Text(buildTokenServicePrefixForLogicalUri(scheme)
|
||||
+ uri.getHost());
|
||||
}
|
||||
|
||||
public static String buildTokenServicePrefixForLogicalUri(String scheme) {
|
||||
return HA_DT_SERVICE_PREFIX + scheme + ":";
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the file system URI out of the provided token.
|
||||
*/
|
||||
public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
|
||||
String tokStr = token.getService().toString();
|
||||
final String prefix = buildTokenServicePrefixForLogicalUri(
|
||||
scheme);
|
||||
if (tokStr.startsWith(prefix)) {
|
||||
tokStr = tokStr.replaceFirst(prefix, "");
|
||||
}
|
||||
return URI.create(scheme + "://" + tokStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this token corresponds to a logical nameservice
|
||||
* rather than a specific namenode.
|
||||
*/
|
||||
public static boolean isTokenForLogicalUri(Token<?> token) {
|
||||
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
|
||||
}
|
||||
}
|
|
@ -31,6 +31,12 @@ public interface HdfsClientConfigKeys {
|
|||
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
|
||||
|
||||
static final String PREFIX = "dfs.client.";
|
||||
String DFS_NAMESERVICES = "dfs.nameservices";
|
||||
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
|
||||
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
|
||||
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
|
||||
String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
|
||||
String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
|
||||
|
||||
/** dfs.client.retry configuration properties */
|
||||
interface Retry {
|
||||
|
|
|
@ -32,4 +32,10 @@ public interface HdfsConstantsClient {
|
|||
*/
|
||||
long GRANDFATHER_INODE_ID = 0;
|
||||
byte BLOCK_STORAGE_POLICY_ID_UNSPECIFIED = 0;
|
||||
/**
|
||||
* A prefix put before the namenode URI inside the "service" field
|
||||
* of a delgation token, indicating that the URI is a logical (HA)
|
||||
* URI.
|
||||
*/
|
||||
String HA_DT_SERVICE_PREFIX = "ha-";
|
||||
}
|
||||
|
|
|
@ -23,7 +23,8 @@ import org.apache.hadoop.io.Text;
|
|||
|
||||
@InterfaceAudience.Private
|
||||
public class WebHdfsConstants {
|
||||
/** Delegation token kind */
|
||||
public static final String WEBHDFS_SCHEME = "webhdfs";
|
||||
public static final String SWEBHDFS_SCHEME = "swebhdfs";
|
||||
public static final Text WEBHDFS_TOKEN_KIND = new Text("WEBHDFS delegation");
|
||||
public static final Text SWEBHDFS_TOKEN_KIND = new Text("SWEBHDFS delegation");
|
||||
|
||||
|
|
|
@ -137,6 +137,9 @@ Release 2.8.0 - UNRELEASED
|
|||
HDFS-8133. Improve readability of deleted block check (Daryn Sharp via
|
||||
Colin P. McCabe)
|
||||
|
||||
HDFS-8185. Separate client related routines in HAUtil into a new class.
|
||||
(wheat9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||
|
|
|
@ -808,10 +808,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
private static ClientProtocol getNNProxy(
|
||||
Token<DelegationTokenIdentifier> token, Configuration conf)
|
||||
throws IOException {
|
||||
URI uri = HAUtil.getServiceUriFromToken(HdfsConstants.HDFS_URI_SCHEME,
|
||||
token);
|
||||
if (HAUtil.isTokenForLogicalUri(token) &&
|
||||
!HAUtil.isLogicalUri(conf, uri)) {
|
||||
URI uri = HAUtilClient.getServiceUriFromToken(
|
||||
HdfsConstants.HDFS_URI_SCHEME, token);
|
||||
if (HAUtilClient.isTokenForLogicalUri(token) &&
|
||||
!HAUtilClient.isLogicalUri(conf, uri)) {
|
||||
// If the token is for a logical nameservice, but the configuration
|
||||
// we have disagrees about that, we can't actually renew it.
|
||||
// This can be the case in MR, for example, if the RM doesn't
|
||||
|
|
|
@ -103,8 +103,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
|
||||
|
||||
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
|
||||
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
|
||||
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
|
||||
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT =
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
||||
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY =
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
|
||||
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;
|
||||
public static final String DFS_NAMENODE_HTTP_BIND_HOST_KEY = "dfs.namenode.http-bind-host";
|
||||
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
|
||||
|
@ -304,8 +306,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
//Following keys have no defaults
|
||||
public static final String DFS_DATANODE_DATA_DIR_KEY = "dfs.datanode.data.dir";
|
||||
public static final String DFS_NAMENODE_HTTPS_PORT_KEY = "dfs.https.port";
|
||||
public static final int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
|
||||
public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
|
||||
public static final int DFS_NAMENODE_HTTPS_PORT_DEFAULT =
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
|
||||
public static final String DFS_NAMENODE_HTTPS_ADDRESS_KEY =
|
||||
HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
|
||||
public static final String DFS_NAMENODE_HTTPS_BIND_HOST_KEY = "dfs.namenode.https-bind-host";
|
||||
public static final String DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTPS_PORT_DEFAULT;
|
||||
public static final String DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";
|
||||
|
@ -519,7 +523,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final boolean DFS_QUOTA_BY_STORAGETYPE_ENABLED_DEFAULT = true;
|
||||
|
||||
// HA related configuration
|
||||
public static final String DFS_HA_NAMENODES_KEY_PREFIX = "dfs.ha.namenodes";
|
||||
public static final String DFS_HA_NAMENODES_KEY_PREFIX =
|
||||
HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
public static final String DFS_HA_NAMENODE_ID_KEY = "dfs.ha.namenode.id";
|
||||
public static final String DFS_HA_STANDBY_CHECKPOINTS_KEY = "dfs.ha.standby.checkpoints";
|
||||
public static final boolean DFS_HA_STANDBY_CHECKPOINTS_DEFAULT = true;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT;
|
||||
|
@ -31,7 +30,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYPASSWORD_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PASSWORD_KEY;
|
||||
|
@ -48,7 +46,6 @@ import java.security.SecureRandom;
|
|||
import java.text.SimpleDateFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
|
@ -88,8 +85,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|||
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.http.HttpServer2;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
|
@ -107,7 +102,6 @@ import com.google.common.base.Charsets;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.SignedBytes;
|
||||
import com.google.protobuf.BlockingService;
|
||||
|
||||
|
@ -538,126 +532,12 @@ public class DFSUtil {
|
|||
return blkLocations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns collection of nameservice Ids from the configuration.
|
||||
* @param conf configuration
|
||||
* @return collection of nameservice Ids, or null if not specified
|
||||
*/
|
||||
public static Collection<String> getNameServiceIds(Configuration conf) {
|
||||
return conf.getTrimmedStringCollection(DFS_NAMESERVICES);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return <code>coll</code> if it is non-null and non-empty. Otherwise,
|
||||
* returns a list with a single null value.
|
||||
*/
|
||||
private static Collection<String> emptyAsSingletonNull(Collection<String> coll) {
|
||||
if (coll == null || coll.isEmpty()) {
|
||||
return Collections.singletonList(null);
|
||||
} else {
|
||||
return coll;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Namenode HighAvailability related configuration.
|
||||
* Returns collection of namenode Ids from the configuration. One logical id
|
||||
* for each namenode in the in the HA setup.
|
||||
*
|
||||
* @param conf configuration
|
||||
* @param nsId the nameservice ID to look at, or null for non-federated
|
||||
* @return collection of namenode Ids
|
||||
*/
|
||||
public static Collection<String> getNameNodeIds(Configuration conf, String nsId) {
|
||||
String key = addSuffix(DFS_HA_NAMENODES_KEY_PREFIX, nsId);
|
||||
return conf.getTrimmedStringCollection(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a list of keys in the order of preference, returns a value
|
||||
* for the key in the given order from the configuration.
|
||||
* @param defaultValue default value to return, when key was not found
|
||||
* @param keySuffix suffix to add to the key, if it is not null
|
||||
* @param conf Configuration
|
||||
* @param keys list of keys in the order of preference
|
||||
* @return value of the key or default if a key was not found in configuration
|
||||
*/
|
||||
private static String getConfValue(String defaultValue, String keySuffix,
|
||||
Configuration conf, String... keys) {
|
||||
String value = null;
|
||||
for (String key : keys) {
|
||||
key = addSuffix(key, keySuffix);
|
||||
value = conf.get(key);
|
||||
if (value != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (value == null) {
|
||||
value = defaultValue;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/** Add non empty and non null suffix to a key */
|
||||
private static String addSuffix(String key, String suffix) {
|
||||
if (suffix == null || suffix.isEmpty()) {
|
||||
return key;
|
||||
}
|
||||
assert !suffix.startsWith(".") :
|
||||
"suffix '" + suffix + "' should not already have '.' prepended.";
|
||||
return key + "." + suffix;
|
||||
}
|
||||
|
||||
/** Concatenate list of suffix strings '.' separated */
|
||||
private static String concatSuffixes(String... suffixes) {
|
||||
if (suffixes == null) {
|
||||
return null;
|
||||
}
|
||||
return Joiner.on(".").skipNulls().join(suffixes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return configuration key of format key.suffix1.suffix2...suffixN
|
||||
*/
|
||||
public static String addKeySuffixes(String key, String... suffixes) {
|
||||
String keySuffix = concatSuffixes(suffixes);
|
||||
return addSuffix(key, keySuffix);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured address for all NameNodes in the cluster.
|
||||
* @param conf configuration
|
||||
* @param defaultAddress default address to return in case key is not found.
|
||||
* @param keys Set of keys to look for in the order of preference
|
||||
* @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
|
||||
*/
|
||||
private static Map<String, Map<String, InetSocketAddress>>
|
||||
getAddresses(Configuration conf, String defaultAddress, String... keys) {
|
||||
Collection<String> nameserviceIds = getNameServiceIds(conf);
|
||||
return getAddressesForNsIds(conf, nameserviceIds, defaultAddress, keys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured address for all NameNodes in the cluster.
|
||||
* @param conf configuration
|
||||
* @param nsIds
|
||||
*@param defaultAddress default address to return in case key is not found.
|
||||
* @param keys Set of keys to look for in the order of preference @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
|
||||
*/
|
||||
private static Map<String, Map<String, InetSocketAddress>>
|
||||
getAddressesForNsIds(Configuration conf, Collection<String> nsIds,
|
||||
String defaultAddress, String... keys) {
|
||||
// Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
|
||||
// across all of the configured nameservices and namenodes.
|
||||
Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
|
||||
for (String nsId : emptyAsSingletonNull(nsIds)) {
|
||||
Map<String, InetSocketAddress> isas =
|
||||
getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
|
||||
if (!isas.isEmpty()) {
|
||||
ret.put(nsId, isas);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
String keySuffix = DFSUtilClient.concatSuffixes(suffixes);
|
||||
return DFSUtilClient.addSuffix(key, keySuffix);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -670,40 +550,18 @@ public class DFSUtil {
|
|||
*/
|
||||
public static Map<String, InetSocketAddress> getRpcAddressesForNameserviceId(
|
||||
Configuration conf, String nsId, String defaultValue) {
|
||||
return getAddressesForNameserviceId(conf, nsId, defaultValue,
|
||||
return DFSUtilClient.getAddressesForNameserviceId(conf, nsId, defaultValue,
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
|
||||
Configuration conf, String nsId, String defaultValue,
|
||||
String... keys) {
|
||||
Collection<String> nnIds = getNameNodeIds(conf, nsId);
|
||||
Map<String, InetSocketAddress> ret = Maps.newHashMap();
|
||||
for (String nnId : emptyAsSingletonNull(nnIds)) {
|
||||
String suffix = concatSuffixes(nsId, nnId);
|
||||
String address = getConfValue(defaultValue, suffix, conf, keys);
|
||||
if (address != null) {
|
||||
InetSocketAddress isa = NetUtils.createSocketAddr(address);
|
||||
if (isa.isUnresolved()) {
|
||||
LOG.warn("Namenode for " + nsId +
|
||||
" remains unresolved for ID " + nnId +
|
||||
". Check your hdfs-site.xml file to " +
|
||||
"ensure namenodes are configured properly.");
|
||||
}
|
||||
ret.put(nnId, isa);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a collection of all configured NN Kerberos principals.
|
||||
*/
|
||||
public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
|
||||
Set<String> principals = new HashSet<String>();
|
||||
for (String nsId : DFSUtil.getNameServiceIds(conf)) {
|
||||
for (String nsId : DFSUtilClient.getNameServiceIds(conf)) {
|
||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
||||
for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
|
||||
for (String nnId : DFSUtilClient.getNameNodeIds(conf, nsId)) {
|
||||
Configuration confForNn = new Configuration(conf);
|
||||
NameNode.initializeGenericKeys(confForNn, nsId, nnId);
|
||||
String principal = SecurityUtil.getServerPrincipal(confForNn
|
||||
|
@ -733,26 +591,8 @@ public class DFSUtil {
|
|||
*/
|
||||
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
|
||||
Configuration conf) {
|
||||
return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
|
||||
* the configuration.
|
||||
*
|
||||
* @return list of InetSocketAddresses
|
||||
*/
|
||||
public static Map<String, Map<String, InetSocketAddress>> getHaNnWebHdfsAddresses(
|
||||
Configuration conf, String scheme) {
|
||||
if (WebHdfsFileSystem.SCHEME.equals(scheme)) {
|
||||
return getAddresses(conf, null,
|
||||
DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
} else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) {
|
||||
return getAddresses(conf, null,
|
||||
DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported scheme: " + scheme);
|
||||
}
|
||||
return DFSUtilClient.getAddresses(conf, null,
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -765,8 +605,8 @@ public class DFSUtil {
|
|||
*/
|
||||
public static Map<String, Map<String, InetSocketAddress>> getBackupNodeAddresses(
|
||||
Configuration conf) throws IOException {
|
||||
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf,
|
||||
null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
|
||||
Map<String, Map<String, InetSocketAddress>> addressList = DFSUtilClient.getAddresses(
|
||||
conf, null, DFS_NAMENODE_BACKUP_ADDRESS_KEY);
|
||||
if (addressList.isEmpty()) {
|
||||
throw new IOException("Incorrect configuration: backup node address "
|
||||
+ DFS_NAMENODE_BACKUP_ADDRESS_KEY + " is not configured.");
|
||||
|
@ -784,8 +624,8 @@ public class DFSUtil {
|
|||
*/
|
||||
public static Map<String, Map<String, InetSocketAddress>> getSecondaryNameNodeAddresses(
|
||||
Configuration conf) throws IOException {
|
||||
Map<String, Map<String, InetSocketAddress>> addressList = getAddresses(conf, null,
|
||||
DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
|
||||
Map<String, Map<String, InetSocketAddress>> addressList = DFSUtilClient.getAddresses(
|
||||
conf, null, DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY);
|
||||
if (addressList.isEmpty()) {
|
||||
throw new IOException("Incorrect configuration: secondary namenode address "
|
||||
+ DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY + " is not configured.");
|
||||
|
@ -816,8 +656,9 @@ public class DFSUtil {
|
|||
}
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> addressList =
|
||||
getAddresses(conf, defaultAddress,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
DFSUtilClient.getAddresses(conf, defaultAddress,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
if (addressList.isEmpty()) {
|
||||
throw new IOException("Incorrect configuration: namenode address "
|
||||
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
|
||||
|
@ -869,8 +710,10 @@ public class DFSUtil {
|
|||
}
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> addressList =
|
||||
getAddressesForNsIds(conf, parentNameServices, defaultAddress,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
DFSUtilClient.getAddressesForNsIds(conf, parentNameServices,
|
||||
defaultAddress,
|
||||
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY);
|
||||
if (addressList.isEmpty()) {
|
||||
throw new IOException("Incorrect configuration: namenode address "
|
||||
+ DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
|
||||
|
@ -1002,7 +845,7 @@ public class DFSUtil {
|
|||
// keep track of non-preferred keys here.
|
||||
Set<URI> nonPreferredUris = new HashSet<URI>();
|
||||
|
||||
for (String nsId : getNameServiceIds(conf)) {
|
||||
for (String nsId : DFSUtilClient.getNameServiceIds(conf)) {
|
||||
if (HAUtil.isHAEnabled(conf, nsId)) {
|
||||
// Add the logical URI of the nameservice.
|
||||
try {
|
||||
|
@ -1014,7 +857,7 @@ public class DFSUtil {
|
|||
// Add the URI corresponding to the address of the NN.
|
||||
boolean uriFound = false;
|
||||
for (String key : keys) {
|
||||
String addr = conf.get(concatSuffixes(key, nsId));
|
||||
String addr = conf.get(DFSUtilClient.concatSuffixes(key, nsId));
|
||||
if (addr != null) {
|
||||
URI uri = createUri(HdfsConstants.HDFS_URI_SCHEME,
|
||||
NetUtils.createSocketAddr(addr));
|
||||
|
@ -1312,7 +1155,7 @@ public class DFSUtil {
|
|||
if (nameserviceId != null) {
|
||||
return nameserviceId;
|
||||
}
|
||||
Collection<String> nsIds = getNameServiceIds(conf);
|
||||
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
if (1 == nsIds.size()) {
|
||||
return nsIds.toArray(new String[1])[0];
|
||||
}
|
||||
|
@ -1343,14 +1186,14 @@ public class DFSUtil {
|
|||
String namenodeId = null;
|
||||
int found = 0;
|
||||
|
||||
Collection<String> nsIds = getNameServiceIds(conf);
|
||||
for (String nsId : emptyAsSingletonNull(nsIds)) {
|
||||
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
for (String nsId : DFSUtilClient.emptyAsSingletonNull(nsIds)) {
|
||||
if (knownNsId != null && !knownNsId.equals(nsId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Collection<String> nnIds = getNameNodeIds(conf, nsId);
|
||||
for (String nnId : emptyAsSingletonNull(nnIds)) {
|
||||
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
||||
for (String nnId : DFSUtilClient.emptyAsSingletonNull(nnIds)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(String.format("addressKey: %s nsId: %s nnId: %s",
|
||||
addressKey, nsId, nnId));
|
||||
|
@ -1454,10 +1297,10 @@ public class DFSUtil {
|
|||
nsId = getOnlyNameServiceIdOrNull(conf);
|
||||
}
|
||||
|
||||
String serviceAddrKey = concatSuffixes(
|
||||
String serviceAddrKey = DFSUtilClient.concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
String addrKey = concatSuffixes(
|
||||
String addrKey = DFSUtilClient.concatSuffixes(
|
||||
DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY, nsId, nnId);
|
||||
|
||||
String serviceRpcAddr = conf.get(serviceAddrKey);
|
||||
|
@ -1472,7 +1315,7 @@ public class DFSUtil {
|
|||
* name of that nameservice. If it refers to 0 or more than 1, return null.
|
||||
*/
|
||||
public static String getOnlyNameServiceIdOrNull(Configuration conf) {
|
||||
Collection<String> nsIds = getNameServiceIds(conf);
|
||||
Collection<String> nsIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
if (1 == nsIds.size()) {
|
||||
return nsIds.toArray(new String[1])[0];
|
||||
} else {
|
||||
|
|
|
@ -1570,7 +1570,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
|
||||
@Override
|
||||
protected URI canonicalizeUri(URI uri) {
|
||||
if (HAUtil.isLogicalUri(getConf(), uri)) {
|
||||
if (HAUtilClient.isLogicalUri(getConf(), uri)) {
|
||||
// Don't try to DNS-resolve logical URIs, since the 'authority'
|
||||
// portion isn't a proper hostname
|
||||
return uri;
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
@ -38,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies.ProxyAndInfo;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
|
@ -151,7 +149,7 @@ public class HAUtil {
|
|||
"machine is one of the machines listed as a NN RPC address, " +
|
||||
"or configure " + DFSConfigKeys.DFS_NAMESERVICE_ID);
|
||||
|
||||
Collection<String> nnIds = DFSUtil.getNameNodeIds(conf, nsId);
|
||||
Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
|
||||
String myNNId = conf.get(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY);
|
||||
Preconditions.checkArgument(nnIds != null,
|
||||
"Could not determine namenode ids in namespace '%s'. " +
|
||||
|
@ -206,32 +204,6 @@ public class HAUtil {
|
|||
conf.setBoolean("dfs.ha.allow.stale.reads", val);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the given nameNodeUri appears to be a logical URI.
|
||||
*/
|
||||
public static boolean isLogicalUri(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
// A logical name must be one of the service IDs.
|
||||
return DFSUtil.getNameServiceIds(conf).contains(host);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether the client has a failover proxy provider configured
|
||||
* for the namenode/nameservice.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param nameNodeUri The URI of namenode
|
||||
* @return true if failover is configured.
|
||||
*/
|
||||
public static boolean isClientFailoverConfigured(
|
||||
Configuration conf, URI nameNodeUri) {
|
||||
String host = nameNodeUri.getHost();
|
||||
String configKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX
|
||||
+ "." + host;
|
||||
return conf.get(configKey) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether logical URI is needed for the namenode and
|
||||
* the corresponding failover proxy provider in the config.
|
||||
|
@ -256,43 +228,6 @@ public class HAUtil {
|
|||
return provider.useLogicalURI();
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the file system URI out of the provided token.
|
||||
*/
|
||||
public static URI getServiceUriFromToken(final String scheme, Token<?> token) {
|
||||
String tokStr = token.getService().toString();
|
||||
final String prefix = buildTokenServicePrefixForLogicalUri(scheme);
|
||||
if (tokStr.startsWith(prefix)) {
|
||||
tokStr = tokStr.replaceFirst(prefix, "");
|
||||
}
|
||||
return URI.create(scheme + "://" + tokStr);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the service name used in the delegation token for the given logical
|
||||
* HA service.
|
||||
* @param uri the logical URI of the cluster
|
||||
* @param scheme the scheme of the corresponding FileSystem
|
||||
* @return the service name
|
||||
*/
|
||||
public static Text buildTokenServiceForLogicalUri(final URI uri,
|
||||
final String scheme) {
|
||||
return new Text(buildTokenServicePrefixForLogicalUri(scheme)
|
||||
+ uri.getHost());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if this token corresponds to a logical nameservice
|
||||
* rather than a specific namenode.
|
||||
*/
|
||||
public static boolean isTokenForLogicalUri(Token<?> token) {
|
||||
return token.getService().toString().startsWith(HA_DT_SERVICE_PREFIX);
|
||||
}
|
||||
|
||||
public static String buildTokenServicePrefixForLogicalUri(String scheme) {
|
||||
return HA_DT_SERVICE_PREFIX + scheme + ":";
|
||||
}
|
||||
|
||||
/**
|
||||
* Locate a delegation token associated with the given HA cluster URI, and if
|
||||
* one is found, clone it to also represent the underlying namenode address.
|
||||
|
@ -305,7 +240,7 @@ public class HAUtil {
|
|||
UserGroupInformation ugi, URI haUri,
|
||||
Collection<InetSocketAddress> nnAddrs) {
|
||||
// this cloning logic is only used by hdfs
|
||||
Text haService = HAUtil.buildTokenServiceForLogicalUri(haUri,
|
||||
Text haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME);
|
||||
Token<DelegationTokenIdentifier> haToken =
|
||||
tokenSelector.selectToken(haService, ugi.getTokens());
|
||||
|
@ -318,7 +253,8 @@ public class HAUtil {
|
|||
new Token.PrivateToken<DelegationTokenIdentifier>(haToken);
|
||||
SecurityUtil.setTokenService(specificToken, singleNNAddr);
|
||||
Text alias = new Text(
|
||||
buildTokenServicePrefixForLogicalUri(HdfsConstants.HDFS_URI_SCHEME)
|
||||
HAUtilClient.buildTokenServicePrefixForLogicalUri(
|
||||
HdfsConstants.HDFS_URI_SCHEME)
|
||||
+ "//" + specificToken.getService());
|
||||
ugi.addToken(alias, specificToken);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -179,7 +179,7 @@ public class NameNodeProxies {
|
|||
|
||||
Text dtService;
|
||||
if (failoverProxyProvider.useLogicalURI()) {
|
||||
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME);
|
||||
} else {
|
||||
dtService = SecurityUtil.buildTokenService(
|
||||
|
@ -245,7 +245,7 @@ public class NameNodeProxies {
|
|||
new Class[] { xface }, dummyHandler);
|
||||
Text dtService;
|
||||
if (failoverProxyProvider.useLogicalURI()) {
|
||||
dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME);
|
||||
} else {
|
||||
dtService = SecurityUtil.buildTokenService(
|
||||
|
|
|
@ -122,13 +122,6 @@ public class HdfsConstants {
|
|||
*/
|
||||
public static final String HDFS_URI_SCHEME = "hdfs";
|
||||
|
||||
/**
|
||||
* A prefix put before the namenode URI inside the "service" field
|
||||
* of a delgation token, indicating that the URI is a logical (HA)
|
||||
* URI.
|
||||
*/
|
||||
public static final String HA_DT_SERVICE_PREFIX = "ha-";
|
||||
|
||||
/**
|
||||
* Path components that are reserved in HDFS.
|
||||
* <p>
|
||||
|
|
|
@ -21,7 +21,7 @@ import io.netty.handler.codec.http.QueryStringDecoder;
|
|||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
|
||||
|
@ -112,10 +112,10 @@ class ParameterParser {
|
|||
Token<DelegationTokenIdentifier>();
|
||||
token.decodeFromUrlString(delegation);
|
||||
URI nnUri = URI.create(HDFS_URI_SCHEME + "://" + namenodeId());
|
||||
boolean isLogical = HAUtil.isLogicalUri(conf, nnUri);
|
||||
boolean isLogical = HAUtilClient.isLogicalUri(conf, nnUri);
|
||||
if (isLogical) {
|
||||
token.setService(HAUtil.buildTokenServiceForLogicalUri(nnUri,
|
||||
HDFS_URI_SCHEME));
|
||||
token.setService(
|
||||
HAUtilClient.buildTokenServiceForLogicalUri(nnUri, HDFS_URI_SCHEME));
|
||||
} else {
|
||||
token.setService(SecurityUtil.buildTokenService(nnUri));
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.ha.HealthCheckFailedException;
|
|||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
|
@ -72,7 +73,6 @@ import org.apache.hadoop.util.ExitUtil.ExitException;
|
|||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ServicePlugin;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -403,7 +403,7 @@ public class NameNode implements NameNodeStatusMXBean {
|
|||
return;
|
||||
}
|
||||
|
||||
if (DFSUtil.getNameServiceIds(conf).contains(nnHost)) {
|
||||
if (DFSUtilClient.getNameServiceIds(conf).contains(nnHost)) {
|
||||
// host name is logical
|
||||
clientNamenodeAddress = nnHost;
|
||||
} else if (nnUri.getPort() > 0) {
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.shell.Command;
|
||||
import org.apache.hadoop.fs.shell.CommandFormat;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.BlockReportOptions;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
|
@ -72,7 +73,6 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.SnapshotException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
|
||||
import org.apache.hadoop.ipc.GenericRefreshProtocol;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
||||
|
@ -575,7 +575,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = getDFS();
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
|
@ -703,8 +703,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = getDFS();
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
List<ProxyAndInfo<ClientProtocol>> proxies =
|
||||
|
@ -748,7 +747,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = getDFS();
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
|
@ -781,7 +780,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = getDFS();
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
|
@ -832,7 +831,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
|
@ -1121,7 +1120,7 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaAndLogicalUri = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaAndLogicalUri = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
if (isHaAndLogicalUri) {
|
||||
// In the case of HA and logical URI, run finalizeUpgrade for all
|
||||
// NNs in this nameservice.
|
||||
|
@ -1161,7 +1160,7 @@ public class DFSAdmin extends FsShell {
|
|||
DistributedFileSystem dfs = getDFS();
|
||||
Configuration dfsConf = dfs.getConf();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(dfsConf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(dfsConf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
String nsId = dfsUri.getHost();
|
||||
|
@ -1248,7 +1247,7 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
DistributedFileSystem dfs = getDFS();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(conf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// Run refreshServiceAcl for all NNs if HA is enabled
|
||||
|
@ -1291,7 +1290,7 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
DistributedFileSystem dfs = getDFS();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(conf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// Run refreshUserToGroupsMapings for all NNs if HA is enabled
|
||||
|
@ -1336,7 +1335,7 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
DistributedFileSystem dfs = getDFS();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(conf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// Run refreshSuperUserGroupsConfiguration for all NNs if HA is enabled
|
||||
|
@ -1375,7 +1374,7 @@ public class DFSAdmin extends FsShell {
|
|||
|
||||
DistributedFileSystem dfs = getDFS();
|
||||
URI dfsUri = dfs.getUri();
|
||||
boolean isHaEnabled = HAUtil.isLogicalUri(conf, dfsUri);
|
||||
boolean isHaEnabled = HAUtilClient.isLogicalUri(conf, dfsUri);
|
||||
|
||||
if (isHaEnabled) {
|
||||
// Run refreshCallQueue for all NNs if HA is enabled
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.ha.HAAdmin;
|
|||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
|
@ -125,7 +126,9 @@ public class DFSHAAdmin extends HAAdmin {
|
|||
*/
|
||||
@Override
|
||||
protected Collection<String> getTargetIds(String namenodeToActivate) {
|
||||
return DFSUtil.getNameNodeIds(getConf(), (nameserviceId != null)? nameserviceId : DFSUtil.getNamenodeNameServiceId(getConf()));
|
||||
return DFSUtilClient.getNameNodeIds(getConf(),
|
||||
(nameserviceId != null) ? nameserviceId : DFSUtil.getNamenodeNameServiceId(
|
||||
getConf()));
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
|
|
|
@ -51,6 +51,8 @@ import org.apache.hadoop.hdfs.web.HsftpFileSystem;
|
|||
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
|
|
@ -24,11 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
|
||||
public class SWebHdfsFileSystem extends WebHdfsFileSystem {
|
||||
|
||||
public static final String SCHEME = "swebhdfs";
|
||||
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return SCHEME;
|
||||
return WebHdfsConstants.SWEBHDFS_SCHEME;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -76,8 +76,8 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
throws IOException {
|
||||
final URI uri;
|
||||
final String scheme = getSchemeByKind(token.getKind());
|
||||
if (HAUtil.isTokenForLogicalUri(token)) {
|
||||
uri = HAUtil.getServiceUriFromToken(scheme, token);
|
||||
if (HAUtilClient.isTokenForLogicalUri(token)) {
|
||||
uri = HAUtilClient.getServiceUriFromToken(scheme, token);
|
||||
} else {
|
||||
final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
|
||||
(token);
|
||||
|
@ -92,9 +92,9 @@ final class TokenAspect<T extends FileSystem & Renewable> {
|
|||
} else if (kind.equals(HsftpFileSystem.TOKEN_KIND)) {
|
||||
return HsftpFileSystem.SCHEME;
|
||||
} else if (kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)) {
|
||||
return WebHdfsFileSystem.SCHEME;
|
||||
return WebHdfsConstants.WEBHDFS_SCHEME;
|
||||
} else if (kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND)) {
|
||||
return SWebHdfsFileSystem.SCHEME;
|
||||
return WebHdfsConstants.SWEBHDFS_SCHEME;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported scheme");
|
||||
}
|
||||
|
|
|
@ -58,7 +58,8 @@ import org.apache.hadoop.fs.permission.FsAction;
|
|||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
|
@ -91,12 +92,10 @@ import com.google.common.collect.Lists;
|
|||
public class WebHdfsFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
|
||||
public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
|
||||
/** File System URI: {SCHEME}://namenode:port/path/to/file */
|
||||
public static final String SCHEME = "webhdfs";
|
||||
/** WebHdfs version. */
|
||||
public static final int VERSION = 1;
|
||||
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
|
||||
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
|
||||
public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME + "/v" + VERSION;
|
||||
|
||||
/** Default connection factory may be overridden in tests to use smaller timeout values */
|
||||
protected URLConnectionFactory connectionFactory;
|
||||
|
@ -125,7 +124,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
*/
|
||||
@Override
|
||||
public String getScheme() {
|
||||
return SCHEME;
|
||||
return WebHdfsConstants.WEBHDFS_SCHEME;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -156,13 +155,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||
this.nnAddrs = resolveNNAddr();
|
||||
|
||||
boolean isHA = HAUtil.isClientFailoverConfigured(conf, this.uri);
|
||||
boolean isLogicalUri = isHA && HAUtil.isLogicalUri(conf, this.uri);
|
||||
boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
|
||||
boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
|
||||
// In non-HA or non-logical URI case, the code needs to call
|
||||
// getCanonicalUri() in order to handle the case where no port is
|
||||
// specified in the URI
|
||||
this.tokenServiceName = isLogicalUri ?
|
||||
HAUtil.buildTokenServiceForLogicalUri(uri, getScheme())
|
||||
HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
|
||||
: SecurityUtil.buildTokenService(getCanonicalUri());
|
||||
|
||||
if (!isHA) {
|
||||
|
@ -896,7 +895,6 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
/**
|
||||
* Create a symlink pointing to the destination path.
|
||||
* @see org.apache.hadoop.fs.Hdfs#createSymlink(Path, Path, boolean)
|
||||
*/
|
||||
public void createSymlink(Path destination, Path f, boolean createParent
|
||||
) throws IOException {
|
||||
|
@ -1440,13 +1438,13 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
|
||||
ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
|
||||
|
||||
if (!HAUtil.isLogicalUri(conf, uri)) {
|
||||
if (!HAUtilClient.isLogicalUri(conf, uri)) {
|
||||
InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
|
||||
getDefaultPort());
|
||||
ret.add(addr);
|
||||
|
||||
} else {
|
||||
Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
|
||||
Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
|
||||
.getHaNnWebHdfsAddresses(conf, scheme);
|
||||
|
||||
// Extract the entry corresponding to the logical name.
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
@ -89,7 +90,7 @@ abstract public class TestSymlinkHdfs extends SymlinkBaseTest {
|
|||
conf.set(FsPermission.UMASK_LABEL, "000");
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY, 0);
|
||||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
dfs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
|
|
|
@ -232,7 +232,7 @@ public class DFSTestUtil {
|
|||
} else { // append the nsid
|
||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES, nsIds + "," + logicalName);
|
||||
}
|
||||
conf.set(DFSUtil.addKeySuffixes(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
|
||||
conf.set(DFSUtil.addKeySuffixes(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX,
|
||||
logicalName), "nn1,nn2");
|
||||
conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
|
||||
"." + logicalName,
|
||||
|
|
|
@ -75,7 +75,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
@ -838,7 +838,7 @@ public class TestDFSClientRetries {
|
|||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final FileSystem fs = isWebHDFS ? WebHdfsTestUtil.getWebHdfsFileSystem(
|
||||
conf, WebHdfsFileSystem.SCHEME) : dfs;
|
||||
conf, WebHdfsConstants.WEBHDFS_SCHEME) : dfs;
|
||||
final URI uri = dfs.getUri();
|
||||
assertTrue(HdfsUtils.isHealthy(uri));
|
||||
|
||||
|
@ -1042,7 +1042,7 @@ public class TestDFSClientRetries {
|
|||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
username, new String[]{"supergroup"});
|
||||
|
||||
return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf, WebHdfsFileSystem.SCHEME)
|
||||
return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf, WebHdfsConstants.WEBHDFS_SCHEME)
|
||||
: DFSTestUtil.getFileSystemAs(ugi, conf);
|
||||
}
|
||||
|
||||
|
|
|
@ -216,13 +216,13 @@ public class TestDFSUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test {@link DFSUtil#getNameServiceIds(Configuration)}
|
||||
* Test {@link DFSUtilClient#getNameServiceIds(Configuration)}
|
||||
*/
|
||||
@Test
|
||||
public void testGetNameServiceIds() {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.set(DFS_NAMESERVICES, "nn1,nn2");
|
||||
Collection<String> nameserviceIds = DFSUtil.getNameServiceIds(conf);
|
||||
Collection<String> nameserviceIds = DFSUtilClient.getNameServiceIds(conf);
|
||||
Iterator<String> it = nameserviceIds.iterator();
|
||||
assertEquals(2, nameserviceIds.size());
|
||||
assertEquals("nn1", it.next().toString());
|
||||
|
@ -587,7 +587,7 @@ public class TestDFSUtil {
|
|||
Configuration conf = createWebHDFSHAConfiguration(LOGICAL_HOST_NAME, NS1_NN1_ADDR, NS1_NN2_ADDR);
|
||||
|
||||
Map<String, Map<String, InetSocketAddress>> map =
|
||||
DFSUtil.getHaNnWebHdfsAddresses(conf, "webhdfs");
|
||||
DFSUtilClient.getHaNnWebHdfsAddresses(conf, "webhdfs");
|
||||
|
||||
assertEquals(NS1_NN1_ADDR, map.get("ns1").get("nn1").toString());
|
||||
assertEquals(NS1_NN2_ADDR, map.get("ns1").get("nn2").toString());
|
||||
|
|
|
@ -68,7 +68,7 @@ import org.apache.hadoop.hdfs.net.Peer;
|
|||
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -541,7 +541,7 @@ public class TestDistributedFileSystem {
|
|||
});
|
||||
|
||||
//webhdfs
|
||||
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||
final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
|
||||
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||
final FileSystem webhdfs = ugi.doAs(
|
||||
new PrivilegedExceptionAction<FileSystem>() {
|
||||
|
|
|
@ -21,10 +21,8 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
|
@ -44,7 +42,6 @@ import org.apache.hadoop.crypto.CipherSuite;
|
|||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
|
@ -72,7 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
|
||||
import org.apache.hadoop.hdfs.tools.DFSck;
|
||||
import org.apache.hadoop.hdfs.tools.offlineImageViewer.PBImageXmlWriter;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
@ -612,7 +609,7 @@ public class TestEncryptionZones {
|
|||
final HdfsAdmin dfsAdmin =
|
||||
new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
|
||||
final FileSystem webHdfsFs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
final Path zone = new Path("/zone");
|
||||
fs.mkdirs(zone);
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.tools.DFSAdmin;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.Assert;
|
||||
|
@ -800,7 +800,7 @@ public class TestQuota {
|
|||
DFSAdmin admin = new DFSAdmin(conf);
|
||||
|
||||
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||
final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
|
||||
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||
final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
|
||||
|
||||
|
@ -865,7 +865,7 @@ public class TestQuota {
|
|||
DFSAdmin admin = new DFSAdmin(conf);
|
||||
|
||||
final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
final String webhdfsuri = WebHdfsFileSystem.SCHEME + "://" + nnAddr;
|
||||
final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
|
||||
System.out.println("webhdfsuri=" + webhdfsuri);
|
||||
final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.junit.AfterClass;
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
|||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
@ -171,7 +172,7 @@ public class TestDelegationToken {
|
|||
@Test
|
||||
public void testDelegationTokenWebHdfsApi() throws Exception {
|
||||
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
final String uri = WebHdfsConstants.WEBHDFS_SCHEME + "://"
|
||||
+ config.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
//get file system as JobTracker
|
||||
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.security.TestDoAsEffectiveUser;
|
||||
|
@ -150,7 +151,7 @@ public class TestDelegationTokenForProxyUser {
|
|||
public void testWebHdfsDoAs() throws Exception {
|
||||
WebHdfsTestUtil.LOG.info("START: testWebHdfsDoAs()");
|
||||
WebHdfsTestUtil.LOG.info("ugi.getShortUserName()=" + ugi.getShortUserName());
|
||||
final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, config, WebHdfsFileSystem.SCHEME);
|
||||
final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, config, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
final Path root = new Path("/");
|
||||
cluster.getFileSystem().setPermission(root, new FsPermission((short)0777));
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.LogVerificationAppender;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
|
@ -61,7 +60,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
|||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.web.webhdfs;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtilClient;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
|
||||
|
@ -30,11 +30,8 @@ import org.junit.Test;
|
|||
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
|
||||
import javax.servlet.ServletContext;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestParameterParser {
|
||||
|
@ -51,7 +48,7 @@ public class TestParameterParser {
|
|||
+ DelegationParam.NAME + "=" + token.encodeToUrlString());
|
||||
ParameterParser testParser = new ParameterParser(decoder, conf);
|
||||
final Token<DelegationTokenIdentifier> tok2 = testParser.delegationToken();
|
||||
Assert.assertTrue(HAUtil.isTokenForLogicalUri(tok2));
|
||||
Assert.assertTrue(HAUtilClient.isTokenForLogicalUri(tok2));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
|
@ -198,7 +199,7 @@ public class TestAuditLogs {
|
|||
|
||||
setupAuditLogs();
|
||||
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
InputStream istream = webfs.open(file);
|
||||
int val = istream.read();
|
||||
istream.close();
|
||||
|
@ -217,7 +218,7 @@ public class TestAuditLogs {
|
|||
|
||||
setupAuditLogs();
|
||||
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
FileStatus st = webfs.getFileStatus(file);
|
||||
|
||||
verifyAuditLogs(true);
|
||||
|
@ -258,7 +259,7 @@ public class TestAuditLogs {
|
|||
|
||||
setupAuditLogs();
|
||||
try {
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
InputStream istream = webfs.open(file);
|
||||
int val = istream.read();
|
||||
fail("open+read must not succeed, got " + val);
|
||||
|
@ -278,7 +279,7 @@ public class TestAuditLogs {
|
|||
|
||||
setupAuditLogs();
|
||||
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
webfs.open(file);
|
||||
|
||||
verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
|
||||
|
|
|
@ -19,15 +19,15 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
||||
public class TestMalformedURLs {
|
||||
private MiniDFSCluster cluster;
|
||||
Configuration config;
|
||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertThat;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
|
@ -195,7 +196,7 @@ public class TestNameNodeRespectsBindHostKeys {
|
|||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
|
||||
conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
|
||||
|
||||
File base = new File(BASEDIR);
|
||||
|
|
|
@ -281,7 +281,7 @@ public class TestDelegationTokensWithHA {
|
|||
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("test");
|
||||
|
||||
URI haUri = new URI("hdfs://my-ha-uri/");
|
||||
token.setService(HAUtil.buildTokenServiceForLogicalUri(haUri,
|
||||
token.setService(HAUtilClient.buildTokenServiceForLogicalUri(haUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME));
|
||||
ugi.addToken(token);
|
||||
|
||||
|
@ -338,7 +338,7 @@ public class TestDelegationTokensWithHA {
|
|||
@Test(timeout = 300000)
|
||||
public void testDFSGetCanonicalServiceName() throws Exception {
|
||||
URI hAUri = HATestUtil.getLogicalUri(cluster);
|
||||
String haService = HAUtil.buildTokenServiceForLogicalUri(hAUri,
|
||||
String haService = HAUtilClient.buildTokenServiceForLogicalUri(hAUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME).toString();
|
||||
assertEquals(haService, dfs.getCanonicalServiceName());
|
||||
final String renewer = UserGroupInformation.getCurrentUser().getShortUserName();
|
||||
|
@ -355,7 +355,7 @@ public class TestDelegationTokensWithHA {
|
|||
Configuration conf = dfs.getConf();
|
||||
URI haUri = HATestUtil.getLogicalUri(cluster);
|
||||
AbstractFileSystem afs = AbstractFileSystem.createFileSystem(haUri, conf);
|
||||
String haService = HAUtil.buildTokenServiceForLogicalUri(haUri,
|
||||
String haService = HAUtilClient.buildTokenServiceForLogicalUri(haUri,
|
||||
HdfsConstants.HDFS_URI_SCHEME).toString();
|
||||
assertEquals(haService, afs.getCanonicalServiceName());
|
||||
Token<?> token = afs.getDelegationTokens(
|
||||
|
|
|
@ -81,7 +81,7 @@ public class TestFSMainOperationsWebHdfs extends FSMainOperationsBaseTest {
|
|||
cluster.getFileSystem().setPermission(
|
||||
new Path("/"), new FsPermission((short)0777));
|
||||
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
final String uri = WebHdfsConstants.WEBHDFS_SCHEME + "://"
|
||||
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
|
||||
//get file system as a non-superuser
|
||||
|
|
|
@ -121,7 +121,7 @@ public class TestWebHDFS {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
|
||||
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
final FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
final Path dir = new Path("/test/largeFile");
|
||||
Assert.assertTrue(fs.mkdirs(dir));
|
||||
|
||||
|
@ -249,7 +249,7 @@ public class TestWebHDFS {
|
|||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME)
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
|
||||
.setPermission(new Path("/"),
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||
|
||||
|
@ -264,7 +264,7 @@ public class TestWebHDFS {
|
|||
@Override
|
||||
public Void run() throws IOException, URISyntaxException {
|
||||
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
Path d = new Path("/my-dir");
|
||||
Assert.assertTrue(fs.mkdirs(d));
|
||||
for (int i=0; i < listLimit*3; i++) {
|
||||
|
@ -288,7 +288,7 @@ public class TestWebHDFS {
|
|||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME)
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME)
|
||||
.setPermission(new Path("/"),
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
|
||||
|
||||
|
@ -297,7 +297,7 @@ public class TestWebHDFS {
|
|||
@Override
|
||||
public Void run() throws IOException, URISyntaxException {
|
||||
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
Path d = new Path("/my-dir");
|
||||
Assert.assertTrue(fs.mkdirs(d));
|
||||
return null;
|
||||
|
@ -321,7 +321,7 @@ public class TestWebHDFS {
|
|||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
cluster.waitActive();
|
||||
FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
fs.create(new Path("/testnodatanode"));
|
||||
Assert.fail("No exception was thrown");
|
||||
} catch (IOException ex) {
|
||||
|
@ -357,7 +357,7 @@ public class TestWebHDFS {
|
|||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
final Path foo = new Path("/foo");
|
||||
dfs.mkdirs(foo);
|
||||
|
@ -399,7 +399,7 @@ public class TestWebHDFS {
|
|||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
final Path foo = new Path("/foo");
|
||||
dfs.mkdirs(foo);
|
||||
|
@ -435,7 +435,7 @@ public class TestWebHDFS {
|
|||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
final Path foo = new Path("/foo");
|
||||
dfs.mkdirs(foo);
|
||||
|
@ -477,7 +477,7 @@ public class TestWebHDFS {
|
|||
|
||||
final Path foo = new Path("/foo");
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
try {
|
||||
webHdfs.mkdirs(foo);
|
||||
fail("Expected RetriableException");
|
||||
|
@ -503,7 +503,7 @@ public class TestWebHDFS {
|
|||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
Assert.assertNull(webHdfs.getDelegationToken(null));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
|
@ -519,7 +519,7 @@ public class TestWebHDFS {
|
|||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
|
||||
final FileSystem webHdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
webHdfs.getDelegationToken(null);
|
||||
fail("No exception is thrown.");
|
||||
} catch (AccessControlException ace) {
|
||||
|
@ -544,7 +544,7 @@ public class TestWebHDFS {
|
|||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
final WebHdfsFileSystem fs =
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
try (OutputStream os = fs.create(new Path(PATH))) {
|
||||
os.write(CONTENTS);
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
@ -53,7 +52,7 @@ public class TestWebHDFSAcl extends FSAclBaseTest {
|
|||
*/
|
||||
@Override
|
||||
protected WebHdfsFileSystem createFileSystem() throws Exception {
|
||||
return WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
return WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,6 +66,6 @@ public class TestWebHDFSAcl extends FSAclBaseTest {
|
|||
protected WebHdfsFileSystem createFileSystem(UserGroupInformation user)
|
||||
throws Exception {
|
||||
return WebHdfsTestUtil.getWebHdfsFileSystemAs(user, conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.apache.hadoop.security.token.SecretManager;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
import org.mortbay.util.ajax.JSON;
|
||||
|
||||
|
@ -64,7 +63,7 @@ import javax.ws.rs.core.Response;
|
|||
|
||||
public class TestWebHDFSForHA {
|
||||
private static final String LOGICAL_NAME = "minidfs";
|
||||
private static final URI WEBHDFS_URI = URI.create(WebHdfsFileSystem.SCHEME +
|
||||
private static final URI WEBHDFS_URI = URI.create(WebHdfsConstants.WEBHDFS_SCHEME +
|
||||
"://" + LOGICAL_NAME);
|
||||
private static final MiniDFSNNTopology topo = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf(LOGICAL_NAME).addNN(
|
||||
|
|
|
@ -31,6 +31,6 @@ public class TestWebHDFSXAttr extends FSXAttrBaseTest {
|
|||
*/
|
||||
@Override
|
||||
protected WebHdfsFileSystem createFileSystem() throws Exception {
|
||||
return WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
return WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
final UserGroupInformation current = UserGroupInformation.getCurrentUser();
|
||||
ugi = UserGroupInformation.createUserForTesting(
|
||||
current.getShortUserName() + "x", new String[]{"user"});
|
||||
fs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf, WebHdfsFileSystem.SCHEME);
|
||||
fs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
defaultWorkingDirectory = fs.getWorkingDirectory().toUri().getPath();
|
||||
}
|
||||
|
||||
|
@ -540,7 +540,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("alpha",
|
||||
new String[]{"beta"});
|
||||
WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf,
|
||||
WebHdfsFileSystem.SCHEME);
|
||||
WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
fs.mkdirs(p1);
|
||||
fs.setPermission(p1, new FsPermission((short) 0444));
|
||||
|
|
|
@ -83,7 +83,7 @@ public class TestWebHdfsTimeouts {
|
|||
serverSocket = new ServerSocket(0, CONNECTION_BACKLOG);
|
||||
nnHttpAddress = new InetSocketAddress("localhost", serverSocket.getLocalPort());
|
||||
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "localhost:" + serverSocket.getLocalPort());
|
||||
fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
fs.connectionFactory = connectionFactory;
|
||||
clients = new ArrayList<SocketChannel>();
|
||||
serverThread = null;
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.junit.Test;
|
|||
|
||||
public class TestWebHdfsUrl {
|
||||
// NOTE: port is never used
|
||||
final URI uri = URI.create(WebHdfsFileSystem.SCHEME + "://" + "127.0.0.1:0");
|
||||
final URI uri = URI.create(WebHdfsConstants.WEBHDFS_SCHEME + "://" + "127.0.0.1:0");
|
||||
|
||||
@Before
|
||||
public void resetUGI() {
|
||||
|
|
|
@ -82,7 +82,7 @@ public class TestWebHdfsWithMultipleNameNodes {
|
|||
webhdfs = new WebHdfsFileSystem[nNameNodes];
|
||||
for(int i = 0; i < webhdfs.length; i++) {
|
||||
final InetSocketAddress addr = cluster.getNameNode(i).getHttpAddress();
|
||||
final String uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
final String uri = WebHdfsConstants.WEBHDFS_SCHEME + "://"
|
||||
+ addr.getHostName() + ":" + addr.getPort() + "/";
|
||||
webhdfs[i] = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Map;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
|
@ -51,11 +50,11 @@ public class WebHdfsTestUtil {
|
|||
URISyntaxException {
|
||||
final String uri;
|
||||
|
||||
if (WebHdfsFileSystem.SCHEME.equals(scheme)) {
|
||||
uri = WebHdfsFileSystem.SCHEME + "://"
|
||||
if (WebHdfsConstants.WEBHDFS_SCHEME.equals(scheme)) {
|
||||
uri = WebHdfsConstants.WEBHDFS_SCHEME + "://"
|
||||
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
|
||||
} else if (SWebHdfsFileSystem.SCHEME.equals(scheme)) {
|
||||
uri = SWebHdfsFileSystem.SCHEME + "://"
|
||||
} else if (WebHdfsConstants.SWEBHDFS_SCHEME.equals(scheme)) {
|
||||
uri = WebHdfsConstants.SWEBHDFS_SCHEME + "://"
|
||||
+ conf.get(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
|
||||
} else {
|
||||
throw new IllegalArgumentException("unknown scheme:" + scheme);
|
||||
|
@ -66,7 +65,7 @@ public class WebHdfsTestUtil {
|
|||
public static WebHdfsFileSystem getWebHdfsFileSystemAs(
|
||||
final UserGroupInformation ugi, final Configuration conf
|
||||
) throws IOException, InterruptedException {
|
||||
return getWebHdfsFileSystemAs(ugi, conf, WebHdfsFileSystem.SCHEME);
|
||||
return getWebHdfsFileSystemAs(ugi, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
}
|
||||
|
||||
public static WebHdfsFileSystem getWebHdfsFileSystemAs(
|
||||
|
@ -75,7 +74,7 @@ public class WebHdfsTestUtil {
|
|||
return ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
|
||||
@Override
|
||||
public WebHdfsFileSystem run() throws Exception {
|
||||
return getWebHdfsFileSystem(conf, WebHdfsFileSystem.SCHEME);
|
||||
return getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue