[ML] add auditor to data frame plugin (#40012) (#40394)

* [Data Frame] add auditor

* Adjusting Level, Auditor, and message to address pr comments

* Addressing PR comments
This commit is contained in:
Benjamin Trent 2019-03-23 18:56:44 -05:00 committed by GitHub
parent 2dd879abac
commit a30bf27b2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 833 additions and 17 deletions

View File

@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
public abstract class AbstractAuditMessage implements ToXContentObject {
public static final ParseField TYPE = new ParseField("audit_message");
public static final ParseField MESSAGE = new ParseField("message");
public static final ParseField LEVEL = new ParseField("level");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField NODE_NAME = new ParseField("node_name");
private final String resourceId;
private final String message;
private final Level level;
private final Date timestamp;
private final String nodeName;
public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) {
this.resourceId = resourceId;
this.message = Objects.requireNonNull(message);
this.level = Objects.requireNonNull(level);
this.timestamp = new Date();
this.nodeName = nodeName;
}
protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
this.resourceId = resourceId;
this.message = Objects.requireNonNull(message);
this.level = Objects.requireNonNull(level);
this.timestamp = Objects.requireNonNull(timestamp);
this.nodeName = nodeName;
}
public final String getResourceId() {
return resourceId;
}
public final String getMessage() {
return message;
}
public final Level getLevel() {
return level;
}
public final Date getTimestamp() {
return timestamp;
}
public final String getNodeName() {
return nodeName;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
if (resourceId != null) {
builder.field(getResourceField(), resourceId);
}
builder.field(MESSAGE.getPreferredName(), message);
builder.field(LEVEL.getPreferredName(), level);
builder.field(TIMESTAMP.getPreferredName(), timestamp.getTime());
if (nodeName != null) {
builder.field(NODE_NAME.getPreferredName(), nodeName);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(resourceId, message, level, timestamp);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof AbstractAuditMessage == false) {
return false;
}
AbstractAuditMessage other = (AbstractAuditMessage) obj;
return Objects.equals(resourceId, other.resourceId) &&
Objects.equals(message, other.message) &&
Objects.equals(level, other.level) &&
Objects.equals(timestamp, other.timestamp);
}
protected abstract String getResourceField();
public abstract static class AbstractBuilder<T extends AbstractAuditMessage> {
public T info(String resourceId, String message, String nodeName) {
return newMessage(Level.INFO, resourceId, message, nodeName);
}
public T warning(String resourceId, String message, String nodeName) {
return newMessage(Level.WARNING, resourceId, message, nodeName);
}
public T error(String resourceId, String message, String nodeName) {
return newMessage(Level.ERROR, resourceId, message, nodeName);
}
protected abstract T newMessage(Level level, String resourceId, String message, String nodeName);
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class Auditor<T extends AbstractAuditMessage> {
private static final Logger logger = LogManager.getLogger(Auditor.class);
private final Client client;
private final String nodeName;
private final String auditIndex;
private final String executionOrigin;
private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder;
public Auditor(Client client,
String nodeName,
String auditIndex,
String executionOrigin,
AbstractAuditMessage.AbstractBuilder<T> messageBuilder) {
this.client = Objects.requireNonNull(client);
this.nodeName = Objects.requireNonNull(nodeName);
this.auditIndex = auditIndex;
this.executionOrigin = executionOrigin;
this.messageBuilder = Objects.requireNonNull(messageBuilder);
}
public final void info(String resourceId, String message) {
indexDoc(messageBuilder.info(resourceId, message, nodeName));
}
public final void warning(String resourceId, String message) {
indexDoc(messageBuilder.warning(resourceId, message, nodeName));
}
public final void error(String resourceId, String message) {
indexDoc(messageBuilder.error(resourceId, message, nodeName));
}
protected void onIndexResponse(IndexResponse response) {
logger.trace("Successfully wrote audit message");
}
protected void onIndexFailure(Exception exception) {
logger.debug("Failed to write audit message", exception);
}
private void indexDoc(ToXContent toXContent) {
IndexRequest indexRequest = new IndexRequest(auditIndex);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin,
indexRequest,
ActionListener.wrap(
this::onIndexResponse,
this::onIndexFailure
), client::index);
}
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
try (XContentBuilder jsonBuilder = jsonBuilder()) {
return toXContent.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import java.util.Locale;
public enum Level {
INFO, WARNING, ERROR;
/**
* Case-insensitive from string method.
*
* @param value
* String representation
* @return The condition type
*/
public static Level fromString(String value) {
return Level.valueOf(value.toUpperCase(Locale.ROOT));
}
@Override
public String toString() {
return name().toLowerCase(Locale.ROOT);
}
}

View File

@ -32,6 +32,7 @@ public final class DataFrameField {
public static final String REST_BASE_PATH = "/_data_frame/"; public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/"; public static final String REST_BASE_PATH_TRANSFORMS = REST_BASE_PATH + "transforms/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/"; public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH_TRANSFORMS + "{id}/";
public static final String DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD = "transform_id";
// note: this is used to match tasks // note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_"; public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

View File

@ -0,0 +1,72 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.notifications;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
public class DataFrameAuditMessage extends AbstractAuditMessage {
private static final ParseField TRANSFORM_ID = new ParseField(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD);
public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
"data_frame_audit_message",
true,
a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
static {
PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID);
PARSER.declareString(constructorArg(), MESSAGE);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), parser -> {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(parser.longValue());
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(parser.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}
public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}
protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}
@Override
protected String getResourceField() {
return TRANSFORM_ID.getPreferredName();
}
public static AbstractAuditMessage.AbstractBuilder<DataFrameAuditMessage> builder() {
return new AbstractBuilder<DataFrameAuditMessage>() {
@Override
protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new DataFrameAuditMessage(resourceId, message, level, nodeName);
}
};
}
}

View File

@ -158,10 +158,18 @@ public class ReservedRolesStore implements BiConsumer<Set<String>, ActionListene
null, MetadataUtils.DEFAULT_RESERVED_METADATA)) null, MetadataUtils.DEFAULT_RESERVED_METADATA))
.put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin", .put("data_frame_transforms_admin", new RoleDescriptor("data_frame_transforms_admin",
new String[] { "manage_data_frame_transforms" }, new String[] { "manage_data_frame_transforms" },
null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) new RoleDescriptor.IndicesPrivileges[]{
RoleDescriptor.IndicesPrivileges.builder()
.indices(".data-frame-notifications*")
.privileges("view_index_metadata", "read").build()
}, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
.put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user", .put("data_frame_transforms_user", new RoleDescriptor("data_frame_transforms_user",
new String[] { "monitor_data_frame_transforms" }, new String[] { "monitor_data_frame_transforms" },
null, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null)) new RoleDescriptor.IndicesPrivileges[]{
RoleDescriptor.IndicesPrivileges.builder()
.indices(".data-frame-notifications*")
.privileges("view_index_metadata", "read").build()
}, null, null, null, MetadataUtils.DEFAULT_RESERVED_METADATA, null))
.put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" }, .put("watcher_admin", new RoleDescriptor("watcher_admin", new String[] { "manage_watcher" },
new RoleDescriptor.IndicesPrivileges[] { new RoleDescriptor.IndicesPrivileges[] {
RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME, RoleDescriptor.IndicesPrivileges.builder().indices(Watch.INDEX, TriggeredWatchStoreField.INDEX_NAME,

View File

@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;
import org.junit.Before;
import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class AbstractAuditMessageTests extends AbstractXContentTestCase<AbstractAuditMessageTests.TestAuditMessage> {
private long startMillis;
static class TestAuditMessage extends AbstractAuditMessage {
private static final ParseField ID = new ParseField("test_id");
public static final ConstructingObjectParser<TestAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
AbstractAuditMessage.TYPE.getPreferredName(),
true,
a -> new TestAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
static {
PARSER.declareString(optionalConstructorArg(), ID);
PARSER.declareString(constructorArg(), MESSAGE);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), parser -> {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(parser.longValue());
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(parser.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}
TestAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}
TestAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}
@Override
protected String getResourceField() {
return "test_id";
}
static AbstractAuditMessage.AbstractBuilder<TestAuditMessage> newBuilder() {
return new AbstractBuilder<TestAuditMessage>() {
@Override
protected TestAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new TestAuditMessage(resourceId, message, level, nodeName);
}
};
}
}
@Before
public void setStartTime() {
startMillis = System.currentTimeMillis();
}
public void testNewInfo() {
TestAuditMessage info = TestAuditMessage.newBuilder().info("foo", "some info", "some_node");
assertEquals("foo", info.getResourceId());
assertEquals("some info", info.getMessage());
assertEquals(Level.INFO, info.getLevel());
assertDateBetweenStartAndNow(info.getTimestamp());
}
public void testNewWarning() {
TestAuditMessage warning = TestAuditMessage.newBuilder().warning("bar", "some warning", "some_node");
assertEquals("bar", warning.getResourceId());
assertEquals("some warning", warning.getMessage());
assertEquals(Level.WARNING, warning.getLevel());
assertDateBetweenStartAndNow(warning.getTimestamp());
}
public void testNewError() {
TestAuditMessage error = TestAuditMessage.newBuilder().error("foo", "some error", "some_node");
assertEquals("foo", error.getResourceId());
assertEquals("some error", error.getMessage());
assertEquals(Level.ERROR, error.getLevel());
assertDateBetweenStartAndNow(error.getTimestamp());
}
private void assertDateBetweenStartAndNow(Date timestamp) {
long timestampMillis = timestamp.getTime();
assertTrue(timestampMillis >= startMillis);
assertTrue(timestampMillis <= System.currentTimeMillis());
}
@Override
protected TestAuditMessage doParseInstance(XContentParser parser) {
return TestAuditMessage.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected TestAuditMessage createTestInstance() {
return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200),
randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20));
}
}

View File

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class AuditorTests extends ESTestCase {
private Client client;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
private static final String TEST_ORIGIN = "test_origin";
private static final String TEST_INDEX = "test_index";
private static final AbstractAuditMessage.AbstractBuilder<AbstractAuditMessageTests.TestAuditMessage> builder =
AbstractAuditMessageTests.TestAuditMessage.newBuilder();
@Before
public void setUpMocks() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
}
public void testInfo() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
auditor.info("foo", "Here is my info");
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foo", auditMessage.getResourceId());
assertEquals("Here is my info", auditMessage.getMessage());
assertEquals(Level.INFO, auditMessage.getLevel());
}
public void testWarning() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
auditor.warning("bar", "Here is my warning");
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("bar", auditMessage.getResourceId());
assertEquals("Here is my warning", auditMessage.getMessage());
assertEquals(Level.WARNING, auditMessage.getLevel());
}
public void testError() throws IOException {
Auditor<AbstractAuditMessageTests.TestAuditMessage> auditor = new Auditor<>(client, "node_1", TEST_INDEX, TEST_ORIGIN, builder);
auditor.error("foobar", "Here is my error");
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foobar", auditMessage.getResourceId());
assertEquals("Here is my error", auditMessage.getMessage());
assertEquals(Level.ERROR, auditMessage.getLevel());
}
private AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException {
XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg))
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput());
return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class LevelTests extends ESTestCase {
public void testFromString() {
assertEquals(Level.INFO, Level.fromString("info"));
assertEquals(Level.INFO, Level.fromString("INFO"));
assertEquals(Level.WARNING, Level.fromString("warning"));
assertEquals(Level.WARNING, Level.fromString("WARNING"));
assertEquals(Level.ERROR, Level.fromString("error"));
assertEquals(Level.ERROR, Level.fromString("ERROR"));
}
public void testToString() {
assertEquals("info", Level.INFO.toString());
assertEquals("warning", Level.WARNING.toString());
assertEquals("error", Level.ERROR.toString());
}
public void testValidOrdinals() {
assertThat(Level.INFO.ordinal(), equalTo(0));
assertThat(Level.WARNING.ordinal(), equalTo(1));
assertThat(Level.ERROR.ordinal(), equalTo(2));
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.dataframe.notifications;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.junit.Before;
import java.util.Date;
public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {
private long startMillis;
@Before
public void setStartTime() {
startMillis = System.currentTimeMillis();
}
public void testNewInfo() {
DataFrameAuditMessage info = DataFrameAuditMessage.builder().info("foo", "some info", "some_node");
assertEquals("foo", info.getResourceId());
assertEquals("some info", info.getMessage());
assertEquals(Level.INFO, info.getLevel());
assertDateBetweenStartAndNow(info.getTimestamp());
}
public void testNewWarning() {
DataFrameAuditMessage warning = DataFrameAuditMessage.builder().warning("bar", "some warning", "some_node");
assertEquals("bar", warning.getResourceId());
assertEquals("some warning", warning.getMessage());
assertEquals(Level.WARNING, warning.getLevel());
assertDateBetweenStartAndNow(warning.getTimestamp());
}
public void testNewError() {
DataFrameAuditMessage error = DataFrameAuditMessage.builder().error("foo", "some error", "some_node");
assertEquals("foo", error.getResourceId());
assertEquals("some error", error.getMessage());
assertEquals(Level.ERROR, error.getLevel());
assertDateBetweenStartAndNow(error.getTimestamp());
}
private void assertDateBetweenStartAndNow(Date timestamp) {
long timestampMillis = timestamp.getTime();
assertTrue(timestampMillis >= startMillis);
assertTrue(timestampMillis <= System.currentTimeMillis());
}
@Override
protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAuditMessage.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected DataFrameAuditMessage createTestInstance() {
return new DataFrameAuditMessage(
randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
}
}

View File

@ -1047,6 +1047,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(true)); assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(true));
assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false));
assertOnlyReadAllowed(role, ".data-frame-notifications-1");
assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, "foo");
assertNoAccessAllowed(role, ".data-frame-internal-1"); // internal use only assertNoAccessAllowed(role, ".data-frame-internal-1"); // internal use only
@ -1070,6 +1071,7 @@ public class ReservedRolesStoreTests extends ESTestCase {
assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(false)); assertThat(role.cluster().check(StopDataFrameTransformAction.NAME, request), is(false));
assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false)); assertThat(role.runAs().check(randomAlphaOfLengthBetween(1, 30)), is(false));
assertOnlyReadAllowed(role, ".data-frame-notifications-1");
assertNoAccessAllowed(role, "foo"); assertNoAccessAllowed(role, "foo");
assertNoAccessAllowed(role, ".data-frame-internal-1"); assertNoAccessAllowed(role, ".data-frame-internal-1");

View File

@ -0,0 +1,76 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.dataframe.integration;
import org.elasticsearch.client.Request;
import org.elasticsearch.xpack.dataframe.persistence.DataFrameInternalIndex;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.is;
public class DataFrameAuditorIT extends DataFrameRestTestCase {
private static final String TEST_USER_NAME = "df_admin_plus_data";
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS =
basicAuthHeaderValue(TEST_USER_NAME, TEST_PASSWORD_SECURE_STRING);
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
@Before
public void createIndexes() throws IOException {
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
createReviewsIndex();
indicesCreated = true;
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("data_frame_transforms_admin", DATA_ACCESS_ROLE));
}
@SuppressWarnings("unchecked")
public void testAuditorWritesAudits() throws Exception {
String transformId = "simplePivotForAudit";
String dataFrameIndex = "pivot_reviews_user_id_above_20";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, dataFrameIndex);
String query = "\"match\": {\"user_id\": \"user_26\"}";
createPivotReviewsTransform(transformId, dataFrameIndex, query, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
startAndWaitForTransform(transformId, dataFrameIndex, BASIC_AUTH_VALUE_DATA_FRAME_ADMIN_WITH_SOME_DATA_ACCESS);
// Make sure we wrote to the audit
assertTrue(indexExists(DataFrameInternalIndex.AUDIT_INDEX));
Request request = new Request("GET", DataFrameInternalIndex.AUDIT_INDEX + "/_search");
request.setJsonEntity("{\"query\":{\"term\":{\"transform_id\":\"simplePivotForAudit\"}}}");
Map<String, Object> response = entityAsMap(client().performRequest(request));
Map<?, ?> hitRsp = (Map<?, ?>) ((List<?>) ((Map<?, ?>)response.get("hits")).get("hits")).get(0);
Map<String, Object> source = (Map<String, Object>)hitRsp.get("_source");
assertThat(source.get("transform_id"), equalTo(transformId));
assertThat(source.get("level"), equalTo("info"));
assertThat(source.get("message"), is(notNullValue()));
assertThat(source.get("node_name"), is(notNullValue()));
assertThat(source.get("timestamp"), is(notNullValue()));
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.DeleteDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsAction;
import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction; import org.elasticsearch.xpack.core.dataframe.action.GetDataFrameTransformsStatsAction;
@ -48,6 +49,7 @@ import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction; import org.elasticsearch.xpack.dataframe.action.TransportDeleteDataFrameTransformAction;
import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction; import org.elasticsearch.xpack.dataframe.action.TransportGetDataFrameTransformsAction;
@ -83,6 +85,7 @@ import java.util.function.Supplier;
import java.util.function.UnaryOperator; import java.util.function.UnaryOperator;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin { public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlugin {
@ -99,6 +102,7 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
private final Settings settings; private final Settings settings;
private final boolean transportClientMode; private final boolean transportClientMode;
private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>(); private final SetOnce<DataFrameTransformsConfigManager> dataFrameTransformsConfigManager = new SetOnce<>();
private final SetOnce<Auditor<DataFrameAuditMessage>> dataFrameAuditor = new SetOnce<>();
private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>(); private final SetOnce<DataFrameTransformsCheckpointService> dataFrameTransformsCheckpointService = new SetOnce<>();
private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>(); private final SetOnce<SchedulerEngine> schedulerEngine = new SetOnce<>();
@ -180,11 +184,15 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
if (enabled == false || transportClientMode) { if (enabled == false || transportClientMode) {
return emptyList(); return emptyList();
} }
dataFrameAuditor.set(new Auditor<>(client,
clusterService.getNodeName(),
DataFrameInternalIndex.AUDIT_INDEX,
DATA_FRAME_ORIGIN,
DataFrameAuditMessage.builder()));
dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry)); dataFrameTransformsConfigManager.set(new DataFrameTransformsConfigManager(client, xContentRegistry));
dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client)); dataFrameTransformsCheckpointService.set(new DataFrameTransformsCheckpointService(client));
return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameTransformsCheckpointService.get()); return Arrays.asList(dataFrameTransformsConfigManager.get(), dataFrameAuditor.get(), dataFrameTransformsCheckpointService.get());
} }
@Override @Override
@ -195,6 +203,11 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
} catch (IOException e) { } catch (IOException e) {
logger.error("Error creating data frame index template", e); logger.error("Error creating data frame index template", e);
} }
try {
templates.put(DataFrameInternalIndex.AUDIT_INDEX, DataFrameInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
logger.warn("Error creating data frame audit index", e);
}
return templates; return templates;
}; };
} }
@ -210,10 +223,12 @@ public class DataFrame extends Plugin implements ActionPlugin, PersistentTaskPlu
// the transforms config manager should have been created // the transforms config manager should have been created
assert dataFrameTransformsConfigManager.get() != null; assert dataFrameTransformsConfigManager.get() != null;
// the auditor should have been created
assert dataFrameAuditor.get() != null;
assert dataFrameTransformsCheckpointService.get() != null; assert dataFrameTransformsCheckpointService.get() != null;
return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(), return Collections.singletonList(new DataFrameTransformPersistentTasksExecutor(client, dataFrameTransformsConfigManager.get(),
dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), threadPool)); dataFrameTransformsCheckpointService.get(), schedulerEngine.get(), dataFrameAuditor.get(), threadPool));
} }
@Override @Override

View File

@ -13,12 +13,15 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.dataframe.DataFrameField.DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD;
public final class DataFrameInternalIndex { public final class DataFrameInternalIndex {
@ -28,10 +31,18 @@ public final class DataFrameInternalIndex {
public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION; public static final String INDEX_TEMPLATE_NAME = INDEX_TEMPLATE_PATTERN + INDEX_TEMPLATE_VERSION;
public static final String INDEX_NAME = INDEX_TEMPLATE_NAME; public static final String INDEX_NAME = INDEX_TEMPLATE_NAME;
public static final String AUDIT_TEMPLATE_VERSION = "1";
public static final String AUDIT_INDEX_PREFIX = ".data-frame-notifications-";
public static final String AUDIT_INDEX = AUDIT_INDEX_PREFIX + AUDIT_TEMPLATE_VERSION;
// constants for mappings // constants for mappings
public static final String DYNAMIC = "dynamic"; public static final String DYNAMIC = "dynamic";
public static final String PROPERTIES = "properties"; public static final String PROPERTIES = "properties";
public static final String TYPE = "type"; public static final String TYPE = "type";
public static final String DATE = "date";
public static final String TEXT = "text";
public static final String FIELDS = "fields";
public static final String RAW = "raw";
// data types // data types
public static final String DOUBLE = "double"; public static final String DOUBLE = "double";
@ -51,6 +62,52 @@ public final class DataFrameInternalIndex {
return dataFrameTemplate; return dataFrameTemplate;
} }
public static IndexTemplateMetaData getAuditIndexTemplateMetaData() throws IOException {
IndexTemplateMetaData dataFrameTemplate = IndexTemplateMetaData.builder(AUDIT_INDEX)
.patterns(Collections.singletonList(AUDIT_INDEX_PREFIX + "*"))
.version(Version.CURRENT.id)
.settings(Settings.builder()
// the audits are expected to be small
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1"))
.putMapping(MapperService.SINGLE_MAPPING_NAME, Strings.toString(auditMappings()))
.build();
return dataFrameTemplate;
}
private static XContentBuilder auditMappings() throws IOException {
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(DATA_FRAME_TRANSFORM_AUDIT_ID_FIELD)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AbstractAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
.startObject(FIELDS)
.startObject(RAW)
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.startObject(AbstractAuditMessage.TIMESTAMP.getPreferredName())
.field(TYPE, DATE)
.endObject()
.startObject(AbstractAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
}
private static XContentBuilder mappings() throws IOException { private static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder(); XContentBuilder builder = jsonBuilder();
builder.startObject(); builder.startObject();

View File

@ -16,7 +16,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@ -35,15 +37,20 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService; private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
private final SchedulerEngine schedulerEngine; private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final Auditor<DataFrameAuditMessage> auditor;
public DataFrameTransformPersistentTasksExecutor(Client client, DataFrameTransformsConfigManager transformsConfigManager, public DataFrameTransformPersistentTasksExecutor(Client client,
DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService, SchedulerEngine schedulerEngine, DataFrameTransformsConfigManager transformsConfigManager,
ThreadPool threadPool) { DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService,
SchedulerEngine schedulerEngine,
Auditor<DataFrameAuditMessage> auditor,
ThreadPool threadPool) {
super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME); super(DataFrameField.TASK_NAME, DataFrame.TASK_THREAD_POOL_NAME);
this.client = client; this.client = client;
this.transformsConfigManager = transformsConfigManager; this.transformsConfigManager = transformsConfigManager;
this.dataFrameTransformsCheckpointService = dataFrameTransformsCheckpointService; this.dataFrameTransformsCheckpointService = dataFrameTransformsCheckpointService;
this.schedulerEngine = schedulerEngine; this.schedulerEngine = schedulerEngine;
this.auditor = auditor;
this.threadPool = threadPool; this.threadPool = threadPool;
} }
@ -71,7 +78,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId, protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) { PersistentTasksCustomMetaData.PersistentTask<DataFrameTransform> persistentTask, Map<String, String> headers) {
return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(), return new DataFrameTransformTask(id, type, action, parentTaskId, persistentTask.getParams(),
(DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager, dataFrameTransformsCheckpointService, (DataFrameTransformState) persistentTask.getState(), client, transformsConfigManager,
schedulerEngine, threadPool, headers); dataFrameTransformsCheckpointService, schedulerEngine, auditor, threadPool, headers);
} }
} }

View File

@ -22,13 +22,15 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.common.notifications.Auditor;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages; import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
import org.elasticsearch.xpack.core.dataframe.notifications.DataFrameAuditMessage;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction;
import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response; import org.elasticsearch.xpack.core.dataframe.action.StartDataFrameTransformTaskAction.Response;
import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction; import org.elasticsearch.xpack.core.dataframe.action.StopDataFrameTransformAction;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransform;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformState;
import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.indexing.IndexerState;
@ -52,6 +54,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final SchedulerEngine schedulerEngine; private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final DataFrameIndexer indexer; private final DataFrameIndexer indexer;
private final Auditor<DataFrameAuditMessage> auditor;
// the generation of this data frame, for v1 there will be only // the generation of this data frame, for v1 there will be only
// 0: data frame not created or still indexing // 0: data frame not created or still indexing
@ -59,13 +62,15 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final AtomicReference<Long> generation; private final AtomicReference<Long> generation;
public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform, public DataFrameTransformTask(long id, String type, String action, TaskId parentTask, DataFrameTransform transform,
DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager, DataFrameTransformState state, Client client, DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService, SchedulerEngine schedulerEngine, ThreadPool threadPool, DataFrameTransformsCheckpointService transformsCheckpointService,
Map<String, String> headers) { SchedulerEngine schedulerEngine, Auditor<DataFrameAuditMessage> auditor,
ThreadPool threadPool, Map<String, String> headers) {
super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers); super(id, type, action, DataFrameField.PERSISTENT_TASK_DESCRIPTION_PREFIX + transform.getId(), parentTask, headers);
this.transform = transform; this.transform = transform;
this.schedulerEngine = schedulerEngine; this.schedulerEngine = schedulerEngine;
this.threadPool = threadPool; this.threadPool = threadPool;
this.auditor = auditor;
IndexerState initialState = IndexerState.STOPPED; IndexerState initialState = IndexerState.STOPPED;
long initialGeneration = 0; long initialGeneration = 0;
Map<String, Object> initialPosition = null; Map<String, Object> initialPosition = null;
@ -87,7 +92,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
} }
this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService, this.indexer = new ClientDataFrameIndexer(transform.getId(), transformsConfigManager, transformsCheckpointService,
new AtomicReference<>(initialState), initialPosition, client); new AtomicReference<>(initialState), initialPosition, client, auditor);
this.generation = new AtomicReference<Long>(initialGeneration); this.generation = new AtomicReference<Long>(initialGeneration);
} }
@ -142,6 +147,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
updatePersistentTaskState(state, updatePersistentTaskState(state,
ActionListener.wrap( ActionListener.wrap(
(task) -> { (task) -> {
auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to [" logger.debug("Successfully updated state for data frame transform [" + transform.getId() + "] to ["
+ state.getIndexerState() + "][" + state.getPosition() + "]"); + state.getIndexerState() + "][" + state.getPosition() + "]");
listener.onResponse(new StartDataFrameTransformTaskAction.Response(true)); listener.onResponse(new StartDataFrameTransformTaskAction.Response(true));
@ -169,6 +175,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// overwrite some docs and eventually checkpoint. // overwrite some docs and eventually checkpoint.
DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get()); DataFrameTransformState state = new DataFrameTransformState(IndexerState.STOPPED, indexer.getPosition(), generation.get());
updatePersistentTaskState(state, ActionListener.wrap((task) -> { updatePersistentTaskState(state, ActionListener.wrap((task) -> {
auditor.info(transform.getId(), "Updated state to [" + state.getIndexerState() + "]");
logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(), logger.debug("Successfully updated state for data frame transform [{}] to [{}]", transform.getId(),
state.getIndexerState()); state.getIndexerState());
listener.onResponse(new StopDataFrameTransformAction.Response(true)); listener.onResponse(new StopDataFrameTransformAction.Response(true));
@ -231,18 +238,21 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsConfigManager transformsConfigManager; private final DataFrameTransformsConfigManager transformsConfigManager;
private final DataFrameTransformsCheckpointService transformsCheckpointService; private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId; private final String transformId;
private final Auditor<DataFrameAuditMessage> auditor;
private Map<String, String> fieldMappings = null; private Map<String, String> fieldMappings = null;
private DataFrameTransformConfig transformConfig = null; private DataFrameTransformConfig transformConfig = null;
public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager, public ClientDataFrameIndexer(String transformId, DataFrameTransformsConfigManager transformsConfigManager,
DataFrameTransformsCheckpointService transformsCheckpointService, AtomicReference<IndexerState> initialState, DataFrameTransformsCheckpointService transformsCheckpointService,
Map<String, Object> initialPosition, Client client) { AtomicReference<IndexerState> initialState, Map<String, Object> initialPosition, Client client,
Auditor<DataFrameAuditMessage> auditor) {
super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition); super(threadPool.executor(ThreadPool.Names.GENERIC), initialState, initialPosition);
this.transformId = transformId; this.transformId = transformId;
this.transformsConfigManager = transformsConfigManager; this.transformsConfigManager = transformsConfigManager;
this.transformsCheckpointService = transformsCheckpointService; this.transformsCheckpointService = transformsCheckpointService;
this.client = client; this.client = client;
this.auditor = auditor;
} }
@Override @Override
@ -282,6 +292,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// todo: set job into failed state // todo: set job into failed state
if (transformConfig.isValid() == false) { if (transformConfig.isValid() == false) {
auditor.error(transformId, "Cannot execute data frame transform as configuration is invalid");
throw new RuntimeException( throw new RuntimeException(
DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId)); DataFrameMessages.getMessage(DataFrameMessages.DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID, transformId));
} }
@ -346,16 +357,19 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
@Override @Override
protected void onFailure(Exception exc) { protected void onFailure(Exception exc) {
auditor.error(transform.getId(), "Data frame transform failed with an exception: " + exc.getMessage());
logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc); logger.warn("Data frame transform [" + transform.getId() + "] failed with an exception: ", exc);
} }
@Override @Override
protected void onFinish() { protected void onFinish() {
auditor.info(transform.getId(), "Finished indexing for data frame transform");
logger.info("Finished indexing for data frame transform [" + transform.getId() + "]"); logger.info("Finished indexing for data frame transform [" + transform.getId() + "]");
} }
@Override @Override
protected void onAbort() { protected void onAbort() {
auditor.info(transform.getId(), "Received abort request, stopping indexer");
logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer"); logger.info("Data frame transform [" + transform.getId() + "] received abort request, stopping indexer");
shutdown(); shutdown();
} }