HDFS-9184. Logging HDFS operation's caller context into audit logs. Contributed by Mingliang Liu.
This commit is contained in:
parent
eb6379ca25
commit
600ad7bf41
|
@ -183,6 +183,17 @@ public class CommonConfigurationKeysPublic {
|
|||
/** Default value for TFILE_FS_OUTPUT_BUFFER_SIZE_KEY */
|
||||
public static final int TFILE_FS_OUTPUT_BUFFER_SIZE_DEFAULT = 256*1024;
|
||||
|
||||
public static final String HADOOP_CALLER_CONTEXT_ENABLED_KEY =
|
||||
"hadoop.caller.context.enabled";
|
||||
public static final boolean HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT = false;
|
||||
public static final String HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY =
|
||||
"hadoop.caller.context.max.size";
|
||||
public static final int HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT = 128;
|
||||
public static final String HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY =
|
||||
"hadoop.caller.context.signature.max.size";
|
||||
public static final int HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT =
|
||||
40;
|
||||
|
||||
/** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
|
||||
public static final String IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY =
|
||||
"ipc.client.connection.maxidletime";
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import org.apache.commons.lang.builder.EqualsBuilder;
|
||||
import org.apache.commons.lang.builder.HashCodeBuilder;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* A class defining the caller context for auditing coarse granularity
|
||||
* operations.
|
||||
*
|
||||
* This class is immutable.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "Hive", "MapReduce",
|
||||
"Pig", "YARN"})
|
||||
@InterfaceStability.Evolving
|
||||
public class CallerContext {
|
||||
public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
|
||||
/** The caller context.
|
||||
*
|
||||
* It will be truncated if it exceeds the maximum allowed length in
|
||||
* server. The default length limit is
|
||||
* {@link org.apache.hadoop.fs.CommonConfigurationKeysPublic#HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT}
|
||||
*/
|
||||
private final String context;
|
||||
/** The caller's signature for validation.
|
||||
*
|
||||
* The signature is optional. The null or empty signature will be abandoned.
|
||||
* If the signature exceeds the maximum allowed length in server, the caller
|
||||
* context will be abandoned. The default length limit is
|
||||
* {@link org.apache.hadoop.fs.CommonConfigurationKeysPublic#HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT}
|
||||
*/
|
||||
private final byte[] signature;
|
||||
|
||||
public CallerContext(Builder builder) {
|
||||
this.context = builder.context;
|
||||
this.signature = builder.signature;
|
||||
}
|
||||
|
||||
public boolean isValid() {
|
||||
return context != null;
|
||||
}
|
||||
|
||||
public String getContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public byte[] getSignature() {
|
||||
return signature == null ?
|
||||
null : Arrays.copyOf(signature, signature.length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder().append(context).toHashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null) {
|
||||
return false;
|
||||
} else if (obj == this) {
|
||||
return true;
|
||||
} else if (obj.getClass() != getClass()) {
|
||||
return false;
|
||||
} else {
|
||||
CallerContext rhs = (CallerContext) obj;
|
||||
return new EqualsBuilder()
|
||||
.append(context, rhs.context)
|
||||
.append(signature, rhs.signature)
|
||||
.isEquals();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
if (!isValid()) {
|
||||
return "";
|
||||
}
|
||||
String str = context;
|
||||
if (signature != null) {
|
||||
str += ":";
|
||||
str += new String(signature, SIGNATURE_ENCODING);
|
||||
}
|
||||
return str;
|
||||
}
|
||||
|
||||
/** The caller context builder. */
|
||||
public static final class Builder {
|
||||
private final String context;
|
||||
private byte[] signature;
|
||||
|
||||
public Builder(String context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
public Builder setSignature(byte[] signature) {
|
||||
if (signature != null && signature.length > 0) {
|
||||
this.signature = Arrays.copyOf(signature, signature.length);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public CallerContext build() {
|
||||
return new CallerContext(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The thread local current caller context.
|
||||
* <p/>
|
||||
* Internal class for defered singleton idiom.
|
||||
* https://en.wikipedia.org/wiki/Initialization_on_demand_holder_idiom
|
||||
*/
|
||||
private static final class CurrentCallerContextHolder {
|
||||
static final ThreadLocal<CallerContext> CALLER_CONTEXT =
|
||||
new InheritableThreadLocal<>();
|
||||
}
|
||||
|
||||
public static CallerContext getCurrent() {
|
||||
return CurrentCallerContextHolder.CALLER_CONTEXT.get();
|
||||
}
|
||||
|
||||
public static void setCurrent(CallerContext callerContext) {
|
||||
CurrentCallerContextHolder.CALLER_CONTEXT.set(callerContext);
|
||||
}
|
||||
}
|
|
@ -583,10 +583,11 @@ public abstract class Server {
|
|||
private final RPC.RpcKind rpcKind;
|
||||
private final byte[] clientId;
|
||||
private final TraceScope traceScope; // the HTrace scope on the server side
|
||||
private final CallerContext callerContext; // the call context
|
||||
|
||||
private Call(Call call) {
|
||||
this(call.callId, call.retryCount, call.rpcRequest, call.connection,
|
||||
call.rpcKind, call.clientId, call.traceScope);
|
||||
call.rpcKind, call.clientId, call.traceScope, call.callerContext);
|
||||
}
|
||||
|
||||
public Call(int id, int retryCount, Writable param,
|
||||
|
@ -597,11 +598,12 @@ public abstract class Server {
|
|||
|
||||
public Call(int id, int retryCount, Writable param, Connection connection,
|
||||
RPC.RpcKind kind, byte[] clientId) {
|
||||
this(id, retryCount, param, connection, kind, clientId, null);
|
||||
this(id, retryCount, param, connection, kind, clientId, null, null);
|
||||
}
|
||||
|
||||
public Call(int id, int retryCount, Writable param, Connection connection,
|
||||
RPC.RpcKind kind, byte[] clientId, TraceScope traceScope) {
|
||||
RPC.RpcKind kind, byte[] clientId, TraceScope traceScope,
|
||||
CallerContext callerContext) {
|
||||
this.callId = id;
|
||||
this.retryCount = retryCount;
|
||||
this.rpcRequest = param;
|
||||
|
@ -611,6 +613,7 @@ public abstract class Server {
|
|||
this.rpcKind = kind;
|
||||
this.clientId = clientId;
|
||||
this.traceScope = traceScope;
|
||||
this.callerContext = callerContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -2080,9 +2083,18 @@ public abstract class Server {
|
|||
}
|
||||
}
|
||||
|
||||
CallerContext callerContext = null;
|
||||
if (header.hasCallerContext()) {
|
||||
callerContext =
|
||||
new CallerContext.Builder(header.getCallerContext().getContext())
|
||||
.setSignature(header.getCallerContext().getSignature()
|
||||
.toByteArray())
|
||||
.build();
|
||||
}
|
||||
|
||||
Call call = new Call(header.getCallId(), header.getRetryCount(),
|
||||
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
|
||||
header.getClientId().toByteArray(), traceScope);
|
||||
header.getClientId().toByteArray(), traceScope, callerContext);
|
||||
|
||||
if (callQueue.isClientBackoffEnabled()) {
|
||||
// if RPC queue is full, we will ask the RPC client to back off by
|
||||
|
@ -2274,6 +2286,8 @@ public abstract class Server {
|
|||
traceScope = call.traceScope;
|
||||
traceScope.getSpan().addTimelineAnnotation("called");
|
||||
}
|
||||
// always update the current call context
|
||||
CallerContext.setCurrent(call.callerContext);
|
||||
|
||||
try {
|
||||
// Make the call as the user via Subject.doAs, thus associating
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.util;
|
|||
import java.io.DataInput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
|
||||
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.UserInformationProto;
|
||||
|
@ -177,6 +178,18 @@ public abstract class ProtoUtil {
|
|||
.build());
|
||||
}
|
||||
|
||||
// Add caller context if it is not null
|
||||
CallerContext callerContext = CallerContext.getCurrent();
|
||||
if (callerContext != null && callerContext.isValid()) {
|
||||
RPCCallerContextProto.Builder contextBuilder = RPCCallerContextProto
|
||||
.newBuilder().setContext(callerContext.getContext());
|
||||
if (callerContext.getSignature() != null) {
|
||||
contextBuilder.setSignature(
|
||||
ByteString.copyFrom(callerContext.getSignature()));
|
||||
}
|
||||
result.setCallerContext(contextBuilder);
|
||||
}
|
||||
|
||||
return result.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,6 +66,14 @@ message RPCTraceInfoProto {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Used to pass through the call context entry after an RPC is made.
|
||||
*/
|
||||
message RPCCallerContextProto {
|
||||
required string context = 1;
|
||||
optional bytes signature = 2;
|
||||
}
|
||||
|
||||
message RpcRequestHeaderProto { // the header for the RpcRequest
|
||||
enum OperationProto {
|
||||
RPC_FINAL_PACKET = 0; // The final RPC Packet
|
||||
|
@ -81,6 +89,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
|||
// retry count, 1 means this is the first retry
|
||||
optional sint32 retryCount = 5 [default = -1];
|
||||
optional RPCTraceInfoProto traceInfo = 6; // tracing info
|
||||
optional RPCCallerContextProto callerContext = 7; // call context
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -853,6 +853,9 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
HDFS-8155. Support OAuth2 in WebHDFS. (jghoman)
|
||||
|
||||
HDFS-9184. Logging HDFS operation's caller context into audit logs.
|
||||
(Mingliang Liu via jitendra)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-9257. improve error message for "Absolute path required" in INode.java
|
||||
|
|
|
@ -19,6 +19,12 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
|
@ -134,7 +140,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.Metadata;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
|
||||
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
|
||||
import org.apache.hadoop.fs.CacheFlag;
|
||||
import org.apache.hadoop.fs.ContentSummary;
|
||||
|
@ -251,6 +256,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.RetryCache;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
|
@ -363,7 +369,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
if (logger instanceof HdfsAuditLogger) {
|
||||
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
|
||||
hdfsLogger.logAuditEvent(succeeded, ugi.toString(), addr, cmd, src, dst,
|
||||
status, ugi, dtSecretManager);
|
||||
status, CallerContext.getCurrent(), ugi, dtSecretManager);
|
||||
} else {
|
||||
logger.logAuditEvent(succeeded, ugi.toString(), addr,
|
||||
cmd, src, dst, status);
|
||||
|
@ -7295,12 +7301,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
@VisibleForTesting
|
||||
static class DefaultAuditLogger extends HdfsAuditLogger {
|
||||
private boolean isCallerContextEnabled;
|
||||
private int callerContextMaxLen;
|
||||
private int callerSignatureMaxLen;
|
||||
|
||||
private boolean logTokenTrackingId;
|
||||
private Set<String> debugCmdSet = new HashSet<String>();
|
||||
|
||||
@Override
|
||||
public void initialize(Configuration conf) {
|
||||
isCallerContextEnabled = conf.getBoolean(
|
||||
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
|
||||
HADOOP_CALLER_CONTEXT_ENABLED_DEFAULT);
|
||||
callerContextMaxLen = conf.getInt(
|
||||
HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY,
|
||||
HADOOP_CALLER_CONTEXT_MAX_SIZE_DEFAULT);
|
||||
callerSignatureMaxLen = conf.getInt(
|
||||
HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY,
|
||||
HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_DEFAULT);
|
||||
logTokenTrackingId = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT);
|
||||
|
@ -7312,7 +7330,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
@Override
|
||||
public void logAuditEvent(boolean succeeded, String userName,
|
||||
InetAddress addr, String cmd, String src, String dst,
|
||||
FileStatus status, UserGroupInformation ugi,
|
||||
FileStatus status, CallerContext callerContext, UserGroupInformation ugi,
|
||||
DelegationTokenSecretManager dtSecretManager) {
|
||||
|
||||
if (auditLog.isDebugEnabled() ||
|
||||
|
@ -7351,6 +7369,24 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
sb.append("\t").append("proto=");
|
||||
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
|
||||
if (isCallerContextEnabled &&
|
||||
callerContext != null &&
|
||||
callerContext.isValid() &&
|
||||
(callerContext.getSignature() == null ||
|
||||
callerContext.getSignature().length <= callerSignatureMaxLen)) {
|
||||
sb.append("\t").append("callerContext=");
|
||||
if (callerContext.getContext().length() > callerContextMaxLen) {
|
||||
sb.append(callerContext.getContext().substring(0,
|
||||
callerContextMaxLen));
|
||||
} else {
|
||||
sb.append(callerContext.getContext());
|
||||
}
|
||||
if (callerContext.getSignature() != null) {
|
||||
sb.append(":");
|
||||
sb.append(new String(callerContext.getSignature(),
|
||||
CallerContext.SIGNATURE_ENCODING));
|
||||
}
|
||||
}
|
||||
logAuditMessage(sb.toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
/**
|
||||
|
@ -36,8 +37,8 @@ public abstract class HdfsAuditLogger implements AuditLogger {
|
|||
public void logAuditEvent(boolean succeeded, String userName,
|
||||
InetAddress addr, String cmd, String src, String dst,
|
||||
FileStatus status) {
|
||||
logAuditEvent(succeeded, userName, addr, cmd, src, dst, status, null,
|
||||
null);
|
||||
logAuditEvent(succeeded, userName, addr, cmd, src, dst, status,
|
||||
null /*callerContext*/, null /*ugi*/, null /*dtSecretManager*/);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -61,6 +62,6 @@ public abstract class HdfsAuditLogger implements AuditLogger {
|
|||
*/
|
||||
public abstract void logAuditEvent(boolean succeeded, String userName,
|
||||
InetAddress addr, String cmd, String src, String dst,
|
||||
FileStatus stat, UserGroupInformation ugi,
|
||||
FileStatus stat, CallerContext callerContext, UserGroupInformation ugi,
|
||||
DelegationTokenSecretManager dtSecretManager);
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ public class TestAuditLogAtDebug {
|
|||
logger.logAuditEvent(true, "",
|
||||
Inet4Address.getLoopbackAddress(),
|
||||
command, "", "",
|
||||
null, null, null);
|
||||
null, null, null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -29,15 +30,24 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
||||
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
|
||||
import org.apache.hadoop.ipc.CallerContext;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.authorize.ProxyServers;
|
||||
import org.apache.hadoop.security.authorize.ProxyUsers;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||
import org.apache.log4j.Level;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetAddress;
|
||||
|
@ -45,11 +55,15 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_ENABLED_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
|
@ -58,6 +72,11 @@ import static org.mockito.Mockito.doThrow;
|
|||
* Tests for the {@link AuditLogger} custom audit logging interface.
|
||||
*/
|
||||
public class TestAuditLogger {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestAuditLogger.class);
|
||||
static {
|
||||
GenericTestUtils.setLogLevel(LOG, Level.ALL);
|
||||
}
|
||||
|
||||
private static final short TEST_PERMISSION = (short) 0654;
|
||||
|
||||
|
@ -199,6 +218,163 @@ public class TestAuditLogger {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the audit logger is aware of the call context
|
||||
*/
|
||||
@Test
|
||||
public void testAuditLoggerWithCallContext() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(HADOOP_CALLER_CONTEXT_ENABLED_KEY, true);
|
||||
conf.setInt(HADOOP_CALLER_CONTEXT_MAX_SIZE_KEY, 128);
|
||||
conf.setInt(HADOOP_CALLER_CONTEXT_SIGNATURE_MAX_SIZE_KEY, 40);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
LogCapturer auditlog = LogCapturer.captureLogs(FSNamesystem.auditLog);
|
||||
|
||||
try {
|
||||
cluster.waitClusterUp();
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
final long time = System.currentTimeMillis();
|
||||
final Path p = new Path("/");
|
||||
|
||||
assertNull(CallerContext.getCurrent());
|
||||
|
||||
// context-only
|
||||
CallerContext context = new CallerContext.Builder("setTimes").build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.setTimes(p, time, time);
|
||||
System.out.println("LLLLLL" + auditlog.getOutput());
|
||||
assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// context with signature
|
||||
context = new CallerContext.Builder("setTimes")
|
||||
.setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.setTimes(p, time, time);
|
||||
assertTrue(auditlog.getOutput().endsWith(
|
||||
"callerContext=setTimes:L\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// long context is truncated
|
||||
final String longContext = RandomStringUtils.randomAscii(200);
|
||||
context = new CallerContext.Builder(longContext)
|
||||
.setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.setTimes(p, time, time);
|
||||
assertTrue(auditlog.getOutput().endsWith(
|
||||
"callerContext=" + longContext.substring(0, 128) + ":L\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// caller context is inherited in child thread
|
||||
context = new CallerContext.Builder("setTimes")
|
||||
.setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
Thread child = new Thread(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
fs.setTimes(p, time, time);
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception found." + e);
|
||||
}
|
||||
}
|
||||
});
|
||||
child.start();
|
||||
try {
|
||||
child.join();
|
||||
} catch (InterruptedException ignored) {
|
||||
// Ignore
|
||||
}
|
||||
assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes:L\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// caller context is overridden in child thread
|
||||
final CallerContext childContext =
|
||||
new CallerContext.Builder("setPermission")
|
||||
.setSignature("L".getBytes(CallerContext.SIGNATURE_ENCODING))
|
||||
.build();
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
child = new Thread(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
CallerContext.setCurrent(childContext);
|
||||
fs.setPermission(p, new FsPermission((short)777));
|
||||
} catch (IOException e) {
|
||||
fail("Unexpected exception found." + e);
|
||||
}
|
||||
}
|
||||
});
|
||||
child.start();
|
||||
try {
|
||||
child.join();
|
||||
} catch (InterruptedException ignored) {
|
||||
// Ignore
|
||||
}
|
||||
assertTrue(auditlog.getOutput().endsWith(
|
||||
"callerContext=setPermission:L\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// reuse the current context's signature
|
||||
context = new CallerContext.Builder("mkdirs")
|
||||
.setSignature(CallerContext.getCurrent().getSignature()).build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.mkdirs(new Path("/reuse-context-signature"));
|
||||
assertTrue(auditlog.getOutput().endsWith("callerContext=mkdirs:L\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// caller context with too long signature is abandoned
|
||||
context = new CallerContext.Builder("setTimes")
|
||||
.setSignature(new byte[41])
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.setTimes(p, time, time);
|
||||
assertFalse(auditlog.getOutput().contains("callerContext="));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// null signature is ignored
|
||||
context = new CallerContext.Builder("setTimes").setSignature(null)
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.setTimes(p, time, time);
|
||||
assertTrue(auditlog.getOutput().endsWith("callerContext=setTimes\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// empty signature is ignored
|
||||
context = new CallerContext.Builder("mkdirs")
|
||||
.setSignature("".getBytes(CallerContext.SIGNATURE_ENCODING))
|
||||
.build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.mkdirs(new Path("/empty-signature"));
|
||||
assertTrue(auditlog.getOutput().endsWith("callerContext=mkdirs\n"));
|
||||
auditlog.clearOutput();
|
||||
|
||||
// invalid context is not passed to the rpc
|
||||
context = new CallerContext.Builder(null).build();
|
||||
CallerContext.setCurrent(context);
|
||||
LOG.info("Set current caller context as {}", CallerContext.getCurrent());
|
||||
fs.mkdirs(new Path("/empty-signature"));
|
||||
assertFalse(auditlog.getOutput().contains("callerContext="));
|
||||
auditlog.clearOutput();
|
||||
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuditLogWithAclFailure() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
|
|
Loading…
Reference in New Issue