Merge branch 'trunk' into HDFS-7240

This commit is contained in:
Anu Engineer 2016-05-02 14:55:23 -07:00
commit 05d7a838ee
32 changed files with 2329 additions and 237 deletions

View File

@ -115,7 +115,7 @@ if [[ $? != 0 ]]; then
fi
if [[ -n "${CURLBIN}" ]]; then
"${CURLBIN}" -s -L -O "${BASEURL}/${TARBALL}.gz"
"${CURLBIN}" -f -s -L -O "${BASEURL}/${TARBALL}.gz"
if [[ $? != 0 ]]; then
yetus_error "ERROR: yetus-dl: unable to download ${BASEURL}/${TARBALL}.gz"
exit 1

View File

@ -33,6 +33,7 @@ function hadoop_usage
hadoop_add_subcommand "daemonlog" "get/set the log level for each daemon"
hadoop_add_subcommand "distch" "distributed metadata changer"
hadoop_add_subcommand "distcp" "copy file or directories recursively"
hadoop_add_subcommand "dtutil" "operations related to delegation tokens"
hadoop_add_subcommand "envvars" "display computed Hadoop environment variables"
hadoop_add_subcommand "fs" "run a generic filesystem user client"
hadoop_add_subcommand "jar <jar>" "run a jar file. NOTE: please use \"yarn jar\" to launch YARN applications, not this command."
@ -139,6 +140,9 @@ case ${COMMAND} in
CLASS=org.apache.hadoop.tools.DistCp
hadoop_add_to_classpath_tools hadoop-distcp
;;
dtutil)
CLASS=org.apache.hadoop.security.token.DtUtilShell
;;
envvars)
echo "JAVA_HOME='${JAVA_HOME}'"
echo "HADOOP_COMMON_HOME='${HADOOP_COMMON_HOME}'"

View File

@ -110,7 +110,7 @@ hadoop_exec_userfuncs
# IMPORTANT! User provided code is now available!
#
hadoop_exec_hadooprc
hadoop_exec_user_hadoopenv
hadoop_verify_confdir
# do all the OS-specific startup bits here
@ -142,6 +142,10 @@ hadoop_shellprofiles_nativelib
hadoop_add_common_to_classpath
hadoop_shellprofiles_classpath
# user API commands can now be run since the runtime
# environment has been configured
hadoop_exec_hadooprc
#
# backwards compatibility. new stuff should
# call this when they are ready

View File

@ -383,6 +383,20 @@ function hadoop_exec_userfuncs
## @audience private
## @stability evolving
## @replaceable yes
function hadoop_exec_user_hadoopenv
{
if [[ -f "${HOME}/.hadoop-env" ]]; then
hadoop_debug "Applying the user's .hadoop-env"
# shellcheck disable=SC1090
. "${HOME}/.hadoop-env"
fi
}
## @description Read the user's settings. This provides for users to
## @description run Hadoop Shell API after system bootstrap
## @audience private
## @stability evolving
## @replaceable yes
function hadoop_exec_hadooprc
{
if [[ -f "${HOME}/.hadooprc" ]]; then

View File

@ -321,6 +321,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
"ipc.client.async.calls.max";
public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;

View File

@ -32,7 +32,8 @@ import java.io.IOException;
public class WindowsGetSpaceUsed extends CachingGetSpaceUsed {
WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder) throws IOException {
public WindowsGetSpaceUsed(CachingGetSpaceUsed.Builder builder)
throws IOException {
super(builder.getPath(), builder.getInterval(), builder.getInitialUsed());
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* Signals that an AsyncCallLimitExceededException has occurred. This class is
* used to make application code using async RPC aware that limit of max async
* calls is reached, application code need to retrieve results from response of
* established async calls to avoid buffer overflow in order for follow-on async
* calls going correctly.
*/
public class AsyncCallLimitExceededException extends IOException {
private static final long serialVersionUID = 1L;
public AsyncCallLimitExceededException(String message) {
super(message);
}
}

View File

@ -159,7 +159,9 @@ public class Client implements AutoCloseable {
private final boolean fallbackAllowed;
private final byte[] clientId;
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@ -1288,6 +1290,9 @@ public class Client implements AutoCloseable {
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@ -1354,6 +1359,20 @@ public class Client implements AutoCloseable {
fallbackToSimpleAuth);
}
private void checkAsyncCall() throws IOException {
if (isAsynchronousMode()) {
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
asyncCallCounter.decrementAndGet();
String errMsg = String.format(
"Exceeded limit of max asynchronous calls: %d, " +
"please configure %s to adjust it.",
maxAsyncCalls,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
throw new AsyncCallLimitExceededException(errMsg);
}
}
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@ -1374,24 +1393,38 @@ public class Client implements AutoCloseable {
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
checkAsyncCall();
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
} catch(Exception e) {
if (isAsynchronousMode()) {
releaseAsyncCall();
}
throw e;
}
if (isAsynchronousMode()) {
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
try {
set(getRpcResponse(call, connection));
} catch (IOException ie) {
setException(ie);
if (callled.compareAndSet(false, true)) {
try {
set(getRpcResponse(call, connection));
} catch (IOException ie) {
setException(ie);
} finally {
releaseAsyncCall();
}
}
return super.get();
}
@ -1427,6 +1460,15 @@ public class Client implements AutoCloseable {
asynchronousMode.set(async);
}
private void releaseAsyncCall() {
asyncCallCounter.decrementAndGet();
}
@VisibleForTesting
int getAsyncCallCount() {
return asyncCallCounter.get();
}
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.security;
import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
@ -25,6 +27,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -47,9 +50,11 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.CredentialsKVProto;
import org.apache.hadoop.security.proto.SecurityProtos.CredentialsProto;
/**
* A class that provides the facilities of reading and writing
* A class that provides the facilities of reading and writing
* secret keys and Tokens.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@ -58,34 +63,34 @@ public class Credentials implements Writable {
private static final Log LOG = LogFactory.getLog(Credentials.class);
private Map<Text, byte[]> secretKeysMap = new HashMap<Text, byte[]>();
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
new HashMap<Text, Token<? extends TokenIdentifier>>();
private Map<Text, Token<? extends TokenIdentifier>> tokenMap =
new HashMap<Text, Token<? extends TokenIdentifier>>();
/**
* Create an empty credentials instance
* Create an empty credentials instance.
*/
public Credentials() {
}
/**
* Create a copy of the given credentials
* Create a copy of the given credentials.
* @param credentials to copy
*/
public Credentials(Credentials credentials) {
this.addAll(credentials);
}
/**
* Returns the Token object for the alias
* Returns the Token object for the alias.
* @param alias the alias for the Token
* @return token for this alias
*/
public Token<? extends TokenIdentifier> getToken(Text alias) {
return tokenMap.get(alias);
}
/**
* Add a token in the storage (in memory)
* Add a token in the storage (in memory).
* @param alias the alias for the key
* @param t the token object
*/
@ -96,14 +101,14 @@ public class Credentials implements Writable {
LOG.warn("Null token ignored for " + alias);
}
}
/**
* Return all the tokens in the in-memory map
* Return all the tokens in the in-memory map.
*/
public Collection<Token<? extends TokenIdentifier>> getAllTokens() {
return tokenMap.values();
}
/**
* @return number of Tokens in the in-memory map
*/
@ -112,23 +117,23 @@ public class Credentials implements Writable {
}
/**
* Returns the key bytes for the alias
* Returns the key bytes for the alias.
* @param alias the alias for the key
* @return key for this alias
*/
public byte[] getSecretKey(Text alias) {
return secretKeysMap.get(alias);
}
/**
* @return number of keys in the in-memory map
*/
public int numberOfSecretKeys() {
return secretKeysMap.size();
}
/**
* Set the key for an alias
* Set the key for an alias.
* @param alias the alias for the key
* @param key the key bytes
*/
@ -145,7 +150,7 @@ public class Credentials implements Writable {
}
/**
* Return all the secret key entries in the in-memory map
* Return all the secret key entries in the in-memory map.
*/
public List<Text> getAllSecretKeys() {
List<Text> list = new java.util.ArrayList<Text>();
@ -155,13 +160,13 @@ public class Credentials implements Writable {
}
/**
* Convenience method for reading a token storage file, and loading the Tokens
* therein in the passed UGI
* Convenience method for reading a token storage file and loading its Tokens.
* @param filename
* @param conf
* @throws IOException
*/
public static Credentials readTokenStorageFile(Path filename, Configuration conf)
public static Credentials readTokenStorageFile(Path filename,
Configuration conf)
throws IOException {
FSDataInputStream in = null;
Credentials credentials = new Credentials();
@ -178,13 +183,13 @@ public class Credentials implements Writable {
}
/**
* Convenience method for reading a token storage file, and loading the Tokens
* therein in the passed UGI
* Convenience method for reading a token storage file and loading its Tokens.
* @param filename
* @param conf
* @throws IOException
*/
public static Credentials readTokenStorageFile(File filename, Configuration conf)
public static Credentials readTokenStorageFile(File filename,
Configuration conf)
throws IOException {
DataInputStream in = null;
Credentials credentials = new Credentials();
@ -199,10 +204,9 @@ public class Credentials implements Writable {
IOUtils.cleanup(LOG, in);
}
}
/**
* Convenience method for reading a token storage file directly from a
* datainputstream
* Convenience method for reading a token from a DataInputStream.
*/
public void readTokenStorageStream(DataInputStream in) throws IOException {
byte[] magic = new byte[TOKEN_STORAGE_MAGIC.length];
@ -211,25 +215,36 @@ public class Credentials implements Writable {
throw new IOException("Bad header found in token storage.");
}
byte version = in.readByte();
if (version != TOKEN_STORAGE_VERSION) {
throw new IOException("Unknown version " + version +
if (version != TOKEN_STORAGE_VERSION &&
version != OLD_TOKEN_STORAGE_VERSION) {
throw new IOException("Unknown version " + version +
" in token storage.");
}
readFields(in);
}
private static final byte[] TOKEN_STORAGE_MAGIC =
"HDTS".getBytes(Charsets.UTF_8);
private static final byte TOKEN_STORAGE_VERSION = 0;
public void writeTokenStorageToStream(DataOutputStream os)
throws IOException {
os.write(TOKEN_STORAGE_MAGIC);
os.write(TOKEN_STORAGE_VERSION);
write(os);
if (version == OLD_TOKEN_STORAGE_VERSION) {
readFields(in);
} else if (version == TOKEN_STORAGE_VERSION) {
readProto(in);
}
}
public void writeTokenStorageFile(Path filename,
private static final byte[] TOKEN_STORAGE_MAGIC =
"HDTS".getBytes(Charsets.UTF_8);
private static final byte TOKEN_STORAGE_VERSION = 1;
/**
* For backward compatibility.
*/
private static final byte OLD_TOKEN_STORAGE_VERSION = 0;
public void writeTokenStorageToStream(DataOutputStream os)
throws IOException {
os.write(TOKEN_STORAGE_MAGIC);
os.write(TOKEN_STORAGE_VERSION);
writeProto(os);
}
public void writeTokenStorageFile(Path filename,
Configuration conf) throws IOException {
FSDataOutputStream os = filename.getFileSystem(conf).create(filename);
writeTokenStorageToStream(os);
@ -237,7 +252,29 @@ public class Credentials implements Writable {
}
/**
* Stores all the keys to DataOutput
* For backward compatibility.
*/
public void writeLegacyTokenStorageLocalFile(File f) throws IOException {
writeLegacyOutputStream(new DataOutputStream(new FileOutputStream(f)));
}
/**
* For backward compatibility.
*/
public void writeLegacyTokenStorageFile(Path filename, Configuration conf)
throws IOException {
writeLegacyOutputStream(filename.getFileSystem(conf).create(filename));
}
private void writeLegacyOutputStream(DataOutputStream os) throws IOException {
os.write(TOKEN_STORAGE_MAGIC);
os.write(OLD_TOKEN_STORAGE_VERSION);
write(os);
os.close();
}
/**
* Stores all the keys to DataOutput.
* @param out
* @throws IOException
*/
@ -245,12 +282,12 @@ public class Credentials implements Writable {
public void write(DataOutput out) throws IOException {
// write out tokens first
WritableUtils.writeVInt(out, tokenMap.size());
for(Map.Entry<Text,
Token<? extends TokenIdentifier>> e: tokenMap.entrySet()) {
for(Map.Entry<Text,
Token<? extends TokenIdentifier>> e: tokenMap.entrySet()) {
e.getKey().write(out);
e.getValue().write(out);
}
// now write out secret keys
WritableUtils.writeVInt(out, secretKeysMap.size());
for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) {
@ -259,9 +296,51 @@ public class Credentials implements Writable {
out.write(e.getValue());
}
}
/**
* Loads all the keys
* Write contents of this instance as CredentialsProto message to DataOutput.
* @param out
* @throws IOException
*/
public void writeProto(DataOutput out) throws IOException {
CredentialsProto.Builder storage = CredentialsProto.newBuilder();
for (Map.Entry<Text, Token<? extends TokenIdentifier>> e :
tokenMap.entrySet()) {
CredentialsKVProto.Builder kv = CredentialsKVProto.newBuilder().
setAliasBytes(ByteString.copyFrom(
e.getKey().getBytes(), 0, e.getKey().getLength())).
setToken(e.getValue().toTokenProto());
storage.addTokens(kv.build());
}
for(Map.Entry<Text, byte[]> e : secretKeysMap.entrySet()) {
CredentialsKVProto.Builder kv = CredentialsKVProto.newBuilder().
setAliasBytes(ByteString.copyFrom(
e.getKey().getBytes(), 0, e.getKey().getLength())).
setSecret(ByteString.copyFrom(e.getValue()));
storage.addSecrets(kv.build());
}
storage.build().writeDelimitedTo((DataOutputStream)out);
}
/**
* Populates keys/values from proto buffer storage.
* @param in - stream ready to read a serialized proto buffer message
*/
public void readProto(DataInput in) throws IOException {
CredentialsProto storage = CredentialsProto.parseDelimitedFrom((DataInputStream)in);
for (CredentialsKVProto kv : storage.getTokensList()) {
addToken(new Text(kv.getAliasBytes().toByteArray()),
(Token<? extends TokenIdentifier>) new Token(kv.getToken()));
}
for (CredentialsKVProto kv : storage.getSecretsList()) {
addSecretKey(new Text(kv.getAliasBytes().toByteArray()),
kv.getSecret().toByteArray());
}
}
/**
* Loads all the keys.
* @param in
* @throws IOException
*/
@ -269,7 +348,7 @@ public class Credentials implements Writable {
public void readFields(DataInput in) throws IOException {
secretKeysMap.clear();
tokenMap.clear();
int size = WritableUtils.readVInt(in);
for(int i=0; i<size; i++) {
Text alias = new Text();
@ -278,7 +357,7 @@ public class Credentials implements Writable {
t.readFields(in);
tokenMap.put(alias, t);
}
size = WritableUtils.readVInt(in);
for(int i=0; i<size; i++) {
Text alias = new Text();
@ -289,7 +368,7 @@ public class Credentials implements Writable {
secretKeysMap.put(alias, value);
}
}
/**
* Copy all of the credentials from one credential object into another.
* Existing secrets and tokens are overwritten.

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
/**
* DtFetcher is an interface which permits the abstraction and separation of
* delegation token fetch implementaions across different packages and
* compilation units. Resolution of fetcher impl will be done at runtime.
*/
public interface DtFetcher {
/** Return a key used to identify the object/service implementation. */
Text getServiceName();
/** Used to allow the service API to indicate whether a token is required. */
boolean isTokenRequired();
/** Add any number of delegation tokens to Credentials object and return
* a token instance that is appropriate for aliasing, or null if none. */
Token<?> addDelegationTokens(Configuration conf, Credentials creds,
String renewer, String url) throws Exception;
}

View File

@ -0,0 +1,271 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.ServiceLoader;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
/**
* DtFileOperations is a collection of delegation token file operations.
*/
public final class DtFileOperations {
private static final Log LOG = LogFactory.getLog(DtFileOperations.class);
/** No public constructor as per checkstyle. */
private DtFileOperations() { }
/**
* Use FORMAT_* as arguments to format parameters.
* FORMAT_PB is for protobuf output.
*/
public static final String FORMAT_PB = "protobuf";
/**
* Use FORMAT_* as arguments to format parameters.
* FORMAT_JAVA is a legacy option for java serialization output.
*/
public static final String FORMAT_JAVA = "java";
private static final String NA_STRING = "-NA-";
private static final String PREFIX_HTTP = "http://";
private static final String PREFIX_HTTPS = "https://";
/** Let the DtFetcher code add the appropriate prefix if HTTP/S is used. */
private static String stripPrefix(String u) {
return u.replaceFirst(PREFIX_HTTP, "").replaceFirst(PREFIX_HTTPS, "");
}
/** Match token service field to alias text. True if alias is null. */
private static boolean matchAlias(Token<?> token, Text alias) {
return alias == null || token.getService().equals(alias);
}
/** Match fetcher's service name to the service text and/or url prefix. */
private static boolean matchService(
DtFetcher fetcher, Text service, String url) {
Text sName = fetcher.getServiceName();
return (service == null && url.startsWith(sName.toString() + "://")) ||
(service != null && service.equals(sName));
}
/** Format a long integer type into a date string. */
private static String formatDate(long date) {
DateFormat df = DateFormat.getDateTimeInstance(
DateFormat.SHORT, DateFormat.SHORT);
return df.format(new Date(date));
}
/** Add the service prefix for a local filesystem. */
private static Path fileToPath(File f) {
return new Path("file:" + f.getAbsolutePath());
}
/** Write out a Credentials object as a local file.
* @param f a local File object.
* @param format a string equal to FORMAT_PB or FORMAT_JAVA.
* @param creds the Credentials object to be written out.
* @param conf a Configuration object passed along.
* @throws IOException
*/
public static void doFormattedWrite(
File f, String format, Credentials creds, Configuration conf)
throws IOException {
if (format == null || format.equals(FORMAT_PB)) {
creds.writeTokenStorageFile(fileToPath(f), conf);
} else { // if (format != null && format.equals(FORMAT_JAVA)) {
creds.writeLegacyTokenStorageLocalFile(f);
}
}
/** Print out a Credentials file from the local filesystem.
* @param tokenFile a local File object.
* @param alias print only tokens matching alias (null matches all).
* @param conf Configuration object passed along.
* @param out print to this stream.
* @throws IOException
*/
public static void printTokenFile(
File tokenFile, Text alias, Configuration conf, PrintStream out)
throws IOException {
out.println("File: " + tokenFile.getPath());
Credentials creds = Credentials.readTokenStorageFile(tokenFile, conf);
printCredentials(creds, alias, out);
}
/** Print out a Credentials object.
* @param creds the Credentials object to be printed out.
* @param alias print only tokens matching alias (null matches all).
* @param out print to this stream.
* @throws IOException
*/
public static void printCredentials(
Credentials creds, Text alias, PrintStream out)
throws IOException {
boolean tokenHeader = true;
String fmt = "%-24s %-20s %-15s %-12s %s%n";
for (Token<?> token : creds.getAllTokens()) {
if (matchAlias(token, alias)) {
if (tokenHeader) {
out.printf(fmt, "Token kind", "Service", "Renewer", "Exp date",
"URL enc token");
out.println(StringUtils.repeat("-", 80));
tokenHeader = false;
}
AbstractDelegationTokenIdentifier id =
(AbstractDelegationTokenIdentifier) token.decodeIdentifier();
out.printf(fmt, token.getKind(), token.getService(),
(id != null) ? id.getRenewer() : NA_STRING,
(id != null) ? formatDate(id.getMaxDate()) : NA_STRING,
token.encodeToUrlString());
}
}
}
/** Fetch a token from a service and save to file in the local filesystem.
* @param tokenFile a local File object to hold the output.
* @param fileFormat a string equal to FORMAT_PB or FORMAT_JAVA, for output
* @param alias overwrite service field of fetched token with this text.
* @param service use a DtFetcher implementation matching this service text.
* @param url pass this URL to fetcher after stripping any http/s prefix.
* @param renewer pass this renewer to the fetcher.
* @param conf Configuration object passed along.
* @throws IOException
*/
public static void getTokenFile(File tokenFile, String fileFormat,
Text alias, Text service, String url, String renewer, Configuration conf)
throws Exception {
Token<?> token = null;
Credentials creds = tokenFile.exists() ?
Credentials.readTokenStorageFile(tokenFile, conf) : new Credentials();
ServiceLoader<DtFetcher> loader = ServiceLoader.load(DtFetcher.class);
for (DtFetcher fetcher : loader) {
if (matchService(fetcher, service, url)) {
if (!fetcher.isTokenRequired()) {
String message = "DtFetcher for service '" + service +
"' does not require a token. Check your configuration. " +
"Note: security may be disabled or there may be two DtFetcher " +
"providers for the same service designation.";
LOG.error(message);
throw new IllegalArgumentException(message);
}
token = fetcher.addDelegationTokens(conf, creds, renewer,
stripPrefix(url));
}
}
if (alias != null) {
if (token == null) {
String message = "DtFetcher for service '" + service + "'" +
" does not allow aliasing. Cannot apply alias '" + alias + "'." +
" Drop alias flag to get token for this service.";
LOG.error(message);
throw new IOException(message);
}
Token<?> aliasedToken = token.copyToken();
aliasedToken.setService(alias);
creds.addToken(alias, aliasedToken);
LOG.info("Add token with service " + alias);
}
doFormattedWrite(tokenFile, fileFormat, creds, conf);
}
/** Append tokens from list of files in local filesystem, saving to last file.
* @param tokenFiles list of local File objects. Last file holds the output.
* @param fileFormat a string equal to FORMAT_PB or FORMAT_JAVA, for output
* @param conf Configuration object passed along.
* @throws IOException
*/
public static void appendTokenFiles(
ArrayList<File> tokenFiles, String fileFormat, Configuration conf)
throws IOException {
Credentials newCreds = new Credentials();
File lastTokenFile = null;
for (File tokenFile : tokenFiles) {
lastTokenFile = tokenFile;
Credentials creds = Credentials.readTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) {
newCreds.addToken(token.getService(), token);
}
}
doFormattedWrite(lastTokenFile, fileFormat, newCreds, conf);
}
/** Remove a token from a file in the local filesystem, matching alias.
* @param cancel cancel token as well as remove from file.
* @param tokenFile a local File object.
* @param fileFormat a string equal to FORMAT_PB or FORMAT_JAVA, for output
* @param alias remove only tokens matching alias; null matches all.
* @param conf Configuration object passed along.
* @throws IOException
* @throws InterruptedException
*/
public static void removeTokenFromFile(boolean cancel,
File tokenFile, String fileFormat, Text alias, Configuration conf)
throws IOException, InterruptedException {
Credentials newCreds = new Credentials();
Credentials creds = Credentials.readTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) {
if (matchAlias(token, alias)) {
if (token.isManaged() && cancel) {
token.cancel(conf);
LOG.info("Canceled " + token.getKind() + ":" + token.getService());
}
} else {
newCreds.addToken(token.getService(), token);
}
}
doFormattedWrite(tokenFile, fileFormat, newCreds, conf);
}
/** Renew a token from a file in the local filesystem, matching alias.
* @param tokenFile a local File object.
* @param fileFormat a string equal to FORMAT_PB or FORMAT_JAVA, for output
* @param alias renew only tokens matching alias; null matches all.
* @param conf Configuration object passed along.
* @throws IOException
* @throws InterruptedException
*/
public static void renewTokenFile(
File tokenFile, String fileFormat, Text alias, Configuration conf)
throws IOException, InterruptedException {
Credentials creds = Credentials.readTokenStorageFile(tokenFile, conf);
for (Token<?> token : creds.getAllTokens()) {
if (token.isManaged() && matchAlias(token, alias)) {
long result = token.renew(conf);
LOG.info("Renewed" + token.getKind() + ":" + token.getService() +
" until " + formatDate(result));
}
}
doFormattedWrite(tokenFile, fileFormat, creds, conf);
}
}

View File

@ -0,0 +1,326 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.CommandShell;
import org.apache.hadoop.util.ToolRunner;
/**
* DtUtilShell is a set of command line token file management operations.
*/
public class DtUtilShell extends CommandShell {
private static final Log LOG = LogFactory.getLog(DtUtilShell.class);
private static final String FORMAT_SUBSTRING = "[-format (" +
DtFileOperations.FORMAT_JAVA + "|" +
DtFileOperations.FORMAT_PB + ")]";
public static final String DT_USAGE = "hadoop dtutil " +
"[-keytab <keytab_file> -principal <principal_name>] " +
"subcommand (help|print|get|append|cancel|remove|renew) " +
FORMAT_SUBSTRING + " [-alias <alias>] filename...";
// command line options
private static final String HELP = "help";
private static final String KEYTAB = "-keytab";
private static final String PRINCIPAL = "-principal";
private static final String PRINT = "print";
private static final String GET = "get";
private static final String APPEND = "append";
private static final String CANCEL = "cancel";
private static final String REMOVE = "remove";
private static final String RENEW = "renew";
private static final String RENEWER = "-renewer";
private static final String SERVICE = "-service";
private static final String ALIAS = "-alias";
private static final String FORMAT = "-format";
// configuration state from args, conf
private String keytab = null;
private String principal = null;
private Text alias = null;
private Text service = null;
private String renewer = null;
private String format = DtFileOperations.FORMAT_PB;
private ArrayList<File> tokenFiles = null;
private File firstFile = null;
/**
* Parse arguments looking for Kerberos keytab/principal.
* If both are found: remove both from the argument list and attempt login.
* If only one of the two is found: remove it from argument list, log warning
* and do not attempt login.
* If neither is found: return original args array, doing nothing.
* Return the pruned args array if either flag is present.
*/
private String[] maybeDoLoginFromKeytabAndPrincipal(String[] args)
throws IOException{
ArrayList<String> savedArgs = new ArrayList<String>(args.length);
for (int i = 0; i < args.length; i++) {
String current = args[i];
if (current.equals(PRINCIPAL)) {
principal = args[++i];
} else if (current.equals(KEYTAB)) {
keytab = args[++i];
} else {
savedArgs.add(current);
}
}
int newSize = savedArgs.size();
if (newSize != args.length) {
if (principal != null && keytab != null) {
UserGroupInformation.loginUserFromKeytab(principal, keytab);
} else {
LOG.warn("-principal and -keytab not both specified! " +
"Kerberos login not attempted.");
}
return savedArgs.toArray(new String[newSize]);
}
return args;
}
/**
* Parse the command line arguments and initialize subcommand.
* Also will attempt to perform Kerberos login if both -principal and -keytab
* flags are passed in args array.
* @param args
* @return 0 if the argument(s) were recognized, 1 otherwise
* @throws Exception
*/
@Override
protected int init(String[] args) throws Exception {
if (0 == args.length) {
return 1;
}
tokenFiles = new ArrayList<File>();
args = maybeDoLoginFromKeytabAndPrincipal(args);
for (int i = 0; i < args.length; i++) {
if (i == 0) {
String command = args[0];
if (command.equals(HELP)) {
return 1;
} else if (command.equals(PRINT)) {
setSubCommand(new Print());
} else if (command.equals(GET)) {
setSubCommand(new Get(args[++i]));
} else if (command.equals(APPEND)) {
setSubCommand(new Append());
} else if (command.equals(CANCEL)) {
setSubCommand(new Remove(true));
} else if (command.equals(REMOVE)) {
setSubCommand(new Remove(false));
} else if (command.equals(RENEW)) {
setSubCommand(new Renew());
}
} else if (args[i].equals(ALIAS)) {
alias = new Text(args[++i]);
} else if (args[i].equals(SERVICE)) {
service = new Text(args[++i]);
} else if (args[i].equals(RENEWER)) {
renewer = args[++i];
} else if (args[i].equals(FORMAT)) {
format = args[++i];
if (!format.equals(DtFileOperations.FORMAT_JAVA) &&
!format.equals(DtFileOperations.FORMAT_PB)) {
LOG.error("-format must be '" + DtFileOperations.FORMAT_JAVA +
"' or '" + DtFileOperations.FORMAT_PB + "' not '" +
format + "'");
return 1;
}
} else {
for (; i < args.length; i++) {
File f = new File(args[i]);
if (f.exists()) {
tokenFiles.add(f);
}
if (firstFile == null) {
firstFile = f;
}
}
if (tokenFiles.size() == 0 && firstFile == null) {
LOG.error("Must provide a filename to all commands.");
return 1;
}
}
}
return 0;
}
@Override
public String getCommandUsage() {
return String.format("%n%s%n %s%n %s%n %s%n %s%n %s%n %s%n%n",
DT_USAGE, (new Print()).getUsage(), (new Get()).getUsage(),
(new Append()).getUsage(), (new Remove(true)).getUsage(),
(new Remove(false)).getUsage(), (new Renew()).getUsage());
}
private class Print extends SubCommand {
public static final String PRINT_USAGE =
"dtutil print [-alias <alias>] filename...";
@Override
public void execute() throws Exception {
for (File tokenFile : tokenFiles) {
DtFileOperations.printTokenFile(tokenFile, alias, getConf(), getOut());
}
}
@Override
public String getUsage() {
return PRINT_USAGE;
}
}
private class Get extends SubCommand {
public static final String GET_USAGE = "dtutil get URL " +
"[-service <scheme>] " + FORMAT_SUBSTRING +
"[-alias <alias>] [-renewer <renewer>] filename";
private static final String PREFIX_HTTP = "http://";
private static final String PREFIX_HTTPS = "https://";
private String url = null;
public Get() { }
public Get(String arg) {
url = arg;
}
public boolean isGenericUrl() {
return url.startsWith(PREFIX_HTTP) || url.startsWith(PREFIX_HTTPS);
}
public boolean validate() {
if (service != null && !isGenericUrl()) {
LOG.error("Only provide -service with http/https URL.");
return false;
}
if (service == null && isGenericUrl()) {
LOG.error("Must provide -service with http/https URL.");
return false;
}
if (url.indexOf("://") == -1) {
LOG.error("URL does not contain a service specification: " + url);
return false;
}
return true;
}
@Override
public void execute() throws Exception {
DtFileOperations.getTokenFile(
firstFile, format, alias, service, url, renewer, getConf());
}
@Override
public String getUsage() {
return GET_USAGE;
}
}
private class Append extends SubCommand {
public static final String APPEND_USAGE =
"dtutil append " + FORMAT_SUBSTRING + "filename...";
@Override
public void execute() throws Exception {
DtFileOperations.appendTokenFiles(tokenFiles, format, getConf());
}
@Override
public String getUsage() {
return APPEND_USAGE;
}
}
private class Remove extends SubCommand {
public static final String REMOVE_USAGE =
"dtutil remove -alias <alias> " + FORMAT_SUBSTRING + " filename...";
public static final String CANCEL_USAGE =
"dtutil cancel -alias <alias> " + FORMAT_SUBSTRING + " filename...";
private boolean cancel = false;
public Remove(boolean arg) {
cancel = arg;
}
@Override
public boolean validate() {
if (alias == null) {
LOG.error("-alias flag is not optional for remove or cancel");
return false;
}
return true;
}
@Override
public void execute() throws Exception {
for (File tokenFile : tokenFiles) {
DtFileOperations.removeTokenFromFile(
cancel, tokenFile, format, alias, getConf());
}
}
@Override
public String getUsage() {
if (cancel) {
return CANCEL_USAGE;
}
return REMOVE_USAGE;
}
}
private class Renew extends SubCommand {
public static final String RENEW_USAGE =
"dtutil renew -alias <alias> filename...";
@Override
public boolean validate() {
if (alias == null) {
LOG.error("-alias flag is not optional for renew");
return false;
}
return true;
}
@Override
public void execute() throws Exception {
for (File tokenFile : tokenFiles) {
DtFileOperations.renewTokenFile(tokenFile, format, alias, getConf());
}
}
@Override
public String getUsage() {
return RENEW_USAGE;
}
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Configuration(), new DtUtilShell(), args));
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.security.token;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.common.primitives.Bytes;
import org.apache.commons.codec.binary.Base64;
@ -28,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.*;
@ -43,15 +45,15 @@ import java.util.UUID;
@InterfaceStability.Evolving
public class Token<T extends TokenIdentifier> implements Writable {
public static final Log LOG = LogFactory.getLog(Token.class);
private static Map<Text, Class<? extends TokenIdentifier>> tokenKindMap;
private byte[] identifier;
private byte[] password;
private Text kind;
private Text service;
private TokenRenewer renewer;
/**
* Construct a token given a token identifier and a secret manager for the
* type of the token identifier.
@ -64,7 +66,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
kind = id.getKind();
service = new Text();
}
/**
* Construct a token from the components.
* @param identifier the token identifier
@ -80,7 +82,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
/**
* Default constructor
* Default constructor.
*/
public Token() {
identifier = new byte[0];
@ -100,14 +102,44 @@ public class Token<T extends TokenIdentifier> implements Writable {
this.service = other.service;
}
public Token<T> copyToken() {
return new Token<T>(this);
}
/**
* Get the token identifier's byte representation
* Construct a Token from a TokenProto.
* @param tokenPB the TokenProto object
*/
public Token(TokenProto tokenPB) {
this.identifier = tokenPB.getIdentifier().toByteArray();
this.password = tokenPB.getPassword().toByteArray();
this.kind = new Text(tokenPB.getKindBytes().toByteArray());
this.service = new Text(tokenPB.getServiceBytes().toByteArray());
}
/**
* Construct a TokenProto from this Token instance.
* @return a new TokenProto object holding copies of data in this instance
*/
public TokenProto toTokenProto() {
return TokenProto.newBuilder().
setIdentifier(ByteString.copyFrom(this.getIdentifier())).
setPassword(ByteString.copyFrom(this.getPassword())).
setKindBytes(ByteString.copyFrom(
this.getKind().getBytes(), 0, this.getKind().getLength())).
setServiceBytes(ByteString.copyFrom(
this.getService().getBytes(), 0, this.getService().getLength())).
build();
}
/**
* Get the token identifier's byte representation.
* @return the token identifier's byte representation
*/
public byte[] getIdentifier() {
return identifier;
}
private static Class<? extends TokenIdentifier>
getClassForIdentifier(Text kind) {
Class<? extends TokenIdentifier> cls = null;
@ -126,12 +158,12 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
return cls;
}
/**
* Get the token identifier object, or null if it could not be constructed
* (because the class could not be loaded, for example).
* @return the token identifier, or null
* @throws IOException
* @throws IOException
*/
@SuppressWarnings("unchecked")
public T decodeIdentifier() throws IOException {
@ -141,22 +173,22 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
TokenIdentifier tokenIdentifier = ReflectionUtils.newInstance(cls, null);
ByteArrayInputStream buf = new ByteArrayInputStream(identifier);
DataInputStream in = new DataInputStream(buf);
DataInputStream in = new DataInputStream(buf);
tokenIdentifier.readFields(in);
in.close();
return (T) tokenIdentifier;
}
/**
* Get the token password/secret
* Get the token password/secret.
* @return the token password/secret
*/
public byte[] getPassword() {
return password;
}
/**
* Get the token kind
* Get the token kind.
* @return the kind of the token
*/
public synchronized Text getKind() {
@ -175,15 +207,15 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
/**
* Get the service on which the token is supposed to be used
* Get the service on which the token is supposed to be used.
* @return the service name
*/
public Text getService() {
return service;
}
/**
* Set the service on which the token is supposed to be used
* Set the service on which the token is supposed to be used.
* @param newService the service name
*/
public void setService(Text newService) {
@ -244,14 +276,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
System.arraycopy(buf.getData(), 0, raw, 0, buf.getLength());
return encoder.encodeToString(raw);
}
/**
* Modify the writable to the value from the newValue
* Modify the writable to the value from the newValue.
* @param obj the object to read into
* @param newValue the string with the url-safe base64 encoded bytes
* @throws IOException
*/
private static void decodeWritable(Writable obj,
private static void decodeWritable(Writable obj,
String newValue) throws IOException {
Base64 decoder = new Base64(0, null, true);
DataInputBuffer buf = new DataInputBuffer();
@ -261,14 +293,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
/**
* Encode this token as a url safe string
* Encode this token as a url safe string.
* @return the encoded string
* @throws IOException
*/
public String encodeToUrlString() throws IOException {
return encodeWritable(this);
}
/**
* Decode the given url safe string into this token.
* @param newValue the encoded string
@ -277,7 +309,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
public void decodeFromUrlString(String newValue) throws IOException {
decodeWritable(this, newValue);
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object right) {
@ -293,12 +325,12 @@ public class Token<T extends TokenIdentifier> implements Writable {
service.equals(r.service);
}
}
@Override
public int hashCode() {
return WritableComparator.hashBytes(identifier, identifier.length);
}
private static void addBinaryBuffer(StringBuilder buffer, byte[] bytes) {
for (int idx = 0; idx < bytes.length; idx++) {
// if not the first, put a blank separator in
@ -313,7 +345,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
buffer.append(num);
}
}
private void identifierToString(StringBuilder buffer) {
T id = null;
try {
@ -375,7 +407,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
}
/**
* Renew this delegation token
* Renew this delegation token.
* @return the new expiration time
* @throws IOException
* @throws InterruptedException
@ -384,9 +416,9 @@ public class Token<T extends TokenIdentifier> implements Writable {
) throws IOException, InterruptedException {
return getRenewer().renew(this, conf);
}
/**
* Cancel this delegation token
* Cancel this delegation token.
* @throws IOException
* @throws InterruptedException
*/
@ -394,7 +426,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
) throws IOException, InterruptedException {
getRenewer().cancel(this, conf);
}
/**
* A trivial renewer for token kinds that aren't managed. Sub-classes need
* to implement getKind for their token kind.
@ -402,7 +434,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving
public static class TrivialRenewer extends TokenRenewer {
// define the kind for this renewer
protected Text getKind() {
return null;

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
/**
* This program is a CLI utility base class utilizing hadoop Tool class.
*/
public abstract class CommandShell extends Configured implements Tool {
private PrintStream out = System.out;
private PrintStream err = System.err;
/** The subcommand instance for this shell command, if any. */
private SubCommand subcommand = null;
/**
* Return usage string for the command including any summary of subcommands.
*/
public abstract String getCommandUsage();
public void setSubCommand(SubCommand cmd) {
subcommand = cmd;
}
public void setOut(PrintStream p) {
out = p;
}
public PrintStream getOut() {
return out;
}
public void setErr(PrintStream p) {
err = p;
}
public PrintStream getErr() {
return err;
}
@Override
public int run(String[] args) throws Exception {
int exitCode = 0;
try {
exitCode = init(args);
if (exitCode != 0) {
printShellUsage();
return exitCode;
}
if (subcommand.validate()) {
subcommand.execute();
} else {
printShellUsage();
exitCode = 1;
}
} catch (Exception e) {
printShellUsage();
e.printStackTrace(err);
return 1;
}
return exitCode;
}
/**
* Parse the command line arguments and initialize subcommand instance.
* @param args
* @return 0 if the argument(s) were recognized, 1 otherwise
*/
protected abstract int init(String[] args) throws Exception;
private void printShellUsage() {
if (subcommand != null) {
out.println(subcommand.getUsage());
} else {
out.println(getCommandUsage());
}
out.flush();
}
/**
* Base class for any subcommands of this shell command.
*/
protected abstract class SubCommand {
public boolean validate() {
return true;
}
public abstract void execute() throws Exception;
public abstract String getUsage();
}
}

View File

@ -38,6 +38,17 @@ message TokenProto {
required string service = 4;
}
message CredentialsKVProto {
required string alias = 1;
optional hadoop.common.TokenProto token = 2;
optional bytes secret = 3;
}
message CredentialsProto {
repeated hadoop.common.CredentialsKVProto tokens = 1;
repeated hadoop.common.CredentialsKVProto secrets = 2;
}
message GetDelegationTokenRequestProto {
required string renewer = 1;
}

View File

@ -134,6 +134,25 @@ Change the ownership and permissions on many files at once.
Copy file or directories recursively. More information can be found at [Hadoop DistCp Guide](../../hadoop-distcp/DistCp.html).
### `dtutil`
Usage: `hadoop dtutil [-keytab` *keytab_file* `-principal` *principal_name* `]` *subcommand* `[-format (java|protobuf)] [-alias` *alias* `] [-renewer` *renewer* `]` *filename...*
Utility to fetch and manage hadoop delegation tokens inside credentials files. It is intended to replace the simpler command `fetchdt`. There are multiple subcommands, each with their own flags and options.
For every subcommand that writes out a file, the `-format` option will specify the internal format to use. `java` is the legacy format that matches `fetchdt`. The default is `protobuf`.
For every subcommand that connects to a service, convenience flags are provided to specify the kerberos principal name and keytab file to use for auth.
| SUBCOMMAND | Description |
|:---- |:---- |
| `print` <br/>&nbsp;&nbsp; `[-alias` *alias* `]` <br/>&nbsp;&nbsp; *filename* `[` *filename2* `...]` | Print out the fields in the tokens contained in *filename* (and *filename2* ...). <br/> If *alias* is specified, print only tokens matching *alias*. Otherwise, print all tokens. |
| `get` *URL* <br/>&nbsp;&nbsp; `[-service` *scheme* `]` <br/>&nbsp;&nbsp; `[-format (java|protobuf)]` <br/>&nbsp;&nbsp; `[-alias` *alias* `]` <br/>&nbsp;&nbsp; `[-renewer` *renewer* `]` <br/>&nbsp;&nbsp; *filename* | Fetch a token from service at *URL* and place it in *filename*. <br/> *URL* is required and must immediately follow `get`.<br/> *URL* is the service URL, e.g. *hdfs:&#47;&#47;localhost:9000*. <br/> *alias* will overwrite the service field in the token. <br/> It is intended for hosts that have external and internal names, e.g. *firewall.com:14000*. <br/> *filename* should come last and is the name of the token file. <br/> It will be created if it does not exist. Otherwise, token(s) are added to existing file. <br/> The `-service` flag should only be used with a URL which starts with `http` or `https`. <br/> The following are equivalent: *hdfs:&#47;&#47;localhost:9000/* vs. *http:&#47;&#47;localhost:9000* `-service` *hdfs* |
| `append` <br/>&nbsp;&nbsp; `[-format (java|protobuf)]` <br/>&nbsp;&nbsp; *filename* *filename2* `[` *filename3* `...]` | Append the contents of the first N filenames onto the last filename. <br/> When tokens with common service fields are present in multiple files, earlier files' tokens are overwritten. <br/> That is, tokens present in the last file are always preserved. |
| `remove -alias` *alias* <br/>&nbsp;&nbsp; `[-format (java|protobuf)]` <br/>&nbsp;&nbsp; *filename* `[` *filename2* `...]` | From each file specified, remove the tokens matching *alias* and write out each file using specified format. <br/> *alias* must be specified. |
| `cancel -alias` *alias* <br/>&nbsp;&nbsp; `[-format (java|protobuf)]` <br/>&nbsp;&nbsp; *filename* `[` *filename2* `...]` | Just like `remove`, except the tokens are also cancelled using the service specified in the token object. <br/> *alias* must be specified. |
| `renew -alias` *alias* <br/>&nbsp;&nbsp; `[-format (java|protobuf)]` <br/>&nbsp;&nbsp; *filename* `[` *filename2* `...]` | For each file specified, renew the tokens matching *alias* and write out each file using specified format. <br/> *alias* must be specified. |
### `fs`
This command is documented in the [File System Shell Guide](./FileSystemShell.html). It is a synonym for `hdfs dfs` when HDFS is in use.

View File

@ -16,15 +16,15 @@
<!-- MACRO{toc|fromDepth=0|toDepth=3} -->
Much of Hadoop's functionality is controlled via [the shell](CommandsManual.html). There are several ways to modify the default behavior of how these commands execute.
Much of Apache Hadoop's functionality is controlled via [the shell](CommandsManual.html). There are several ways to modify the default behavior of how these commands execute.
## Important End-User Environment Variables
Hadoop has many environment variables that control various aspects of the software. (See `hadoop-env.sh` and related files.) Some of these environment variables are dedicated to helping end users manage their runtime.
Apache Hadoop has many environment variables that control various aspects of the software. (See `hadoop-env.sh` and related files.) Some of these environment variables are dedicated to helping end users manage their runtime.
### `HADOOP_CLIENT_OPTS`
This environment variable is used for almost all end-user operations. It can be used to set any Java options as well as any Hadoop options via a system property definition. For example:
This environment variable is used for almost all end-user operations. It can be used to set any Java options as well as any Apache Hadoop options via a system property definition. For example:
```bash
HADOOP_CLIENT_OPTS="-Xmx1g -Dhadoop.socks.server=localhost:4000" hadoop fs -ls /tmp
@ -34,7 +34,7 @@ will increase the memory and send this command via a SOCKS proxy server.
### `HADOOP_USER_CLASSPATH`
The Hadoop scripts have the capability to inject more content into the classpath of the running command by setting this environment variable. It should be a colon delimited list of directories, files, or wildcard locations.
The Apache Hadoop scripts have the capability to inject more content into the classpath of the running command by setting this environment variable. It should be a colon delimited list of directories, files, or wildcard locations.
```bash
HADOOP_USER_CLASSPATH=${HOME}/lib/myjars/*.jar hadoop classpath
@ -44,13 +44,13 @@ A user can provides hints to the location of the paths via the `HADOOP_USER_CLAS
### Auto-setting of Variables
If a user has a common set of settings, they can be put into the `${HOME}/.hadooprc` file. This file is always read to initialize and override any variables that the user may want to customize. It uses bash syntax, similar to the `.bashrc` file:
If a user has a common set of settings, they can be put into the `${HOME}/.hadoop-env` file. This file is always read to initialize and override any variables that the user may want to customize. It uses bash syntax, similar to the `.bashrc` file:
For example:
```bash
#
# my custom Hadoop settings!
# my custom Apache Hadoop settings!
#
HADOOP_USER_CLASSPATH=${HOME}/hadoopjars/*
@ -58,7 +58,7 @@ HADOOP_USER_CLASSPATH_FIRST=yes
HADOOP_CLIENT_OPTS="-Xmx1g"
```
The `.hadooprc` file can also be used to extend functionality and teach Hadoop new tricks. For example, to run hadoop commands accessing the server referenced in the environment variable `${HADOOP_SERVER}`, the following in the `.hadooprc` will do just that:
The `.hadoop-env` file can also be used to extend functionality and teach Apache Hadoop new tricks. For example, to run hadoop commands accessing the server referenced in the environment variable `${HADOOP_SERVER}`, the following in the `.hadoop-env` will do just that:
```bash
@ -67,13 +67,15 @@ if [[ -n ${HADOOP_SERVER} ]]; then
fi
```
One word of warning: not all of Unix Shell API routines are available or work correctly in `.hadoop-env`. See below for more information on `.hadooprc`.
## Administrator Environment
There are many environment variables that impact how the system operates. By far, the most important are the series of `_OPTS` variables that control how daemons work. These variables should contain all of the relevant settings for those daemons.
More, detailed information is contained in `hadoop-env.sh` and the other env.sh files.
Advanced administrators may wish to supplement or do some platform-specific fixes to the existing scripts. In some systems, this means copying the errant script or creating a custom build with these changes. Hadoop provides the capabilities to do function overrides so that the existing code base may be changed in place without all of that work. Replacing functions is covered later under the Shell API documentation.
Advanced administrators may wish to supplement or do some platform-specific fixes to the existing scripts. In some systems, this means copying the errant script or creating a custom build with these changes. Apache Hadoop provides the capabilities to do function overrides so that the existing code base may be changed in place without all of that work. Replacing functions is covered later under the Shell API documentation.
## Developer and Advanced Administrator Environment
@ -89,7 +91,7 @@ An example of a shell profile is in the libexec directory.
## Shell API
Hadoop's shell code has a [function library](./UnixShellAPI.html) that is open for administrators and developers to use to assist in their configuration and advanced feature management. These APIs follow the standard [Hadoop Interface Classification](./InterfaceClassification.html), with one addition: Replaceable.
Apache Hadoop's shell code has a [function library](./UnixShellAPI.html) that is open for administrators and developers to use to assist in their configuration and advanced feature management. These APIs follow the standard [Apache Hadoop Interface Classification](./InterfaceClassification.html), with one addition: Replaceable.
The shell code allows for core functions to be overridden. However, not all functions can be or are safe to be replaced. If a function is not safe to replace, it will have an attribute of Replaceable: No. If a function is safe to replace, it will have the attribute of Replaceable: Yes.
@ -98,3 +100,15 @@ In order to replace a function, create a file called `hadoop-user-functions.sh`
Functions that are marked Public and Stable are safe to use in shell profiles as-is. Other functions may change in a minor release.
### User-level API Access
In addition to `.hadoop-env`, which allows individual users to override `hadoop-env.sh`, user's may also use `.hadooprc`. This is called after the Apache Hadoop shell environment has been configured and allows the full set of shell API function calls.
For example:
```bash
hadoop_add_classpath /some/path/custom.jar
```
would go into `.hadooprc`

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -34,6 +35,7 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@ -54,12 +56,13 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
protected static class SerialCaller extends Thread {
static class AsyncCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@ -68,11 +71,11 @@ public class TestAsyncIPC {
new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
public SerialCaller(Client client, InetSocketAddress server, int count) {
public AsyncCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
// set asynchronous mode, since SerialCaller extends Thread
// set asynchronous mode, since AsyncCaller extends Thread
Client.setAsynchronousMode(true);
}
@ -107,14 +110,111 @@ public class TestAsyncIPC {
}
}
@Test
public void testSerial() throws IOException, InterruptedException,
ExecutionException {
internalTestSerial(3, false, 2, 5, 100);
internalTestSerial(3, true, 2, 5, 10);
static class AsyncLimitlCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
private boolean failed;
Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
int start = 0, end = 0;
int getStart() {
return start;
}
int getEnd() {
return end;
}
int getCount() {
return count;
}
public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
this(0, client, server, count);
}
final int callerId;
public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
int count) {
this.client = client;
this.server = server;
this.count = count;
// set asynchronous mode, since AsyncLimitlCaller extends Thread
Client.setAsynchronousMode(true);
this.callerId = callerId;
}
@Override
public void run() {
// in case Thread#Start is called, which will spawn new thread
Client.setAsynchronousMode(true);
for (int i = 0; i < count; i++) {
try {
final long param = TestIPC.RANDOM.nextLong();
runCall(i, param);
} catch (Exception e) {
LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
StringUtils.stringifyException(e)));
failed = true;
}
}
}
private void runCall(final int idx, final long param)
throws InterruptedException, ExecutionException, IOException {
for (;;) {
try {
doCall(idx, param);
return;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async calls
* to let follow-on calls go
*/
start = end;
end = idx;
waitForReturnValues(start, end);
}
}
}
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
private void waitForReturnValues(final int start, final int end)
throws InterruptedException, ExecutionException {
for (int i = start; i < end; i++) {
LongWritable value = returnFutures.get(i).get();
if (expectedValues.get(i) != value.get()) {
LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
failed = true;
break;
}
}
}
}
public void internalTestSerial(int handlerCount, boolean handlerSleep,
@Test(timeout = 60000)
public void testAsyncCall() throws IOException, InterruptedException,
ExecutionException {
internalTestAsyncCall(3, false, 2, 5, 100);
internalTestAsyncCall(3, true, 2, 5, 10);
}
@Test(timeout = 60000)
public void testAsyncCallLimit() throws IOException,
InterruptedException, ExecutionException {
internalTestAsyncCallLimit(100, false, 5, 10, 500);
}
public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@ -126,9 +226,9 @@ public class TestAsyncIPC {
clients[i] = new Client(LongWritable.class, conf);
}
SerialCaller[] callers = new SerialCaller[callerCount];
AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@ -144,6 +244,75 @@ public class TestAsyncIPC {
server.stop();
}
@Test(timeout = 60000)
public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
InterruptedException, ExecutionException {
int handlerCount = 10, callCount = 100;
Server server = new TestIPC.TestServer(handlerCount, false, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final Client client = new Client(LongWritable.class, conf);
int asyncCallCount = client.getAsyncCallCount();
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
caller.waitForReturnValues();
String msg = String.format(
"First time, expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
caller.waitForReturnValues();
assertTrue(asyncCallCount == client.getAsyncCallCount());
msg = String.format("Second time, expected not failed for caller: %s.",
caller);
assertFalse(msg, caller.failed);
assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally {
client.stop();
server.stop();
}
}
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
Client[] clients = new Client[clientCount];
for (int i = 0; i < clientCount; i++) {
clients[i] = new Client(LongWritable.class, conf);
}
AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
callers[i].waitForReturnValues(callers[i].getStart(),
callers[i].getCount());
String msg = String.format("Expected not failed for caller-%d: %s.", i,
callers[i]);
assertFalse(msg, callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
}
server.stop();
}
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@ -196,7 +365,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 4);
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -235,7 +404,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -272,7 +441,7 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -313,9 +482,9 @@ public class TestAsyncIPC {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
SerialCaller[] callers = new SerialCaller[callerCount];
AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {

View File

@ -25,11 +25,14 @@ import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.Key;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Collection;
@ -49,38 +52,38 @@ import static org.junit.Assert.*;
public class TestCredentials {
private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA1";
private static final File tmpDir = GenericTestUtils.getTestDir("mapred");
@Before
public void setUp() {
tmpDir.mkdir();
}
@After
public void tearDown() {
tmpDir.delete();
}
@SuppressWarnings("unchecked")
@Test
public <T extends TokenIdentifier> void testReadWriteStorage()
@Test
public <T extends TokenIdentifier> void testReadWriteStorage()
throws IOException, NoSuchAlgorithmException{
// create tokenStorage Object
Credentials ts = new Credentials();
Token<T> token1 = new Token();
Token<T> token2 = new Token();
Text service1 = new Text("service1");
Text service2 = new Text("service2");
Collection<Text> services = new ArrayList<Text>();
services.add(service1);
services.add(service2);
token1.setService(service1);
token2.setService(service2);
ts.addToken(new Text("sometoken1"), token1);
ts.addToken(new Text("sometoken2"), token2);
// create keys and put it in
final KeyGenerator kg = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
String alias = "alias";
@ -90,24 +93,24 @@ public class TestCredentials {
m.put(new Text(alias+i), key.getEncoded());
ts.addSecretKey(new Text(alias+i), key.getEncoded());
}
// create file to store
File tmpFileName = new File(tmpDir, "tokenStorageTest");
DataOutputStream dos =
DataOutputStream dos =
new DataOutputStream(new FileOutputStream(tmpFileName));
ts.write(dos);
dos.close();
// open and read it back
DataInputStream dis =
new DataInputStream(new FileInputStream(tmpFileName));
DataInputStream dis =
new DataInputStream(new FileInputStream(tmpFileName));
ts = new Credentials();
ts.readFields(dis);
dis.close();
// get the tokens and compare the services
Collection<Token<? extends TokenIdentifier>> list = ts.getAllTokens();
assertEquals("getAllTokens should return collection of size 2",
assertEquals("getAllTokens should return collection of size 2",
list.size(), 2);
boolean foundFirst = false;
boolean foundSecond = false;
@ -119,22 +122,213 @@ public class TestCredentials {
foundSecond = true;
}
}
assertTrue("Tokens for services service1 and service2 must be present",
assertTrue("Tokens for services service1 and service2 must be present",
foundFirst && foundSecond);
// compare secret keys
int mapLen = m.size();
assertEquals("wrong number of keys in the Storage",
assertEquals("wrong number of keys in the Storage",
mapLen, ts.numberOfSecretKeys());
for(Text a : m.keySet()) {
byte [] kTS = ts.getSecretKey(a);
byte [] kLocal = m.get(a);
assertTrue("keys don't match for " + a,
assertTrue("keys don't match for " + a,
WritableComparator.compareBytes(kTS, 0, kTS.length, kLocal,
0, kLocal.length)==0);
}
tmpFileName.delete();
}
@Test
public void testBasicReadWriteProtoEmpty()
throws IOException, NoSuchAlgorithmException {
String testname ="testBasicReadWriteProtoEmpty";
Credentials ts = new Credentials();
writeCredentialsProto(ts, testname);
Credentials ts2 = readCredentialsProto(testname);
assertEquals("test empty tokens", 0, ts2.numberOfTokens());
assertEquals("test empty keys", 0, ts2.numberOfSecretKeys());
}
@Test
public void testBasicReadWriteProto()
throws IOException, NoSuchAlgorithmException {
String testname ="testBasicReadWriteProto";
Text tok1 = new Text("token1");
Text tok2 = new Text("token2");
Text key1 = new Text("key1");
Credentials ts = generateCredentials(tok1, tok2, key1);
writeCredentialsProto(ts, testname);
Credentials ts2 = readCredentialsProto(testname);
assertCredentials(testname, tok1, key1, ts, ts2);
assertCredentials(testname, tok2, key1, ts, ts2);
}
@Test
public void testBasicReadWriteStreamEmpty()
throws IOException, NoSuchAlgorithmException {
String testname ="testBasicReadWriteStreamEmpty";
Credentials ts = new Credentials();
writeCredentialsStream(ts, testname);
Credentials ts2 = readCredentialsStream(testname);
assertEquals("test empty tokens", 0, ts2.numberOfTokens());
assertEquals("test empty keys", 0, ts2.numberOfSecretKeys());
}
@Test
public void testBasicReadWriteStream()
throws IOException, NoSuchAlgorithmException {
String testname ="testBasicReadWriteStream";
Text tok1 = new Text("token1");
Text tok2 = new Text("token2");
Text key1 = new Text("key1");
Credentials ts = generateCredentials(tok1, tok2, key1);
writeCredentialsStream(ts, testname);
Credentials ts2 = readCredentialsStream(testname);
assertCredentials(testname, tok1, key1, ts, ts2);
assertCredentials(testname, tok2, key1, ts, ts2);
}
@Test
/**
* Verify the suitability of read/writeProto for use with Writable interface.
* This test uses only empty credentials.
*/
public void testWritablePropertiesEmpty()
throws IOException, NoSuchAlgorithmException {
String testname ="testWritablePropertiesEmpty";
Credentials ts = new Credentials();
Credentials ts2 = new Credentials();
writeCredentialsProtos(ts, ts2, testname);
List<Credentials> clist = readCredentialsProtos(testname);
assertEquals("test empty tokens 0", 0, clist.get(0).numberOfTokens());
assertEquals("test empty keys 0", 0, clist.get(0).numberOfSecretKeys());
assertEquals("test empty tokens 1", 0, clist.get(1).numberOfTokens());
assertEquals("test empty keys 1", 0, clist.get(1).numberOfSecretKeys());
}
@Test
/**
* Verify the suitability of read/writeProto for use with Writable interface.
*/
public void testWritableProperties()
throws IOException, NoSuchAlgorithmException {
String testname ="testWritableProperties";
Text tok1 = new Text("token1");
Text tok2 = new Text("token2");
Text key1 = new Text("key1");
Credentials ts = generateCredentials(tok1, tok2, key1);
Text tok3 = new Text("token3");
Text key2 = new Text("key2");
Credentials ts2 = generateCredentials(tok1, tok3, key2);
writeCredentialsProtos(ts, ts2, testname);
List<Credentials> clist = readCredentialsProtos(testname);
assertCredentials(testname, tok1, key1, ts, clist.get(0));
assertCredentials(testname, tok2, key1, ts, clist.get(0));
assertCredentials(testname, tok1, key2, ts2, clist.get(1));
assertCredentials(testname, tok3, key2, ts2, clist.get(1));
}
private Credentials generateCredentials(Text t1, Text t2, Text t3)
throws NoSuchAlgorithmException {
Text kind = new Text("TESTTOK");
byte[] id1 = {0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72};
byte[] pass1 = {0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64};
byte[] id2 = {0x68, 0x63, 0x64, 0x6d, 0x73, 0x68, 0x65, 0x68, 0x64, 0x71};
byte[] pass2 = {0x6f, 0x60, 0x72, 0x72, 0x76, 0x6e, 0x71, 0x63};
Credentials ts = new Credentials();
generateToken(ts, id1, pass1, kind, t1);
generateToken(ts, id2, pass2, kind, t2);
generateKey(ts, t3);
return ts;
}
private void assertCredentials(String tag, Text alias, Text keykey,
Credentials a, Credentials b) {
assertEquals(tag + ": test token count", a.numberOfTokens(),
b.numberOfTokens());
assertEquals(tag + ": test service", a.getToken(alias).getService(),
b.getToken(alias).getService());
assertEquals(tag + ": test kind", a.getToken(alias).getKind(),
b.getToken(alias).getKind());
assertTrue(tag + ": test password",
Arrays.equals(a.getToken(alias).getPassword(),
b.getToken(alias).getPassword()));
assertTrue(tag + ": test identifier",
Arrays.equals(a.getToken(alias).getIdentifier(),
b.getToken(alias).getIdentifier()));
assertEquals(tag + ": test number of keys", a.numberOfSecretKeys(),
b.numberOfSecretKeys());
assertTrue(tag + ":test key values", Arrays.equals(a.getSecretKey(keykey),
b.getSecretKey(keykey)));
}
private void writeCredentialsStream(Credentials creds, String filename)
throws IOException, FileNotFoundException {
DataOutputStream dos = new DataOutputStream(
new FileOutputStream(new File(tmpDir, filename)));
creds.writeTokenStorageToStream(dos);
}
private Credentials readCredentialsStream(String filename)
throws IOException, FileNotFoundException {
Credentials creds = new Credentials();
DataInputStream dis = new DataInputStream(
new FileInputStream(new File(tmpDir, filename)));
creds.readTokenStorageStream(dis);
return creds;
}
private void writeCredentialsProto(Credentials creds, String filename)
throws IOException, FileNotFoundException {
DataOutputStream dos = new DataOutputStream(
new FileOutputStream(new File(tmpDir, filename)));
creds.writeProto(dos);
}
private Credentials readCredentialsProto(String filename)
throws IOException, FileNotFoundException {
Credentials creds = new Credentials();
DataInputStream dis = new DataInputStream(
new FileInputStream(new File(tmpDir, filename)));
creds.readProto(dis);
return creds;
}
private void writeCredentialsProtos(Credentials c1, Credentials c2,
String filename) throws IOException, FileNotFoundException {
DataOutputStream dos = new DataOutputStream(
new FileOutputStream(new File(tmpDir, filename)));
c1.writeProto(dos);
c2.writeProto(dos);
}
private List<Credentials> readCredentialsProtos(String filename)
throws IOException, FileNotFoundException {
Credentials c1 = new Credentials();
Credentials c2 = new Credentials();
DataInputStream dis = new DataInputStream(
new FileInputStream(new File(tmpDir, filename)));
c1.readProto(dis);
c2.readProto(dis);
List<Credentials> r = new ArrayList<Credentials>(2);
r.add(0, c1);
r.add(1, c2);
return r;
}
private <T extends TokenIdentifier> void generateToken(
Credentials creds, byte[] ident, byte[] pass, Text kind, Text service) {
Token<T> token = new Token(ident, pass, kind, service);
creds.addToken(service, token);
}
private void generateKey(Credentials creds, Text alias)
throws NoSuchAlgorithmException {
final KeyGenerator kg = KeyGenerator.getInstance(DEFAULT_HMAC_ALGORITHM);
Key key = kg.generateKey();
creds.addSecretKey(alias, key.getEncoded());
}
static Text secret[] = {
new Text("secret1"),
new Text("secret2"),
@ -153,7 +347,7 @@ public class TestCredentials {
new Token<TokenIdentifier>(),
new Token<TokenIdentifier>()
};
@Test
public void addAll() {
Credentials creds = new Credentials();
@ -168,7 +362,7 @@ public class TestCredentials {
credsToAdd.addToken(service[2], token[2]);
credsToAdd.addSecretKey(secret[0], secret[3].getBytes());
credsToAdd.addSecretKey(secret[2], secret[2].getBytes());
creds.addAll(credsToAdd);
assertEquals(3, creds.numberOfTokens());
assertEquals(3, creds.numberOfSecretKeys());
@ -190,14 +384,14 @@ public class TestCredentials {
creds.addToken(service[1], token[1]);
creds.addSecretKey(secret[0], secret[0].getBytes());
creds.addSecretKey(secret[1], secret[1].getBytes());
Credentials credsToAdd = new Credentials();
// one duplicate with different value, one new
credsToAdd.addToken(service[0], token[3]);
credsToAdd.addToken(service[2], token[2]);
credsToAdd.addSecretKey(secret[0], secret[3].getBytes());
credsToAdd.addSecretKey(secret[2], secret[2].getBytes());
creds.mergeAll(credsToAdd);
assertEquals(3, creds.numberOfTokens());
assertEquals(3, creds.numberOfSecretKeys());
@ -211,12 +405,12 @@ public class TestCredentials {
assertEquals(token[2], creds.getToken(service[2]));
assertEquals(secret[2], new Text(creds.getSecretKey(secret[2])));
}
@Test
public void testAddTokensToUGI() {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("someone");
Credentials creds = new Credentials();
for (int i=0; i < service.length; i++) {
creds.addToken(service[i], token[i]);
}
@ -228,4 +422,4 @@ public class TestCredentials {
}
assertEquals(service.length, creds.numberOfTokens());
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.DtFetcher;
public class TestDtFetcher implements DtFetcher {
public Text getServiceName() {
return TestDtUtilShell.SERVICE_GET;
}
public boolean isTokenRequired() {
return true;
}
public Token<?> addDelegationTokens(Configuration conf,
Credentials creds, String renewer, String url) throws Exception {
creds.addToken(TestDtUtilShell.MOCK_TOKEN.getService(),
TestDtUtilShell.MOCK_TOKEN);
return TestDtUtilShell.MOCK_TOKEN;
}
}

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.DtFetcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestDtUtilShell {
private static byte[] IDENTIFIER = {
0x69, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72};
private static byte[] PASSWORD = {
0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64};
private static Text KIND = new Text("testTokenKind");
private static Text SERVICE = new Text("testTokenService");
private static Text SERVICE2 = new Text("ecivreSnekoTtset");
private static Configuration defaultConf = new Configuration();
private static FileSystem localFs = null;
private final String alias = "proxy_ip:1234";
private final String renewer = "yarn";
private final String getUrl = SERVICE_GET.toString() + "://localhost:9000/";
private final String getUrl2 = "http://localhost:9000/";
public static Text SERVICE_GET = new Text("testTokenServiceGet");
public static Text KIND_GET = new Text("testTokenKindGet");
public static Token<?> MOCK_TOKEN =
new Token(IDENTIFIER, PASSWORD, KIND_GET, SERVICE_GET);
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
localFs = FileSystem.getLocal(defaultConf);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final Path workDir = new Path(
System.getProperty("test.build.data", "/tmp"), "TestDtUtilShell");
private final Path tokenFile = new Path(workDir, "testPrintTokenFile");
private final Path tokenFile2 = new Path(workDir, "testPrintTokenFile2");
private final Path tokenLegacyFile = new Path(workDir, "testPrintTokenFile3");
private final Path tokenFileGet = new Path(workDir, "testGetTokenFile");
private final String tokenFilename = tokenFile.toString();
private final String tokenFilename2 = tokenFile2.toString();
private final String tokenFilenameGet = tokenFileGet.toString();
private String[] args = null;
private DtUtilShell dt = null;
private int rc = 0;
@Before
public void setup() throws Exception {
localFs.mkdirs(localFs.makeQualified(workDir));
makeTokenFile(tokenFile, false, null);
makeTokenFile(tokenFile2, false, SERVICE2);
makeTokenFile(tokenLegacyFile, true, null);
dt = new DtUtilShell();
dt.setConf(new Configuration());
dt.setOut(new PrintStream(outContent));
outContent.reset();
rc = 0;
}
@After
public void teardown() throws Exception {
localFs.delete(localFs.makeQualified(workDir), true);
}
public void makeTokenFile(Path tokenPath, boolean legacy, Text service)
throws IOException {
if (service == null) {
service = SERVICE;
}
Credentials creds = new Credentials();
Token<? extends TokenIdentifier> tok = (Token<? extends TokenIdentifier>)
new Token(IDENTIFIER, PASSWORD, KIND, service);
creds.addToken(tok.getService(), tok);
if (legacy) {
creds.writeLegacyTokenStorageLocalFile(new File(tokenPath.toString()));
} else {
creds.writeTokenStorageFile(tokenPath, defaultConf);
}
}
@Test
public void testPrint() throws Exception {
args = new String[] {"print", tokenFilename};
rc = dt.run(args);
assertEquals("test simple print exit code", 0, rc);
assertTrue("test simple print output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertTrue("test simple print output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
outContent.reset();
args = new String[] {"print", tokenLegacyFile.toString()};
rc = dt.run(args);
assertEquals("test legacy print exit code", 0, rc);
assertTrue("test simple print output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertTrue("test simple print output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
outContent.reset();
args = new String[] {
"print", "-alias", SERVICE.toString(), tokenFilename};
rc = dt.run(args);
assertEquals("test alias print exit code", 0, rc);
assertTrue("test simple print output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertTrue("test simple print output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
outContent.reset();
args = new String[] {
"print", "-alias", "not-a-serivce", tokenFilename};
rc = dt.run(args);
assertEquals("test no alias print exit code", 0, rc);
assertFalse("test no alias print output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertFalse("test no alias print output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
}
@Test
public void testAppend() throws Exception {
args = new String[] {"append", tokenFilename, tokenFilename2};
rc = dt.run(args);
assertEquals("test simple append exit code", 0, rc);
args = new String[] {"print", tokenFilename2};
rc = dt.run(args);
assertEquals("test simple append print exit code", 0, rc);
assertTrue("test simple append output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertTrue("test simple append output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
assertTrue("test simple append output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE2.toString()));
}
@Test
public void testRemove() throws Exception {
args = new String[] {"remove", "-alias", SERVICE.toString(), tokenFilename};
rc = dt.run(args);
assertEquals("test simple remove exit code", 0, rc);
args = new String[] {"print", tokenFilename};
rc = dt.run(args);
assertEquals("test simple remove print exit code", 0, rc);
assertFalse("test simple remove output kind:\n" + outContent.toString(),
outContent.toString().contains(KIND.toString()));
assertFalse("test simple remove output service:\n" + outContent.toString(),
outContent.toString().contains(SERVICE.toString()));
}
@Test
public void testGet() throws Exception {
args = new String[] {"get", getUrl, tokenFilenameGet};
rc = dt.run(args);
assertEquals("test mocked get exit code", 0, rc);
args = new String[] {"print", tokenFilenameGet};
rc = dt.run(args);
String oc = outContent.toString();
assertEquals("test print after get exit code", 0, rc);
assertTrue("test print after get output kind:\n" + oc,
oc.contains(KIND_GET.toString()));
assertTrue("test print after get output service:\n" + oc,
oc.contains(SERVICE_GET.toString()));
}
@Test
public void testGetWithServiceFlag() throws Exception {
args = new String[] {"get", getUrl2, "-service", SERVICE_GET.toString(),
tokenFilenameGet};
rc = dt.run(args);
assertEquals("test mocked get with service flag exit code", 0, rc);
args = new String[] {"print", tokenFilenameGet};
rc = dt.run(args);
String oc = outContent.toString();
assertEquals("test print after get with service flag exit code", 0, rc);
assertTrue("test print after get with service flag output kind:\n" + oc,
oc.contains(KIND_GET.toString()));
assertTrue("test print after get with service flag output service:\n" + oc,
oc.contains(SERVICE_GET.toString()));
}
@Test
public void testGetWithAliasFlag() throws Exception {
args = new String[] {"get", getUrl, "-alias", alias, tokenFilenameGet};
rc = dt.run(args);
assertEquals("test mocked get with alias flag exit code", 0, rc);
args = new String[] {"print", tokenFilenameGet};
rc = dt.run(args);
String oc = outContent.toString();
assertEquals("test print after get with alias flag exit code", 0, rc);
assertTrue("test print after get with alias flag output kind:\n" + oc,
oc.contains(KIND_GET.toString()));
assertTrue("test print after get with alias flag output alias:\n" + oc,
oc.contains(alias));
assertFalse("test print after get with alias flag output old service:\n" +
oc, oc.contains(SERVICE_GET.toString()));
}
@Test
public void testFormatJavaFlag() throws Exception {
args = new String[] {"get", getUrl, "-format", "java", tokenFilenameGet};
rc = dt.run(args);
assertEquals("test mocked get with java format flag exit code", 0, rc);
Credentials creds = new Credentials();
Credentials spyCreds = Mockito.spy(creds);
DataInputStream in = new DataInputStream(
new FileInputStream(tokenFilenameGet));
spyCreds.readTokenStorageStream(in);
Mockito.verify(spyCreds).readFields(in);
}
@Test
public void testFormatProtoFlag() throws Exception {
args = new String[] {
"get", getUrl, "-format", "protobuf", tokenFilenameGet};
rc = dt.run(args);
assertEquals("test mocked get with protobuf format flag exit code", 0, rc);
Credentials creds = new Credentials();
Credentials spyCreds = Mockito.spy(creds);
DataInputStream in = new DataInputStream(
new FileInputStream(tokenFilenameGet));
spyCreds.readTokenStorageStream(in);
Mockito.verify(spyCreds).readProto(in);
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.CommandShell;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
public class TestCommandShell {
public class Example extends CommandShell {
public static final String EXAMPLE = "example";
public static final String HELLO = "hello";
public static final String HELLO_MSG = "hello is running";
public static final String GOODBYE = "goodbye";
public static final String GOODBYE_MSG = "goodbye is running";
public String[] savedArgs = null;
@Override
protected int init(String[] args) throws Exception {
String command = args[0];
if (command.equals(HELLO)) {
setSubCommand(new Hello());
} else if (command.equals(GOODBYE)) {
setSubCommand(new Goodbye());
} else{
return 1;
}
savedArgs = args;
return 0;
}
public String getCommandUsage() {
return EXAMPLE;
}
public class Hello extends SubCommand {
public static final String HELLO_USAGE = EXAMPLE + " hello";
@Override
public boolean validate() {
return savedArgs.length == 1;
}
@Override
public void execute() throws Exception {
System.out.println(HELLO_MSG);
}
@Override
public String getUsage() {
return HELLO_USAGE;
}
}
public class Goodbye extends SubCommand {
public static final String GOODBYE_USAGE = EXAMPLE + " goodbye";
@Override
public void execute() throws Exception {
System.out.println(GOODBYE_MSG);
}
@Override
public String getUsage() {
return GOODBYE_USAGE;
}
}
}
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private String outMsg(String message) {
return "OUT:\n" + outContent.toString() + "\n" + message;
}
@Before
public void setup() throws Exception {
System.setOut(new PrintStream(outContent));
}
@Test
public void testCommandShellExample() throws Exception {
Example ex = new Example();
ex.setConf(new Configuration());
int rc = 0;
outContent.reset();
String[] args1 = {"hello"};
rc = ex.run(args1);
assertEquals(outMsg("test exit code - normal hello"), 0, rc);
assertTrue(outMsg("test normal hello message"),
outContent.toString().contains(Example.HELLO_MSG));
outContent.reset();
String[] args2 = {"hello", "x"};
rc = ex.run(args2);
assertEquals(outMsg("test exit code - bad hello"), 1, rc);
assertTrue(outMsg("test bad hello message"),
outContent.toString().contains(Example.Hello.HELLO_USAGE));
outContent.reset();
String[] args3 = {"goodbye"};
rc = ex.run(args3);
assertEquals(outMsg("test exit code - normal goodbye"), 0, rc);
assertTrue(outMsg("test normal goodbye message"),
outContent.toString().contains(Example.GOODBYE_MSG));
}
}

View File

@ -0,0 +1,14 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.security.token.TestDtFetcher

View File

@ -89,4 +89,9 @@ create_fake_dirs () {
[ ${unittest} = "hadooprc" ]
}
@test "hadoop_exec_user_hadoopenv" {
HOME=${TMP}
echo "unittest=hadoopenv" > "${TMP}/.hadoop-env"
hadoop_exec_user_hadoopenv
[ ${unittest} = "hadoopenv" ]
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@ -50,11 +51,14 @@ public class AsyncDistributedFileSystem {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() {
private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
if (called.compareAndSet(false, true)) {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
}
}
return super.get();
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.DtFetcher;
import org.apache.hadoop.security.token.Token;
/**
* DtFetcher is an interface which permits the abstraction and separation of
* delegation token fetch implementaions across different packages and
* compilation units. Resolution of fetcher impl will be done at runtime.
*/
public class HdfsDtFetcher implements DtFetcher {
private static final Log LOG = LogFactory.getLog(HdfsDtFetcher.class);
private static final String SERVICE_NAME = HdfsConstants.HDFS_URI_SCHEME;
private static final String FETCH_FAILED = "Fetch of delegation token failed";
/**
* Returns the service name for HDFS, which is also a valid URL prefix.
*/
public Text getServiceName() {
return new Text(SERVICE_NAME);
}
public boolean isTokenRequired() {
return UserGroupInformation.isSecurityEnabled();
}
/**
* Returns Token object via FileSystem, null if bad argument.
* @param conf - a Configuration object used with FileSystem.get()
* @param creds - a Credentials object to which token(s) will be added
* @param renewer - the renewer to send with the token request
* @param url - the URL to which the request is sent
* @return a Token, or null if fetch fails.
*/
public Token<?> addDelegationTokens(Configuration conf, Credentials creds,
String renewer, String url) throws Exception {
if (!url.startsWith(getServiceName().toString())) {
url = getServiceName().toString() + "://" + url;
}
FileSystem fs = FileSystem.get(URI.create(url), conf);
Token<?> token = fs.getDelegationToken(renewer);
if (token == null) {
LOG.error(FETCH_FAILED);
throw new IOException(FETCH_FAILED);
}
creds.addToken(token.getService(), token);
return token;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.io.Text;
/**
* DtFetcher for SWebHdfsFileSystem using the base class HdfsDtFetcher impl.
*/
public class SWebHdfsDtFetcher extends HdfsDtFetcher {
private static final Log LOG = LogFactory.getLog(SWebHdfsDtFetcher.class);
private static final String SERVICE_NAME = WebHdfsConstants.SWEBHDFS_SCHEME;
@Override
public Text getServiceName() {
return new Text(SERVICE_NAME);
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.io.Text;
/**
* DtFetcher for WebHdfsFileSystem using the base class HdfsDtFetcher impl.
*/
public class WebHdfsDtFetcher extends HdfsDtFetcher {
private static final Log LOG = LogFactory.getLog(WebHdfsDtFetcher.class);
private static final String SERVICE_NAME = WebHdfsConstants.WEBHDFS_SCHEME;
@Override
public Text getServiceName() {
return new Text(SERVICE_NAME);
}
}

View File

@ -4798,7 +4798,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
checkOperation(OperationCategory.WRITE);
NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4807,6 +4806,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
DatanodeInfo[] nodes = blocks[i].getLocations();
String[] storageIDs = blocks[i].getStorageIDs();
for (int j = 0; j < nodes.length; j++) {
NameNode.stateChangeLog.info("*DIR* reportBadBlocks for block: {} on"
+ " datanode: {}", blk, nodes[j].getXferAddr());
blockManager.findAndMarkBlockAsCorrupt(blk, nodes[j],
storageIDs == null ? null: storageIDs[j],
"client machine reported it");

View File

@ -179,7 +179,8 @@ public class DelegationTokenFetcher {
if (null != token) {
Credentials cred = new Credentials();
cred.addToken(token.getService(), token);
cred.writeTokenStorageFile(tokenFile, conf);
// dtutil is replacing this tool; preserve legacy functionality
cred.writeLegacyTokenStorageFile(tokenFile, conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Fetched token " + fs.getUri() + " for " +

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.hadoop.hdfs.HdfsDtFetcher
org.apache.hadoop.hdfs.WebHdfsDtFetcher
org.apache.hadoop.hdfs.SWebHdfsDtFetcher

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@ -31,80 +30,25 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
final private static Configuration CONF = new HdfsConfiguration();
final private static String GROUP1_NAME = "group1";
final private static String GROUP2_NAME = "group2";
final private static String USER1_NAME = "user1";
private static final UserGroupInformation USER1;
private MiniDFSCluster gCluster;
static {
// explicitly turn on permission checking
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
// Initiate all four users
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
GROUP1_NAME, GROUP2_NAME });
}
@Before
public void setUp() throws IOException {
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
gCluster.waitActive();
}
@After
public void tearDown() throws IOException {
if (gCluster != null) {
gCluster.shutdown();
gCluster = null;
}
}
static int countLease(MiniDFSCluster cluster) {
return TestDFSRename.countLease(cluster);
}
void list(DistributedFileSystem dfs, String name) throws IOException {
FileSystem.LOG.info("\n\n" + name);
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
FileSystem.LOG.info("" + s.getPath());
}
}
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
DataOutputStream a_out = dfs.create(f);
a_out.writeBytes("something");
a_out.close();
}
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
@Test
@Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@ -169,26 +113,26 @@ public class TestAsyncDFSRename {
}
}
@Test
public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
"/test/concurrent_reanme_with_overwrite_dir/");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
"/test/testCallGetReturnValueMultipleTimes/");
final Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
final DistributedFileSystem dfs = cluster.getFileSystem();
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
final int count = 100;
long fileLen = blockSize * 3;
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try {
long fileLen = blockSize * 3;
assertTrue(dfs.mkdirs(renameDir));
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
@ -199,8 +143,104 @@ public class TestAsyncDFSRename {
returnFutures.put(i, returnFuture);
}
// wait for completing the calls
for (int i = 0; i < 5; i++) {
verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
renameDir, dfs);
}
} finally {
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void verifyCallGetReturnValueMultipleTimes(
Map<Integer, Future<Void>> returnFutures, int count,
MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
throws InterruptedException, ExecutionException, IOException {
// wait for completing the calls
for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
// Restart NN and check the rename successfully
cluster.restartNameNodes();
// very the src dir should not exist, dst should
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
assertFalse(dfs.exists(src));
assertTrue(dfs.exists(dst));
}
}
@Test(timeout = 120000)
public void testAggressiveConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(100,
"testAggressiveConcurrentAsyncRenameWithOverwrite");
}
@Test(timeout = 60000)
public void testConservativeConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(10000,
"testConservativeConcurrentAsyncRenameWithOverwrite");
}
private void internalTestConcurrentAsyncRenameWithOverwrite(
final int asyncCallLimit, final String basePath) throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
asyncCallLimit);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
long fileLen = blockSize * 3;
int start = 0, end = 0;
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try {
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
for (;;) {
try {
LOG.info("rename #" + i);
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFutures.put(i, returnFuture);
break;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async
* calls to let follow-on calls go
*/
LOG.error(e);
start = end;
end = i;
LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
waitForReturnValues(returnFutures, start, end);
}
}
}
// wait for completing the calls
for (int i = start; i < count; i++) {
returnFutures.get(i).get();
}
@ -215,26 +255,60 @@ public class TestAsyncDFSRename {
assertTrue(dfs.exists(dst));
}
} finally {
dfs.delete(renameDir, true);
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
private void waitForReturnValues(
final Map<Integer, Future<Void>> returnFutures, final int start,
final int end) throws InterruptedException, ExecutionException {
LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
for (int i = start; i < end; i++) {
LOG.info("calling Future#get #" + i);
returnFutures.get(i).get();
}
}
@Test(timeout = 60000)
public void testAsyncRenameWithException() throws Exception {
FileSystem rootFs = FileSystem.get(CONF);
Configuration conf = new HdfsConfiguration();
String group1 = "group1";
String group2 = "group2";
String user1 = "user1";
UserGroupInformation ugi1;
// explicitly turn on permission checking
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(user1, new String[] { group1, group2 });
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
// Initiate all four users
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
group1, group2 });
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build();
cluster.waitActive();
FileSystem rootFs = FileSystem.get(conf);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
AsyncDistributedFileSystem adfs = USER1
AsyncDistributedFileSystem adfs = ugi1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
return gCluster.getFileSystem().getAsyncDistributedFileSystem();
return cluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@ -242,16 +316,24 @@ public class TestAsyncDFSRename {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
checkPermissionDenied(e, src);
checkPermissionDenied(e, src, user1);
} finally {
if (rootFs != null) {
rootFs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void checkPermissionDenied(final Exception e, final Path dir) {
private void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(USER1_NAME));
.getMessage().contains(user));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}