HADOOP-6547. Move DelegationToken into Common, so that it can be used by
MapReduce also. (devaraj via omalley) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@907956 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5b0d8a0162
commit
38c59c0af7
|
@ -131,6 +131,9 @@ Trunk (unreleased changes)
|
||||||
HADOOP-6531. Enhance FileUtil with an API to delete all contents of a
|
HADOOP-6531. Enhance FileUtil with an API to delete all contents of a
|
||||||
directory. (Amareshwari Sriramadasu via yhemanth)
|
directory. (Amareshwari Sriramadasu via yhemanth)
|
||||||
|
|
||||||
|
HADOOP-6547. Move DelegationToken into Common, so that it can be used by
|
||||||
|
MapReduce also. (devaraj via omalley)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -0,0 +1,166 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.security.token.delegation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.HDFS;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.MAPREDUCE;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
|
||||||
|
@InterfaceAudience.LimitedPrivate({HDFS, MAPREDUCE})
|
||||||
|
public abstract class AbstractDelegationTokenIdentifier
|
||||||
|
extends TokenIdentifier {
|
||||||
|
|
||||||
|
private Text owner;
|
||||||
|
private Text renewer;
|
||||||
|
private Text realUser;
|
||||||
|
private long issueDate;
|
||||||
|
private long maxDate;
|
||||||
|
private int sequenceNumber;
|
||||||
|
private int masterKeyId = 0;
|
||||||
|
|
||||||
|
public AbstractDelegationTokenIdentifier() {
|
||||||
|
this(new Text(), new Text(), new Text());
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
|
||||||
|
this.owner = owner;
|
||||||
|
this.renewer = renewer;
|
||||||
|
if (realUser == null) {
|
||||||
|
this.realUser = new Text();
|
||||||
|
} else {
|
||||||
|
this.realUser = realUser;
|
||||||
|
}
|
||||||
|
issueDate = 0;
|
||||||
|
maxDate = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public abstract Text getKind();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the username encoded in the token identifier
|
||||||
|
*
|
||||||
|
* @return the username or owner
|
||||||
|
*/
|
||||||
|
public UserGroupInformation getUser() {
|
||||||
|
if ( (owner == null) || ("".equals(owner.toString()))) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if ((realUser == null) || ("".equals(realUser.toString()))
|
||||||
|
|| realUser.equals(owner)) {
|
||||||
|
return UserGroupInformation.createRemoteUser(owner.toString());
|
||||||
|
} else {
|
||||||
|
UserGroupInformation realUgi = UserGroupInformation
|
||||||
|
.createRemoteUser(realUser.toString());
|
||||||
|
return UserGroupInformation.createProxyUser(owner.toString(), realUgi);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Text getRenewer() {
|
||||||
|
return renewer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setIssueDate(long issueDate) {
|
||||||
|
this.issueDate = issueDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getIssueDate() {
|
||||||
|
return issueDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxDate(long maxDate) {
|
||||||
|
this.maxDate = maxDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxDate() {
|
||||||
|
return maxDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSequenceNumber(int seqNum) {
|
||||||
|
this.sequenceNumber = seqNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSequenceNumber() {
|
||||||
|
return sequenceNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMasterKeyId(int newId) {
|
||||||
|
masterKeyId = newId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMasterKeyId() {
|
||||||
|
return masterKeyId;
|
||||||
|
}
|
||||||
|
|
||||||
|
static boolean isEqual(Object a, Object b) {
|
||||||
|
return a == null ? b == null : a.equals(b);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj == this) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (obj instanceof AbstractDelegationTokenIdentifier) {
|
||||||
|
AbstractDelegationTokenIdentifier that = (AbstractDelegationTokenIdentifier) obj;
|
||||||
|
return this.sequenceNumber == that.sequenceNumber
|
||||||
|
&& this.issueDate == that.issueDate
|
||||||
|
&& this.maxDate == that.maxDate
|
||||||
|
&& this.masterKeyId == that.masterKeyId
|
||||||
|
&& isEqual(this.owner, that.owner)
|
||||||
|
&& isEqual(this.renewer, that.renewer)
|
||||||
|
&& isEqual(this.realUser, that.realUser);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** {@inheritDoc} */
|
||||||
|
public int hashCode() {
|
||||||
|
return this.sequenceNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
owner.readFields(in);
|
||||||
|
renewer.readFields(in);
|
||||||
|
realUser.readFields(in);
|
||||||
|
issueDate = WritableUtils.readVLong(in);
|
||||||
|
maxDate = WritableUtils.readVLong(in);
|
||||||
|
sequenceNumber = WritableUtils.readVInt(in);
|
||||||
|
masterKeyId = WritableUtils.readVInt(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
owner.write(out);
|
||||||
|
renewer.write(out);
|
||||||
|
realUser.write(out);
|
||||||
|
WritableUtils.writeVLong(out, issueDate);
|
||||||
|
WritableUtils.writeVLong(out, maxDate);
|
||||||
|
WritableUtils.writeVInt(out, sequenceNumber);
|
||||||
|
WritableUtils.writeVInt(out, masterKeyId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,354 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.security.token.delegation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.HDFS;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.MAPREDUCE;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager;
|
||||||
|
import org.apache.hadoop.util.Daemon;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
|
||||||
|
@InterfaceAudience.LimitedPrivate({HDFS, MAPREDUCE})
|
||||||
|
public abstract
|
||||||
|
class AbstractDelegationTokenSecretManager<TokenIdent
|
||||||
|
extends AbstractDelegationTokenIdentifier>
|
||||||
|
extends SecretManager<TokenIdent> {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(AbstractDelegationTokenSecretManager.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cache of currently valid tokens, mapping from DelegationTokenIdentifier
|
||||||
|
* to DelegationTokenInformation. Protected by its own lock.
|
||||||
|
*/
|
||||||
|
private final Map<TokenIdent, DelegationTokenInformation> currentTokens
|
||||||
|
= new HashMap<TokenIdent, DelegationTokenInformation>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sequence number to create DelegationTokenIdentifier
|
||||||
|
*/
|
||||||
|
private int delegationTokenSequenceNumber = 0;
|
||||||
|
|
||||||
|
private final Map<Integer, DelegationKey> allKeys
|
||||||
|
= new HashMap<Integer, DelegationKey>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Access to currentId and currentKey is protected by this object lock.
|
||||||
|
*/
|
||||||
|
private int currentId = 0;
|
||||||
|
private DelegationKey currentKey;
|
||||||
|
|
||||||
|
private long keyUpdateInterval;
|
||||||
|
private long tokenMaxLifetime;
|
||||||
|
private long tokenRemoverScanInterval;
|
||||||
|
private long tokenRenewInterval;
|
||||||
|
private Thread tokenRemoverThread;
|
||||||
|
private volatile boolean running;
|
||||||
|
|
||||||
|
public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||||
|
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
|
||||||
|
long delegationTokenRemoverScanInterval) {
|
||||||
|
this.keyUpdateInterval = delegationKeyUpdateInterval;
|
||||||
|
this.tokenMaxLifetime = delegationTokenMaxLifetime;
|
||||||
|
this.tokenRenewInterval = delegationTokenRenewInterval;
|
||||||
|
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** should be called before this object is used */
|
||||||
|
public synchronized void startThreads() throws IOException {
|
||||||
|
updateCurrentKey();
|
||||||
|
running = true;
|
||||||
|
tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
|
||||||
|
tokenRemoverThread.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a previously used master key to cache (when NN restarts),
|
||||||
|
* should be called before activate().
|
||||||
|
* */
|
||||||
|
public synchronized void addKey(DelegationKey key) throws IOException {
|
||||||
|
if (running) // a safety check
|
||||||
|
throw new IOException("Can't add delegation key to a running SecretManager.");
|
||||||
|
if (key.getKeyId() > currentId) {
|
||||||
|
currentId = key.getKeyId();
|
||||||
|
}
|
||||||
|
allKeys.put(key.getKeyId(), key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized DelegationKey[] getAllKeys() {
|
||||||
|
return allKeys.values().toArray(new DelegationKey[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update the current master key */
|
||||||
|
private synchronized void updateCurrentKey() throws IOException {
|
||||||
|
LOG.info("Updating the current master key for generating delegation tokens");
|
||||||
|
/* Create a new currentKey with an estimated expiry date. */
|
||||||
|
currentId++;
|
||||||
|
currentKey = new DelegationKey(currentId, System.currentTimeMillis()
|
||||||
|
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
|
||||||
|
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update the current master key for generating delegation tokens */
|
||||||
|
public synchronized void rollMasterKey() throws IOException {
|
||||||
|
removeExpiredKeys();
|
||||||
|
/* set final expiry date for retiring currentKey */
|
||||||
|
currentKey.setExpiryDate(System.currentTimeMillis() + tokenMaxLifetime);
|
||||||
|
/*
|
||||||
|
* currentKey might have been removed by removeExpiredKeys(), if
|
||||||
|
* updateMasterKey() isn't called at expected interval. Add it back to
|
||||||
|
* allKeys just in case.
|
||||||
|
*/
|
||||||
|
allKeys.put(currentKey.getKeyId(), currentKey);
|
||||||
|
updateCurrentKey();
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void removeExpiredKeys() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
for (Iterator<Map.Entry<Integer, DelegationKey>> it = allKeys.entrySet()
|
||||||
|
.iterator(); it.hasNext();) {
|
||||||
|
Map.Entry<Integer, DelegationKey> e = it.next();
|
||||||
|
if (e.getValue().getExpiryDate() < now) {
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected byte[] createPassword(TokenIdent identifier) {
|
||||||
|
int sequenceNum;
|
||||||
|
int id;
|
||||||
|
DelegationKey key;
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
synchronized (this) {
|
||||||
|
id = currentId;
|
||||||
|
key = currentKey;
|
||||||
|
sequenceNum = ++delegationTokenSequenceNumber;
|
||||||
|
}
|
||||||
|
identifier.setIssueDate(now);
|
||||||
|
identifier.setMaxDate(now + tokenMaxLifetime);
|
||||||
|
identifier.setMasterKeyId(id);
|
||||||
|
identifier.setSequenceNumber(sequenceNum);
|
||||||
|
byte[] password = createPassword(identifier.getBytes(), key.getKey());
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
currentTokens.put(identifier, new DelegationTokenInformation(now
|
||||||
|
+ tokenRenewInterval, password));
|
||||||
|
}
|
||||||
|
return password;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] retrievePassword(TokenIdent identifier
|
||||||
|
) throws InvalidToken {
|
||||||
|
DelegationTokenInformation info = null;
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
info = currentTokens.get(identifier);
|
||||||
|
}
|
||||||
|
if (info == null) {
|
||||||
|
throw new InvalidToken("token is expired or doesn't exist");
|
||||||
|
}
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (info.getRenewDate() < now) {
|
||||||
|
throw new InvalidToken("token is expired");
|
||||||
|
}
|
||||||
|
return info.getPassword();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Renew a delegation token. Canceled tokens are not renewed. Return true if
|
||||||
|
* the token is successfully renewed; false otherwise.
|
||||||
|
*/
|
||||||
|
public Boolean renewToken(Token<TokenIdent> token,
|
||||||
|
String renewer) throws InvalidToken, IOException {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||||
|
DataInputStream in = new DataInputStream(buf);
|
||||||
|
TokenIdent id = createIdentifier();
|
||||||
|
id.readFields(in);
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
if (currentTokens.get(id) == null) {
|
||||||
|
LOG.warn("Renewal request for unknown token");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (id.getMaxDate() < now) {
|
||||||
|
LOG.warn("Client " + renewer + " tries to renew an expired token");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (id.getRenewer() == null || !id.getRenewer().toString().equals(renewer)) {
|
||||||
|
LOG.warn("Client " + renewer + " tries to renew a token with "
|
||||||
|
+ "renewer specified as " + id.getRenewer());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
DelegationKey key = null;
|
||||||
|
synchronized (this) {
|
||||||
|
key = allKeys.get(id.getMasterKeyId());
|
||||||
|
}
|
||||||
|
if (key == null) {
|
||||||
|
LOG.warn("Unable to find master key for keyId=" + id.getMasterKeyId()
|
||||||
|
+ " from cache. Failed to renew an unexpired token with sequenceNumber="
|
||||||
|
+ id.getSequenceNumber() + ", issued by this key");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
byte[] password = createPassword(token.getIdentifier(), key.getKey());
|
||||||
|
if (!Arrays.equals(password, token.getPassword())) {
|
||||||
|
LOG.warn("Client " + renewer + " is trying to renew a token with wrong password");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
DelegationTokenInformation info = new DelegationTokenInformation(
|
||||||
|
Math.min(id.getMaxDate(), now + tokenRenewInterval), password);
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
currentTokens.put(id, info);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel a token by removing it from cache. Return true if
|
||||||
|
* token exists in cache; false otherwise.
|
||||||
|
*/
|
||||||
|
public Boolean cancelToken(Token<TokenIdent> token,
|
||||||
|
String canceller) throws IOException {
|
||||||
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||||
|
DataInputStream in = new DataInputStream(buf);
|
||||||
|
TokenIdent id = createIdentifier();
|
||||||
|
id.readFields(in);
|
||||||
|
if (id.getRenewer() == null) {
|
||||||
|
LOG.warn("Renewer is null: Invalid Identifier");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (id.getUser() == null) {
|
||||||
|
LOG.warn("owner is null: Invalid Identifier");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
String owner = id.getUser().getUserName();
|
||||||
|
String renewer = id.getRenewer().toString();
|
||||||
|
if (!canceller.equals(owner) && !canceller.equals(renewer)) {
|
||||||
|
LOG.warn(canceller + " is not authorized to cancel the token");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
DelegationTokenInformation info = null;
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
info = currentTokens.remove(id);
|
||||||
|
}
|
||||||
|
return info != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert the byte[] to a secret key
|
||||||
|
* @param key the byte[] to create the secret key from
|
||||||
|
* @return the secret key
|
||||||
|
*/
|
||||||
|
public static SecretKey createSecretKey(byte[] key) {
|
||||||
|
return SecretManager.createSecretKey(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Utility class to encapsulate a token's renew date and password. */
|
||||||
|
private static class DelegationTokenInformation {
|
||||||
|
long renewDate;
|
||||||
|
byte[] password;
|
||||||
|
DelegationTokenInformation(long renewDate, byte[] password) {
|
||||||
|
this.renewDate = renewDate;
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
/** returns renew date */
|
||||||
|
long getRenewDate() {
|
||||||
|
return renewDate;
|
||||||
|
}
|
||||||
|
/** returns password */
|
||||||
|
byte[] getPassword() {
|
||||||
|
return password;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Remove expired delegation tokens from cache */
|
||||||
|
private void removeExpiredToken() {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
synchronized (currentTokens) {
|
||||||
|
Iterator<DelegationTokenInformation> i = currentTokens.values().iterator();
|
||||||
|
while (i.hasNext()) {
|
||||||
|
long renewDate = i.next().getRenewDate();
|
||||||
|
if (now > renewDate) {
|
||||||
|
i.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void stopThreads() {
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Stopping expired delegation token remover thread");
|
||||||
|
running = false;
|
||||||
|
tokenRemoverThread.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ExpiredTokenRemover extends Thread {
|
||||||
|
private long lastMasterKeyUpdate;
|
||||||
|
private long lastTokenCacheCleanup;
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
LOG.info("Starting expired delegation token remover thread, "
|
||||||
|
+ "tokenRemoverScanInterval=" + tokenRemoverScanInterval
|
||||||
|
/ (60 * 1000) + " min(s)");
|
||||||
|
try {
|
||||||
|
while (running) {
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
if (lastMasterKeyUpdate + keyUpdateInterval < now) {
|
||||||
|
try {
|
||||||
|
rollMasterKey();
|
||||||
|
lastMasterKeyUpdate = now;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Master key updating failed. "
|
||||||
|
+ StringUtils.stringifyException(e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (lastTokenCacheCleanup + tokenRemoverScanInterval < now) {
|
||||||
|
removeExpiredToken();
|
||||||
|
lastTokenCacheCleanup = now;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000); // 5 seconds
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG
|
||||||
|
.error("InterruptedExcpetion recieved for ExpiredTokenRemover thread "
|
||||||
|
+ ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error("ExpiredTokenRemover thread received unexpected exception. "
|
||||||
|
+ t);
|
||||||
|
Runtime.getRuntime().exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.security.token.delegation;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.HDFS;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.MAPREDUCE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Look through tokens to find the first delegation token that matches the
|
||||||
|
* service and return it.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({HDFS, MAPREDUCE})
|
||||||
|
public
|
||||||
|
class AbstractDelegationTokenSelector<TokenIdent
|
||||||
|
extends AbstractDelegationTokenIdentifier>
|
||||||
|
implements TokenSelector<TokenIdent> {
|
||||||
|
private Text kindName;
|
||||||
|
|
||||||
|
protected AbstractDelegationTokenSelector(Text kindName) {
|
||||||
|
this.kindName = kindName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public Token<TokenIdent> selectToken(Text service,
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens) {
|
||||||
|
if (service == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (Token<? extends TokenIdentifier> token : tokens) {
|
||||||
|
if (kindName.equals(token.getKind())
|
||||||
|
&& service.equals(token.getService())) {
|
||||||
|
return (Token<TokenIdent>) token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.security.token.delegation;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import javax.crypto.SecretKey;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.HDFS;
|
||||||
|
import static org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project.MAPREDUCE;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Key used for generating and verifying delegation tokens
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.LimitedPrivate({HDFS, MAPREDUCE})
|
||||||
|
public class DelegationKey implements Writable {
|
||||||
|
private int keyId;
|
||||||
|
private long expiryDate;
|
||||||
|
private SecretKey key;
|
||||||
|
|
||||||
|
public DelegationKey() {
|
||||||
|
this(0, 0L, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DelegationKey(int keyId, long expiryDate, SecretKey key) {
|
||||||
|
this.keyId = keyId;
|
||||||
|
this.expiryDate = expiryDate;
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getKeyId() {
|
||||||
|
return keyId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getExpiryDate() {
|
||||||
|
return expiryDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SecretKey getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setExpiryDate(long expiryDate) {
|
||||||
|
this.expiryDate = expiryDate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
WritableUtils.writeVInt(out, keyId);
|
||||||
|
WritableUtils.writeVLong(out, expiryDate);
|
||||||
|
byte[] keyBytes = key.getEncoded();
|
||||||
|
WritableUtils.writeVInt(out, keyBytes.length);
|
||||||
|
out.write(keyBytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
keyId = WritableUtils.readVInt(in);
|
||||||
|
expiryDate = WritableUtils.readVLong(in);
|
||||||
|
int len = WritableUtils.readVInt(in);
|
||||||
|
byte[] keyBytes = new byte[len];
|
||||||
|
in.readFully(keyBytes);
|
||||||
|
key = AbstractDelegationTokenSecretManager.createSecretKey(keyBytes);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,262 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.security.token.delegation;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.Writable;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class TestDelegationToken {
|
||||||
|
private static final Text KIND = new Text("MY KIND");
|
||||||
|
|
||||||
|
public static class TestDelegationTokenIdentifier
|
||||||
|
extends AbstractDelegationTokenIdentifier
|
||||||
|
implements Writable {
|
||||||
|
|
||||||
|
public TestDelegationTokenIdentifier() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestDelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
|
||||||
|
super(owner, renewer, realUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Text getKind() {
|
||||||
|
return KIND;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void write(DataOutput out) throws IOException {
|
||||||
|
super.write(out);
|
||||||
|
}
|
||||||
|
public void readFields(DataInput in) throws IOException {
|
||||||
|
super.readFields(in);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestDelegationTokenSecretManager
|
||||||
|
extends AbstractDelegationTokenSecretManager<TestDelegationTokenIdentifier> {
|
||||||
|
|
||||||
|
public TestDelegationTokenSecretManager(long delegationKeyUpdateInterval,
|
||||||
|
long delegationTokenMaxLifetime,
|
||||||
|
long delegationTokenRenewInterval,
|
||||||
|
long delegationTokenRemoverScanInterval) {
|
||||||
|
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
|
||||||
|
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestDelegationTokenIdentifier createIdentifier() {
|
||||||
|
return new TestDelegationTokenIdentifier();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected byte[] createPassword(TestDelegationTokenIdentifier t) {
|
||||||
|
return super.createPassword(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TokenSelector extends
|
||||||
|
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
|
||||||
|
|
||||||
|
protected TokenSelector() {
|
||||||
|
super(KIND);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSerialization() throws Exception {
|
||||||
|
TestDelegationTokenIdentifier origToken = new
|
||||||
|
TestDelegationTokenIdentifier(new Text("alice"),
|
||||||
|
new Text("bob"),
|
||||||
|
new Text("colin"));
|
||||||
|
TestDelegationTokenIdentifier newToken = new TestDelegationTokenIdentifier();
|
||||||
|
origToken.setIssueDate(123);
|
||||||
|
origToken.setMasterKeyId(321);
|
||||||
|
origToken.setMaxDate(314);
|
||||||
|
origToken.setSequenceNumber(12345);
|
||||||
|
|
||||||
|
// clone origToken into newToken
|
||||||
|
DataInputBuffer inBuf = new DataInputBuffer();
|
||||||
|
DataOutputBuffer outBuf = new DataOutputBuffer();
|
||||||
|
origToken.write(outBuf);
|
||||||
|
inBuf.reset(outBuf.getData(), 0, outBuf.getLength());
|
||||||
|
newToken.readFields(inBuf);
|
||||||
|
|
||||||
|
// now test the fields
|
||||||
|
assertEquals("alice", newToken.getUser().getUserName());
|
||||||
|
assertEquals(new Text("bob"), newToken.getRenewer());
|
||||||
|
assertEquals("colin", newToken.getUser().getRealUser().getUserName());
|
||||||
|
assertEquals(123, newToken.getIssueDate());
|
||||||
|
assertEquals(321, newToken.getMasterKeyId());
|
||||||
|
assertEquals(314, newToken.getMaxDate());
|
||||||
|
assertEquals(12345, newToken.getSequenceNumber());
|
||||||
|
assertEquals(origToken, newToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Token<TestDelegationTokenIdentifier> generateDelegationToken(
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager,
|
||||||
|
String owner, String renewer) {
|
||||||
|
TestDelegationTokenIdentifier dtId =
|
||||||
|
new TestDelegationTokenIdentifier(new Text(
|
||||||
|
owner), new Text(renewer), null);
|
||||||
|
return new Token<TestDelegationTokenIdentifier>(dtId, dtSecretManager);
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testDelegationTokenSecretManager() throws Exception {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
|
3*1000,1*1000,3600000);
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
Token<TestDelegationTokenIdentifier> token = generateDelegationToken(
|
||||||
|
dtSecretManager, "SomeUser", "JobTracker");
|
||||||
|
// Fake renewer should not be able to renew
|
||||||
|
Assert.assertFalse(dtSecretManager.renewToken(token, "FakeRenewer"));
|
||||||
|
Assert.assertTrue(dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
TestDelegationTokenIdentifier identifier =
|
||||||
|
new TestDelegationTokenIdentifier();
|
||||||
|
byte[] tokenId = token.getIdentifier();
|
||||||
|
identifier.readFields(new DataInputStream(
|
||||||
|
new ByteArrayInputStream(tokenId)));
|
||||||
|
Assert.assertTrue(null != dtSecretManager.retrievePassword(identifier));
|
||||||
|
Log.info("Sleep to expire the token");
|
||||||
|
Thread.sleep(2000);
|
||||||
|
//Token should be expired
|
||||||
|
try {
|
||||||
|
dtSecretManager.retrievePassword(identifier);
|
||||||
|
//Should not come here
|
||||||
|
Assert.fail("Token should have expired");
|
||||||
|
} catch (InvalidToken e) {
|
||||||
|
//Success
|
||||||
|
}
|
||||||
|
Assert.assertTrue(dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
Log.info("Sleep beyond the max lifetime");
|
||||||
|
Thread.sleep(2000);
|
||||||
|
Assert.assertFalse(dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testCancelDelegationToken() throws Exception {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
|
10*1000,1*1000,3600000);
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
Token<TestDelegationTokenIdentifier> token = generateDelegationToken(
|
||||||
|
dtSecretManager, "SomeUser", "JobTracker");
|
||||||
|
//Fake renewer should not be able to renew
|
||||||
|
Assert.assertFalse(dtSecretManager.cancelToken(token, "FakeCanceller"));
|
||||||
|
Assert.assertTrue(dtSecretManager.cancelToken(token, "JobTracker"));
|
||||||
|
Assert.assertFalse(dtSecretManager.renewToken(token, "JobTracker"));
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
public void testRollMasterKey() throws Exception {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
|
10*1000,1*1000,3600000);
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
//generate a token and store the password
|
||||||
|
Token<TestDelegationTokenIdentifier> token = generateDelegationToken(
|
||||||
|
dtSecretManager, "SomeUser", "JobTracker");
|
||||||
|
byte[] oldPasswd = token.getPassword();
|
||||||
|
//store the length of the keys list
|
||||||
|
int prevNumKeys = dtSecretManager.getAllKeys().length;
|
||||||
|
|
||||||
|
dtSecretManager.rollMasterKey();
|
||||||
|
|
||||||
|
//after rolling, the length of the keys list must increase
|
||||||
|
int currNumKeys = dtSecretManager.getAllKeys().length;
|
||||||
|
Assert.assertEquals((currNumKeys - prevNumKeys) >= 1, true);
|
||||||
|
|
||||||
|
//after rolling, the token that was generated earlier must
|
||||||
|
//still be valid (retrievePassword will fail if the token
|
||||||
|
//is not valid)
|
||||||
|
ByteArrayInputStream bi =
|
||||||
|
new ByteArrayInputStream(token.getIdentifier());
|
||||||
|
TestDelegationTokenIdentifier identifier =
|
||||||
|
dtSecretManager.createIdentifier();
|
||||||
|
identifier.readFields(new DataInputStream(bi));
|
||||||
|
byte[] newPasswd =
|
||||||
|
dtSecretManager.retrievePassword(identifier);
|
||||||
|
//compare the passwords
|
||||||
|
Assert.assertEquals(oldPasswd, newPasswd);
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Test
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testDelegationTokenSelector() throws Exception {
|
||||||
|
TestDelegationTokenSecretManager dtSecretManager =
|
||||||
|
new TestDelegationTokenSecretManager(24*60*60*1000,
|
||||||
|
10*1000,1*1000,3600000);
|
||||||
|
try {
|
||||||
|
dtSecretManager.startThreads();
|
||||||
|
AbstractDelegationTokenSelector ds =
|
||||||
|
new AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>(KIND);
|
||||||
|
|
||||||
|
//Creates a collection of tokens
|
||||||
|
Token<TestDelegationTokenIdentifier> token1 = generateDelegationToken(
|
||||||
|
dtSecretManager, "SomeUser1", "JobTracker");
|
||||||
|
token1.setService(new Text("MY-SERVICE1"));
|
||||||
|
|
||||||
|
Token<TestDelegationTokenIdentifier> token2 = generateDelegationToken(
|
||||||
|
dtSecretManager, "SomeUser2", "JobTracker");
|
||||||
|
token2.setService(new Text("MY-SERVICE2"));
|
||||||
|
|
||||||
|
List<Token<TestDelegationTokenIdentifier>> tokens =
|
||||||
|
new ArrayList<Token<TestDelegationTokenIdentifier>>();
|
||||||
|
tokens.add(token1);
|
||||||
|
tokens.add(token2);
|
||||||
|
|
||||||
|
//try to select a token with a given service name (created earlier)
|
||||||
|
Token<TestDelegationTokenIdentifier> t =
|
||||||
|
ds.selectToken(new Text("MY-SERVICE1"), tokens);
|
||||||
|
Assert.assertEquals(t, token1);
|
||||||
|
} finally {
|
||||||
|
dtSecretManager.stopThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue