diff --git a/src/main/java/org/elasticsearch/shield/ShieldPlugin.java b/src/main/java/org/elasticsearch/shield/ShieldPlugin.java index b2999d8d2b8..dfc047474fa 100644 --- a/src/main/java/org/elasticsearch/shield/ShieldPlugin.java +++ b/src/main/java/org/elasticsearch/shield/ShieldPlugin.java @@ -14,7 +14,7 @@ import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.AbstractPlugin; -import org.elasticsearch.shield.audit.index.IndexAuditTrailBulkProcessor; +import org.elasticsearch.shield.audit.index.IndexAuditTrail; import org.elasticsearch.shield.authc.Realms; import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.authc.support.UsernamePasswordToken; @@ -71,7 +71,7 @@ public class ShieldPlugin extends AbstractPlugin { if (enabled && !clientMode) { if (indexAuditLoggingEnabled(settings)) { // index-based audit logging should be started before other services - builder.add(IndexAuditTrailBulkProcessor.class); + builder.add(IndexAuditTrail.class); } builder.add(LicenseService.class).add(InternalCryptoService.class).add(FileRolesStore.class).add(Realms.class).add(IPFilter.class); } diff --git a/src/main/java/org/elasticsearch/shield/audit/AuditTrailModule.java b/src/main/java/org/elasticsearch/shield/audit/AuditTrailModule.java index d3016dd599e..55963ddef75 100644 --- a/src/main/java/org/elasticsearch/shield/audit/AuditTrailModule.java +++ b/src/main/java/org/elasticsearch/shield/audit/AuditTrailModule.java @@ -12,7 +12,6 @@ import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.shield.audit.index.IndexAuditTrail; -import org.elasticsearch.shield.audit.index.IndexAuditTrailBulkProcessor; import org.elasticsearch.shield.audit.index.IndexAuditUserHolder; import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail; import org.elasticsearch.shield.authz.AuthorizationModule; @@ -59,7 +58,6 @@ public class AuditTrailModule extends AbstractShieldModule.Node implements PrePr bind(IndexAuditUserHolder.class).toInstance(indexAuditUser); binder.addBinding().to(IndexAuditTrail.class); bind(IndexAuditTrail.class).asEagerSingleton(); - bind(IndexAuditTrailBulkProcessor.class).asEagerSingleton(); break; default: throw new ElasticsearchException("unknown audit trail output [" + output + "]"); @@ -71,7 +69,7 @@ public class AuditTrailModule extends AbstractShieldModule.Node implements PrePr public void processModule(Module module) { if (enabled && module instanceof AuthorizationModule) { if (indexAuditLoggingEnabled(settings)) { - indexAuditUser = new IndexAuditUserHolder(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX); + indexAuditUser = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX); ((AuthorizationModule) module).registerReservedRole(indexAuditUser.role()); } } diff --git a/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditLevel.java b/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditLevel.java new file mode 100644 index 00000000000..d4e882d06c7 --- /dev/null +++ b/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditLevel.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.shield.audit.index; + +import org.elasticsearch.shield.ShieldException; + +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Locale; + +public enum IndexAuditLevel { + + ANONYMOUS_ACCESS_DENIED, + AUTHENTICATION_FAILED, + ACCESS_GRANTED, + ACCESS_DENIED, + TAMPERED_REQUEST, + CONNECTION_GRANTED, + CONNECTION_DENIED, + SYSTEM_ACCESS_GRANTED; + + static EnumSet parse(String[] levels) { + EnumSet enumSet = EnumSet.noneOf(IndexAuditLevel.class); + for (String level : levels) { + String lowerCaseLevel = level.trim().toLowerCase(Locale.ROOT); + switch (lowerCaseLevel) { + case "_all": + enumSet.addAll(Arrays.asList(IndexAuditLevel.values())); + break; + case "anonymous_access_denied": + enumSet.add(ANONYMOUS_ACCESS_DENIED); + break; + case "authentication_failed": + enumSet.add(AUTHENTICATION_FAILED); + break; + case "access_granted": + enumSet.add(ACCESS_GRANTED); + break; + case "access_denied": + enumSet.add(ACCESS_DENIED); + break; + case "tampered_request": + enumSet.add(TAMPERED_REQUEST); + break; + case "connection_granted": + enumSet.add(CONNECTION_GRANTED); + break; + case "connection_denied": + enumSet.add(CONNECTION_DENIED); + break; + case "system_access_granted": + enumSet.add(SYSTEM_ACCESS_GRANTED); + break; + default: + throw new ShieldException("invalid event name specified [" + level + "]"); + } + } + return enumSet; + } + + public static EnumSet parse(String[] includeLevels, String[] excludeLevels) { + EnumSet included = parse(includeLevels); + EnumSet excluded = parse(excludeLevels); + included.removeAll(excluded); + return included; + } +} diff --git a/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java b/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java index 840935197c4..3a5d7681f49 100644 --- a/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java +++ b/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrail.java @@ -5,20 +5,32 @@ */ package org.elasticsearch.shield.audit.index; +import com.google.common.base.Splitter; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.env.Environment; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.shield.User; import org.elasticsearch.shield.audit.AuditTrail; +import org.elasticsearch.shield.authc.AuthenticationService; import org.elasticsearch.shield.authc.AuthenticationToken; import org.elasticsearch.shield.authz.Privilege; import org.elasticsearch.shield.rest.RemoteHostHeader; @@ -30,55 +42,85 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.EnumSet; +import java.util.List; +import java.util.Locale; import static org.elasticsearch.shield.audit.AuditUtil.indices; import static org.elasticsearch.shield.audit.AuditUtil.restRequestContent; +import static org.elasticsearch.shield.audit.index.IndexAuditLevel.*; /** * Audit trail implementation that writes events into an index. */ -public class IndexAuditTrail implements AuditTrail { - - private static final ESLogger logger = ESLoggerFactory.getLogger(IndexAuditTrail.class.getName()); +public class IndexAuditTrail extends AbstractLifecycleComponent implements AuditTrail { + public static final int DEFAULT_BULK_SIZE = 1000; + public static final int MAX_BULK_SIZE = 10000; + public static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1); + public static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY; public static final String NAME = "index"; + public static final String INDEX_NAME_PREFIX = ".shield-audit-log"; + public static final String DOC_TYPE = "event"; + + static final String[] DEFAULT_EVENT_INCLUDES = new String[] { + ACCESS_DENIED.toString(), + ACCESS_GRANTED.toString(), + ANONYMOUS_ACCESS_DENIED.toString(), + AUTHENTICATION_FAILED.toString(), + CONNECTION_DENIED.toString(), + CONNECTION_GRANTED.toString(), + TAMPERED_REQUEST.toString() + }; private final String nodeName; - private final String nodeHostName; - private final String nodeHostAddress; private final IndexAuditUserHolder auditUser; - private final IndexAuditTrailBulkProcessor processor; + private final Provider clientProvider; + private final AuthenticationService authenticationService; + private final IndexNameResolver resolver; + private final Environment environment; + + private BulkProcessor bulkProcessor; + private Client client; + private boolean indexToRemoteCluster; + private IndexNameResolver.Rollover rollover; + private String nodeHostName; + private String nodeHostAddress; @Override public String name() { return NAME; } - private enum Level { - ANONYMOUS_ACCESS_DENIED, - AUTHENTICATION_FAILED, - ACCESS_GRANTED, - ACCESS_DENIED, - TAMPERED_REQUEST, - CONNECTION_GRANTED, - CONNECTION_DENIED, - SYSTEM_ACCESS_GRANTED; - } - - private final EnumSet enabled = EnumSet.allOf(Level.class); + private EnumSet events; @Inject public IndexAuditTrail(Settings settings, IndexAuditUserHolder indexingAuditUser, - IndexAuditTrailBulkProcessor processor) { - + Environment environment, AuthenticationService authenticationService, + Provider clientProvider) { + super(settings); this.auditUser = indexingAuditUser; - this.processor = processor; + this.authenticationService = authenticationService; + this.clientProvider = clientProvider; + this.environment = environment; + this.resolver = new IndexNameResolver(); this.nodeName = settings.get("name"); + } + + @Override + protected void doStart() throws ElasticsearchException { + try { + rollover = IndexNameResolver.Rollover.valueOf( + settings.get("shield.audit.index.rollover", DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH)); + } catch (IllegalArgumentException e) { + logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]", + DEFAULT_ROLLOVER.name()); + rollover = DEFAULT_ROLLOVER; + } String hostname = "n/a"; String hostaddr = "n/a"; - try { hostname = InetAddress.getLocalHost().getHostName(); hostaddr = InetAddress.getLocalHost().getHostAddress(); @@ -88,37 +130,38 @@ public class IndexAuditTrail implements AuditTrail { this.nodeHostName = hostname; this.nodeHostAddress = hostaddr; - if (!settings.getAsBoolean("shield.audit.index.events.system.access_granted", false)) { - enabled.remove(Level.SYSTEM_ACCESS_GRANTED); - } - if (!settings.getAsBoolean("shield.audit.index.events.anonymous_access_denied", true)) { - enabled.remove(Level.ANONYMOUS_ACCESS_DENIED); - } - if (!settings.getAsBoolean("shield.audit.index.events.authentication_failed", true)) { - enabled.remove(Level.AUTHENTICATION_FAILED); - } - if (!settings.getAsBoolean("shield.audit.index.events.access_granted", true)) { - enabled.remove(Level.ACCESS_GRANTED); - } - if (!settings.getAsBoolean("shield.audit.index.events.access_denied", true)) { - enabled.remove(Level.ACCESS_DENIED); - } - if (!settings.getAsBoolean("shield.audit.index.events.tampered_request", true)) { - enabled.remove(Level.TAMPERED_REQUEST); - } - if (!settings.getAsBoolean("shield.audit.index.events.connection_granted", true)) { - enabled.remove(Level.CONNECTION_GRANTED); - } - if (!settings.getAsBoolean("shield.audit.index.events.connection_denied", true)) { - enabled.remove(Level.CONNECTION_DENIED); + String[] includedEvents = settings.getAsArray("shield.audit.index.events.include", DEFAULT_EVENT_INCLUDES); + String[] excludedEvents = settings.getAsArray("shield.audit.index.events.exclude"); + events = parse(includedEvents, excludedEvents); + + initializeClient(); + initializeBulkProcessor(); + } + + @Override + protected void doStop() throws ElasticsearchException { + } + + @Override + protected void doClose() throws ElasticsearchException { + try { + if (bulkProcessor != null) { + bulkProcessor.close(); + } + } finally { + if (indexToRemoteCluster) { + if (client != null) { + client.close(); + } + } } } @Override public void anonymousAccessDenied(String action, TransportMessage message) { - if (enabled.contains(Level.ANONYMOUS_ACCESS_DENIED)) { + if (events.contains(ANONYMOUS_ACCESS_DENIED)) { try { - processor.submit(message("anonymous_access_denied", action, null, null, indices(message), message)); + submit(message("anonymous_access_denied", action, null, null, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [anonymous_access_denied]", e); } @@ -127,9 +170,9 @@ public class IndexAuditTrail implements AuditTrail { @Override public void anonymousAccessDenied(RestRequest request) { - if (enabled.contains(Level.ANONYMOUS_ACCESS_DENIED)) { + if (events.contains(ANONYMOUS_ACCESS_DENIED)) { try { - processor.submit(message("anonymous_access_denied", null, null, null, null, request)); + submit(message("anonymous_access_denied", null, null, null, null, request)); } catch (Exception e) { logger.warn("failed to index audit event: [anonymous_access_denied]", e); } @@ -138,9 +181,9 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(String action, TransportMessage message) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { try { - processor.submit(message("authentication_failed", action, null, null, indices(message), message)); + submit(message("authentication_failed", action, null, null, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -149,9 +192,9 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(RestRequest request) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { try { - processor.submit(message("authentication_failed", null, null, null, null, request)); + submit(message("authentication_failed", null, null, null, null, request)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -160,10 +203,10 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(AuthenticationToken token, String action, TransportMessage message) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { if (!principalIsAuditor(token.principal())) { try { - processor.submit(message("authentication_failed", action, token.principal(), null, indices(message), message)); + submit(message("authentication_failed", action, token.principal(), null, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -173,10 +216,10 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(AuthenticationToken token, RestRequest request) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { if (!principalIsAuditor(token.principal())) { try { - processor.submit(message("authentication_failed", null, token.principal(), null, null, request)); + submit(message("authentication_failed", null, token.principal(), null, null, request)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -186,10 +229,10 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(String realm, AuthenticationToken token, String action, TransportMessage message) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { if (!principalIsAuditor(token.principal())) { try { - processor.submit(message("authentication_failed", action, token.principal(), realm, indices(message), message)); + submit(message("authentication_failed", action, token.principal(), realm, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -199,10 +242,10 @@ public class IndexAuditTrail implements AuditTrail { @Override public void authenticationFailed(String realm, AuthenticationToken token, RestRequest request) { - if (enabled.contains(Level.AUTHENTICATION_FAILED)) { + if (events.contains(AUTHENTICATION_FAILED)) { if (!principalIsAuditor(token.principal())) { try { - processor.submit(message("authentication_failed", null, token.principal(), realm, null, request)); + submit(message("authentication_failed", null, token.principal(), realm, null, request)); } catch (Exception e) { logger.warn("failed to index audit event: [authentication_failed]", e); } @@ -212,34 +255,32 @@ public class IndexAuditTrail implements AuditTrail { @Override public void accessGranted(User user, String action, TransportMessage message) { - if (enabled.contains(Level.ACCESS_GRANTED)) { - if (!principalIsAuditor(user.principal())) { - // special treatment for internal system actions - only log if explicitly told to - if (user.isSystem() && Privilege.SYSTEM.predicate().apply(action)) { - if (enabled.contains(Level.SYSTEM_ACCESS_GRANTED)) { - try { - processor.submit(message("access_granted", action, user.principal(), null, indices(message), message)); - } catch (Exception e) { - logger.warn("failed to index audit event: [access_granted]", e); - } - } - } else { + if (!principalIsAuditor(user.principal())) { + // special treatment for internal system actions - only log if explicitly told to + if (user.isSystem() && Privilege.SYSTEM.predicate().apply(action)) { + if (events.contains(SYSTEM_ACCESS_GRANTED)) { try { - processor.submit(message("access_granted", action, user.principal(), null, indices(message), message)); + submit(message("access_granted", action, user.principal(), null, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [access_granted]", e); } } + } else if (events.contains(ACCESS_GRANTED)) { + try { + submit(message("access_granted", action, user.principal(), null, indices(message), message)); + } catch (Exception e) { + logger.warn("failed to index audit event: [access_granted]", e); + } } } } @Override public void accessDenied(User user, String action, TransportMessage message) { - if (enabled.contains(Level.ACCESS_DENIED)) { + if (events.contains(ACCESS_DENIED)) { if (!principalIsAuditor(user.principal())) { try { - processor.submit(message("access_denied", action, user.principal(), null, indices(message), message)); + submit(message("access_denied", action, user.principal(), null, indices(message), message)); } catch (Exception e) { logger.warn("failed to index audit event: [access_denied]", e); } @@ -249,10 +290,10 @@ public class IndexAuditTrail implements AuditTrail { @Override public void tamperedRequest(User user, String action, TransportRequest request) { - if (enabled.contains(Level.TAMPERED_REQUEST)) { + if (events.contains(TAMPERED_REQUEST)) { if (!principalIsAuditor(user.principal())) { try { - processor.submit(message("tampered_request", action, user.principal(), null, indices(request), request)); + submit(message("tampered_request", action, user.principal(), null, indices(request), request)); } catch (Exception e) { logger.warn("failed to index audit event: [tampered_request]", e); } @@ -262,9 +303,9 @@ public class IndexAuditTrail implements AuditTrail { @Override public void connectionGranted(InetAddress inetAddress, String profile, ShieldIpFilterRule rule) { - if (enabled.contains(Level.CONNECTION_GRANTED)) { + if (events.contains(CONNECTION_GRANTED)) { try { - processor.submit(message("ip_filter", "connection_granted", inetAddress, profile, rule)); + submit(message("ip_filter", "connection_granted", inetAddress, profile, rule)); } catch (Exception e) { logger.warn("failed to index audit event: [connection_granted]", e); } @@ -273,9 +314,9 @@ public class IndexAuditTrail implements AuditTrail { @Override public void connectionDenied(InetAddress inetAddress, String profile, ShieldIpFilterRule rule) { - if (enabled.contains(Level.CONNECTION_DENIED)) { + if (events.contains(CONNECTION_DENIED)) { try { - processor.submit(message("ip_filter", "connection_denied", inetAddress, profile, rule)); + submit(message("ip_filter", "connection_denied", inetAddress, profile, rule)); } catch (Exception e) { logger.warn("failed to index audit event: [connection_denied]", e); } @@ -390,6 +431,94 @@ public class IndexAuditTrail implements AuditTrail { return builder; } + void submit(IndexAuditTrail.Message message) { + assert lifecycle.started(); + IndexRequest indexRequest = client.prepareIndex() + .setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover)) + .setType(DOC_TYPE).setSource(message.builder).request(); + authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user()); + bulkProcessor.add(indexRequest); + } + + private void initializeClient() { + + Settings clientSettings = settings.getByPrefix("shield.audit.index.client."); + + if (clientSettings.names().size() == 0) { + // in the absence of client settings for remote indexing, fall back to the client that was passed in. + this.client = clientProvider.get(); + indexToRemoteCluster = false; + } else { + String[] hosts = clientSettings.getAsArray("hosts"); + if (hosts.length == 0) { + throw new ElasticsearchException("missing required setting " + + "[shield.audit.index.client.hosts] for remote audit log indexing"); + } + + if (clientSettings.get("cluster.name", "").isEmpty()) { + throw new ElasticsearchException("missing required setting " + + "[shield.audit.index.client.cluster.name] for remote audit log indexing"); + } + + List> hostPortPairs = new ArrayList<>(); + + for (String host : hosts) { + List hostPort = Splitter.on(":").splitToList(host.trim()); + if (hostPort.size() != 1 && hostPort.size() != 2) { + logger.warn("invalid host:port specified: [{}] for setting [shield.audit.index.client.hosts]", host); + } + hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300)); + } + + if (hostPortPairs.size() == 0) { + throw new ElasticsearchException("no valid host:port pairs specified for setting [shield.audit.index.client.hosts]"); + } + + final TransportClient transportClient = TransportClient.builder() + .settings(Settings.builder().put(clientSettings).put("path.home", environment.homeFile()).build()).build(); + for (Tuple pair : hostPortPairs) { + transportClient.addTransportAddress(new InetSocketTransportAddress(pair.v1(), pair.v2())); + } + + this.client = transportClient; + indexToRemoteCluster = true; + + logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]", + clientSettings.get("cluster.name", ""), hostPortPairs.toString()); + } + } + + private void initializeBulkProcessor() { + + int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE); + bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize; + + TimeValue interval = settings.getAsTime("shield.audit.index.flush_interval", DEFAULT_FLUSH_INTERVAL); + interval = (interval.millis() < 1) ? DEFAULT_FLUSH_INTERVAL : interval; + + bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + authenticationService.attachUserHeaderIfMissing(request, auditUser.user()); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + logger.info("failed to bulk index audit events: [{}]", response.buildFailureMessage()); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("failed to bulk index audit events: [{}]", failure, failure.getMessage()); + } + }).setBulkActions(bulkSize) + .setFlushInterval(interval) + .setConcurrentRequests(1) + .build(); + } + static class Message { final long timestamp; diff --git a/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrailBulkProcessor.java b/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrailBulkProcessor.java deleted file mode 100644 index f8207345cba..00000000000 --- a/src/main/java/org/elasticsearch/shield/audit/index/IndexAuditTrailBulkProcessor.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.shield.audit.index; - -import com.google.common.base.Splitter; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Provider; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.env.Environment; -import org.elasticsearch.shield.authc.AuthenticationService; - -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; - -/** - * - */ -public class IndexAuditTrailBulkProcessor extends AbstractLifecycleComponent { - - public static final int DEFAULT_BULK_SIZE = 1000; - public static final int MAX_BULK_SIZE = 10000; - public static final String INDEX_NAME_PREFIX = ".shield-audit-log"; - public static final String DOC_TYPE = "event"; - - public static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1); - public static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY; - - private static BulkProcessor bulkProcessor; - - private final Provider clientProvider; - private final IndexAuditUserHolder auditUser; - private final AuthenticationService authenticationService; - private final IndexNameResolver resolver; - private final IndexNameResolver.Rollover rollover; - private final Environment environment; - - private Client client; - private boolean indexToRemoteCluster; - - @Inject - public IndexAuditTrailBulkProcessor(Settings settings, Environment environment, AuthenticationService authenticationService, - IndexAuditUserHolder auditUser, Provider clientProvider) { - super(settings); - this.authenticationService = authenticationService; - this.auditUser = auditUser; - this.clientProvider = clientProvider; - this.environment = environment; - - IndexNameResolver.Rollover rollover; - try { - rollover = IndexNameResolver.Rollover.valueOf( - settings.get("shield.audit.index.rollover", DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH)); - } catch (IllegalArgumentException e) { - logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]", - DEFAULT_ROLLOVER.name()); - rollover = DEFAULT_ROLLOVER; - } - this.rollover = rollover; - this.resolver = new IndexNameResolver(); - } - - @Override - protected void doStart() throws ElasticsearchException { - initializeClient(); - initializeBulkProcessor(); - } - - @Override - protected void doStop() throws ElasticsearchException { - } - - @Override - protected void doClose() throws ElasticsearchException { - try { - if (bulkProcessor != null) { - bulkProcessor.close(); - } - } finally { - if (indexToRemoteCluster) { - if (client != null) { - client.close(); - } - } - } - } - - public void submit(IndexAuditTrail.Message message) { - assert lifecycle.started(); - IndexRequest indexRequest = client.prepareIndex() - .setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover)) - .setType(DOC_TYPE).setSource(message.builder).request(); - authenticationService.attachUserHeaderIfMissing(indexRequest, auditUser.user()); - bulkProcessor.add(indexRequest); - } - - private void initializeClient() { - - Settings clientSettings = settings.getByPrefix("shield.audit.index.client."); - - if (clientSettings.names().size() == 0) { - // in the absence of client settings for remote indexing, fall back to the client that was passed in. - this.client = clientProvider.get(); - indexToRemoteCluster = false; - } else { - String[] hosts = clientSettings.getAsArray("hosts"); - if (hosts.length == 0) { - throw new ElasticsearchException("missing required setting " + - "[shield.audit.index.client.hosts] for remote audit log indexing"); - } - - if (clientSettings.get("cluster.name", "").isEmpty()) { - throw new ElasticsearchException("missing required setting " + - "[shield.audit.index.client.cluster.name] for remote audit log indexing"); - } - - List> hostPortPairs = new ArrayList<>(); - - for (String host : hosts) { - List hostPort = Splitter.on(":").splitToList(host.trim()); - if (hostPort.size() != 1 && hostPort.size() != 2) { - logger.warn("invalid host:port specified: [{}] for setting [shield.audit.index.client.hosts]", host); - } - hostPortPairs.add(new Tuple<>(hostPort.get(0), hostPort.size() == 2 ? Integer.valueOf(hostPort.get(1)) : 9300)); - } - - if (hostPortPairs.size() == 0) { - throw new ElasticsearchException("no valid host:port pairs specified for setting [shield.audit.index.client.hosts]"); - } - - final TransportClient transportClient = TransportClient.builder() - .settings(Settings.builder().put(clientSettings).put("path.home", environment.homeFile()).build()).build(); - for (Tuple pair : hostPortPairs) { - transportClient.addTransportAddress(new InetSocketTransportAddress(pair.v1(), pair.v2())); - } - - this.client = transportClient; - indexToRemoteCluster = true; - - logger.info("forwarding audit events to remote cluster [{}] using hosts [{}]", - clientSettings.get("cluster.name", ""), hostPortPairs.toString()); - } - } - - private void initializeBulkProcessor() { - - int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE); - bulkSize = (bulkSize < 1) ? DEFAULT_BULK_SIZE : bulkSize; - - TimeValue interval = settings.getAsTime("shield.audit.index.flush_interval", DEFAULT_FLUSH_INTERVAL); - interval = (interval.millis() < 1) ? DEFAULT_FLUSH_INTERVAL : interval; - - bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - authenticationService.attachUserHeaderIfMissing(request, auditUser.user()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - logger.info("failed to bulk index audit events: [{}]", response.buildFailureMessage()); - } - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - logger.error("failed to bulk index audit events: [{}]", failure, failure.getMessage()); - } - }).setBulkActions(bulkSize) - .setFlushInterval(interval) - .setConcurrentRequests(1) - .build(); - } -} diff --git a/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditLevelTests.java b/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditLevelTests.java new file mode 100644 index 00000000000..367342ac745 --- /dev/null +++ b/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditLevelTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.shield.audit.index; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.EnumSet; +import java.util.Locale; + +import static org.hamcrest.Matchers.*; + +public class IndexAuditLevelTests extends ElasticsearchTestCase { + + @Test + public void testAllIndexAuditLevel() { + EnumSet enumSet = IndexAuditLevel.parse(new String[] { "_all" }); + IndexAuditLevel[] levels = IndexAuditLevel.values(); + assertThat(enumSet.size(), is(levels.length)); + for (IndexAuditLevel level : levels) { + assertThat(enumSet.contains(level), is(true)); + } + } + + @Test + public void testExcludeHasPreference() { + EnumSet enumSet = IndexAuditLevel.parse(new String[] { "_all" }, new String[] { "_all" }); + assertThat(enumSet.size(), is(0)); + } + + @Test + public void testExcludeHasPreferenceSingle() { + String excluded = randomFrom(IndexAuditLevel.values()).toString().toLowerCase(Locale.ROOT); + EnumSet enumSet = IndexAuditLevel.parse(new String[] { "_all" }, new String[] { excluded }); + EnumSet expected = EnumSet.allOf(IndexAuditLevel.class); + expected.remove(IndexAuditLevel.valueOf(excluded.toUpperCase(Locale.ROOT))); + assertThat(enumSet, equalTo(expected)); + } + +} diff --git a/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java b/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java index 21e8cc8cff4..da8405d14d4 100644 --- a/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java +++ b/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java @@ -52,7 +52,6 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { private IndexNameResolver resolver = new IndexNameResolver(); private IndexNameResolver.Rollover rollover; - private IndexAuditTrailBulkProcessor bulkProcessor; private IndexAuditTrail auditor; private boolean remoteIndexing = false; @@ -61,7 +60,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { public static final String REMOTE_TEST_CLUSTER = "single-node-remote-test-cluster"; - private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX); + private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX); private Settings commonSettings(IndexNameResolver.Rollover rollover) { return Settings.builder() @@ -80,44 +79,35 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { .build(); } - private Settings mutedSettings(boolean systemEnabled, String... muted) { + private Settings levelSettings(String[] includes, String[] excludes) { Settings.Builder builder = Settings.builder(); - for (String mute : muted) { - builder.put("shield.audit.index.events." + mute, false); + if (includes != null) { + builder.putArray("shield.audit.index.events.include", includes); + } + if (excludes != null) { + builder.putArray("shield.audit.index.events.exclude", excludes); } - - builder.put("shield.audit.index.events.system.access_granted", systemEnabled); return builder.build(); } - private Settings settings(IndexNameResolver.Rollover rollover, boolean systemEnabled, String... muted) { + private Settings settings(IndexNameResolver.Rollover rollover, String[] includes, String[] excludes) { Settings.Builder builder = Settings.builder(); - builder.put(mutedSettings(systemEnabled, muted)); + builder.put(levelSettings(includes, excludes)); builder.put(commonSettings(rollover)); return builder.build(); } - private IndexAuditTrailBulkProcessor buildIndexAuditTrailService(Settings settings) { - - AuthenticationService authService = mock(AuthenticationService.class); - when(authService.authenticate(mock(RestRequest.class))).thenThrow(UnsupportedOperationException.class); - when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(UnsupportedOperationException.class); - - Environment env = new Environment(settings); - return new IndexAuditTrailBulkProcessor(settings, env, authService, user, Providers.of(client())); - } - private Client getClient() { return remoteIndexing ? remoteClient : client(); } - private void initialize(String... muted) { - initialize(false, muted); + private void initialize(String... excludes) { + initialize(null, excludes); } - private void initialize(boolean systemEnabled, String... muted) { + private void initialize(String[] includes, String[] excludes) { rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY); - Settings settings = settings(rollover, systemEnabled, muted); + Settings settings = settings(rollover, includes, excludes); remoteIndexing = randomBoolean(); if (remoteIndexing) { @@ -137,16 +127,22 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { .build(); } - Settings settings1 = Settings.builder().put(settings).put("path.home", createTempDir()).build(); + settings = Settings.builder().put(settings).put("path.home", createTempDir()).build(); logger.info("--> settings: [{}]", settings.getAsMap().toString()); - bulkProcessor = buildIndexAuditTrailService(settings1); - bulkProcessor.start(); - auditor = new IndexAuditTrail(settings, user, bulkProcessor); + AuthenticationService authService = mock(AuthenticationService.class); + when(authService.authenticate(mock(RestRequest.class))).thenThrow(new UnsupportedOperationException("")); + when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(new UnsupportedOperationException("")); + + Environment env = new Environment(settings); + auditor = new IndexAuditTrail(settings, user, env, authService, Providers.of(client())).start(); } @After public void afterTest() { - bulkProcessor.close(); + if (auditor != null) { + auditor.close(); + } + cluster().wipe(); if (remoteIndexing && remoteNode != null) { DeleteIndexResponse response = remoteClient.admin().indices().prepareDelete("*").execute().actionGet(); @@ -397,7 +393,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { @Test public void testSystemAccessGranted() throws Exception { - initialize(true); + initialize(new String[] { "system_access_granted" }, null); TransportMessage message = randomBoolean() ? new RemoteHostMockMessage() : new LocalHostMockMessage(); auditor.accessGranted(User.SYSTEM, "internal:_action", message); awaitIndexCreation(resolveIndexName()); @@ -572,7 +568,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { private SearchHit getIndexedAuditMessage() { SearchResponse response = getClient().prepareSearch(resolveIndexName()) - .setTypes(IndexAuditTrailBulkProcessor.DOC_TYPE) + .setTypes(IndexAuditTrail.DOC_TYPE) .addFields(fieldList()) .execute().actionGet(); @@ -605,7 +601,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest { } private String resolveIndexName() { - return resolver.resolve(IndexAuditTrailBulkProcessor.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover); + return resolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover); } }