HDFS-3680. Allow customized audit logging in HDFS FSNamesystem. Contributed by Marcelo Vanzin.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1418114 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-12-06 22:27:27 +00:00
parent a5331eeead
commit df2fb006b2
6 changed files with 341 additions and 55 deletions

View File

@ -404,6 +404,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-4268. Remove redundant enum NNHAStatusHeartbeat.State. (shv)
HDFS-3680. Allow customized audit logging in HDFS FSNamesystem. (Marcelo
Vanzin via atm)
OPTIMIZATIONS
BUG FIXES

View File

@ -246,6 +246,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_HOSTS = "dfs.hosts";
public static final String DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
// Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.net.InetAddress;
import java.security.Principal;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
/**
* Interface defining an audit logger.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface AuditLogger {
/**
* Called during initialization of the logger.
*
* @param conf The configuration object.
*/
void initialize(Configuration conf);
/**
* Called to log an audit event.
* <p>
* This method must return as quickly as possible, since it's called
* in a critical section of the NameNode's operation.
*
* @param succeeded Whether authorization succeeded.
* @param userName Name of the user executing the request.
* @param addr Remote address of the request.
* @param cmd The requested command.
* @param src Path of affected source file.
* @param dst Path of affected destination file (if any).
* @param stat File information for operations that change the file's
* metadata (permissions, owner, times, etc).
*/
void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus stat);
}

View File

@ -34,6 +34,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
@ -111,6 +113,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
@ -245,32 +248,32 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
};
private static final void logAuditEvent(UserGroupInformation ugi,
private boolean isAuditEnabled() {
return !isDefaultAuditLogger || auditLog.isInfoEnabled();
}
private void logAuditEvent(UserGroupInformation ugi,
InetAddress addr, String cmd, String src, String dst,
HdfsFileStatus stat) {
logAuditEvent(true, ugi, addr, cmd, src, dst, stat);
}
private static final void logAuditEvent(boolean succeeded,
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(ugi).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == stat) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(stat.getOwner()).append(":");
sb.append(stat.getGroup()).append(":");
sb.append(stat.getPermission());
FileStatus status = null;
if (stat != null) {
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
Path path = dst != null ? new Path(dst) : new Path(src);
status = new FileStatus(stat.getLen(), stat.isDir(),
stat.getReplication(), stat.getBlockSize(), stat.getModificationTime(),
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
stat.getGroup(), symlink, path);
}
for (AuditLogger logger : auditLoggers) {
logger.logAuditEvent(succeeded, ugi.toString(), addr,
cmd, src, dst, status);
}
auditLog.info(sb);
}
/**
@ -303,6 +306,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final DelegationTokenSecretManager dtSecretManager;
private final boolean alwaysUseDelegationTokensForTests;
// Tracks whether the default audit logger is the only configured audit
// logger; this allows isAuditEnabled() to return false in case the
// underlying logger is disabled, and avoid some unnecessary work.
private final boolean isDefaultAuditLogger;
private final List<AuditLogger> auditLoggers;
/** The namespace tree. */
FSDirectory dir;
@ -535,14 +543,50 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
auditLoggers.get(0) instanceof DefaultAuditLogger;
} catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close();
throw e;
} catch (RuntimeException re) {
LOG.error(getClass().getSimpleName() + " initialization failed.", re);
close();
throw re;
}
}
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
Collection<String> alClasses = conf.getStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
List<AuditLogger> auditLoggers = Lists.newArrayList();
if (alClasses != null && !alClasses.isEmpty()) {
for (String className : alClasses) {
try {
AuditLogger logger;
if (DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME.equals(className)) {
logger = new DefaultAuditLogger();
} else {
logger = (AuditLogger) Class.forName(className).newInstance();
}
logger.initialize(conf);
auditLoggers.add(logger);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
// Make sure there is at least one logger installed.
if (auditLoggers.isEmpty()) {
auditLoggers.add(new DefaultAuditLogger());
}
return auditLoggers;
}
void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
throws IOException {
// format before starting up if requested
@ -1076,7 +1120,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
setPermissionInt(src, permission);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setPermission", src, null, null);
@ -1098,14 +1142,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
checkOwner(src);
dir.setPermission(src, permission);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(src, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setPermission", src, null, resultingStat);
@ -1122,7 +1166,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
setOwnerInt(src, username, group);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setOwner", src, null, null);
@ -1153,14 +1197,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
dir.setOwner(src, username, group);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(src, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setOwner", src, null, resultingStat);
@ -1203,7 +1247,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return getBlockLocationsInt(src, offset, length, doAccessTime,
needBlockToken, checkSafeMode);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"open", src, null, null);
@ -1229,7 +1273,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
final LocatedBlocks ret = getBlockLocationsUpdateTimes(src,
offset, length, doAccessTime, needBlockToken);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"open", src, null, null);
@ -1310,7 +1354,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
concatInt(target, srcs);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getLoginUser(),
getRemoteIp(),
"concat", Arrays.toString(srcs), target, null);
@ -1353,14 +1397,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot concat " + target, safeMode);
}
concatInternal(target, srcs);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(target, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getLoginUser(),
getRemoteIp(),
"concat", Arrays.toString(srcs), target, resultingStat);
@ -1481,7 +1525,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
setTimesInt(src, mtime, atime);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setTimes", src, null, null);
@ -1507,7 +1551,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
INode inode = dir.getINode(src);
if (inode != null) {
dir.setTimes(src, inode, mtime, atime, true);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
@ -1530,7 +1574,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
createSymlinkInt(target, link, dirPerms, createParent);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"createSymlink", link, target, null);
@ -1551,14 +1595,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
verifyParentDir(link);
}
createSymlinkInternal(target, link, dirPerms, createParent);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(link, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"createSymlink", link, target, resultingStat);
@ -1614,7 +1658,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return setReplicationInt(src, replication);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setReplication", src, null, null);
@ -1650,7 +1694,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
getEditLog().logSync();
if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isFile && isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"setReplication", src, null, null);
@ -1706,7 +1750,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
startFileInt(src, permissions, holder, clientMachine, flag, createParent,
replication, blockSize);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"create", src, null, null);
@ -1739,7 +1783,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
@ -2040,7 +2084,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return appendFileInt(src, holder, clientMachine);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"append", src, null, null);
@ -2086,7 +2130,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+" block size " + lb.getBlock().getNumBytes());
}
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"append", src, null, null);
@ -2532,7 +2576,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return renameToInt(src, dst);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"rename", src, dst, null);
@ -2554,14 +2598,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
status = renameToInternal(src, dst);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (status && isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (status && isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"rename", src, dst, resultingStat);
@ -2611,14 +2655,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkOperation(OperationCategory.WRITE);
renameToInternal(src, dst, options);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
StringBuilder cmd = new StringBuilder("rename options=");
for (Rename option : options) {
cmd.append(option.value()).append(" ");
@ -2657,7 +2701,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return deleteInt(src, recursive);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"delete", src, null, null);
@ -2673,7 +2717,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
}
boolean status = deleteInternal(src, recursive, true);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (status && isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"delete", src, null, null);
@ -2839,7 +2883,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
stat = dir.getFileInfo(src, resolveLink);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"getfileinfo", src, null, null);
@ -2848,7 +2892,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} finally {
readUnlock();
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"getfileinfo", src, null, null);
@ -2864,7 +2908,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return mkdirsInt(src, permissions, createParent);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"mkdirs", src, null, null);
@ -2888,7 +2932,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeUnlock();
}
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (status && isAuditEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
@ -3322,7 +3366,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
try {
return getListingInt(src, startAfter, needLocation);
} catch (AccessControlException e) {
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(false, UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"listStatus", src, null, null);
@ -3346,7 +3390,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
checkTraverse(src);
}
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
getRemoteIp(),
"listStatus", src, null, null);
@ -5260,7 +5304,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Log fsck event in the audit log
*/
void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
if (auditLog.isInfoEnabled()) {
if (isAuditEnabled()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
remoteAddress,
"fsck", src, null, null);
@ -5515,4 +5559,44 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return this.blockManager.getDatanodeManager()
.isAvoidingStaleDataNodesForWrite();
}
/**
* Default AuditLogger implementation; used when no access logger is
* defined in the config file. It can also be explicitly listed in the
* config file.
*/
private static class DefaultAuditLogger implements AuditLogger {
@Override
public void initialize(Configuration conf) {
// Nothing to do.
}
@Override
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus status) {
if (auditLog.isInfoEnabled()) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(src).append("\t");
sb.append("dst=").append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
auditLog.info(sb);
}
}
}
}

View File

@ -1184,4 +1184,17 @@
</description>
</property>
<property>
<name>dfs.namenode.audit.loggers</name>
<value>default</value>
<description>
List of classes implementing audit loggers that will receive audit events.
These should be implementations of org.apache.hadoop.hdfs.server.namenode.AuditLogger.
The special value "default" can be used to reference the default audit
logger, which uses the configured log system. Installing custom audit loggers
may affect the performance and stability of the NameNode. Refer to the custom
logger's documentation for more details.
</description>
</property>
</configuration>

View File

@ -0,0 +1,123 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
/**
* Tests for the {@link AuditLogger} custom audit logging interface.
*/
public class TestAuditLogger {
/**
* Tests that AuditLogger works as expected.
*/
@Test
public void testAuditLogger() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
DummyAuditLogger.class.getName());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitClusterUp();
assertTrue(DummyAuditLogger.initialized);
FileSystem fs = cluster.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
assertEquals(1, DummyAuditLogger.logCount);
} finally {
cluster.shutdown();
}
}
/**
* Tests that a broken audit logger causes requests to fail.
*/
@Test
public void testBrokenLogger() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.set(DFS_NAMENODE_AUDIT_LOGGERS_KEY,
BrokenAuditLogger.class.getName());
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitClusterUp();
FileSystem fs = cluster.getFileSystem();
long time = System.currentTimeMillis();
fs.setTimes(new Path("/"), time, time);
fail("Expected exception due to broken audit logger.");
} catch (RemoteException re) {
// Expected.
} finally {
cluster.shutdown();
}
}
public static class DummyAuditLogger implements AuditLogger {
static boolean initialized;
static int logCount;
public void initialize(Configuration conf) {
initialized = true;
}
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus stat) {
logCount++;
}
}
public static class BrokenAuditLogger implements AuditLogger {
public void initialize(Configuration conf) {
// No op.
}
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst,
FileStatus stat) {
throw new RuntimeException("uh oh");
}
}
}