control index audit events through an include/excludes mechanism

This changes how the user defines which events should be indexed. Previously, there were
several boolean settings being used. This condenses to an include and exclude setting.

Additionally, the IndexAuditTrail needed to become a lifecycle component since parsing the
enum could throw exceptions. Given this, the IndexBulkProcessor was condensed into the
IndexAuditTrail since it did not make sense to have two lifecycle components for an index
audit trail.

Closes elastic/elasticsearch#900

Original commit: elastic/x-pack-elasticsearch@4b4d824f5e
This commit is contained in:
jaymode 2015-06-12 11:46:51 -04:00
parent c82816dd49
commit 9a97b046d5
7 changed files with 356 additions and 309 deletions

View File

@ -14,7 +14,7 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.shield.audit.index.IndexAuditTrailBulkProcessor;
import org.elasticsearch.shield.audit.index.IndexAuditTrail;
import org.elasticsearch.shield.authc.Realms;
import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.authc.support.UsernamePasswordToken;
@ -71,7 +71,7 @@ public class ShieldPlugin extends AbstractPlugin {
if (enabled && !clientMode) {
if (indexAuditLoggingEnabled(settings)) {
// index-based audit logging should be started before other services
builder.add(IndexAuditTrailBulkProcessor.class);
builder.add(IndexAuditTrail.class);
}
builder.add(LicenseService.class).add(InternalCryptoService.class).add(FileRolesStore.class).add(Realms.class).add(IPFilter.class);
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.audit.index.IndexAuditTrail;
import org.elasticsearch.shield.audit.index.IndexAuditTrailBulkProcessor;
import org.elasticsearch.shield.audit.index.IndexAuditUserHolder;
import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.shield.authz.AuthorizationModule;
@ -59,7 +58,6 @@ public class AuditTrailModule extends AbstractShieldModule.Node implements PrePr
bind(IndexAuditUserHolder.class).toInstance(indexAuditUser);
binder.addBinding().to(IndexAuditTrail.class);
bind(IndexAuditTrail.class).asEagerSingleton();
bind(IndexAuditTrailBulkProcessor.class).asEagerSingleton();
break;
default:
throw new ElasticsearchException("unknown audit trail output [" + output + "]");
@ -71,7 +69,7 @@ public class AuditTrailModule extends AbstractShieldModule.Node implements PrePr
public void processModule(Module module) {
if (enabled && module instanceof AuthorizationModule) {
if (indexAuditLoggingEnabled(settings)) {
indexAuditUser = new IndexAuditUserHolder(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX);
indexAuditUser = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX);
((AuthorizationModule) module).registerReservedRole(indexAuditUser.role());
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.audit.index;
import org.elasticsearch.shield.ShieldException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Locale;
public enum IndexAuditLevel {
ANONYMOUS_ACCESS_DENIED,
AUTHENTICATION_FAILED,
ACCESS_GRANTED,
ACCESS_DENIED,
TAMPERED_REQUEST,
CONNECTION_GRANTED,
CONNECTION_DENIED,
SYSTEM_ACCESS_GRANTED;
static EnumSet<IndexAuditLevel> parse(String[] levels) {
EnumSet<IndexAuditLevel> enumSet = EnumSet.noneOf(IndexAuditLevel.class);
for (String level : levels) {
String lowerCaseLevel = level.trim().toLowerCase(Locale.ROOT);
switch (lowerCaseLevel) {
case "_all":
enumSet.addAll(Arrays.asList(IndexAuditLevel.values()));
break;
case "anonymous_access_denied":
enumSet.add(ANONYMOUS_ACCESS_DENIED);
break;
case "authentication_failed":
enumSet.add(AUTHENTICATION_FAILED);
break;
case "access_granted":
enumSet.add(ACCESS_GRANTED);
break;
case "access_denied":
enumSet.add(ACCESS_DENIED);
break;
case "tampered_request":
enumSet.add(TAMPERED_REQUEST);
break;
case "connection_granted":
enumSet.add(CONNECTION_GRANTED);
break;
case "connection_denied":
enumSet.add(CONNECTION_DENIED);
break;
case "system_access_granted":
enumSet.add(SYSTEM_ACCESS_GRANTED);
break;
default:
throw new ShieldException("invalid event name specified [" + level + "]");
}
}
return enumSet;
}
public static EnumSet<IndexAuditLevel> parse(String[] includeLevels, String[] excludeLevels) {
EnumSet<IndexAuditLevel> included = parse(includeLevels);
EnumSet<IndexAuditLevel> excluded = parse(excludeLevels);
included.removeAll(excluded);
return included;
}
}

View File

@ -5,20 +5,32 @@
*/
package org.elasticsearch.shield.audit.index;
import com.google.common.base.Splitter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.shield.User;
import org.elasticsearch.shield.audit.AuditTrail;
import org.elasticsearch.shield.authc.AuthenticationService;
import org.elasticsearch.shield.authc.AuthenticationToken;
import org.elasticsearch.shield.authz.Privilege;
import org.elasticsearch.shield.rest.RemoteHostHeader;
@ -30,55 +42,85 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import static org.elasticsearch.shield.audit.AuditUtil.indices;
import static org.elasticsearch.shield.audit.AuditUtil.restRequestContent;
import static org.elasticsearch.shield.audit.index.IndexAuditLevel.*;
/**
* Audit trail implementation that writes events into an index.
*/
public class IndexAuditTrail implements AuditTrail {
private static final ESLogger logger = ESLoggerFactory.getLogger(IndexAuditTrail.class.getName());
public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail> implements AuditTrail {
public static final int DEFAULT_BULK_SIZE = 1000;
public static final int MAX_BULK_SIZE = 10000;
public static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
public static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY;
public static final String NAME = "index";
public static final String INDEX_NAME_PREFIX = ".shield-audit-log";
public static final String DOC_TYPE = "event";
static final String[] DEFAULT_EVENT_INCLUDES = new String[] {
ACCESS_DENIED.toString(),
ACCESS_GRANTED.toString(),
ANONYMOUS_ACCESS_DENIED.toString(),
AUTHENTICATION_FAILED.toString(),
CONNECTION_DENIED.toString(),
CONNECTION_GRANTED.toString(),
TAMPERED_REQUEST.toString()
};
private final String nodeName;
private final String nodeHostName;
private final String nodeHostAddress;
private final IndexAuditUserHolder auditUser;
private final IndexAuditTrailBulkProcessor processor;
private final Provider<Client> clientProvider;
private final AuthenticationService authenticationService;
private final IndexNameResolver resolver;
private final Environment environment;
private BulkProcessor bulkProcessor;
private Client client;
private boolean indexToRemoteCluster;
private IndexNameResolver.Rollover rollover;
private String nodeHostName;
private String nodeHostAddress;
@Override
public String name() {
return NAME;
}
private enum Level {
ANONYMOUS_ACCESS_DENIED,
AUTHENTICATION_FAILED,
ACCESS_GRANTED,
ACCESS_DENIED,
TAMPERED_REQUEST,
CONNECTION_GRANTED,
CONNECTION_DENIED,
SYSTEM_ACCESS_GRANTED;
}
private final EnumSet<Level> enabled = EnumSet.allOf(Level.class);
private EnumSet<IndexAuditLevel> events;
@Inject
public IndexAuditTrail(Settings settings, IndexAuditUserHolder indexingAuditUser,
IndexAuditTrailBulkProcessor processor) {
Environment environment, AuthenticationService authenticationService,
Provider<Client> clientProvider) {
super(settings);
this.auditUser = indexingAuditUser;
this.processor = processor;
this.authenticationService = authenticationService;
this.clientProvider = clientProvider;
this.environment = environment;
this.resolver = new IndexNameResolver();
this.nodeName = settings.get("name");
}
@Override
protected void doStart() throws ElasticsearchException {
try {
rollover = IndexNameResolver.Rollover.valueOf(
settings.get("shield.audit.index.rollover", DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]",
DEFAULT_ROLLOVER.name());
rollover = DEFAULT_ROLLOVER;
}
String hostname = "n/a";
String hostaddr = "n/a";
try {
hostname = InetAddress.getLocalHost().getHostName();
hostaddr = InetAddress.getLocalHost().getHostAddress();
@ -88,37 +130,38 @@ public class IndexAuditTrail implements AuditTrail {
this.nodeHostName = hostname;
this.nodeHostAddress = hostaddr;
if (!settings.getAsBoolean("shield.audit.index.events.system.access_granted", false)) {
enabled.remove(Level.SYSTEM_ACCESS_GRANTED);
}
if (!settings.getAsBoolean("shield.audit.index.events.anonymous_access_denied", true)) {
enabled.remove(Level.ANONYMOUS_ACCESS_DENIED);
}
if (!settings.getAsBoolean("shield.audit.index.events.authentication_failed", true)) {
enabled.remove(Level.AUTHENTICATION_FAILED);
}
if (!settings.getAsBoolean("shield.audit.index.events.access_granted", true)) {
enabled.remove(Level.ACCESS_GRANTED);
}
if (!settings.getAsBoolean("shield.audit.index.events.access_denied", true)) {
enabled.remove(Level.ACCESS_DENIED);
}
if (!settings.getAsBoolean("shield.audit.index.events.tampered_request", true)) {
enabled.remove(Level.TAMPERED_REQUEST);
}
if (!settings.getAsBoolean("shield.audit.index.events.connection_granted", true)) {
enabled.remove(Level.CONNECTION_GRANTED);
}
if (!settings.getAsBoolean("shield.audit.index.events.connection_denied", true)) {
enabled.remove(Level.CONNECTION_DENIED);
String[] includedEvents = settings.getAsArray("shield.audit.index.events.include", DEFAULT_EVENT_INCLUDES);
String[] excludedEvents = settings.getAsArray("shield.audit.index.events.exclude");
events = parse(includedEvents, excludedEvents);
initializeClient();
initializeBulkProcessor();
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
try {
if (bulkProcessor != null) {
bulkProcessor.close();
}
} finally {
if (indexToRemoteCluster) {
if (client != null) {
client.close();
}
}
}
}
@Override
public void anonymousAccessDenied(String action, TransportMessage<?> message) {
if (enabled.contains(Level.ANONYMOUS_ACCESS_DENIED)) {
if (events.contains(ANONYMOUS_ACCESS_DENIED)) {
try {
processor.submit(message("anonymous_access_denied", action, null, null, indices(message), message));
submit(message("anonymous_access_denied", action, null, null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [anonymous_access_denied]", e);
}
@ -127,9 +170,9 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void anonymousAccessDenied(RestRequest request) {
if (enabled.contains(Level.ANONYMOUS_ACCESS_DENIED)) {
if (events.contains(ANONYMOUS_ACCESS_DENIED)) {
try {
processor.submit(message("anonymous_access_denied", null, null, null, null, request));
submit(message("anonymous_access_denied", null, null, null, null, request));
} catch (Exception e) {
logger.warn("failed to index audit event: [anonymous_access_denied]", e);
}
@ -138,9 +181,9 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(String action, TransportMessage<?> message) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
try {
processor.submit(message("authentication_failed", action, null, null, indices(message), message));
submit(message("authentication_failed", action, null, null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -149,9 +192,9 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(RestRequest request) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
try {
processor.submit(message("authentication_failed", null, null, null, null, request));
submit(message("authentication_failed", null, null, null, null, request));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -160,10 +203,10 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(AuthenticationToken token, String action, TransportMessage<?> message) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
try {
processor.submit(message("authentication_failed", action, token.principal(), null, indices(message), message));
submit(message("authentication_failed", action, token.principal(), null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -173,10 +216,10 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(AuthenticationToken token, RestRequest request) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
try {
processor.submit(message("authentication_failed", null, token.principal(), null, null, request));
submit(message("authentication_failed", null, token.principal(), null, null, request));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -186,10 +229,10 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(String realm, AuthenticationToken token, String action, TransportMessage<?> message) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
try {
processor.submit(message("authentication_failed", action, token.principal(), realm, indices(message), message));
submit(message("authentication_failed", action, token.principal(), realm, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -199,10 +242,10 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void authenticationFailed(String realm, AuthenticationToken token, RestRequest request) {
if (enabled.contains(Level.AUTHENTICATION_FAILED)) {
if (events.contains(AUTHENTICATION_FAILED)) {
if (!principalIsAuditor(token.principal())) {
try {
processor.submit(message("authentication_failed", null, token.principal(), realm, null, request));
submit(message("authentication_failed", null, token.principal(), realm, null, request));
} catch (Exception e) {
logger.warn("failed to index audit event: [authentication_failed]", e);
}
@ -212,34 +255,32 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void accessGranted(User user, String action, TransportMessage<?> message) {
if (enabled.contains(Level.ACCESS_GRANTED)) {
if (!principalIsAuditor(user.principal())) {
// special treatment for internal system actions - only log if explicitly told to
if (user.isSystem() && Privilege.SYSTEM.predicate().apply(action)) {
if (enabled.contains(Level.SYSTEM_ACCESS_GRANTED)) {
try {
processor.submit(message("access_granted", action, user.principal(), null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
} else {
if (!principalIsAuditor(user.principal())) {
// special treatment for internal system actions - only log if explicitly told to
if (user.isSystem() && Privilege.SYSTEM.predicate().apply(action)) {
if (events.contains(SYSTEM_ACCESS_GRANTED)) {
try {
processor.submit(message("access_granted", action, user.principal(), null, indices(message), message));
submit(message("access_granted", action, user.principal(), null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
} else if (events.contains(ACCESS_GRANTED)) {
try {
submit(message("access_granted", action, user.principal(), null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [access_granted]", e);
}
}
}
}
@Override
public void accessDenied(User user, String action, TransportMessage<?> message) {
if (enabled.contains(Level.ACCESS_DENIED)) {
if (events.contains(ACCESS_DENIED)) {
if (!principalIsAuditor(user.principal())) {
try {
processor.submit(message("access_denied", action, user.principal(), null, indices(message), message));
submit(message("access_denied", action, user.principal(), null, indices(message), message));
} catch (Exception e) {
logger.warn("failed to index audit event: [access_denied]", e);
}
@ -249,10 +290,10 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void tamperedRequest(User user, String action, TransportRequest request) {
if (enabled.contains(Level.TAMPERED_REQUEST)) {
if (events.contains(TAMPERED_REQUEST)) {
if (!principalIsAuditor(user.principal())) {
try {
processor.submit(message("tampered_request", action, user.principal(), null, indices(request), request));
submit(message("tampered_request", action, user.principal(), null, indices(request), request));
} catch (Exception e) {
logger.warn("failed to index audit event: [tampered_request]", e);
}
@ -262,9 +303,9 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void connectionGranted(InetAddress inetAddress, String profile, ShieldIpFilterRule rule) {
if (enabled.contains(Level.CONNECTION_GRANTED)) {
if (events.contains(CONNECTION_GRANTED)) {
try {
processor.submit(message("ip_filter", "connection_granted", inetAddress, profile, rule));
submit(message("ip_filter", "connection_granted", inetAddress, profile, rule));
} catch (Exception e) {
logger.warn("failed to index audit event: [connection_granted]", e);
}
@ -273,9 +314,9 @@ public class IndexAuditTrail implements AuditTrail {
@Override
public void connectionDenied(InetAddress inetAddress, String profile, ShieldIpFilterRule rule) {
if (enabled.contains(Level.CONNECTION_DENIED)) {
if (events.contains(CONNECTION_DENIED)) {
try {
processor.submit(message("ip_filter", "connection_denied", inetAddress, profile, rule));
submit(message("ip_filter", "connection_denied", inetAddress, profile, rule));
} catch (Exception e) {
logger.warn("failed to index audit event: [connection_denied]", e);
}
@ -390,6 +431,94 @@ public class IndexAuditTrail implements AuditTrail {
return builder;
}
void submit(IndexAuditTrail.Message message) {
assert lifecycle.started();
IndexRequest indexRequest = client.prepareIndex()
.setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover))
.setType(DOC_TYPE).setSource(message.builder).request();
authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user());
bulkProcessor.add(indexRequest);
}
private void initializeClient() {
Settings clientSettings = settings.getByPrefix("shield.audit.index.client.");
if (clientSettings.names().size() == 0) {
// in the absence of client settings for remote indexing, fall back to the client that was passed in.
this.client = clientProvider.get();
indexToRemoteCluster = false;
} else {
String[] hosts = clientSettings.getAsArray("hosts");
if (hosts.length == 0) {
throw new ElasticsearchException("missing required setting " +
"[shield.audit.index.client.hosts] for remote audit log indexing");
}
if (clientSettings.get("cluster.name", "").isEmpty()) {
throw new ElasticsearchException("missing required setting " +
"[shield.audit.index.client.cluster.name] for remote audit log indexing");
}
List<Tuple<String, Integer>> hostPortPairs = new ArrayList<>();
for (String host : hosts) {
List<String> hostPort = Splitter.on(":").splitToList(host.trim());
if (hostPort.size() != 1 && hostPort.size() != 2) {
logger.warn("invalid host:port specified: [{}] for setting [shield.audit.index.client.hosts]", host);
}
hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300));
}
if (hostPortPairs.size() == 0) {
throw new ElasticsearchException("no valid host:port pairs specified for setting [shield.audit.index.client.hosts]");
}
final TransportClient transportClient = TransportClient.builder()
.settings(Settings.builder().put(clientSettings).put("path.home", environment.homeFile()).build()).build();
for (Tuple<String, Integer> pair : hostPortPairs) {
transportClient.addTransportAddress(new InetSocketTransportAddress(pair.v1(), pair.v2()));
}
this.client = transportClient;
indexToRemoteCluster = true;
logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]",
clientSettings.get("cluster.name", ""), hostPortPairs.toString());
}
}
private void initializeBulkProcessor() {
int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE);
bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize;
TimeValue interval = settings.getAsTime("shield.audit.index.flush_interval", DEFAULT_FLUSH_INTERVAL);
interval = (interval.millis() < 1) ? DEFAULT_FLUSH_INTERVAL : interval;
bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.info("failed to bulk index audit events: [{}]", response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("failed to bulk index audit events: [{}]", failure, failure.getMessage());
}
}).setBulkActions(bulkSize)
.setFlushInterval(interval)
.setConcurrentRequests(1)
.build();
}
static class Message {
final long timestamp;

View File

@ -1,189 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.audit.index;
import com.google.common.base.Splitter;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.shield.authc.AuthenticationService;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
/**
*
*/
public class IndexAuditTrailBulkProcessor extends AbstractLifecycleComponent<IndexAuditTrailBulkProcessor> {
public static final int DEFAULT_BULK_SIZE = 1000;
public static final int MAX_BULK_SIZE = 10000;
public static final String INDEX_NAME_PREFIX = ".shield-audit-log";
public static final String DOC_TYPE = "event";
public static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
public static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY;
private static BulkProcessor bulkProcessor;
private final Provider<Client> clientProvider;
private final IndexAuditUserHolder auditUser;
private final AuthenticationService authenticationService;
private final IndexNameResolver resolver;
private final IndexNameResolver.Rollover rollover;
private final Environment environment;
private Client client;
private boolean indexToRemoteCluster;
@Inject
public IndexAuditTrailBulkProcessor(Settings settings, Environment environment, AuthenticationService authenticationService,
IndexAuditUserHolder auditUser, Provider<Client> clientProvider) {
super(settings);
this.authenticationService = authenticationService;
this.auditUser = auditUser;
this.clientProvider = clientProvider;
this.environment = environment;
IndexNameResolver.Rollover rollover;
try {
rollover = IndexNameResolver.Rollover.valueOf(
settings.get("shield.audit.index.rollover", DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]",
DEFAULT_ROLLOVER.name());
rollover = DEFAULT_ROLLOVER;
}
this.rollover = rollover;
this.resolver = new IndexNameResolver();
}
@Override
protected void doStart() throws ElasticsearchException {
initializeClient();
initializeBulkProcessor();
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
try {
if (bulkProcessor != null) {
bulkProcessor.close();
}
} finally {
if (indexToRemoteCluster) {
if (client != null) {
client.close();
}
}
}
}
public void submit(IndexAuditTrail.Message message) {
assert lifecycle.started();
IndexRequest indexRequest = client.prepareIndex()
.setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover))
.setType(DOC_TYPE).setSource(message.builder).request();
authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user());
bulkProcessor.add(indexRequest);
}
private void initializeClient() {
Settings clientSettings = settings.getByPrefix("shield.audit.index.client.");
if (clientSettings.names().size() == 0) {
// in the absence of client settings for remote indexing, fall back to the client that was passed in.
this.client = clientProvider.get();
indexToRemoteCluster = false;
} else {
String[] hosts = clientSettings.getAsArray("hosts");
if (hosts.length == 0) {
throw new ElasticsearchException("missing required setting " +
"[shield.audit.index.client.hosts] for remote audit log indexing");
}
if (clientSettings.get("cluster.name", "").isEmpty()) {
throw new ElasticsearchException("missing required setting " +
"[shield.audit.index.client.cluster.name] for remote audit log indexing");
}
List<Tuple<String, Integer>> hostPortPairs = new ArrayList<>();
for (String host : hosts) {
List<String> hostPort = Splitter.on(":").splitToList(host.trim());
if (hostPort.size() != 1 && hostPort.size() != 2) {
logger.warn("invalid host:port specified: [{}] for setting [shield.audit.index.client.hosts]", host);
}
hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300));
}
if (hostPortPairs.size() == 0) {
throw new ElasticsearchException("no valid host:port pairs specified for setting [shield.audit.index.client.hosts]");
}
final TransportClient transportClient = TransportClient.builder()
.settings(Settings.builder().put(clientSettings).put("path.home", environment.homeFile()).build()).build();
for (Tuple<String, Integer> pair : hostPortPairs) {
transportClient.addTransportAddress(new InetSocketTransportAddress(pair.v1(), pair.v2()));
}
this.client = transportClient;
indexToRemoteCluster = true;
logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]",
clientSettings.get("cluster.name", ""), hostPortPairs.toString());
}
}
private void initializeBulkProcessor() {
int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE);
bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize;
TimeValue interval = settings.getAsTime("shield.audit.index.flush_interval", DEFAULT_FLUSH_INTERVAL);
interval = (interval.millis() < 1) ? DEFAULT_FLUSH_INTERVAL : interval;
bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.info("failed to bulk index audit events: [{}]", response.buildFailureMessage());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("failed to bulk index audit events: [{}]", failure, failure.getMessage());
}
}).setBulkActions(bulkSize)
.setFlushInterval(interval)
.setConcurrentRequests(1)
.build();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.audit.index;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.EnumSet;
import java.util.Locale;
import static org.hamcrest.Matchers.*;
public class IndexAuditLevelTests extends ElasticsearchTestCase {
@Test
public void testAllIndexAuditLevel() {
EnumSet<IndexAuditLevel> enumSet = IndexAuditLevel.parse(new String[] { "_all" });
IndexAuditLevel[] levels = IndexAuditLevel.values();
assertThat(enumSet.size(), is(levels.length));
for (IndexAuditLevel level : levels) {
assertThat(enumSet.contains(level), is(true));
}
}
@Test
public void testExcludeHasPreference() {
EnumSet<IndexAuditLevel> enumSet = IndexAuditLevel.parse(new String[] { "_all" }, new String[] { "_all" });
assertThat(enumSet.size(), is(0));
}
@Test
public void testExcludeHasPreferenceSingle() {
String excluded = randomFrom(IndexAuditLevel.values()).toString().toLowerCase(Locale.ROOT);
EnumSet<IndexAuditLevel> enumSet = IndexAuditLevel.parse(new String[] { "_all" }, new String[] { excluded });
EnumSet<IndexAuditLevel> expected = EnumSet.allOf(IndexAuditLevel.class);
expected.remove(IndexAuditLevel.valueOf(excluded.toUpperCase(Locale.ROOT)));
assertThat(enumSet, equalTo(expected));
}
}

View File

@ -52,7 +52,6 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
private IndexNameResolver resolver = new IndexNameResolver();
private IndexNameResolver.Rollover rollover;
private IndexAuditTrailBulkProcessor bulkProcessor;
private IndexAuditTrail auditor;
private boolean remoteIndexing = false;
@ -61,7 +60,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
public static final String REMOTE_TEST_CLUSTER = "single-node-remote-test-cluster";
private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX);
private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX);
private Settings commonSettings(IndexNameResolver.Rollover rollover) {
return Settings.builder()
@ -80,44 +79,35 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
.build();
}
private Settings mutedSettings(boolean systemEnabled, String... muted) {
private Settings levelSettings(String[] includes, String[] excludes) {
Settings.Builder builder = Settings.builder();
for (String mute : muted) {
builder.put("shield.audit.index.events." + mute, false);
if (includes != null) {
builder.putArray("shield.audit.index.events.include", includes);
}
if (excludes != null) {
builder.putArray("shield.audit.index.events.exclude", excludes);
}
builder.put("shield.audit.index.events.system.access_granted", systemEnabled);
return builder.build();
}
private Settings settings(IndexNameResolver.Rollover rollover, boolean systemEnabled, String... muted) {
private Settings settings(IndexNameResolver.Rollover rollover, String[] includes, String[] excludes) {
Settings.Builder builder = Settings.builder();
builder.put(mutedSettings(systemEnabled, muted));
builder.put(levelSettings(includes, excludes));
builder.put(commonSettings(rollover));
return builder.build();
}
private IndexAuditTrailBulkProcessor buildIndexAuditTrailService(Settings settings) {
AuthenticationService authService = mock(AuthenticationService.class);
when(authService.authenticate(mock(RestRequest.class))).thenThrow(UnsupportedOperationException.class);
when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(UnsupportedOperationException.class);
Environment env = new Environment(settings);
return new IndexAuditTrailBulkProcessor(settings, env, authService, user, Providers.of(client()));
}
private Client getClient() {
return remoteIndexing ? remoteClient : client();
}
private void initialize(String... muted) {
initialize(false, muted);
private void initialize(String... excludes) {
initialize(null, excludes);
}
private void initialize(boolean systemEnabled, String... muted) {
private void initialize(String[] includes, String[] excludes) {
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
Settings settings = settings(rollover, systemEnabled, muted);
Settings settings = settings(rollover, includes, excludes);
remoteIndexing = randomBoolean();
if (remoteIndexing) {
@ -137,16 +127,22 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
.build();
}
Settings settings1 = Settings.builder().put(settings).put("path.home", createTempDir()).build();
settings = Settings.builder().put(settings).put("path.home", createTempDir()).build();
logger.info("--> settings: [{}]", settings.getAsMap().toString());
bulkProcessor = buildIndexAuditTrailService(settings1);
bulkProcessor.start();
auditor = new IndexAuditTrail(settings, user, bulkProcessor);
AuthenticationService authService = mock(AuthenticationService.class);
when(authService.authenticate(mock(RestRequest.class))).thenThrow(new UnsupportedOperationException(""));
when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(new UnsupportedOperationException(""));
Environment env = new Environment(settings);
auditor = new IndexAuditTrail(settings, user, env, authService, Providers.of(client())).start();
}
@After
public void afterTest() {
bulkProcessor.close();
if (auditor != null) {
auditor.close();
}
cluster().wipe();
if (remoteIndexing && remoteNode != null) {
DeleteIndexResponse response = remoteClient.admin().indices().prepareDelete("*").execute().actionGet();
@ -397,7 +393,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
@Test
public void testSystemAccessGranted() throws Exception {
initialize(true);
initialize(new String[] { "system_access_granted" }, null);
TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage();
auditor.accessGranted(User.SYSTEM, "internal:_action", message);
awaitIndexCreation(resolveIndexName());
@ -572,7 +568,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
private SearchHit getIndexedAuditMessage() {
SearchResponse response = getClient().prepareSearch(resolveIndexName())
.setTypes(IndexAuditTrailBulkProcessor.DOC_TYPE)
.setTypes(IndexAuditTrail.DOC_TYPE)
.addFields(fieldList())
.execute().actionGet();
@ -605,7 +601,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
}
private String resolveIndexName() {
return resolver.resolve(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover);
return resolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover);
}
}