HDFS-6888. Allow selectively audit logging ops (Contributed by Chen He)

(cherry picked from commit 7f2e89fa7082840bfa3e8e593c93db050a80d04f)
This commit is contained in:
Vinayakumar B 2015-05-15 11:05:01 +05:30
parent 91855c2340
commit d9455c790f
5 changed files with 152 additions and 2 deletions

View File

@ -219,6 +219,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8350. Remove old webhdfs.xml and other outdated documentation stuff.
(Brahma Reddy Battula via aajisaka)
HDFS-6888. Allow selectively audit logging ops (Chen He via vinayakumarb)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -339,6 +339,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST = "dfs.namenode.audit.log.debug.cmdlist";
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

View File

@ -8143,15 +8143,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* defined in the config file. It can also be explicitly listed in the
* config file.
*/
private static class DefaultAuditLogger extends HdfsAuditLogger {
@VisibleForTesting
static class DefaultAuditLogger extends HdfsAuditLogger {
private boolean logTokenTrackingId;
private Set<String> debugCmdSet = new HashSet<String>();
@Override
public void initialize(Configuration conf) {
logTokenTrackingId = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
debugCmdSet.addAll(Arrays.asList(conf.getTrimmedStrings(
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST)));
}
@Override
@ -8159,7 +8164,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi,
DelegationTokenSecretManager dtSecretManager) {
if (auditLog.isInfoEnabled()) {
if (auditLog.isDebugEnabled() ||
(auditLog.isInfoEnabled() && !debugCmdSet.contains(cmd))) {
final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");

View File

@ -2083,6 +2083,15 @@
</description>
</property>
<property>
<name>dfs.namenode.audit.log.debug.cmdlist</name>
<value></value>
<description>
A comma separated list of NameNode commands that are written to the HDFS
namenode audit log only if the audit log level is debug.
</description>
</property>
<property>
<name>dfs.client.use.legacy.blockreader.local</name>
<value>false</value>

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.DefaultAuditLogger;
import org.apache.log4j.Level;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.net.Inet4Address;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
/**
* Test that the HDFS Audit logger respects DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST.
*/
public class TestAuditLogAtDebug {
static final Log LOG = LogFactory.getLog(TestAuditLogAtDebug.class);
@Rule
public Timeout timeout = new Timeout(300000);
private static final String DUMMY_COMMAND_1 = "dummycommand1";
private static final String DUMMY_COMMAND_2 = "dummycommand2";
private DefaultAuditLogger makeSpyLogger(
Level level, Optional<List<String>> debugCommands) {
DefaultAuditLogger logger = new DefaultAuditLogger();
Configuration conf = new HdfsConfiguration();
if (debugCommands.isPresent()) {
conf.set(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_DEBUG_CMDLIST,
Joiner.on(",").join(debugCommands.get()));
}
logger.initialize(conf);
((Log4JLogger) FSNamesystem.auditLog).getLogger().setLevel(level);
return spy(logger);
}
private void logDummyCommandToAuditLog(HdfsAuditLogger logger, String command) {
logger.logAuditEvent(true, "",
Inet4Address.getLoopbackAddress(),
command, "", "",
null, null, null);
}
@Test
public void testDebugCommandNotLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
verify(logger, never()).logAuditMessage(anyString());
}
@Test
public void testDebugCommandLoggedAtDebug() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.DEBUG, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
verify(logger, times(1)).logAuditMessage(anyString());
}
@Test
public void testInfoCommandLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO, Optional.of(Arrays.asList(DUMMY_COMMAND_1)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(1)).logAuditMessage(anyString());
}
@Test
public void testMultipleDebugCommandsNotLoggedAtInfo() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.INFO,
Optional.of(Arrays.asList(DUMMY_COMMAND_1, DUMMY_COMMAND_2)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, never()).logAuditMessage(anyString());
}
@Test
public void testMultipleDebugCommandsLoggedAtDebug() {
DefaultAuditLogger logger =
makeSpyLogger(
Level.DEBUG,
Optional.of(Arrays.asList(DUMMY_COMMAND_1, DUMMY_COMMAND_2)));
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(2)).logAuditMessage(anyString());
}
@Test
public void testEmptyDebugCommands() {
DefaultAuditLogger logger = makeSpyLogger(
Level.INFO, Optional.<List<String>>absent());
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_1);
logDummyCommandToAuditLog(logger, DUMMY_COMMAND_2);
verify(logger, times(2)).logAuditMessage(anyString());
}
}