HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1522015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-09-11 20:04:50 +00:00
parent 90f492e532
commit 1d4d1d8f70
7 changed files with 178 additions and 13 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.security.token;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -35,6 +36,9 @@ import org.apache.hadoop.security.UserGroupInformation;
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving @InterfaceStability.Evolving
public abstract class TokenIdentifier implements Writable { public abstract class TokenIdentifier implements Writable {
private String trackingId = null;
/** /**
* Get the token kind * Get the token kind
* @return the kind of the token * @return the kind of the token
@ -62,4 +66,19 @@ public abstract class TokenIdentifier implements Writable {
} }
return Arrays.copyOf(buf.getData(), buf.getLength()); return Arrays.copyOf(buf.getData(), buf.getLength());
} }
/**
* Returns a tracking identifier that can be used to associate usages of a
* token across multiple client sessions.
*
* Currently, this function just returns an MD5 of {{@link #getBytes()}.
*
* @return tracking identifier
*/
public String getTrackingId() {
if (trackingId == null) {
trackingId = DigestUtils.md5Hex(getBytes());
}
return trackingId;
}
} }

View File

@ -86,6 +86,11 @@ extends AbstractDelegationTokenIdentifier>
private long tokenMaxLifetime; private long tokenMaxLifetime;
private long tokenRemoverScanInterval; private long tokenRemoverScanInterval;
private long tokenRenewInterval; private long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
*/
protected boolean storeTokenTrackingId;
private Thread tokenRemoverThread; private Thread tokenRemoverThread;
protected volatile boolean running; protected volatile boolean running;
@ -102,6 +107,7 @@ extends AbstractDelegationTokenIdentifier>
this.tokenMaxLifetime = delegationTokenMaxLifetime; this.tokenMaxLifetime = delegationTokenMaxLifetime;
this.tokenRenewInterval = delegationTokenRenewInterval; this.tokenRenewInterval = delegationTokenRenewInterval;
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval; this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
this.storeTokenTrackingId = false;
} }
/** should be called before this object is used */ /** should be called before this object is used */
@ -201,7 +207,7 @@ extends AbstractDelegationTokenIdentifier>
} }
if (currentTokens.get(identifier) == null) { if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(renewDate, currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
password)); password, getTrackingIdIfEnabled(identifier)));
} else { } else {
throw new IOException( throw new IOException(
"Same delegation token being added twice."); "Same delegation token being added twice.");
@ -280,7 +286,7 @@ extends AbstractDelegationTokenIdentifier>
byte[] password = createPassword(identifier.getBytes(), currentKey.getKey()); byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
storeNewToken(identifier, now + tokenRenewInterval); storeNewToken(identifier, now + tokenRenewInterval);
currentTokens.put(identifier, new DelegationTokenInformation(now currentTokens.put(identifier, new DelegationTokenInformation(now
+ tokenRenewInterval, password)); + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
return password; return password;
} }
@ -299,6 +305,21 @@ extends AbstractDelegationTokenIdentifier>
return info.getPassword(); return info.getPassword();
} }
protected String getTrackingIdIfEnabled(TokenIdent ident) {
if (storeTokenTrackingId) {
return ident.getTrackingId();
}
return null;
}
public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = currentTokens.get(identifier);
if (info == null) {
return null;
}
return info.getTrackingId();
}
/** /**
* Verifies that the given identifier and password are valid and match. * Verifies that the given identifier and password are valid and match.
* @param identifier Token identifier. * @param identifier Token identifier.
@ -359,8 +380,9 @@ extends AbstractDelegationTokenIdentifier>
+ " is trying to renew a token with " + "wrong password"); + " is trying to renew a token with " + "wrong password");
} }
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval); long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
String trackingId = getTrackingIdIfEnabled(id);
DelegationTokenInformation info = new DelegationTokenInformation(renewTime, DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password); password, trackingId);
if (currentTokens.get(id) == null) { if (currentTokens.get(id) == null) {
throw new InvalidToken("Renewal request for unknown token"); throw new InvalidToken("Renewal request for unknown token");
@ -420,9 +442,13 @@ extends AbstractDelegationTokenIdentifier>
public static class DelegationTokenInformation { public static class DelegationTokenInformation {
long renewDate; long renewDate;
byte[] password; byte[] password;
public DelegationTokenInformation(long renewDate, byte[] password) { String trackingId;
public DelegationTokenInformation(long renewDate, byte[] password,
String trackingId) {
this.renewDate = renewDate; this.renewDate = renewDate;
this.password = password; this.password = password;
this.trackingId = trackingId;
} }
/** returns renew date */ /** returns renew date */
public long getRenewDate() { public long getRenewDate() {
@ -432,6 +458,10 @@ extends AbstractDelegationTokenIdentifier>
byte[] getPassword() { byte[] getPassword() {
return password; return password;
} }
/** returns tracking id */
public String getTrackingId() {
return trackingId;
}
} }
/** Remove expired delegation tokens from cache */ /** Remove expired delegation tokens from cache */

View File

@ -34,6 +34,8 @@ Release 2.3.0 - UNRELEASED
HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs HDFS-4879. Add "blocked ArrayList" collection to avoid CMS full GCs
(Todd Lipcon via Colin Patrick McCabe) (Todd Lipcon via Colin Patrick McCabe)
HDFS-4680. Audit logging of delegation tokens for MR tracing. (Andrew Wang)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -267,6 +267,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces"; public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers"; public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default"; public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys. // Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries"; public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

View File

@ -58,6 +58,15 @@ public class DelegationTokenSecretManager
.getLog(DelegationTokenSecretManager.class); .getLog(DelegationTokenSecretManager.class);
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
namesystem);
}
/** /**
* Create a secret manager * Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new * @param delegationKeyUpdateInterval the number of seconds for rolling new
@ -67,13 +76,16 @@ public class DelegationTokenSecretManager
* @param delegationTokenRenewInterval how often the tokens must be renewed * @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned * @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens * for expired tokens
* @param storeTokenTrackingId whether to store the token's tracking id
*/ */
public DelegationTokenSecretManager(long delegationKeyUpdateInterval, public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval, FSNamesystem namesystem) { long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
FSNamesystem namesystem) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval); delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.namesystem = namesystem; this.namesystem = namesystem;
this.storeTokenTrackingId = storeTokenTrackingId;
} }
@Override //SecretManager @Override //SecretManager
@ -184,7 +196,7 @@ public class DelegationTokenSecretManager
} }
if (currentTokens.get(identifier) == null) { if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
password)); password, getTrackingIdIfEnabled(identifier)));
} else { } else {
throw new IOException( throw new IOException(
"Same delegation token being added twice; invalid entry in fsimage or editlogs"); "Same delegation token being added twice; invalid entry in fsimage or editlogs");
@ -223,7 +235,7 @@ public class DelegationTokenSecretManager
byte[] password = createPassword(identifier.getBytes(), allKeys byte[] password = createPassword(identifier.getBytes(), allKeys
.get(keyId).getKey()); .get(keyId).getKey());
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime, currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
password)); password, getTrackingIdIfEnabled(identifier)));
} }
} }

View File

@ -36,6 +36,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KE
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
@ -220,6 +222,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -296,10 +300,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
stat.getGroup(), symlink, path); stat.getGroup(), symlink, path);
} }
for (AuditLogger logger : auditLoggers) { for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
status, ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugi.toString(), addr, logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status); cmd, src, dst, status);
} }
} }
}
/** /**
* Logger for audit events, noting successful FSNamesystem operations. Emits * Logger for audit events, noting successful FSNamesystem operations. Emits
@ -5889,7 +5899,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT), DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT),
conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, conf.getLong(DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT), DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT),
DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL, this); DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL,
conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT),
this);
} }
/** /**
@ -6800,17 +6813,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* defined in the config file. It can also be explicitly listed in the * defined in the config file. It can also be explicitly listed in the
* config file. * config file.
*/ */
private static class DefaultAuditLogger implements AuditLogger { private static class DefaultAuditLogger extends HdfsAuditLogger {
private boolean logTokenTrackingId;
@Override @Override
public void initialize(Configuration conf) { public void initialize(Configuration conf) {
// Nothing to do. logTokenTrackingId = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
} }
@Override @Override
public void logAuditEvent(boolean succeeded, String userName, public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, InetAddress addr, String cmd, String src, String dst,
FileStatus status) { FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) { if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get(); final StringBuilder sb = auditBuffer.get();
sb.setLength(0); sb.setLength(0);
@ -6828,6 +6846,22 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
sb.append(status.getGroup()).append(":"); sb.append(status.getGroup()).append(":");
sb.append(status.getPermission()); sb.append(status.getPermission());
} }
if (logTokenTrackingId) {
sb.append("\t").append("trackingId=");
String trackingId = null;
if (ugi != null && dtSecretManager != null
&& ugi.getAuthenticationMethod() == AuthenticationMethod.TOKEN) {
for (TokenIdentifier tid: ugi.getTokenIdentifiers()) {
if (tid instanceof DelegationTokenIdentifier) {
DelegationTokenIdentifier dtid =
(DelegationTokenIdentifier)tid;
trackingId = dtSecretManager.getTokenTrackingId(dtid);
break;
}
}
}
sb.append(trackingId);
}
auditLog.info(sb); auditLog.info(sb);
} }
} }

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Extension of {@link AuditLogger}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class HdfsAuditLogger implements AuditLogger {
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status) {
logAuditEvent(succeeded, userName, addr, cmd, src, dst, status, null,
null);
}
/**
* Same as
* {@link #logAuditEvent(boolean, String, InetAddress, String, String, String, FileStatus)}
* with additional parameters related to logging delegation token tracking
* IDs.
*
* @param succeeded Whether authorization succeeded.
* @param userName Name of the user executing the request.
* @param addr Remote address of the request.
* @param cmd The requested command.
* @param src Path of affected source file.
* @param dst Path of affected destination file (if any).
* @param stat File information for operations that change the file's metadata
* (permissions, owner, times, etc).
* @param ugi UserGroupInformation of the current user, or null if not logging
* token tracking information
* @param dtSecretManager The token secret manager, or null if not logging
* token tracking information
*/
public abstract void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus stat, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager);
}