HDDS-120. Adding HDDS datanode Audit Log. Contributed by Dinesh Chitlangia.

This commit is contained in:
Xiaoyu Yao 2018-11-16 14:59:59 -08:00
parent abd6d48c46
commit 29374999b6
7 changed files with 473 additions and 9 deletions

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.audit;
/**
* Enum to define Audit Action types for Datanode.
*/
public enum DNAction implements AuditAction {
CREATE_CONTAINER("CREATE_CONTAINER"),
READ_CONTAINER("READ_CONTAINER"),
UPDATE_CONTAINER("UPDATE_CONTAINER"),
DELETE_CONTAINER("DELETE_CONTAINER"),
LIST_CONTAINER("LIST_CONTAINER"),
PUT_BLOCK("PUT_BLOCK"),
GET_BLOCK("GET_BLOCK"),
DELETE_BLOCK("DELETE_BLOCK"),
LIST_BLOCK("LIST_BLOCK"),
READ_CHUNK("READ_CHUNK"),
DELETE_CHUNK("DELETE_CHUNK"),
WRITE_CHUNK("WRITE_CHUNK"),
LIST_CHUNK("LIST_CHUNK"),
COMPACT_CHUNK("COMPACT_CHUNK"),
PUT_SMALL_FILE("PUT_SMALL_FILE"),
GET_SMALL_FILE("GET_SMALL_FILE"),
CLOSE_CONTAINER("CLOSE_CONTAINER"),
GET_COMMITTED_BLOCK_LENGTH("GET_COMMITTED_BLOCK_LENGTH");
private String action;
DNAction(String action) {
this.action = action;
}
@Override
public String getAction() {
return this.action;
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import com.google.common.base.Preconditions;
@ -260,4 +262,12 @@ public class BlockData {
public long getSize() {
return size;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.NO_CLASS_NAME_STYLE)
.append("blockId", blockID.toString())
.append("size", this.size)
.toString();
}
}

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.helpers;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.audit.DNAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
/**
* Utilities for converting protobuf classes to Java classes.
*/
public final class ContainerCommandRequestPBHelper {
static final Logger LOG =
LoggerFactory.getLogger(ContainerCommandRequestPBHelper.class);
private ContainerCommandRequestPBHelper() {
}
public static Map<String, String> getAuditParams(
ContainerCommandRequestProto msg) {
Map<String, String> auditParams = new TreeMap<>();
Type cmdType = msg.getCmdType();
String containerID = String.valueOf(msg.getContainerID());
switch(cmdType) {
case CreateContainer:
auditParams.put("containerID", containerID);
auditParams.put("containerType",
msg.getCreateContainer().getContainerType().toString());
return auditParams;
case ReadContainer:
auditParams.put("containerID", containerID);
return auditParams;
case UpdateContainer:
auditParams.put("containerID", containerID);
auditParams.put("forceUpdate",
String.valueOf(msg.getUpdateContainer().getForceUpdate()));
return auditParams;
case DeleteContainer:
auditParams.put("containerID", containerID);
auditParams.put("forceDelete",
String.valueOf(msg.getDeleteContainer().getForceDelete()));
return auditParams;
case ListContainer:
auditParams.put("startContainerID", containerID);
auditParams.put("count",
String.valueOf(msg.getListContainer().getCount()));
return auditParams;
case PutBlock:
try{
auditParams.put("blockData",
BlockData.getFromProtoBuf(msg.getPutBlock().getBlockData())
.toString());
}catch (IOException ex){
LOG.trace("Encountered error parsing BlockData from protobuf:"
+ ex.getMessage());
return null;
}
return auditParams;
case GetBlock:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getGetBlock().getBlockID()).toString());
return auditParams;
case DeleteBlock:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getDeleteBlock().getBlockID())
.toString());
return auditParams;
case ListBlock:
auditParams.put("startLocalID",
String.valueOf(msg.getListBlock().getStartLocalID()));
auditParams.put("count", String.valueOf(msg.getListBlock().getCount()));
return auditParams;
case ReadChunk:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
return auditParams;
case DeleteChunk:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getDeleteChunk().getBlockID())
.toString());
return auditParams;
case WriteChunk:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
.toString());
return auditParams;
case ListChunk:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getListChunk().getBlockID()).toString());
auditParams.put("prevChunkName", msg.getListChunk().getPrevChunkName());
auditParams.put("count", String.valueOf(msg.getListChunk().getCount()));
return auditParams;
case CompactChunk: return null; //CompactChunk operation
case PutSmallFile:
try{
auditParams.put("blockData",
BlockData.getFromProtoBuf(msg.getPutSmallFile()
.getBlock().getBlockData()).toString());
}catch (IOException ex){
LOG.trace("Encountered error parsing BlockData from protobuf:"
+ ex.getMessage());
}
return auditParams;
case GetSmallFile:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getGetSmallFile().getBlock().getBlockID())
.toString());
return auditParams;
case CloseContainer:
auditParams.put("containerID", containerID);
return auditParams;
case GetCommittedBlockLength:
auditParams.put("blockData",
BlockID.getFromProtobuf(msg.getGetCommittedBlockLength().getBlockID())
.toString());
return auditParams;
default :
LOG.debug("Invalid command type - " + cmdType);
return null;
}
}
public static DNAction getAuditAction(Type cmdType) {
switch (cmdType) {
case CreateContainer : return DNAction.CREATE_CONTAINER;
case ReadContainer : return DNAction.READ_CONTAINER;
case UpdateContainer : return DNAction.UPDATE_CONTAINER;
case DeleteContainer : return DNAction.DELETE_CONTAINER;
case ListContainer : return DNAction.LIST_CONTAINER;
case PutBlock : return DNAction.PUT_BLOCK;
case GetBlock : return DNAction.GET_BLOCK;
case DeleteBlock : return DNAction.DELETE_BLOCK;
case ListBlock : return DNAction.LIST_BLOCK;
case ReadChunk : return DNAction.READ_CHUNK;
case DeleteChunk : return DNAction.DELETE_CHUNK;
case WriteChunk : return DNAction.WRITE_CHUNK;
case ListChunk : return DNAction.LIST_CHUNK;
case CompactChunk : return DNAction.COMPACT_CHUNK;
case PutSmallFile : return DNAction.PUT_SMALL_FILE;
case GetSmallFile : return DNAction.GET_SMALL_FILE;
case CloseContainer : return DNAction.CLOSE_CONTAINER;
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
default :
LOG.debug("Invalid command type - " + cmdType);
return null;
}
}
}

View File

@ -27,9 +27,21 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerDataProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.InvalidContainerStateException;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
import org.apache.hadoop.ozone.audit.AuditAction;
import org.apache.hadoop.ozone.audit.AuditEventStatus;
import org.apache.hadoop.ozone.audit.AuditLogger;
import org.apache.hadoop.ozone.audit.AuditLoggerType;
import org.apache.hadoop.ozone.audit.AuditMarker;
import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.container.common.helpers
.ContainerCommandRequestPBHelper;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
@ -57,10 +69,11 @@ import java.util.Optional;
* Ozone Container dispatcher takes a call from the netty server and routes it
* to the right handler function.
*/
public class HddsDispatcher implements ContainerDispatcher {
public class HddsDispatcher implements ContainerDispatcher, Auditor {
static final Logger LOG = LoggerFactory.getLogger(HddsDispatcher.class);
private static final AuditLogger AUDIT =
new AuditLogger(AuditLoggerType.DNLOGGER);
private final Map<ContainerType, Handler> handlers;
private final Configuration conf;
private final ContainerSet containerSet;
@ -123,7 +136,13 @@ public class HddsDispatcher implements ContainerDispatcher {
ContainerCommandRequestProto msg) {
LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(),
msg.getTraceID());
Preconditions.checkNotNull(msg);
Preconditions.checkNotNull(msg.toString());
AuditAction action = ContainerCommandRequestPBHelper.getAuditAction(
msg.getCmdType());
EventType eventType = getEventType(msg);
Map<String, String> params =
ContainerCommandRequestPBHelper.getAuditParams(msg);
Container container = null;
ContainerType containerType = null;
@ -149,11 +168,14 @@ public class HddsDispatcher implements ContainerDispatcher {
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return ContainerUtils.malformedRequest(msg);
}
containerType = msg.getCreateContainer().getContainerType();
@ -168,6 +190,8 @@ public class HddsDispatcher implements ContainerDispatcher {
StorageContainerException ex = new StorageContainerException("Invalid " +
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
responseProto = handler.handle(msg, container);
@ -204,8 +228,19 @@ public class HddsDispatcher implements ContainerDispatcher {
.setState(ContainerDataProto.State.UNHEALTHY);
sendCloseContainerActionIfNeeded(container);
}
if(result == Result.SUCCESS) {
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
return responseProto;
} else {
// log failure
audit(action, eventType, params, AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return ContainerUtils.unsupportedRequest(msg);
}
}
@ -249,17 +284,24 @@ public class HddsDispatcher implements ContainerDispatcher {
public void validateContainerCommand(
ContainerCommandRequestProto msg) throws StorageContainerException {
ContainerType containerType = msg.getCreateContainer().getContainerType();
ContainerProtos.Type cmdType = msg.getCmdType();
AuditAction action =
ContainerCommandRequestPBHelper.getAuditAction(cmdType);
EventType eventType = getEventType(msg);
Map<String, String> params =
ContainerCommandRequestPBHelper.getAuditParams(msg);
Handler handler = getHandler(containerType);
if (handler == null) {
StorageContainerException ex = new StorageContainerException(
"Invalid " + "ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
audit(action, eventType, params, AuditEventStatus.FAILURE, ex);
throw ex;
}
ContainerProtos.Type cmdType = msg.getCmdType();
long containerID = msg.getContainerID();
Container container;
container = getContainer(containerID);
if (container != null) {
State containerState = container.getContainerState();
if (!HddsUtils.isReadOnly(msg) && containerState != State.OPEN) {
@ -274,12 +316,16 @@ public class HddsDispatcher implements ContainerDispatcher {
default:
// if the container is not open, no updates can happen. Just throw
// an exception
throw new ContainerNotOpenException(
ContainerNotOpenException cex = new ContainerNotOpenException(
"Container " + containerID + " in " + containerState + " state");
audit(action, eventType, params, AuditEventStatus.FAILURE, cex);
throw cex;
}
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
throw new InvalidContainerStateException(
InvalidContainerStateException iex = new InvalidContainerStateException(
"Container " + containerID + " in " + containerState + " state");
audit(action, eventType, params, AuditEventStatus.FAILURE, iex);
throw iex;
}
}
}
@ -355,4 +401,73 @@ public class HddsDispatcher implements ContainerDispatcher {
public void setMetricsForTesting(ContainerMetrics containerMetrics) {
this.metrics = containerMetrics;
}
private EventType getEventType(ContainerCommandRequestProto msg) {
return HddsUtils.isReadOnly(msg) ? EventType.READ : EventType.WRITE;
}
private void audit(AuditAction action, EventType eventType,
Map<String, String> params, AuditEventStatus result, Throwable exception){
AuditMessage amsg;
switch (result) {
case SUCCESS:
if(eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logReadSuccess(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
AUDIT.logWriteSuccess(amsg);
}
break;
case FAILURE:
if(eventType == EventType.READ &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
AUDIT.logReadFailure(amsg);
} else if(eventType == EventType.WRITE &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.WRITE.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
AUDIT.logWriteFailure(amsg);
}
break;
default: LOG.debug("Invalid audit event status - " + result);
}
}
//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForSuccess(AuditAction op,
Map<String, String> auditMap) {
return new AuditMessage.Builder()
.setUser(null)
.atIp(null)
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.SUCCESS.toString())
.withException(null)
.build();
}
//TODO: use GRPC to fetch user and ip details
@Override
public AuditMessage buildAuditMessageForFailure(AuditAction op,
Map<String, String> auditMap, Throwable throwable) {
return new AuditMessage.Builder()
.setUser(null)
.atIp(null)
.forOperation(op.getAction())
.withParams(auditMap)
.withResult(AuditEventStatus.FAILURE.toString())
.withException(throwable)
.build();
}
enum EventType {
READ,
WRITE
}
}

View File

@ -68,6 +68,8 @@ function ozonecmd_case
;;
datanode)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HDDS_DN_OPTS="${HDDS_DN_OPTS} -Dlog4j.configurationFile=${HADOOP_CONF_DIR}/dn-audit-log4j2.properties"
HADOOP_OPTS="${HADOOP_OPTS} ${HDDS_DN_OPTS}"
HADOOP_CLASSNAME=org.apache.hadoop.ozone.HddsDatanodeService
OZONE_RUN_ARTIFACT_NAME="hadoop-ozone-datanode"
;;

View File

@ -79,6 +79,7 @@ run mkdir -p ./libexec
run cp -r "${ROOT}/hadoop-common-project/hadoop-common/src/main/conf" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/om-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop"
run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop"
run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/"

View File

@ -0,0 +1,90 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with this
# work for additional information regarding copyright ownership. The ASF
# licenses this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# <p>
# http://www.apache.org/licenses/LICENSE-2.0
# <p>
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
name=PropertiesConfig
# Checks for config change periodically and reloads
monitorInterval=30
filter=read,write
# filter.read.onMatch=DENY avoids logging all READ events
# filter.read.onMatch=ACCEPT permits logging all READ events
# The above two settings ignore the log levels in configuration
# filter.read.onMatch=NEUTRAL permits logging of only those READ events
# which are attempted at log level equal or greater than log level specified
# in the configuration
filter.read.type=MarkerFilter
filter.read.marker=READ
filter.read.onMatch=DENY
filter.read.onMismatch=NEUTRAL
# filter.write.onMatch=DENY avoids logging all WRITE events
# filter.write.onMatch=ACCEPT permits logging all WRITE events
# The above two settings ignore the log levels in configuration
# filter.write.onMatch=NEUTRAL permits logging of only those WRITE events
# which are attempted at log level equal or greater than log level specified
# in the configuration
filter.write.type=MarkerFilter
filter.write.marker=WRITE
filter.write.onMatch=NEUTRAL
filter.write.onMismatch=NEUTRAL
# Log Levels are organized from most specific to least:
# OFF (most specific, no logging)
# FATAL (most specific, little data)
# ERROR
# WARN
# INFO
# DEBUG
# TRACE (least specific, a lot of data)
# ALL (least specific, all data)
# Uncomment following section to enable logging to console appender also
#appenders=console, rolling
#appender.console.type=Console
#appender.console.name=STDOUT
#appender.console.layout.type=PatternLayout
#appender.console.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
# Comment this line when using both console and rolling appenders
appenders=rolling
#Rolling File Appender with size & time thresholds.
#Rolling is triggered when either threshold is breached.
#The rolled over file is compressed by default
#Time interval is specified in seconds 86400s=1 day
appender.rolling.type=RollingFile
appender.rolling.name=RollingFile
appender.rolling.fileName =${sys:hadoop.log.dir}/dn-audit-${hostName}.log
appender.rolling.filePattern=${sys:hadoop.log.dir}/dn-audit-${hostName}-%d{yyyy-MM-dd-HH-mm-ss}-%i.log.gz
appender.rolling.layout.type=PatternLayout
appender.rolling.layout.pattern=%d{DEFAULT} | %-5level | %c{1} | %msg | %throwable{3} %n
appender.rolling.policies.type=Policies
appender.rolling.policies.time.type=TimeBasedTriggeringPolicy
appender.rolling.policies.time.interval=86400
appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=64MB
loggers=audit
logger.audit.type=AsyncLogger
logger.audit.name=DNAudit
logger.audit.level=INFO
logger.audit.appenderRefs=rolling
logger.audit.appenderRef.file.ref=RollingFile
rootLogger.level=INFO
#rootLogger.appenderRefs=stdout
#rootLogger.appenderRef.stdout.ref=STDOUT