add mapping and configurable settings for audit index output

Previously we relied on the default dynamic mapping for the audit index output, which did
not create an ideal mapping. This change adds a mapping file with default settings and
dynamic mapping disabled for the audit indexes.

Additionally, the ability to override settings for the audit indexes has been provided so that
users can customize the number of shards and replicas to meet their needs.

In order to implement these changes, the index audit service had to be moved from a lifecycle
component to an abstract component that had its own custom lifecycle on top of the cluster
state. A ShieldLifecycleService class was added to accomplish this. In the future, this class
can be used for other services that need to perform index based operations.

Closes elastic/elasticsearch#913

Original commit: elastic/x-pack-elasticsearch@231740c1cc
This commit is contained in:
jaymode 2015-06-15 13:40:24 -04:00
parent f4ed6282fd
commit cb80e9ccbd
9 changed files with 487 additions and 77 deletions

View File

@ -145,6 +145,13 @@
<build> <build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>

View File

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.audit.index.IndexAuditTrail;
import org.elasticsearch.threadpool.ThreadPool;
/**
* This class is used to provide a lifecycle for services that is based on the cluster's state
* rather than the typical lifecycle that is used to start services as part of the node startup.
*
* This type of lifecycle is necessary for services that need to perform actions that require the cluster to be in a
* certain state; some examples are storing index templates and creating indices. These actions would most likely fail
* from within a plugin if executed in the {@link org.elasticsearch.common.component.AbstractLifecycleComponent#doStart()}
* method. However, if the startup of these services waits for the cluster to form and recover indices then it will be
* successful. This lifecycle service allows for this to happen by listening for {@link ClusterChangedEvent} and checking
* if the services can start. Additionally, the service also provides hooks for stop and close functionality.
*/
public class ShieldLifecycleService extends AbstractComponent implements ClusterStateListener {
private final ThreadPool threadPool;
private final IndexAuditTrail indexAuditTrail;
@Inject
public ShieldLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, IndexAuditTrail indexAuditTrail) {
super(settings);
this.threadPool = threadPool;
this.indexAuditTrail = indexAuditTrail;
clusterService.add(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
stop();
}
@Override
public void beforeClose() {
close();
}
});
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
// TODO if/when we have more services this should not be checking the audit trail
if (indexAuditTrail.state() == IndexAuditTrail.State.STOPPED) {
final boolean master = event.localNodeMaster();
if (indexAuditTrail.canStart(event, master)) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
indexAuditTrail.start(master);
}
});
}
}
}
public void stop() {
indexAuditTrail.stop();
}
public void close() {
indexAuditTrail.close();
}
}

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.AbstractPlugin; import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.shield.audit.index.IndexAuditTrail;
import org.elasticsearch.shield.authc.Realms; import org.elasticsearch.shield.authc.Realms;
import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.authc.support.UsernamePasswordToken; import org.elasticsearch.shield.authc.support.UsernamePasswordToken;
@ -27,8 +26,6 @@ import java.nio.file.Path;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.shield.audit.AuditTrailModule.indexAuditLoggingEnabled;
/** /**
* *
*/ */
@ -69,10 +66,6 @@ public class ShieldPlugin extends AbstractPlugin {
public Collection<Class<? extends LifecycleComponent>> services() { public Collection<Class<? extends LifecycleComponent>> services() {
ImmutableList.Builder<Class<? extends LifecycleComponent>> builder = ImmutableList.builder(); ImmutableList.Builder<Class<? extends LifecycleComponent>> builder = ImmutableList.builder();
if (enabled && !clientMode) { if (enabled && !clientMode) {
if (indexAuditLoggingEnabled(settings)) {
// index-based audit logging should be started before other services
builder.add(IndexAuditTrail.class);
}
builder.add(LicenseService.class).add(InternalCryptoService.class).add(FileRolesStore.class).add(Realms.class).add(IPFilter.class); builder.add(LicenseService.class).add(InternalCryptoService.class).add(FileRolesStore.class).add(Realms.class).add(IPFilter.class);
} }
return builder.build(); return builder.build();

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule; import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.common.inject.multibindings.Multibinder; import org.elasticsearch.common.inject.multibindings.Multibinder;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.ShieldLifecycleService;
import org.elasticsearch.shield.audit.index.IndexAuditTrail; import org.elasticsearch.shield.audit.index.IndexAuditTrail;
import org.elasticsearch.shield.audit.index.IndexAuditUserHolder; import org.elasticsearch.shield.audit.index.IndexAuditUserHolder;
import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail; import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail;
@ -55,6 +56,8 @@ public class AuditTrailModule extends AbstractShieldModule.Node implements PrePr
bind(LoggingAuditTrail.class).asEagerSingleton(); bind(LoggingAuditTrail.class).asEagerSingleton();
break; break;
case IndexAuditTrail.NAME: case IndexAuditTrail.NAME:
// TODO should bind the lifecycle service in ShieldModule if we use it other places...
bind(ShieldLifecycleService.class).asEagerSingleton();
bind(IndexAuditUserHolder.class).toInstance(indexAuditUser); bind(IndexAuditUserHolder.class).toInstance(indexAuditUser);
binder.addBinding().to(IndexAuditTrail.class); binder.addBinding().to(IndexAuditTrail.class);
bind(IndexAuditTrail.class).asEagerSingleton(); bind(IndexAuditTrail.class).asEagerSingleton();

View File

@ -6,18 +6,26 @@
package org.elasticsearch.shield.audit.index; package org.elasticsearch.shield.audit.index;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.NetworkUtils; import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress;
@ -27,7 +35,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.shield.ShieldException;
import org.elasticsearch.shield.User; import org.elasticsearch.shield.User;
import org.elasticsearch.shield.audit.AuditTrail; import org.elasticsearch.shield.audit.AuditTrail;
import org.elasticsearch.shield.authc.AuthenticationService; import org.elasticsearch.shield.authc.AuthenticationService;
@ -42,10 +52,9 @@ import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.*;
import java.util.EnumSet; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.List; import java.util.concurrent.atomic.AtomicReference;
import java.util.Locale;
import static org.elasticsearch.shield.audit.AuditUtil.indices; import static org.elasticsearch.shield.audit.AuditUtil.indices;
import static org.elasticsearch.shield.audit.AuditUtil.restRequestContent; import static org.elasticsearch.shield.audit.AuditUtil.restRequestContent;
@ -54,7 +63,7 @@ import static org.elasticsearch.shield.audit.index.IndexAuditLevel.*;
/** /**
* Audit trail implementation that writes events into an index. * Audit trail implementation that writes events into an index.
*/ */
public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail> implements AuditTrail { public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
public static final int DEFAULT_BULK_SIZE = 1000; public static final int DEFAULT_BULK_SIZE = 1000;
public static final int MAX_BULK_SIZE = 10000; public static final int MAX_BULK_SIZE = 10000;
@ -63,7 +72,9 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
public static final String NAME = "index"; public static final String NAME = "index";
public static final String INDEX_NAME_PREFIX = ".shield-audit-log"; public static final String INDEX_NAME_PREFIX = ".shield-audit-log";
public static final String DOC_TYPE = "event"; public static final String DOC_TYPE = "event";
public static final String ROLLOVER_SETTING = "shield.audit.index.rollover";
static final String INDEX_TEMPLATE_NAME = "shield_audit_log";
static final String[] DEFAULT_EVENT_INCLUDES = new String[] { static final String[] DEFAULT_EVENT_INCLUDES = new String[] {
ACCESS_DENIED.toString(), ACCESS_DENIED.toString(),
ACCESS_GRANTED.toString(), ACCESS_GRANTED.toString(),
@ -74,6 +85,9 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
TAMPERED_REQUEST.toString() TAMPERED_REQUEST.toString()
}; };
private static final ImmutableSet<String> forbiddenIndexSettings = ImmutableSet.of("index.mapper.dynamic");
private final AtomicReference<State> state = new AtomicReference<>(State.STOPPED);
private final String nodeName; private final String nodeName;
private final IndexAuditUserHolder auditUser; private final IndexAuditUserHolder auditUser;
private final Provider<Client> clientProvider; private final Provider<Client> clientProvider;
@ -87,14 +101,14 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
private IndexNameResolver.Rollover rollover; private IndexNameResolver.Rollover rollover;
private String nodeHostName; private String nodeHostName;
private String nodeHostAddress; private String nodeHostAddress;
private ConcurrentLinkedQueue<Message> eventQueue = new ConcurrentLinkedQueue<>();
private EnumSet<IndexAuditLevel> events;
@Override @Override
public String name() { public String name() {
return NAME; return NAME;
} }
private EnumSet<IndexAuditLevel> events;
@Inject @Inject
public IndexAuditTrail(Settings settings, IndexAuditUserHolder indexingAuditUser, public IndexAuditTrail(Settings settings, IndexAuditUserHolder indexingAuditUser,
Environment environment, AuthenticationService authenticationService, Environment environment, AuthenticationService authenticationService,
@ -106,48 +120,125 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
this.environment = environment; this.environment = environment;
this.resolver = new IndexNameResolver(); this.resolver = new IndexNameResolver();
this.nodeName = settings.get("name"); this.nodeName = settings.get("name");
}
@Override // we have to initialize this here since we use rollover in determining if we can start...
protected void doStart() throws ElasticsearchException {
try { try {
rollover = IndexNameResolver.Rollover.valueOf( rollover = IndexNameResolver.Rollover.valueOf(
settings.get("shield.audit.index.rollover", DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH)); settings.get(ROLLOVER_SETTING, DEFAULT_ROLLOVER.name()).toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]", logger.warn("invalid value for setting [shield.audit.index.rollover]; falling back to default [{}]",
DEFAULT_ROLLOVER.name()); DEFAULT_ROLLOVER.name());
rollover = DEFAULT_ROLLOVER; rollover = DEFAULT_ROLLOVER;
} }
String hostname = "n/a"; // we have to initialize the events here since we can receive events before starting...
String hostaddr = "n/a";
try {
hostname = InetAddress.getLocalHost().getHostName();
hostaddr = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn("unable to resolve local host name", e);
}
this.nodeHostName = hostname;
this.nodeHostAddress = hostaddr;
String[] includedEvents = settings.getAsArray("shield.audit.index.events.include", DEFAULT_EVENT_INCLUDES); String[] includedEvents = settings.getAsArray("shield.audit.index.events.include", DEFAULT_EVENT_INCLUDES);
String[] excludedEvents = settings.getAsArray("shield.audit.index.events.exclude"); String[] excludedEvents = settings.getAsArray("shield.audit.index.events.exclude");
events = parse(includedEvents, excludedEvents);
initializeClient();
initializeBulkProcessor();
}
@Override
protected void doStop() throws ElasticsearchException {
}
@Override
protected void doClose() throws ElasticsearchException {
try { try {
if (bulkProcessor != null) { events = parse(includedEvents, excludedEvents);
bulkProcessor.close(); } catch (ShieldException e) {
logger.warn("invalid event type specified, using default for audit index output. include events [{}], exclude events [{}]", e, includedEvents, excludedEvents);
events = parse(DEFAULT_EVENT_INCLUDES, Strings.EMPTY_ARRAY);
}
}
public State state() {
return state.get();
}
/**
* This method determines if this service can be started based on the state in the {@link ClusterChangedEvent} and
* if the node is the master or not. In order for the service to start, the following must be true:
*
* <ol>
* <li>The cluster must not have a {@link GatewayService#STATE_NOT_RECOVERED_BLOCK}; in other words the gateway
* must have recovered from disk already.</li>
* <li>The current node must be the master OR the <code>shield_audit_log</code> index template must exist</li>
* <li>The current audit index must not exist or have all primary shards active. The current audit index name
* is determined by the rollover settings and current time</li>
* </ol>
*
* @param event the {@link ClusterChangedEvent} containing the up to date cluster state
* @param master flag indicating if the current node is the master
* @return true if all requirements are met and the service can be started
*/
public boolean canStart(ClusterChangedEvent event, boolean master) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have .shield-audit-
// but they may not have been restored from the cluster state on disk
logger.debug("index audit trail waiting until gateway has recovered from disk");
return false;
}
final ClusterState clusterState = event.state();
if (!master && clusterState.metaData().templates().get(INDEX_TEMPLATE_NAME) == null) {
logger.debug("shield audit index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
return false;
}
String index = resolver.resolve(INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover);
IndexMetaData metaData = clusterState.metaData().index(index);
if (metaData == null) {
logger.debug("shield audit index [{}] does not exist, so service can start", index);
return true;
}
if (clusterState.routingTable().index(index).allPrimaryShardsActive()) {
logger.debug("shield audit index [{}] all primary shards started, so service can start", index);
return true;
}
logger.debug("shield audit index [{}] does not have all primary shards started, so service cannot start", index);
return false;
}
/**
* Starts the service. The state is moved to {@link org.elasticsearch.shield.audit.index.IndexAuditTrail.State#STARTING}
* at the beginning of the method. The service's components are initialized and if the current node is the master, the index
* template will be stored. The state is moved {@link org.elasticsearch.shield.audit.index.IndexAuditTrail.State#STARTED}
* and before returning the queue of messages that came before the service started is drained.
*
* @param master flag indicating if the current node is master
*/
public void start(boolean master) {
if (state.compareAndSet(State.STOPPED, State.STARTING)) {
String hostname = "n/a";
String hostaddr = "n/a";
try {
hostname = InetAddress.getLocalHost().getHostName();
hostaddr = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn("unable to resolve local host name", e);
} }
this.nodeHostName = hostname;
this.nodeHostAddress = hostaddr;
initializeClient();
if (master) {
putTemplate(customAuditIndexSettings(settings));
}
initializeBulkProcessor();
state.set(State.STARTED);
drainQueue();
}
}
public void stop() {
if (state.compareAndSet(State.STARTED, State.STOPPING)) {
try {
bulkProcessor.flush();
} finally {
state.set(State.STOPPED);
}
}
}
public void close() {
if (state.get() != State.STOPPED) {
stop();
}
try {
bulkProcessor.close();
} finally { } finally {
if (indexToRemoteCluster) { if (indexToRemoteCluster) {
if (client != null) { if (client != null) {
@ -409,7 +500,7 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
InetSocketAddress restAddress = RemoteHostHeader.restRemoteAddress(message); InetSocketAddress restAddress = RemoteHostHeader.restRemoteAddress(message);
if (restAddress != null) { if (restAddress != null) {
builder.field(Field.ORIGIN_TYPE, "rest"); builder.field(Field.ORIGIN_TYPE, "rest");
builder.field(Field.ORIGIN_ADDRESS, restAddress); builder.field(Field.ORIGIN_ADDRESS, restAddress.getAddress().getHostAddress());
return builder; return builder;
} }
@ -418,7 +509,7 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
if (address != null) { if (address != null) {
builder.field(Field.ORIGIN_TYPE, "transport"); builder.field(Field.ORIGIN_TYPE, "transport");
if (address instanceof InetSocketTransportAddress) { if (address instanceof InetSocketTransportAddress) {
builder.field(Field.ORIGIN_ADDRESS, ((InetSocketTransportAddress) address).address()); builder.field(Field.ORIGIN_ADDRESS, ((InetSocketTransportAddress) address).address().getAddress().getHostAddress());
} else { } else {
builder.field(Field.ORIGIN_ADDRESS, address); builder.field(Field.ORIGIN_ADDRESS, address);
} }
@ -431,8 +522,12 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
return builder; return builder;
} }
void submit(IndexAuditTrail.Message message) { void submit(Message message) {
assert lifecycle.started(); if (state.get() != State.STARTED) {
eventQueue.add(message);
return;
}
IndexRequest indexRequest = client.prepareIndex() IndexRequest indexRequest = client.prepareIndex()
.setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover)) .setIndex(resolver.resolve(INDEX_NAME_PREFIX, message.timestamp, rollover))
.setType(DOC_TYPE).setSource(message.builder).request(); .setType(DOC_TYPE).setSource(message.builder).request();
@ -488,6 +583,49 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
} }
} }
Settings customAuditIndexSettings(Settings nodeSettings) {
Settings newSettings = Settings.builder()
.put(nodeSettings.getAsSettings("shield.audit.index.settings.index"))
.build();
if (newSettings.names().isEmpty()) {
return Settings.EMPTY;
}
// Filter out forbidden settings:
Settings.Builder builder = Settings.builder();
for (Map.Entry<String, String> entry : newSettings.getAsMap().entrySet()) {
String name = "index." + entry.getKey();
if (forbiddenIndexSettings.contains(name)) {
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
continue;
}
builder.put(name, entry.getValue());
}
return builder.build();
}
void putTemplate(Settings customSettings) {
try {
final byte[] template = Streams.copyToBytesFromClasspath("/" + INDEX_TEMPLATE_NAME + ".json");
PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template);
if (customSettings != null && customSettings.names().size() > 0) {
Settings updatedSettings = Settings.builder()
.put(request.settings())
.put(customSettings)
.build();
request.settings(updatedSettings);
}
authenticationService.attachUserHeaderIfMissing(request, auditUser.user());
PutIndexTemplateResponse response = client.admin().indices().putTemplate(request).actionGet();
if (!response.isAcknowledged()) {
throw new ShieldException("failed to put index template for audit logging");
}
} catch (Exception e) {
throw new ShieldException("failed to load [" + INDEX_TEMPLATE_NAME + ".json]", e);
}
}
private void initializeBulkProcessor() { private void initializeBulkProcessor() {
int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE); int bulkSize = Math.min(settings.getAsInt("shield.audit.index.bulk_size", DEFAULT_BULK_SIZE), MAX_BULK_SIZE);
@ -519,6 +657,14 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
.build(); .build();
} }
private void drainQueue() {
Message message = eventQueue.poll();
while (message != null) {
submit(message);
message = eventQueue.poll();
}
}
static class Message { static class Message {
final long timestamp; final long timestamp;
@ -542,12 +688,12 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
} }
interface Field { interface Field {
XContentBuilderString TIMESTAMP = new XContentBuilderString("timestamp"); XContentBuilderString TIMESTAMP = new XContentBuilderString("@timestamp");
XContentBuilderString NODE_NAME = new XContentBuilderString("node_name"); XContentBuilderString NODE_NAME = new XContentBuilderString("node_name");
XContentBuilderString NODE_HOST_NAME = new XContentBuilderString("node_host_name"); XContentBuilderString NODE_HOST_NAME = new XContentBuilderString("node_host_name");
XContentBuilderString NODE_HOST_ADDRESS = new XContentBuilderString("node_host_address"); XContentBuilderString NODE_HOST_ADDRESS = new XContentBuilderString("node_host_address");
XContentBuilderString LAYER = new XContentBuilderString("layer"); XContentBuilderString LAYER = new XContentBuilderString("layer");
XContentBuilderString TYPE = new XContentBuilderString("type"); XContentBuilderString TYPE = new XContentBuilderString("event_type");
XContentBuilderString ORIGIN_ADDRESS = new XContentBuilderString("origin_address"); XContentBuilderString ORIGIN_ADDRESS = new XContentBuilderString("origin_address");
XContentBuilderString ORIGIN_TYPE = new XContentBuilderString("origin_type"); XContentBuilderString ORIGIN_TYPE = new XContentBuilderString("origin_type");
XContentBuilderString PRINCIPAL = new XContentBuilderString("principal"); XContentBuilderString PRINCIPAL = new XContentBuilderString("principal");
@ -560,4 +706,11 @@ public class IndexAuditTrail extends AbstractLifecycleComponent<IndexAuditTrail>
XContentBuilderString TRANSPORT_PROFILE = new XContentBuilderString("transport_profile"); XContentBuilderString TRANSPORT_PROFILE = new XContentBuilderString("transport_profile");
XContentBuilderString RULE = new XContentBuilderString("rule"); XContentBuilderString RULE = new XContentBuilderString("rule");
} }
public enum State {
STOPPED,
STARTING,
STARTED,
STOPPING
}
} }

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.shield.audit.index; package org.elasticsearch.shield.audit.index;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.shield.User; import org.elasticsearch.shield.User;
import org.elasticsearch.shield.authz.Permission; import org.elasticsearch.shield.authz.Permission;
@ -29,6 +30,7 @@ public class IndexAuditUserHolder {
String indexPattern = indexName + "*"; String indexPattern = indexName + "*";
this.role = Permission.Global.Role.builder(ROLE_NAMES[0]) this.role = Permission.Global.Role.builder(ROLE_NAMES[0])
.set(Privilege.Cluster.action(PutIndexTemplateAction.NAME))
.add(Privilege.Index.CREATE_INDEX, indexPattern) .add(Privilege.Index.CREATE_INDEX, indexPattern)
.add(Privilege.Index.INDEX, indexPattern) .add(Privilege.Index.INDEX, indexPattern)
.add(Privilege.Index.action(BulkAction.NAME), indexPattern) .add(Privilege.Index.action(BulkAction.NAME), indexPattern)

View File

@ -0,0 +1,87 @@
{
"template": ".shield-audit-log*",
"order": 2147483647,
"settings": {
"index.mapper.dynamic" : false
},
"mappings": {
"event": {
"dynamic" : "strict",
"_all" : {
"enabled" : false
},
"_timestamp" : {
"enabled" : true,
"path" : "@timestamp"
},
"properties": {
"@timestamp": {
"type": "date"
},
"node_name": {
"type": "string",
"index": "not_analyzed"
},
"node_host_name": {
"type": "string",
"index": "not_analyzed"
},
"node_host_address": {
"type": "string",
"index": "not_analyzed"
},
"layer": {
"type": "string",
"index": "not_analyzed"
},
"event_type": {
"type": "string",
"index": "not_analyzed"
},
"origin_address": {
"type": "string",
"index": "not_analyzed"
},
"origin_type": {
"type": "string",
"index": "not_analyzed"
},
"principal": {
"type": "string",
"index": "not_analyzed"
},
"action": {
"type": "string",
"index": "not_analyzed"
},
"indices": {
"type": "string",
"index": "not_analyzed"
},
"request": {
"type": "string",
"index": "not_analyzed"
},
"request_body": {
"type": "string"
},
"uri": {
"type": "string",
"index": "not_analyzed"
},
"realm": {
"type": "string",
"index": "not_analyzed"
},
"transport_profile": {
"type": "string",
"index": "not_analyzed"
},
"rule": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.shield.audit.index;
import com.google.common.base.Predicate;
import org.elasticsearch.action.exists.ExistsResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.test.ShieldIntegrationTest;
import org.junit.Test;
@ClusterScope(scope = Scope.TEST)
public class IndexAuditTrailEnabledTests extends ShieldIntegrationTest {
IndexNameResolver.Rollover rollover = randomFrom(IndexNameResolver.Rollover.values());
@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder()
.put(super.nodeSettings(nodeOrdinal));
builder.put("shield.audit.enabled", true);
if (randomBoolean()) {
builder.putArray("shield.audit.outputs", LoggingAuditTrail.NAME, IndexAuditTrail.NAME);
} else {
builder.putArray("shield.audit.outputs", IndexAuditTrail.NAME);
}
builder.put(IndexAuditTrail.ROLLOVER_SETTING, rollover);
return builder.build();
}
@Test
public void testIndexAuditTrailIndexExists() throws Exception {
awaitIndexCreation();
}
void awaitIndexCreation() throws Exception {
IndexNameResolver resolver = new IndexNameResolver();
final String indexName = resolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover);
boolean success = awaitBusy(new Predicate<Void>() {
@Override
public boolean apply(Void o) {
try {
ExistsResponse response =
client().prepareExists(indexName).execute().actionGet();
return response.exists();
} catch (Exception e) {
return false;
}
}
});
if (!success) {
fail("index [" + indexName + "] was not created");
}
}
}

View File

@ -8,6 +8,7 @@ package org.elasticsearch.shield.audit.index;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.exists.ExistsResponse; import org.elasticsearch.action.exists.ExistsResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -50,18 +51,19 @@ import static org.mockito.Mockito.when;
@ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numDataNodes = 1) @ElasticsearchIntegrationTest.ClusterScope(scope = SUITE, numDataNodes = 1)
public class IndexAuditTrailTests extends ShieldIntegrationTest { public class IndexAuditTrailTests extends ShieldIntegrationTest {
private IndexNameResolver resolver = new IndexNameResolver();
private IndexNameResolver.Rollover rollover;
private IndexAuditTrail auditor;
private boolean remoteIndexing = false;
private Node remoteNode;
private Client remoteClient;
public static final String REMOTE_TEST_CLUSTER = "single-node-remote-test-cluster"; public static final String REMOTE_TEST_CLUSTER = "single-node-remote-test-cluster";
private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX); private static final IndexAuditUserHolder user = new IndexAuditUserHolder(IndexAuditTrail.INDEX_NAME_PREFIX);
private IndexNameResolver resolver = new IndexNameResolver();
private IndexNameResolver.Rollover rollover;
private IndexAuditTrail auditor;
private boolean remoteIndexing = false;
private Node remoteNode;
private Client remoteClient;
private int numShards;
private int numReplicas;
private Settings commonSettings(IndexNameResolver.Rollover rollover) { private Settings commonSettings(IndexNameResolver.Rollover rollover) {
return Settings.builder() return Settings.builder()
.put("shield.audit.enabled", true) .put("shield.audit.enabled", true)
@ -69,6 +71,8 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
.put("shield.audit.index.bulk_size", 1) .put("shield.audit.index.bulk_size", 1)
.put("shield.audit.index.flush_interval", "1ms") .put("shield.audit.index.flush_interval", "1ms")
.put("shield.audit.index.rollover", rollover.name().toLowerCase(Locale.ENGLISH)) .put("shield.audit.index.rollover", rollover.name().toLowerCase(Locale.ENGLISH))
.put("shield.audit.index.settings.index.number_of_shards", numShards)
.put("shield.audit.index.settings.index.number_of_replicas", numReplicas)
.build(); .build();
} }
@ -107,6 +111,8 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
private void initialize(String[] includes, String[] excludes) { private void initialize(String[] includes, String[] excludes) {
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY); rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
numReplicas = numberOfReplicas();
numShards = numberOfShards();
Settings settings = settings(rollover, includes, excludes); Settings settings = settings(rollover, includes, excludes);
remoteIndexing = randomBoolean(); remoteIndexing = randomBoolean();
@ -134,7 +140,8 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(new UnsupportedOperationException("")); when(authService.authenticate("_action", new LocalHostMockMessage(), user.user())).thenThrow(new UnsupportedOperationException(""));
Environment env = new Environment(settings); Environment env = new Environment(settings);
auditor = new IndexAuditTrail(settings, user, env, authService, Providers.of(client())).start(); auditor = new IndexAuditTrail(settings, user, env, authService, Providers.of(client()));
auditor.start(true);
} }
@After @After
@ -164,7 +171,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "transport", "anonymous_access_denied"); assertAuditMessage(hit, "transport", "anonymous_access_denied");
if (message instanceof RemoteHostMockMessage) { if (message instanceof RemoteHostMockMessage) {
assertEquals("remote_host:1234", hit.field("origin_address").getValue()); assertEquals(remoteHostAddress(), hit.field("origin_address").getValue());
} else { } else {
assertEquals("local[local_host]", hit.field("origin_address").getValue()); assertEquals("local[local_host]", hit.field("origin_address").getValue());
} }
@ -217,7 +224,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "transport", "authentication_failed"); assertAuditMessage(hit, "transport", "authentication_failed");
if (message instanceof RemoteHostMockMessage) { if (message instanceof RemoteHostMockMessage) {
assertEquals("remote_host:1234", hit.field("origin_address").getValue()); assertEquals(remoteHostAddress(), hit.field("origin_address").getValue());
} else { } else {
assertEquals("local[local_host]", hit.field("origin_address").getValue()); assertEquals("local[local_host]", hit.field("origin_address").getValue());
} }
@ -239,7 +246,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "transport", "authentication_failed"); assertAuditMessage(hit, "transport", "authentication_failed");
if (message instanceof RemoteHostMockMessage) { if (message instanceof RemoteHostMockMessage) {
assertEquals("remote_host:1234", hit.field("origin_address").getValue()); assertEquals(remoteHostAddress(), hit.field("origin_address").getValue());
} else { } else {
assertEquals("local[local_host]", hit.field("origin_address").getValue()); assertEquals("local[local_host]", hit.field("origin_address").getValue());
} }
@ -325,7 +332,7 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
assertAuditMessage(hit, "transport", "authentication_failed"); assertAuditMessage(hit, "transport", "authentication_failed");
if (message instanceof RemoteHostMockMessage) { if (message instanceof RemoteHostMockMessage) {
assertEquals("remote_host:1234", hit.field("origin_address").getValue()); assertEquals(remoteHostAddress(), hit.field("origin_address").getValue());
} else { } else {
assertEquals("local[local_host]", hit.field("origin_address").getValue()); assertEquals("local[local_host]", hit.field("origin_address").getValue());
} }
@ -512,15 +519,13 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
} }
private void assertAuditMessage(SearchHit hit, String layer, String type) { private void assertAuditMessage(SearchHit hit, String layer, String type) {
assertThat((Long) hit.field("@timestamp").getValue(), lessThan(System.currentTimeMillis()));
assertThat((Long) hit.field("timestamp").getValue(), greaterThan(0L));
assertThat((Long) hit.field("timestamp").getValue(), lessThan(System.currentTimeMillis()));
assertThat(clusterService().localNode().getHostName(), equalTo(hit.field("node_host_name").getValue())); assertThat(clusterService().localNode().getHostName(), equalTo(hit.field("node_host_name").getValue()));
assertThat(clusterService().localNode().getHostAddress(), equalTo(hit.field("node_host_address").getValue())); assertThat(clusterService().localNode().getHostAddress(), equalTo(hit.field("node_host_address").getValue()));
assertEquals(layer, hit.field("layer").getValue()); assertEquals(layer, hit.field("layer").getValue());
assertEquals(type, hit.field("type").getValue()); assertEquals(type, hit.field("event_type").getValue());
} }
private static class LocalHostMockMessage extends TransportMessage<LocalHostMockMessage> { private static class LocalHostMockMessage extends TransportMessage<LocalHostMockMessage> {
@ -530,14 +535,14 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
} }
private static class RemoteHostMockMessage extends TransportMessage<RemoteHostMockMessage> { private static class RemoteHostMockMessage extends TransportMessage<RemoteHostMockMessage> {
RemoteHostMockMessage() { RemoteHostMockMessage() throws Exception {
remoteAddress(new InetSocketTransportAddress("remote_host", 1234)); remoteAddress(new InetSocketTransportAddress(InetAddress.getLocalHost(), 1234));
} }
} }
private static class RemoteHostMockTransportRequest extends TransportRequest { private static class RemoteHostMockTransportRequest extends TransportRequest {
RemoteHostMockTransportRequest() { RemoteHostMockTransportRequest() throws Exception {
remoteAddress(new InetSocketTransportAddress("remote_host", 1234)); remoteAddress(new InetSocketTransportAddress(InetAddress.getLocalHost(), 1234));
} }
} }
@ -577,12 +582,25 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
} }
private String[] fieldList() { private String[] fieldList() {
java.lang.reflect.Field[] fields = IndexAuditTrail.Field.class.getDeclaredFields(); return new String[] {
String[] array = new String[fields.length]; "@timestamp",
for (int i = 0; i < fields.length; i++) { "node_name",
array[i] = fields[i].getName().toLowerCase(Locale.ROOT); "node_host_name",
} "node_host_address",
return array; "layer",
"event_type",
"origin_address",
"origin_type",
"principal",
"action",
"indices",
"request",
"request_body",
"uri",
"realm",
"transport_profile",
"rule"
};
} }
private void awaitIndexCreation(final String indexName) throws InterruptedException { private void awaitIndexCreation(final String indexName) throws InterruptedException {
@ -598,10 +616,18 @@ public class IndexAuditTrailTests extends ShieldIntegrationTest {
} }
} }
}); });
GetSettingsResponse response = getClient().admin().indices().prepareGetSettings(indexName).execute().actionGet();
assertThat(response.getSetting(indexName, "index.number_of_shards"), is(Integer.toString(numShards)));
assertThat(response.getSetting(indexName, "index.number_of_replicas"), is(Integer.toString(numReplicas)));
} }
private String resolveIndexName() { private String resolveIndexName() {
return resolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover); return resolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, System.currentTimeMillis(), rollover);
} }
static String remoteHostAddress() throws Exception {
return InetAddress.getLocalHost().getHostAddress();
}
} }