[7.x] Implement DataFrameAnalyticsAuditMessage and DataFrameAnalyticsAuditor (#45967) (#46519)

This commit is contained in:
Przemysław Witek 2019-09-11 12:17:26 +02:00 committed by GitHub
parent 35810bd2ae
commit e38e631dac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 193 additions and 17 deletions

View File

@ -27,6 +27,7 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
public static final ParseField LEVEL = new ParseField("level");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField NODE_NAME = new ParseField("node_name");
public static final ParseField JOB_TYPE = new ParseField("job_type");
protected static final <T extends AbstractAuditMessage> ConstructingObjectParser<T, Void> createParser(
String name, AbstractAuditMessageFactory<T> messageFactory, ParseField resourceField) {
@ -99,13 +100,17 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
if (nodeName != null) {
builder.field(NODE_NAME.getPreferredName(), nodeName);
}
String jobType = getJobType();
if (jobType != null) {
builder.field(JOB_TYPE.getPreferredName(), jobType);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(resourceId, message, level, timestamp, nodeName);
return Objects.hash(resourceId, message, level, timestamp, nodeName, getJobType());
}
@Override
@ -122,8 +127,17 @@ public abstract class AbstractAuditMessage implements ToXContentObject {
Objects.equals(message, other.message) &&
Objects.equals(level, other.level) &&
Objects.equals(timestamp, other.timestamp) &&
Objects.equals(nodeName, other.nodeName);
Objects.equals(nodeName, other.nodeName) &&
Objects.equals(getJobType(), other.getJobType());
}
/**
* @return job type string used to tell apart jobs of different types stored in the same index
*/
public abstract String getJobType();
/**
* @return resource id field name used when storing a new message
*/
protected abstract String getResourceField();
}

View File

@ -23,6 +23,11 @@ public class DataFrameAuditMessage extends AbstractAuditMessage {
super(resourceId, message, level, timestamp, nodeName);
}
@Override
public final String getJobType() {
return null;
}
@Override
protected String getResourceField() {
return TRANSFORM_ID.getPreferredName();

View File

@ -1121,12 +1121,13 @@ public class ElasticsearchMappings {
XContentBuilder builder = jsonBuilder().startObject();
builder.startObject(SINGLE_MAPPING_NAME);
addMetaInformation(builder);
builder.field(DYNAMIC, "false");
builder.startObject(PROPERTIES)
.startObject(Job.ID.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.LEVEL.getPreferredName())
.field(TYPE, KEYWORD)
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.MESSAGE.getPreferredName())
.field(TYPE, TEXT)
@ -1142,6 +1143,9 @@ public class ElasticsearchMappings {
.startObject(AnomalyDetectionAuditMessage.NODE_NAME.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.startObject(AnomalyDetectionAuditMessage.JOB_TYPE.getPreferredName())
.field(TYPE, KEYWORD)
.endObject()
.endObject()
.endObject()
.endObject();

View File

@ -23,6 +23,11 @@ public class AnomalyDetectionAuditMessage extends AbstractAuditMessage {
super(resourceId, message, level, timestamp, nodeName);
}
@Override
public final String getJobType() {
return Job.ANOMALY_DETECTOR_JOB_TYPE;
}
@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();

View File

@ -6,8 +6,8 @@
package org.elasticsearch.xpack.core.ml.notifications;
public final class AuditorField {
public static final String NOTIFICATIONS_INDEX = ".ml-notifications";
public static final String NOTIFICATIONS_INDEX = ".ml-notifications-000001";
private AuditorField() {}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.Date;
public class DataFrameAnalyticsAuditMessage extends AbstractAuditMessage {
private static final ParseField JOB_ID = Job.ID;
public static final ConstructingObjectParser<DataFrameAnalyticsAuditMessage, Void> PARSER =
createParser("ml_analytics_audit_message", DataFrameAnalyticsAuditMessage::new, JOB_ID);
public DataFrameAnalyticsAuditMessage(String resourceId, String message, Level level, Date timestamp, String nodeName) {
super(resourceId, message, level, timestamp, nodeName);
}
@Override
public final String getJobType() {
return "data_frame_analytics";
}
@Override
protected String getResourceField() {
return JOB_ID.getPreferredName();
}
}

View File

@ -26,6 +26,11 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
super(resourceId, message, level, timestamp, nodeName);
}
@Override
public String getJobType() {
return "test_type";
}
@Override
protected String getResourceField() {
return TEST_ID.getPreferredName();
@ -42,6 +47,11 @@ public class AbstractAuditMessageTests extends AbstractXContentTestCase<Abstract
assertThat(message.getResourceField(), equalTo(TestAuditMessage.TEST_ID.getPreferredName()));
}
public void testGetJobType() {
TestAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("test_type"));
}
public void testNewInfo() {
TestAuditMessage message = new TestAuditMessage(RESOURCE_ID, MESSAGE, Level.INFO, TIMESTAMP, NODE_NAME);
assertThat(message.getResourceId(), equalTo(RESOURCE_ID));

View File

@ -11,8 +11,15 @@ import org.elasticsearch.xpack.core.common.notifications.Level;
import java.util.Date;
import static org.hamcrest.Matchers.nullValue;
public class DataFrameAuditMessageTests extends AbstractXContentTestCase<DataFrameAuditMessage> {
public void testGetJobType() {
DataFrameAuditMessage message = createTestInstance();
assertThat(message.getJobType(), nullValue());
}
@Override
protected DataFrameAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAuditMessage.PARSER.apply(parser, null);

View File

@ -8,11 +8,19 @@ package org.elasticsearch.xpack.core.ml.notifications;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import java.util.Date;
import static org.hamcrest.Matchers.equalTo;
public class AnomalyDetectionAuditMessageTests extends AbstractXContentTestCase<AnomalyDetectionAuditMessage> {
public void testGetJobType() {
AnomalyDetectionAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo(Job.ANOMALY_DETECTOR_JOB_TYPE));
}
@Override
protected AnomalyDetectionAuditMessage doParseInstance(XContentParser parser) {
return AnomalyDetectionAuditMessage.PARSER.apply(parser, null);

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.notifications;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.common.notifications.Level;
import java.util.Date;
import static org.hamcrest.Matchers.equalTo;
public class DataFrameAnalyticsAuditMessageTests extends AbstractXContentTestCase<DataFrameAnalyticsAuditMessage> {
public void testGetJobType() {
DataFrameAnalyticsAuditMessage message = createTestInstance();
assertThat(message.getJobType(), equalTo("data_frame_analytics"));
}
@Override
protected DataFrameAnalyticsAuditMessage doParseInstance(XContentParser parser) {
return DataFrameAnalyticsAuditMessage.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected DataFrameAnalyticsAuditMessage createTestInstance() {
return new DataFrameAnalyticsAuditMessage(
randomBoolean() ? null : randomAlphaOfLength(10),
randomAlphaOfLengthBetween(1, 20),
randomFrom(Level.values()),
new Date(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20)
);
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;
import org.junit.Before;
@ -184,7 +185,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
assertThat(totalNotificationsCountBeforeDelete, greaterThan(0L));
@ -234,7 +236,8 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete = client().prepareSearch(".ml-notifications").get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
assertThat(totalNotificationsCountAfterDelete, greaterThanOrEqualTo(totalNotificationsCountBeforeDelete));

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.config.RuleScope;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;
import java.io.IOException;
@ -186,7 +187,8 @@ public class DetectionRulesIT extends MlNativeAutodetectIntegTestCase {
// Wait until the notification that the filter was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.junit.After;
import java.io.IOException;
@ -223,7 +224,8 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
// Wait until the notification that the process was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()
@ -298,7 +300,8 @@ public class ScheduledEventsIT extends MlNativeAutodetectIntegTestCase {
// Wait until the notification that the job was updated is indexed
assertBusy(() -> {
SearchResponse searchResponse = client().prepareSearch(".ml-notifications")
SearchResponse searchResponse =
client().prepareSearch(AuditorField.NOTIFICATIONS_INDEX)
.setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(QueryBuilders.boolQuery()

View File

@ -216,6 +216,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.NativeNormalizerProcess
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcessFactory;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
@ -470,6 +471,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
}
AnomalyDetectionAuditor anomalyDetectionAuditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor = new DataFrameAnalyticsAuditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
@ -592,6 +594,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
jobDataCountsPersister,
datafeedManager,
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
new MlAssignmentNotifier(settings, anomalyDetectionAuditor, threadPool, client, clusterService),
memoryTracker,
analyticsProcessManager,
@ -906,8 +909,12 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static boolean allTemplatesInstalled(ClusterState clusterState) {
boolean allPresent = true;
List<String> templateNames = Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobResultsIndexPrefix());
List<String> templateNames =
Arrays.asList(
AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix());
for (String templateName : templateNames) {
allPresent = allPresent && TemplateUtils.checkTemplateExistsAndVersionIsGTECurrentVersion(templateName, clusterState);
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.ml.notifications.DataFrameAnalyticsAuditMessage;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
public class DataFrameAnalyticsAuditor extends AbstractAuditor<DataFrameAnalyticsAuditMessage> {
public DataFrameAnalyticsAuditor(Client client, String nodeName) {
super(client, nodeName, AuditorField.NOTIFICATIONS_INDEX, ML_ORIGIN, DataFrameAnalyticsAuditMessage::new);
}
}

View File

@ -56,7 +56,7 @@ public class AnnotationIndexIT extends MlSingleNodeTestCase {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client(), "node_1");
auditor.info("whatever", "blah");
// Creating a document in the .ml-notifications index should cause .ml-annotations
// Creating a document in the .ml-notifications-000001 index should cause .ml-annotations
// to be created, as it should get created as soon as any other ML index exists
assertBusy(() -> {

View File

@ -87,7 +87,10 @@ public class XPackRestIT extends ESClientYamlSuiteTestCase {
private void waitForTemplates() throws Exception {
if (installTemplates()) {
List<String> templates = new ArrayList<>();
templates.addAll(Arrays.asList(AuditorField.NOTIFICATIONS_INDEX, MlMetaIndex.INDEX_NAME,
templates.addAll(
Arrays.asList(
AuditorField.NOTIFICATIONS_INDEX,
MlMetaIndex.INDEX_NAME,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX,
AnomalyDetectorsIndex.jobResultsIndexPrefix(),
AnomalyDetectorsIndex.configIndexName()));

View File

@ -23,7 +23,8 @@ public final class XPackRestTestConstants {
// ML constants:
public static final String ML_META_INDEX_NAME = ".ml-meta";
public static final String AUDITOR_NOTIFICATIONS_INDEX = ".ml-notifications";
public static final String AUDITOR_NOTIFICATIONS_INDEX_LEGACY = ".ml-notifications";
public static final String AUDITOR_NOTIFICATIONS_INDEX = ".ml-notifications-000001";
public static final String CONFIG_INDEX = ".ml-config";
public static final String RESULTS_INDEX_PREFIX = ".ml-anomalies-";
public static final String STATE_INDEX_PREFIX = ".ml-state";
@ -32,8 +33,14 @@ public final class XPackRestTestConstants {
public static final List<String> ML_PRE_V660_TEMPLATES = Collections.unmodifiableList(Arrays.asList(
AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX));
public static final List<String> ML_POST_V660_TEMPLATES = Collections.unmodifiableList(Arrays.asList(
AUDITOR_NOTIFICATIONS_INDEX, ML_META_INDEX_NAME, STATE_INDEX_PREFIX, RESULTS_INDEX_PREFIX, CONFIG_INDEX));
public static final List<String> ML_POST_V660_TEMPLATES =
Collections.unmodifiableList(Arrays.asList(
AUDITOR_NOTIFICATIONS_INDEX_LEGACY,
AUDITOR_NOTIFICATIONS_INDEX,
ML_META_INDEX_NAME,
STATE_INDEX_PREFIX,
RESULTS_INDEX_PREFIX,
CONFIG_INDEX));
// Data Frame constants:
public static final String DATA_FRAME_INTERNAL_INDEX = ".data-frame-internal-1";