Refactor auditor-related classes (#45893) (#46120)

This commit is contained in:
Przemysław Witek 2019-08-29 14:21:03 +02:00 committed by GitHub
parent d340530a47
commit b8a0379057
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 206 additions and 264 deletions

View File

@ -6,36 +6,59 @@
package org.elasticsearch.xpack.core.common.notifications; package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public abstract class AbstractAuditMessage implements ToXContentObject { public abstract class AbstractAuditMessage implements ToXContentObject {
public static final ParseField TYPE = new ParseField("audit_message");
public static final ParseField MESSAGE = new ParseField("message"); public static final ParseField MESSAGE = new ParseField("message");
public static final ParseField LEVEL = new ParseField("level"); public static final ParseField LEVEL = new ParseField("level");
public static final ParseField TIMESTAMP = new ParseField("timestamp"); public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField NODE_NAME = new ParseField("node_name"); public static final ParseField NODE_NAME = new ParseField("node_name");
protected static final <T extends AbstractAuditMessage> ConstructingObjectParser<T, Void> createParser(
String name, AbstractAuditMessageFactory<T> messageFactory, ParseField resourceField) {
ConstructingObjectParser<T, Void> PARSER = new ConstructingObjectParser<>(
name,
true,
a -> messageFactory.newMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
PARSER.declareString(optionalConstructorArg(), resourceField);
PARSER.declareString(constructorArg(), MESSAGE);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(),
p -> TimeUtils.parseTimeField(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
return PARSER;
}
private final String resourceId; private final String resourceId;
private final String message; private final String message;
private final Level level; private final Level level;
private final Date timestamp; private final Date timestamp;
private final String nodeName; private final String nodeName;
public AbstractAuditMessage(String resourceId, String message, Level level, String nodeName) {
this.resourceId = resourceId;
this.message = Objects.requireNonNull(message);
this.level = Objects.requireNonNull(level);
this.timestamp = new Date();
this.nodeName = nodeName;
}
protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { protected AbstractAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
this.resourceId = resourceId; this.resourceId = resourceId;
this.message = Objects.requireNonNull(message); this.message = Objects.requireNonNull(message);
@ -82,7 +105,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(resourceId, message, level, timestamp); return Objects.hash(resourceId, message, level, timestamp, nodeName);
} }
@Override @Override
@ -98,25 +121,9 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
return Objects.equals(resourceId, other.resourceId) && return Objects.equals(resourceId, other.resourceId) &&
Objects.equals(message, other.message) && Objects.equals(message, other.message) &&
Objects.equals(level, other.level) && Objects.equals(level, other.level) &&
Objects.equals(timestamp, other.timestamp); Objects.equals(timestamp, other.timestamp) &&
Objects.equals(nodeName, other.nodeName);
} }
protected abstract String getResourceField(); protected abstract String getResourceField();
public abstract static class AbstractBuilder<T extends AbstractAuditMessage> {
public T info(String resourceId, String message, String nodeName) {
return newMessage(Level.INFO, resourceId, message, nodeName);
}
public T warning(String resourceId, String message, String nodeName) {
return newMessage(Level.WARNING, resourceId, message, nodeName);
}
public T error(String resourceId, String message, String nodeName) {
return newMessage(Level.ERROR, resourceId, message, nodeName);
}
protected abstract T newMessage(Level level, String resourceId, String message, String nodeName);
}
} }

View File

@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.common.notifications;
import java.util.Date;
/**
* {@link AbstractAuditMessageFactory} interface provides means for creating audit messages.
* @param <T> type of the audit message
*/
public interface AbstractAuditMessageFactory<T extends AbstractAuditMessage> {
T newMessage(String resourceId, String message, Level level, Date timestamp, String nodeName);
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.Date;
import java.util.Objects; import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -28,37 +29,37 @@ public abstract class AbstractAuditor<T extends AbstractAuditMessage> {
private final String nodeName; private final String nodeName;
private final String auditIndex; private final String auditIndex;
private final String executionOrigin; private final String executionOrigin;
private final AbstractAuditMessage.AbstractBuilder<T> messageBuilder; private final AbstractAuditMessageFactory<T> messageFactory;
public AbstractAuditor(Client client, protected AbstractAuditor(Client client,
String nodeName, String nodeName,
String auditIndex, String auditIndex,
String executionOrigin, String executionOrigin,
AbstractAuditMessage.AbstractBuilder<T> messageBuilder) { AbstractAuditMessageFactory<T> messageFactory) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.nodeName = Objects.requireNonNull(nodeName); this.nodeName = Objects.requireNonNull(nodeName);
this.auditIndex = auditIndex; this.auditIndex = auditIndex;
this.executionOrigin = executionOrigin; this.executionOrigin = executionOrigin;
this.messageBuilder = Objects.requireNonNull(messageBuilder); this.messageFactory = Objects.requireNonNull(messageFactory);
} }
public void info(String resourceId, String message) { public void info(String resourceId, String message) {
indexDoc(messageBuilder.info(resourceId, message, nodeName)); indexDoc(messageFactory.newMessage(resourceId, message, Level.INFO, new Date(), nodeName));
} }
public void warning(String resourceId, String message) { public void warning(String resourceId, String message) {
indexDoc(messageBuilder.warning(resourceId, message, nodeName)); indexDoc(messageFactory.newMessage(resourceId, message, Level.WARNING, new Date(), nodeName));
} }
public void error(String resourceId, String message) { public void error(String resourceId, String message) {
indexDoc(messageBuilder.error(resourceId, message, nodeName)); indexDoc(messageFactory.newMessage(resourceId, message, Level.ERROR, new Date(), nodeName));
} }
protected void onIndexResponse(IndexResponse response) { private void onIndexResponse(IndexResponse response) {
logger.trace("Successfully wrote audit message"); logger.trace("Successfully wrote audit message");
} }
protected void onIndexFailure(Exception exception) { private void onIndexFailure(Exception exception) {
logger.debug("Failed to write audit message", exception); logger.debug("Failed to write audit message", exception);
} }

View File

@ -7,47 +7,19 @@ package org.elasticsearch.xpack.core.dataframe.notifications;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DataFrameAuditMessage extends AbstractAuditMessage { public class DataFrameAuditMessage extends AbstractAuditMessage {
private static final ParseField TRANSFORM_ID = new ParseField(DataFrameField.TRANSFORM_ID); private static final ParseField TRANSFORM_ID = new ParseField(DataFrameField.TRANSFORM_ID);
public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER = new ConstructingObjectParser<>( public static final ConstructingObjectParser<DataFrameAuditMessage, Void> PARSER =
"data_frame_audit_message", createParser("data_frame_audit_message", DataFrameAuditMessage::new, TRANSFORM_ID);
true,
a -> new DataFrameAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
static { public DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
PARSER.declareString(optionalConstructorArg(), TRANSFORM_ID);
PARSER.declareString(constructorArg(), MESSAGE);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(),
p -> TimeUtils.parseTimeField(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}
public DataFrameAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}
protected DataFrameAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName); super(resourceId, message, level, timestamp, nodeName);
} }
@ -55,13 +27,4 @@ public class DataFrameAuditMessage extends AbstractAuditMessage {
protected String getResourceField() { protected String getResourceField() {
return TRANSFORM_ID.getPreferredName(); return TRANSFORM_ID.getPreferredName();
} }
public static AbstractAuditMessage.AbstractBuilder<DataFrameAuditMessage> builder() {
return new AbstractBuilder<DataFrameAuditMessage>() {
@Override
protected DataFrameAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new DataFrameAuditMessage(resourceId, message, level, nodeName);
}
};
}
} }

View File

@ -5,61 +5,26 @@
*/ */
package org.elasticsearch.xpack.core.ml.notifications; package org.elasticsearch.xpack.core.ml.notifications;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class AnomalyDetectionAuditMessage extends AbstractAuditMessage { public class AnomalyDetectionAuditMessage extends AbstractAuditMessage {
public static final ConstructingObjectParser<AnomalyDetectionAuditMessage, Void> PARSER = new ConstructingObjectParser<>( private static final ParseField JOB_ID = Job.ID;
"ml_audit_message", public static final ConstructingObjectParser<AnomalyDetectionAuditMessage, Void> PARSER =
true, createParser("ml_audit_message", AnomalyDetectionAuditMessage::new, JOB_ID);
a -> new AnomalyDetectionAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
static { public AnomalyDetectionAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
PARSER.declareString(optionalConstructorArg(), Job.ID);
PARSER.declareString(constructorArg(), MESSAGE);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(),
p -> TimeUtils.parseTimeField(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}
public AnomalyDetectionAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}
protected AnomalyDetectionAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName); super(resourceId, message, level, timestamp, nodeName);
} }
@Override @Override
protected String getResourceField() { protected String getResourceField() {
return Job.ID.getPreferredName(); return JOB_ID.getPreferredName();
}
public static AbstractBuilder<AnomalyDetectionAuditMessage> builder() {
return new AbstractBuilder<AnomalyDetectionAuditMessage>() {
@Override
protected AnomalyDetectionAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new AnomalyDetectionAuditMessage(resourceId, message, level, nodeName);
}
};
} }
} }

View File

@ -7,51 +7,20 @@ package org.elasticsearch.xpack.core.common.notifications;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.junit.Before;
import java.util.Date; import java.util.Date;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.hamcrest.Matchers.equalTo;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class AbstractAuditMessageTests extends AbstractXContentTestCase<AbstractAuditMessageTests.TestAuditMessage> { public class AbstractAuditMessageTests extends AbstractXContentTestCase<AbstractAuditMessageTests.TestAuditMessage> {
private long startMillis;
static class TestAuditMessage extends AbstractAuditMessage { static class TestAuditMessage extends AbstractAuditMessage {
private static final ParseField ID = new ParseField("test_id");
public static final ConstructingObjectParser<TestAuditMessage, Void> PARSER = new ConstructingObjectParser<>(
AbstractAuditMessage.TYPE.getPreferredName(),
true,
a -> new TestAuditMessage((String)a[0], (String)a[1], (Level)a[2], (Date)a[3], (String)a[4]));
static { private static final ParseField TEST_ID = new ParseField("test_id");
PARSER.declareString(optionalConstructorArg(), ID); public static final ConstructingObjectParser<TestAuditMessage, Void> PARSER =
PARSER.declareString(constructorArg(), MESSAGE); createParser("test_audit_message", TestAuditMessage::new, TEST_ID);
PARSER.declareField(constructorArg(), p -> {
if (p.currentToken() == XContentParser.Token.VALUE_STRING) {
return Level.fromString(p.text());
}
throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]");
}, LEVEL, ObjectParser.ValueType.STRING);
PARSER.declareField(constructorArg(), parser -> {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(parser.longValue());
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
return new Date(TimeUtils.dateStringToEpoch(parser.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + parser.currentToken() + "] for [" + TIMESTAMP.getPreferredName() + "]");
}, TIMESTAMP, ObjectParser.ValueType.VALUE);
PARSER.declareString(optionalConstructorArg(), NODE_NAME);
}
TestAuditMessage(String resourceId, String message, Level level, String nodeName) {
super(resourceId, message, level, nodeName);
}
TestAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) { TestAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName); super(resourceId, message, level, timestamp, nodeName);
@ -59,53 +28,45 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
@Override @Override
protected String getResourceField() { protected String getResourceField() {
return "test_id"; return TEST_ID.getPreferredName();
}
static AbstractAuditMessage.AbstractBuilder<TestAuditMessage> newBuilder() {
return new AbstractBuilder<TestAuditMessage>() {
@Override
protected TestAuditMessage newMessage(Level level, String resourceId, String message, String nodeName) {
return new TestAuditMessage(resourceId, message, level, nodeName);
}
};
} }
} }
@Before private static final String RESOURCE_ID = "foo";
public void setStartTime() { private static final String MESSAGE = "some message";
startMillis = System.currentTimeMillis(); private static final Date TIMESTAMP = new Date(123456789);
private static final String NODE_NAME = "some_node";
public void testGetResourceField() {
TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
} }
public void testNewInfo() { public void testNewInfo() {
TestAuditMessage info = TestAuditMessage.newBuilder().info("foo", "some info", "some_node"); TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
assertEquals("foo", info.getResourceId()); assertThat(message.getResourceId(), equalTo(RESOURCE_ID));
assertEquals("some info", info.getMessage()); assertThat(message.getMessage(), equalTo(MESSAGE));
assertEquals(Level.INFO, info.getLevel()); assertThat(message.getLevel(), equalTo(Level.INFO));
assertDateBetweenStartAndNow(info.getTimestamp()); assertThat(message.getTimestamp(), equalTo(TIMESTAMP));
assertThat(message.getNodeName(), equalTo(NODE_NAME));
} }
public void testNewWarning() { public void testNewWarning() {
TestAuditMessage warning = TestAuditMessage.newBuilder().warning("bar", "some warning", "some_node"); TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.WARNING, TIMESTAMP, NODE_NAME);
assertEquals("bar", warning.getResourceId()); assertThat(message.getResourceId(), equalTo(RESOURCE_ID));
assertEquals("some warning", warning.getMessage()); assertThat(message.getMessage(), equalTo(MESSAGE));
assertEquals(Level.WARNING, warning.getLevel()); assertThat(message.getLevel(), equalTo(Level.WARNING));
assertDateBetweenStartAndNow(warning.getTimestamp()); assertThat(message.getTimestamp(), equalTo(TIMESTAMP));
assertThat(message.getNodeName(), equalTo(NODE_NAME));
} }
public void testNewError() { public void testNewError() {
TestAuditMessage error = TestAuditMessage.newBuilder().error("foo", "some error", "some_node"); TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.ERROR, TIMESTAMP, NODE_NAME);
assertEquals("foo", error.getResourceId()); assertThat(message.getResourceId(), equalTo(RESOURCE_ID));
assertEquals("some error", error.getMessage()); assertThat(message.getMessage(), equalTo(MESSAGE));
assertEquals(Level.ERROR, error.getLevel()); assertThat(message.getLevel(), equalTo(Level.ERROR));
assertDateBetweenStartAndNow(error.getTimestamp()); assertThat(message.getTimestamp(), equalTo(TIMESTAMP));
} assertThat(message.getNodeName(), equalTo(NODE_NAME));
private void assertDateBetweenStartAndNow(Date timestamp) {
long timestampMillis = timestamp.getTime();
assertTrue(timestampMillis >= startMillis);
assertTrue(timestampMillis <= System.currentTimeMillis());
} }
@Override @Override
@ -120,7 +81,12 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
@Override @Override
protected TestAuditMessage createTestInstance() { protected TestAuditMessage createTestInstance() {
return new TestAuditMessage(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 200), return new TestAuditMessage(
randomFrom(Level.values()), randomAlphaOfLengthBetween(1, 20)); randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
} }
} }

View File

@ -23,6 +23,11 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException; import java.io.IOException;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -36,6 +41,7 @@ public class AbstractAuditorTests extends ESTestCase {
private Client client; private Client client;
private ArgumentCaptor<IndexRequest> indexRequestCaptor; private ArgumentCaptor<IndexRequest> indexRequestCaptor;
private long startMillis;
@Before @Before
public void setUpMocks() { public void setUpMocks() {
@ -45,6 +51,8 @@ public class AbstractAuditorTests extends ESTestCase {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class); indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
startMillis = System.currentTimeMillis();
} }
public void testInfo() throws IOException { public void testInfo() throws IOException {
@ -53,12 +61,15 @@ public class AbstractAuditorTests extends ESTestCase {
verify(client).index(indexRequestCaptor.capture(), any()); verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue(); IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foo", auditMessage.getResourceId()); assertThat(auditMessage.getResourceId(), equalTo("foo"));
assertEquals("Here is my info", auditMessage.getMessage()); assertThat(auditMessage.getMessage(), equalTo("Here is my info"));
assertEquals(Level.INFO, auditMessage.getLevel()); assertThat(auditMessage.getLevel(), equalTo(Level.INFO));
assertThat(auditMessage.getTimestamp().getTime(),
allOf(greaterThanOrEqualTo(startMillis), lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
} }
public void testWarning() throws IOException { public void testWarning() throws IOException {
@ -67,12 +78,15 @@ public class AbstractAuditorTests extends ESTestCase {
verify(client).index(indexRequestCaptor.capture(), any()); verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue(); IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("bar", auditMessage.getResourceId()); assertThat(auditMessage.getResourceId(), equalTo("bar"));
assertEquals("Here is my warning", auditMessage.getMessage()); assertThat(auditMessage.getMessage(), equalTo("Here is my warning"));
assertEquals(Level.WARNING, auditMessage.getLevel()); assertThat(auditMessage.getLevel(), equalTo(Level.WARNING));
assertThat(auditMessage.getTimestamp().getTime(),
allOf(greaterThanOrEqualTo(startMillis), lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
} }
public void testError() throws IOException { public void testError() throws IOException {
@ -81,23 +95,27 @@ public class AbstractAuditorTests extends ESTestCase {
verify(client).index(indexRequestCaptor.capture(), any()); verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue(); IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {TEST_INDEX}, indexRequest.indices()); assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout()); assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source()); AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foobar", auditMessage.getResourceId()); assertThat(auditMessage.getResourceId(), equalTo("foobar"));
assertEquals("Here is my error", auditMessage.getMessage()); assertThat(auditMessage.getMessage(), equalTo("Here is my error"));
assertEquals(Level.ERROR, auditMessage.getLevel()); assertThat(auditMessage.getLevel(), equalTo(Level.ERROR));
assertThat(auditMessage.getTimestamp().getTime(),
allOf(greaterThanOrEqualTo(startMillis), lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
} }
private AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException { private static AbstractAuditMessageTests.TestAuditMessage parseAuditMessage(BytesReference msg) throws IOException {
XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg)) XContentParser parser = XContentFactory.xContent(XContentHelper.xContentType(msg))
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput()); .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, msg.streamInput());
return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null); return AbstractAuditMessageTests.TestAuditMessage.PARSER.apply(parser, null);
} }
static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> { private static class TestAuditor extends AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> {
TestAuditor(Client client) { TestAuditor(Client client) {
super(client, TEST_NODE_NAME, TEST_INDEX, TEST_ORIGIN, AbstractAuditMessageTests.TestAuditMessage.newBuilder()); super(client, TEST_NODE_NAME, TEST_INDEX, TEST_ORIGIN, AbstractAuditMessageTests.TestAuditMessage::new);
} }
} }
} }

View File

@ -8,48 +8,10 @@ package org.elasticsearch.xpack.core.dataframe.notifications;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase; import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level; import org.elasticsearch.xpack.core.common.notifications.Level;
import org.junit.Before;
import java.util.Date; import java.util.Date;
public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> { public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {
private long startMillis;
@Before
public void setStartTime() {
startMillis = System.currentTimeMillis();
}
public void testNewInfo() {
DataFrameAuditMessage info = DataFrameAuditMessage.builder().info("foo", "some info", "some_node");
assertEquals("foo", info.getResourceId());
assertEquals("some info", info.getMessage());
assertEquals(Level.INFO, info.getLevel());
assertDateBetweenStartAndNow(info.getTimestamp());
}
public void testNewWarning() {
DataFrameAuditMessage warning = DataFrameAuditMessage.builder().warning("bar", "some warning", "some_node");
assertEquals("bar", warning.getResourceId());
assertEquals("some warning", warning.getMessage());
assertEquals(Level.WARNING, warning.getLevel());
assertDateBetweenStartAndNow(warning.getTimestamp());
}
public void testNewError() {
DataFrameAuditMessage error = DataFrameAuditMessage.builder().error("foo", "some error", "some_node");
assertEquals("foo", error.getResourceId());
assertEquals("some error", error.getMessage());
assertEquals(Level.ERROR, error.getLevel());
assertDateBetweenStartAndNow(error.getTimestamp());
}
private void assertDateBetweenStartAndNow(Date timestamp) {
long timestampMillis = timestamp.getTime();
assertTrue(timestampMillis >= startMillis);
assertTrue(timestampMillis <= System.currentTimeMillis());
}
@Override @Override
protected DataFrameAuditMessage doParseInstance(XContentParser parser) { protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
@ -67,6 +29,7 @@ public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFra
randomBoolean() ? null : randomAlphaOfLength(10), randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()), randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20) randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
); );
} }

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import java.util.Date;
public class AnomalyDetectionAuditMessageTests extends AbstractXContentTestCase<AnomalyDetectionAuditMessage> {
@Override
protected AnomalyDetectionAuditMessage doParseInstance(XContentParser parser) {
return AnomalyDetectionAuditMessage.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected AnomalyDetectionAuditMessage createTestInstance() {
return new AnomalyDetectionAuditMessage(
randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
}
}

View File

@ -16,7 +16,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
* DataFrameAuditor class that abstracts away generic templating for easier injection * DataFrameAuditor class that abstracts away generic templating for easier injection
*/ */
public class DataFrameAuditor extends AbstractAuditor<DataFrameAuditMessage> { public class DataFrameAuditor extends AbstractAuditor<DataFrameAuditMessage> {
public DataFrameAuditor(Client client, String nodeName) { public DataFrameAuditor(Client client, String nodeName) {
super(client, nodeName, DataFrameInternalIndex.AUDIT_INDEX, DATA_FRAME_ORIGIN, DataFrameAuditMessage.builder()); super(client, nodeName, DataFrameInternalIndex.AUDIT_INDEX, DATA_FRAME_ORIGIN, DataFrameAuditMessage::new);
} }
} }

View File

@ -469,7 +469,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
return Collections.singletonList(new JobManagerHolder()); return Collections.singletonList(new JobManagerHolder());
} }
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings); JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client); JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client); JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
@ -481,7 +481,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
jobResultsProvider, jobResultsProvider,
jobResultsPersister, jobResultsPersister,
clusterService, clusterService,
auditor, anomalyDetectionAuditor,
threadPool, threadPool,
client, client,
notifier, notifier,
@ -534,21 +534,21 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
xContentRegistry, auditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, xContentRegistry, anomalyDetectionAuditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister,
autodetectProcessFactory, normalizerFactory, nativeStorageProvider); jobDataCountsPersister, autodetectProcessFactory, normalizerFactory, nativeStorageProvider);
this.autodetectProcessManager.set(autodetectProcessManager); this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = DatafeedJobBuilder datafeedJobBuilder =
new DatafeedJobBuilder( new DatafeedJobBuilder(
client, client,
xContentRegistry, xContentRegistry,
auditor, anomalyDetectionAuditor,
System::currentTimeMillis, System::currentTimeMillis,
jobConfigProvider, jobConfigProvider,
jobResultsProvider, jobResultsProvider,
datafeedConfigProvider, datafeedConfigProvider,
jobResultsPersister); jobResultsPersister);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager); System::currentTimeMillis, anomalyDetectionAuditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager); this.datafeedManager.set(datafeedManager);
// Data frame analytics components // Data frame analytics components
@ -589,8 +589,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new MlInitializationService(settings, threadPool, clusterService, client), new MlInitializationService(settings, threadPool, clusterService, client),
jobDataCountsPersister, jobDataCountsPersister,
datafeedManager, datafeedManager,
auditor, anomalyDetectionAuditor,
new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService), new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
memoryTracker, memoryTracker,
analyticsProcessManager, analyticsProcessManager,
memoryEstimationProcessManager, memoryEstimationProcessManager,
@ -805,7 +805,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
} }
try (XContentBuilder auditMapping = ElasticsearchMappings.auditMessageMapping()) { try (XContentBuilder auditMapping = ElasticsearchMappings.auditMessageMapping()) {
IndexTemplateMetaData notificationMessageTemplate = IndexTemplateMetaData.builder(AuditorField.NOTIFICATIONS_INDEX) IndexTemplateMetaData notificationMessageTemplate =
IndexTemplateMetaData.builder(AuditorField.NOTIFICATIONS_INDEX)
.putMapping(SINGLE_MAPPING_NAME, Strings.toString(auditMapping)) .putMapping(SINGLE_MAPPING_NAME, Strings.toString(auditMapping))
.patterns(Collections.singletonList(AuditorField.NOTIFICATIONS_INDEX)) .patterns(Collections.singletonList(AuditorField.NOTIFICATIONS_INDEX))
.version(Version.CURRENT.id) .version(Version.CURRENT.id)
@ -822,7 +823,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
} }
try (XContentBuilder docMapping = MlMetaIndex.docMapping()) { try (XContentBuilder docMapping = MlMetaIndex.docMapping()) {
IndexTemplateMetaData metaTemplate = IndexTemplateMetaData.builder(MlMetaIndex.INDEX_NAME) IndexTemplateMetaData metaTemplate =
IndexTemplateMetaData.builder(MlMetaIndex.INDEX_NAME)
.patterns(Collections.singletonList(MlMetaIndex.INDEX_NAME)) .patterns(Collections.singletonList(MlMetaIndex.INDEX_NAME))
.settings(Settings.builder() .settings(Settings.builder()
// Our indexes are small and one shard puts the // Our indexes are small and one shard puts the
@ -839,7 +841,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
} }
try (XContentBuilder configMapping = ElasticsearchMappings.configMapping()) { try (XContentBuilder configMapping = ElasticsearchMappings.configMapping()) {
IndexTemplateMetaData configTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.configIndexName()) IndexTemplateMetaData configTemplate =
IndexTemplateMetaData.builder(AnomalyDetectorsIndex.configIndexName())
.patterns(Collections.singletonList(AnomalyDetectorsIndex.configIndexName())) .patterns(Collections.singletonList(AnomalyDetectorsIndex.configIndexName()))
.settings(Settings.builder() .settings(Settings.builder()
// Our indexes are small and one shard puts the // Our indexes are small and one shard puts the
@ -858,7 +861,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
} }
try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) { try (XContentBuilder stateMapping = ElasticsearchMappings.stateMapping()) {
IndexTemplateMetaData stateTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX) IndexTemplateMetaData stateTemplate =
IndexTemplateMetaData.builder(AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX)
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern())) .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobStateIndexPattern()))
// TODO review these settings // TODO review these settings
.settings(Settings.builder() .settings(Settings.builder()
@ -874,7 +878,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
} }
try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping(SINGLE_MAPPING_NAME)) { try (XContentBuilder docMapping = ElasticsearchMappings.resultsMapping(SINGLE_MAPPING_NAME)) {
IndexTemplateMetaData jobResultsTemplate = IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix()) IndexTemplateMetaData jobResultsTemplate =
IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobResultsIndexPrefix())
.patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*")) .patterns(Collections.singletonList(AnomalyDetectorsIndex.jobResultsIndexPrefix() + "*"))
.settings(Settings.builder() .settings(Settings.builder()
.put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1") .put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-1")

View File

@ -15,6 +15,6 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class AnomalyDetectionAuditor extends AbstractAuditor<AnomalyDetectionAuditMessage> { public class AnomalyDetectionAuditor extends AbstractAuditor<AnomalyDetectionAuditMessage> {
public AnomalyDetectionAuditor(Client client, String nodeName) { public AnomalyDetectionAuditor(Client client, String nodeName) {
super(client, nodeName, AuditorField.NOTIFICATIONS_INDEX, ML_ORIGIN, AnomalyDetectionAuditMessage.builder()); super(client, nodeName, AuditorField.NOTIFICATIONS_INDEX, ML_ORIGIN, AnomalyDetectionAuditMessage::new);
} }
} }