[7.x] [ML] Add DatafeedTimingStats to datafeed GetDatafeedStatsAction.Response (#43045) (#44118)

This commit is contained in:
Przemysław Witek 2019-07-10 11:51:44 +02:00 committed by GitHub
parent 853ddb5a07
commit 44781e415e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
53 changed files with 1728 additions and 237 deletions

View File

@ -41,9 +41,12 @@ public class DatafeedStats implements ToXContentObject {
private final NodeAttributes node;
@Nullable
private final String assignmentExplanation;
@Nullable
private final DatafeedTimingStats timingStats;
public static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
public static final ParseField NODE = new ParseField("node");
public static final ParseField TIMING_STATS = new ParseField("timing_stats");
public static final ConstructingObjectParser<DatafeedStats, Void> PARSER = new ConstructingObjectParser<>("datafeed_stats",
true,
@ -52,7 +55,8 @@ public class DatafeedStats implements ToXContentObject {
DatafeedState datafeedState = DatafeedState.fromString((String)a[1]);
NodeAttributes nodeAttributes = (NodeAttributes)a[2];
String assignmentExplanation = (String)a[3];
return new DatafeedStats(datafeedId, datafeedState, nodeAttributes, assignmentExplanation);
DatafeedTimingStats timingStats = (DatafeedTimingStats)a[4];
return new DatafeedStats(datafeedId, datafeedState, nodeAttributes, assignmentExplanation, timingStats);
} );
static {
@ -60,14 +64,16 @@ public class DatafeedStats implements ToXContentObject {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedState.STATE);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), DatafeedTimingStats.PARSER, TIMING_STATS);
}
public DatafeedStats(String datafeedId, DatafeedState datafeedState, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation) {
@Nullable String assignmentExplanation, @Nullable DatafeedTimingStats timingStats) {
this.datafeedId = Objects.requireNonNull(datafeedId);
this.datafeedState = Objects.requireNonNull(datafeedState);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
this.timingStats = timingStats;
}
public String getDatafeedId() {
@ -86,6 +92,10 @@ public class DatafeedStats implements ToXContentObject {
return assignmentExplanation;
}
public DatafeedTimingStats getDatafeedTimingStats() {
return timingStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
@ -110,13 +120,16 @@ public class DatafeedStats implements ToXContentObject {
if (assignmentExplanation != null) {
builder.field(ASSIGNMENT_EXPLANATION.getPreferredName(), assignmentExplanation);
}
if (timingStats != null) {
builder.field(TIMING_STATS.getPreferredName(), timingStats);
}
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, datafeedState.toString(), node, assignmentExplanation);
return Objects.hash(datafeedId, datafeedState.toString(), node, assignmentExplanation, timingStats);
}
@Override
@ -131,6 +144,7 @@ public class DatafeedStats implements ToXContentObject {
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(this.datafeedState, other.datafeedState) &&
Objects.equals(this.node, other.node) &&
Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
Objects.equals(this.assignmentExplanation, other.assignmentExplanation) &&
Objects.equals(this.timingStats, other.timingStats);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DatafeedTimingStats implements ToXContentObject {
public static final ParseField JOB_ID = new ParseField("job_id");
public static final ParseField SEARCH_COUNT = new ParseField("search_count");
public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
public static final ConstructingObjectParser<DatafeedTimingStats, Void> PARSER = createParser();
private static ConstructingObjectParser<DatafeedTimingStats, Void> createParser() {
ConstructingObjectParser<DatafeedTimingStats, Void> parser =
new ConstructingObjectParser<>(
"datafeed_timing_stats",
true,
args -> {
String jobId = (String) args[0];
Long searchCount = (Long) args[1];
Double totalSearchTimeMs = (Double) args[2];
return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
});
parser.declareString(constructorArg(), JOB_ID);
parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
return parser;
}
private final String jobId;
private long searchCount;
private double totalSearchTimeMs;
public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
this.jobId = Objects.requireNonNull(jobId);
this.searchCount = searchCount;
this.totalSearchTimeMs = totalSearchTimeMs;
}
public String getJobId() {
return jobId;
}
public long getSearchCount() {
return searchCount;
}
public double getTotalSearchTimeMs() {
return totalSearchTimeMs;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID.getPreferredName(), jobId);
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DatafeedTimingStats other = (DatafeedTimingStats) obj;
return Objects.equals(this.jobId, other.jobId)
&& this.searchCount == other.searchCount
&& this.totalSearchTimeMs == other.totalSearchTimeMs;
}
@Override
public int hashCode() {
return Objects.hash(jobId, searchCount, totalSearchTimeMs);
}
@Override
public String toString() {
return Strings.toString(this);
}
private static <T> T getOrDefault(@Nullable T value, T defaultValue) {
return value != null ? value : defaultValue;
}
}

View File

@ -50,7 +50,8 @@ public class DatafeedStatsTests extends AbstractXContentTestCase<DatafeedStats>
attributes);
}
String assignmentReason = randomBoolean() ? randomAlphaOfLength(10) : null;
return new DatafeedStats(datafeedId, datafeedState, nodeAttributes, assignmentReason);
DatafeedTimingStats timingStats = DatafeedTimingStatsTests.createRandomInstance();
return new DatafeedStats(datafeedId, datafeedState, nodeAttributes, assignmentReason, timingStats);
}
@Override

View File

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.datafeed;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class DatafeedTimingStatsTests extends AbstractXContentTestCase<DatafeedTimingStats> {
private static final String JOB_ID = "my-job-id";
public static DatafeedTimingStats createRandomInstance() {
return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
}
@Override
protected DatafeedTimingStats createTestInstance() {
return createRandomInstance();
}
@Override
protected DatafeedTimingStats doParseInstance(XContentParser parser) throws IOException {
return DatafeedTimingStats.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
public void testParse_OptionalFieldsAbsent() throws IOException {
String json = "{\"job_id\": \"my-job-id\"}";
try (XContentParser parser =
XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json)) {
DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(0L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
}
}
public void testEquals() {
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}
public void testHashCode() {
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
assertNotEquals(stats2.hashCode(), stats3.hashCode());
}
public void testConstructorAndGetters() {
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(5L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
}
}

View File

@ -143,3 +143,10 @@ update their values:
`started`::: The {dfeed} is actively receiving data.
`stopped`::: The {dfeed} is stopped and will not receive data until it is
re-started.
`timing_stats`::
(object) An object that provides statistical information about timing aspect of this datafeed. +
`job_id`::: A numerical character string that uniquely identifies the job.
`search_count`::: Number of searches performed by this datafeed.
`total_search_time_ms`::: Total time the datafeed spent searching in milliseconds.

View File

@ -90,7 +90,12 @@ The API returns the following results:
"ml.max_open_jobs": "20"
}
},
"assignment_explanation": ""
"assignment_explanation": "",
"timing_stats": {
"job_id": "job-total-requests",
"search_count": 20,
"total_search_time_ms": 120.5
}
}
]
}

View File

@ -23,12 +23,15 @@ import org.elasticsearch.xpack.core.action.AbstractGetResourcesResponse;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.Version.V_7_4_0;
public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDatafeedsStatsAction.Response> {
public static final GetDatafeedsStatsAction INSTANCE = new GetDatafeedsStatsAction();
@ -133,13 +136,16 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
private DiscoveryNode node;
@Nullable
private String assignmentExplanation;
@Nullable
private DatafeedTimingStats timingStats;
public DatafeedStats(String datafeedId, DatafeedState datafeedState, @Nullable DiscoveryNode node,
@Nullable String assignmentExplanation) {
@Nullable String assignmentExplanation, @Nullable DatafeedTimingStats timingStats) {
this.datafeedId = Objects.requireNonNull(datafeedId);
this.datafeedState = Objects.requireNonNull(datafeedState);
this.node = node;
this.assignmentExplanation = assignmentExplanation;
this.timingStats = timingStats;
}
DatafeedStats(StreamInput in) throws IOException {
@ -147,6 +153,11 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
datafeedState = DatafeedState.fromStream(in);
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
if (in.getVersion().onOrAfter(V_7_4_0)) {
timingStats = in.readOptionalWriteable(DatafeedTimingStats::new);
} else {
timingStats = null;
}
}
public String getDatafeedId() {
@ -165,6 +176,10 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
return assignmentExplanation;
}
public DatafeedTimingStats getTimingStats() {
return timingStats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -189,6 +204,9 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
if (assignmentExplanation != null) {
builder.field("assignment_explanation", assignmentExplanation);
}
if (timingStats != null) {
builder.field("timing_stats", timingStats);
}
builder.endObject();
return builder;
}
@ -199,11 +217,14 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
datafeedState.writeTo(out);
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
if (out.getVersion().onOrAfter(V_7_4_0)) {
out.writeOptionalWriteable(timingStats);
}
}
@Override
public int hashCode() {
return Objects.hash(datafeedId, datafeedState, node, assignmentExplanation);
return Objects.hash(datafeedId, datafeedState, node, assignmentExplanation, timingStats);
}
@Override
@ -215,10 +236,11 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
return false;
}
DatafeedStats other = (DatafeedStats) obj;
return Objects.equals(datafeedId, other.datafeedId) &&
return Objects.equals(this.datafeedId, other.datafeedId) &&
Objects.equals(this.datafeedState, other.datafeedState) &&
Objects.equals(this.node, other.node) &&
Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
Objects.equals(this.assignmentExplanation, other.assignmentExplanation) &&
Objects.equals(this.timingStats, other.timingStats);
}
}
@ -237,5 +259,4 @@ public class GetDatafeedsStatsAction extends StreamableResponseActionType<GetDat
return DatafeedStats::new;
}
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.datafeed;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class DatafeedTimingStats implements ToXContentObject, Writeable {
public static final ParseField JOB_ID = new ParseField("job_id");
public static final ParseField SEARCH_COUNT = new ParseField("search_count");
public static final ParseField TOTAL_SEARCH_TIME_MS = new ParseField("total_search_time_ms");
public static final ParseField TYPE = new ParseField("datafeed_timing_stats");
public static final ConstructingObjectParser<DatafeedTimingStats, Void> PARSER = createParser();
private static ConstructingObjectParser<DatafeedTimingStats, Void> createParser() {
ConstructingObjectParser<DatafeedTimingStats, Void> parser =
new ConstructingObjectParser<>(
"datafeed_timing_stats",
true,
args -> {
String jobId = (String) args[0];
Long searchCount = (Long) args[1];
Double totalSearchTimeMs = (Double) args[2];
return new DatafeedTimingStats(jobId, getOrDefault(searchCount, 0L), getOrDefault(totalSearchTimeMs, 0.0));
});
parser.declareString(constructorArg(), JOB_ID);
parser.declareLong(optionalConstructorArg(), SEARCH_COUNT);
parser.declareDouble(optionalConstructorArg(), TOTAL_SEARCH_TIME_MS);
return parser;
}
public static String documentId(String jobId) {
return jobId + "_datafeed_timing_stats";
}
private final String jobId;
private long searchCount;
private double totalSearchTimeMs;
public DatafeedTimingStats(String jobId, long searchCount, double totalSearchTimeMs) {
this.jobId = Objects.requireNonNull(jobId);
this.searchCount = searchCount;
this.totalSearchTimeMs = totalSearchTimeMs;
}
public DatafeedTimingStats(String jobId) {
this(jobId, 0, 0);
}
public DatafeedTimingStats(StreamInput in) throws IOException {
jobId = in.readString();
searchCount = in.readLong();
totalSearchTimeMs = in.readDouble();
}
public DatafeedTimingStats(DatafeedTimingStats other) {
this(other.jobId, other.searchCount, other.totalSearchTimeMs);
}
public String getJobId() {
return jobId;
}
public long getSearchCount() {
return searchCount;
}
public double getTotalSearchTimeMs() {
return totalSearchTimeMs;
}
public void incrementTotalSearchTimeMs(double searchTimeMs) {
this.searchCount++;
this.totalSearchTimeMs += searchTimeMs;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeLong(searchCount);
out.writeDouble(totalSearchTimeMs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(JOB_ID.getPreferredName(), jobId);
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
builder.field(TOTAL_SEARCH_TIME_MS.getPreferredName(), totalSearchTimeMs);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DatafeedTimingStats other = (DatafeedTimingStats) obj;
return Objects.equals(this.jobId, other.jobId)
&& this.searchCount == other.searchCount
&& this.totalSearchTimeMs == other.totalSearchTimeMs;
}
@Override
public int hashCode() {
return Objects.hash(jobId, searchCount, totalSearchTimeMs);
}
@Override
public String toString() {
return Strings.toString(this);
}
private static <T> T getOrDefault(@Nullable T value, T defaultValue) {
return value != null ? value : defaultValue;
}
}

View File

@ -426,57 +426,69 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
}
public void setId(String datafeedId) {
public Builder setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
return this;
}
public void setJobId(String jobId) {
public Builder setJobId(String jobId) {
this.jobId = jobId;
return this;
}
public void setIndices(List<String> indices) {
public Builder setIndices(List<String> indices) {
this.indices = indices;
return this;
}
public void setQueryDelay(TimeValue queryDelay) {
public Builder setQueryDelay(TimeValue queryDelay) {
this.queryDelay = queryDelay;
return this;
}
public void setFrequency(TimeValue frequency) {
public Builder setFrequency(TimeValue frequency) {
this.frequency = frequency;
return this;
}
public void setQuery(QueryProvider queryProvider) {
public Builder setQuery(QueryProvider queryProvider) {
this.queryProvider = queryProvider;
return this;
}
private void setAggregationsSafe(AggProvider aggProvider) {
private Builder setAggregationsSafe(AggProvider aggProvider) {
if (this.aggProvider != null) {
throw ExceptionsHelper.badRequestException("Found two aggregation definitions: [aggs] and [aggregations]");
}
setAggregations(aggProvider);
return this;
}
public void setAggregations(AggProvider aggProvider) {
public Builder setAggregations(AggProvider aggProvider) {
this.aggProvider = aggProvider;
return this;
}
public void setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
public Builder setScriptFields(List<SearchSourceBuilder.ScriptField> scriptFields) {
List<SearchSourceBuilder.ScriptField> sorted = new ArrayList<>(scriptFields);
sorted.sort(Comparator.comparing(SearchSourceBuilder.ScriptField::fieldName));
this.scriptFields = sorted;
return this;
}
public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) {
this.delayedDataCheckConfig = delayedDataCheckConfig;
return this;
}
public void setScrollSize(int scrollSize) {
public Builder setScrollSize(int scrollSize) {
this.scrollSize = scrollSize;
return this;
}
public void setChunkingConfig(ChunkingConfig chunkingConfig) {
public Builder setChunkingConfig(ChunkingConfig chunkingConfig) {
this.chunkingConfig = chunkingConfig;
return this;
}
public DatafeedUpdate build() {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
@ -510,6 +511,7 @@ public class ElasticsearchMappings {
addCategoryDefinitionMapping(builder);
addDataCountsMapping(builder);
addTimingStatsExceptBucketCountMapping(builder);
addDatafeedTimingStats(builder);
addModelSnapshotMapping(builder);
addTermFields(builder, extraTermFields);
@ -928,6 +930,21 @@ public class ElasticsearchMappings {
.endObject();
}
/**
* {@link DatafeedTimingStats} mapping.
*
* @throws IOException On builder write error
*/
private static void addDatafeedTimingStats(XContentBuilder builder) throws IOException {
builder
.startObject(DatafeedTimingStats.SEARCH_COUNT.getPreferredName())
.field(TYPE, LONG)
.endObject()
.startObject(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName())
.field(TYPE, DOUBLE)
.endObject();
}
/**
* Create the Elasticsearch mapping for {@linkplain CategoryDefinition}.
* The '_all' field is disabled as the document isn't meant to be searched.

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ml.job.results;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
@ -185,6 +186,9 @@ public final class ReservedFieldNames {
TimingStats.AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
TimingStats.EXPONENTIAL_AVG_BUCKET_PROCESSING_TIME_MS.getPreferredName(),
DatafeedTimingStats.SEARCH_COUNT.getPreferredName(),
DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(),
GetResult._ID,
GetResult._INDEX,
GetResult._TYPE

View File

@ -18,6 +18,8 @@ import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction.Response;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStatsTests;
import java.io.IOException;
import java.net.InetAddress;
@ -43,16 +45,13 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
for (int j = 0; j < listSize; j++) {
String datafeedId = randomAlphaOfLength(10);
DatafeedState datafeedState = randomFrom(DatafeedState.values());
DiscoveryNode node = null;
if (randomBoolean()) {
node = new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
}
String explanation = null;
if (randomBoolean()) {
explanation = randomAlphaOfLength(3);
}
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState, node, explanation);
DiscoveryNode node =
randomBoolean()
? null
: new DiscoveryNode("_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
String explanation = randomBoolean() ? null : randomAlphaOfLength(3);
DatafeedTimingStats timingStats = randomBoolean() ? null : DatafeedTimingStatsTests.createRandom();
Response.DatafeedStats datafeedStats = new Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats);
datafeedStatsList.add(datafeedStats);
}
@ -78,7 +77,9 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
Collections.emptySet(),
Version.CURRENT);
Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null);
DatafeedTimingStats timingStats = new DatafeedTimingStats("my-job-id", 5, 123.456);
Response.DatafeedStats stats = new Response.DatafeedStats("df-id", DatafeedState.STARTED, node, null, timingStats);
XContentType xContentType = randomFrom(XContentType.values());
BytesReference bytes;
@ -89,10 +90,11 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
Map<String, Object> dfStatsMap = XContentHelper.convertToMap(bytes, randomBoolean(), xContentType).v2();
assertThat(dfStatsMap.size(), is(equalTo(3)));
assertThat(dfStatsMap.size(), is(equalTo(4)));
assertThat(dfStatsMap, hasEntry("datafeed_id", "df-id"));
assertThat(dfStatsMap, hasEntry("state", "started"));
assertThat(dfStatsMap, hasKey("node"));
assertThat(dfStatsMap, hasKey("timing_stats"));
Map<String, Object> nodeMap = (Map<String, Object>) dfStatsMap.get("node");
assertThat(nodeMap, hasEntry("id", "df-node-id"));
@ -105,5 +107,11 @@ public class GetDatafeedStatsActionResponseTests extends AbstractStreamableTestC
assertThat(nodeAttributes.size(), is(equalTo(2)));
assertThat(nodeAttributes, hasEntry("ml.enabled", "true"));
assertThat(nodeAttributes, hasEntry("ml.max_open_jobs", "5"));
Map<String, Object> timingStatsMap = (Map<String, Object>) dfStatsMap.get("timing_stats");
assertThat(timingStatsMap.size(), is(equalTo(3)));
assertThat(timingStatsMap, hasEntry("job_id", "my-job-id"));
assertThat(timingStatsMap, hasEntry("search_count", 5));
assertThat(timingStatsMap, hasEntry("total_search_time_ms", 123.456));
}
}

View File

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.datafeed;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class DatafeedTimingStatsTests extends AbstractSerializingTestCase<DatafeedTimingStats> {
private static final String JOB_ID = "my-job-id";
public static DatafeedTimingStats createRandom() {
return new DatafeedTimingStats(randomAlphaOfLength(10), randomLong(), randomDouble());
}
@Override
protected DatafeedTimingStats createTestInstance(){
return createRandom();
}
@Override
protected Writeable.Reader<DatafeedTimingStats> instanceReader() {
return DatafeedTimingStats::new;
}
@Override
protected DatafeedTimingStats doParseInstance(XContentParser parser) {
return DatafeedTimingStats.PARSER.apply(parser, null);
}
@Override
protected DatafeedTimingStats mutateInstance(DatafeedTimingStats instance) throws IOException {
String jobId = instance.getJobId();
long searchCount = instance.getSearchCount();
double totalSearchTimeMs = instance.getTotalSearchTimeMs();
return new DatafeedTimingStats(
jobId + randomAlphaOfLength(5),
searchCount + 1,
totalSearchTimeMs + randomDoubleBetween(1.0, 100.0, true));
}
public void testParse_OptionalFieldsAbsent() throws IOException {
String json = "{\"job_id\": \"my-job-id\"}";
try (XContentParser parser =
XContentFactory.xContent(XContentType.JSON).createParser(
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json)) {
DatafeedTimingStats stats = DatafeedTimingStats.PARSER.apply(parser, null);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(0L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
}
}
public void testEquals() {
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
assertTrue(stats1.equals(stats1));
assertTrue(stats1.equals(stats2));
assertFalse(stats2.equals(stats3));
}
public void testHashCode() {
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats2 = new DatafeedTimingStats(JOB_ID, 5, 100.0);
DatafeedTimingStats stats3 = new DatafeedTimingStats(JOB_ID, 5, 200.0);
assertEquals(stats1.hashCode(), stats1.hashCode());
assertEquals(stats1.hashCode(), stats2.hashCode());
assertNotEquals(stats2.hashCode(), stats3.hashCode());
}
public void testConstructorsAndGetters() {
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 123.456);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(5L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(123.456));
stats = new DatafeedTimingStats(JOB_ID);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(0L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(0.0));
}
public void testCopyConstructor() {
DatafeedTimingStats stats1 = new DatafeedTimingStats(JOB_ID, 5, 123.456);
DatafeedTimingStats stats2 = new DatafeedTimingStats(stats1);
assertThat(stats2.getJobId(), equalTo(JOB_ID));
assertThat(stats2.getSearchCount(), equalTo(5L));
assertThat(stats2.getTotalSearchTimeMs(), equalTo(123.456));
}
public void testIncrementTotalSearchTimeMs() {
DatafeedTimingStats stats = new DatafeedTimingStats(JOB_ID, 5, 100.0);
stats.incrementTotalSearchTimeMs(200.0);
assertThat(stats.getJobId(), equalTo(JOB_ID));
assertThat(stats.getSearchCount(), equalTo(6L));
assertThat(stats.getTotalSearchTimeMs(), equalTo(300.0));
}
public void testDocumentId() {
assertThat(DatafeedTimingStats.documentId("my-job-id"), equalTo("my-job-id_datafeed_timing_stats"));
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
@ -82,6 +83,7 @@ public class ElasticsearchMappingsTests extends ESTestCase {
overridden.add(ModelSnapshot.TYPE.getPreferredName());
overridden.add(Quantiles.TYPE.getPreferredName());
overridden.add(TimingStats.TYPE.getPreferredName());
overridden.add(DatafeedTimingStats.TYPE.getPreferredName());
Set<String> expected = collectResultsDocFieldNames();
expected.removeAll(overridden);

View File

@ -21,12 +21,17 @@ import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.hamcrest.Matcher;
import org.junit.After;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -39,6 +44,7 @@ import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCoun
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
@ -49,8 +55,8 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
public void testLookbackOnly() throws Exception {
client().admin().indices().prepareCreate("data-1")
.addMapping("type", "time", "type=date")
.get();
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
@ -58,8 +64,8 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
indexDocs(logger, "data-1", numDocs, twoWeeksAgo, oneWeekAgo);
client().admin().indices().prepareCreate("data-2")
.addMapping("type", "time", "type=date")
.get();
.addMapping("type", "time", "type=date")
.get();
client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
long numDocs2 = randomIntBetween(32, 2048);
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
@ -92,6 +98,122 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
waitUntilJobIsClosed(job.getId());
}
public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
Instant now = Instant.now();
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
Job.Builder job = createScheduledJob("lookback-job");
String datafeedId = "lookback-datafeed";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
registerJob(job);
putJob(job);
for (int i = 0; i < 2; ++i) {
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
// Datafeed did not do anything yet, hence search_count is equal to 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
startDatafeed(datafeedId, 0L, now.toEpochMilli());
assertBusy(() -> {
assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
// Datafeed processed numDocs documents so search_count must be greater than 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);
deleteDatafeed(datafeedId);
waitUntilJobIsClosed(job.getId());
}
}
public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
Instant now = Instant.now();
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
Job.Builder jobA = createScheduledJob("lookback-job");
Job.Builder jobB = createScheduledJob("other-lookback-job");
for (Job.Builder job : Arrays.asList(jobA, jobB)) {
registerJob(job);
putJob(job);
}
String datafeedId = "lookback-datafeed";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, jobA.getId(), Arrays.asList("data"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
for (Job.Builder job : Arrays.asList(jobA, jobB, jobA)) {
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
// Bind datafeedId to the current job on the list, timing stats are wiped out.
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(job.getId()).build());
// Datafeed did not do anything yet, hence search_count is equal to 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
startDatafeed(datafeedId, 0L, now.toEpochMilli());
assertBusy(() -> {
assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
// Datafeed processed numDocs documents so search_count must be greater than 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
}
}
public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
Instant now = Instant.now();
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());
Job.Builder job = createScheduledJob("lookback-job");
registerJob(job);
putJob(job);
String datafeedId = "lookback-datafeed";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
// Datafeed did not do anything yet, hence search_count is equal to 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
startDatafeed(datafeedId, 0L, now.toEpochMilli());
assertBusy(() -> {
assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
// Datafeed processed numDocs documents so search_count must be greater than 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
// Change something different than jobId, here: queryDelay.
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setQueryDelay(TimeValue.timeValueSeconds(777)).build());
// Search_count is still greater than 0 (i.e. has not been reset by datafeed update)
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
}
private void assertDatafeedStats(String datafeedId, DatafeedState state, String jobId, Matcher<Long> searchCountMatcher) {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results(), hasSize(1));
GetDatafeedsStatsAction.Response.DatafeedStats stats = response.getResponse().results().get(0);
assertThat(stats.getDatafeedState(), equalTo(state));
assertThat(stats.getTimingStats().getJobId(), equalTo(jobId));
assertThat(stats.getTimingStats().getSearchCount(), searchCountMatcher);
}
public void testRealtime() throws Exception {
String jobId = "realtime-job";
String datafeedId = jobId + "-datafeed";

View File

@ -43,11 +43,13 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
@ -175,6 +177,11 @@ abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
}
protected PutDatafeedAction.Response updateDatafeed(DatafeedUpdate update) {
UpdateDatafeedAction.Request request = new UpdateDatafeedAction.Request(update);
return client().execute(UpdateDatafeedAction.INSTANCE, request).actionGet();
}
protected AcknowledgedResponse deleteDatafeed(String datafeedId) {
DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId);
return client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet();

View File

@ -447,12 +447,15 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
Auditor auditor = new Auditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool);
JobManager jobManager = new JobManager(env,
settings,
jobResultsProvider,
jobResultsPersister,
clusterService,
auditor,
threadPool,
@ -463,9 +466,6 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
// special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled
JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager);
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
NativeStorageProvider nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
AutodetectProcessFactory autodetectProcessFactory;
@ -509,8 +509,16 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
xContentRegistry, auditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
autodetectProcessFactory, normalizerFactory, nativeStorageProvider);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
DatafeedJobBuilder datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry,
auditor,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
@ -541,6 +549,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
return Arrays.asList(
mlLifeCycleService,
jobResultsProvider,
jobResultsPersister,
jobConfigProvider,
datafeedConfigProvider,
jobManager,

View File

@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import java.io.IOException;
@ -143,10 +144,26 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
return;
}
datafeedConfigProvider.deleteDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
deleteResponse -> listener.onResponse(new AcknowledgedResponse(true)),
listener::onFailure
));
String datafeedId = request.getDatafeedId();
datafeedConfigProvider.getDatafeedConfig(
datafeedId,
ActionListener.wrap(
datafeedConfigBuilder -> {
String jobId = datafeedConfigBuilder.build().getJobId();
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
jobDataDeleter.deleteDatafeedTimingStats(
ActionListener.wrap(
unused1 -> {
datafeedConfigProvider.deleteDatafeedConfig(
datafeedId,
ActionListener.wrap(
unused2 -> listener.onResponse(new AcknowledgedResponse(true)),
listener::onFailure));
},
listener::onFailure));
},
listener::onFailure));
}
@Override

View File

@ -23,7 +23,9 @@ import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import java.util.List;
import java.util.stream.Collectors;
@ -32,15 +34,17 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
GetDatafeedsStatsAction.Response> {
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsProvider jobResultsProvider;
@Inject
public TransportGetDatafeedsStatsAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
DatafeedConfigProvider datafeedConfigProvider) {
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider) {
super(GetDatafeedsStatsAction.NAME, transportService, clusterService, threadPool, actionFilters,
GetDatafeedsStatsAction.Request::new, indexNameExpressionResolver);
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsProvider = jobResultsProvider;
}
@Override
@ -58,22 +62,46 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
ActionListener<GetDatafeedsStatsAction.Response> listener) throws Exception {
logger.debug("Get stats for datafeed '{}'", request.getDatafeedId());
datafeedConfigProvider.expandDatafeedIds(request.getDatafeedId(), request.allowNoDatafeeds(), ActionListener.wrap(
expandedDatafeedIds -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results = expandedDatafeedIds.stream()
.map(datafeedId -> getDatafeedStats(datafeedId, state, tasksInProgress))
datafeedConfigProvider.expandDatafeedConfigs(
request.getDatafeedId(),
request.allowNoDatafeeds(),
ActionListener.wrap(
datafeedBuilders -> {
List<String> jobIds =
datafeedBuilders.stream()
.map(DatafeedConfig.Builder::build)
.map(DatafeedConfig::getJobId)
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage = new QueryPage<>(results, results.size(),
DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
jobResultsProvider.datafeedTimingStats(
jobIds,
timingStatsByJobId -> {
PersistentTasksCustomMetaData tasksInProgress = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
List<GetDatafeedsStatsAction.Response.DatafeedStats> results =
datafeedBuilders.stream()
.map(DatafeedConfig.Builder::build)
.map(
datafeed -> getDatafeedStats(
datafeed.getId(),
state,
tasksInProgress,
datafeed.getJobId(),
timingStatsByJobId.get(datafeed.getJobId())))
.collect(Collectors.toList());
QueryPage<GetDatafeedsStatsAction.Response.DatafeedStats> statsPage =
new QueryPage<>(results, results.size(), DatafeedConfig.RESULTS_FIELD);
listener.onResponse(new GetDatafeedsStatsAction.Response(statsPage));
},
listener::onFailure);
},
listener::onFailure
));
listener::onFailure)
);
}
private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId, ClusterState state,
PersistentTasksCustomMetaData tasks) {
private static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId,
ClusterState state,
PersistentTasksCustomMetaData tasks,
String jobId,
DatafeedTimingStats timingStats) {
PersistentTasksCustomMetaData.PersistentTask<?> task = MlTasks.getDatafeedTask(datafeedId, tasks);
DatafeedState datafeedState = MlTasks.getDatafeedState(datafeedId, tasks);
DiscoveryNode node = null;
@ -82,7 +110,10 @@ public class TransportGetDatafeedsStatsAction extends TransportMasterNodeReadAct
node = state.nodes().get(task.getExecutorNode());
explanation = task.getAssignment().getExplanation();
}
return new GetDatafeedsStatsAction.Response.DatafeedStats(datafeedId, datafeedState, node, explanation);
if (timingStats == null) {
timingStats = new DatafeedTimingStats(jobId);
}
return new GetDatafeedsStatsAction.Response.DatafeedStats(datafeedId, datafeedState, node, explanation, timingStats);
}
@Override

View File

@ -20,9 +20,12 @@ import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import java.io.BufferedReader;
import java.io.InputStream;
@ -39,56 +42,68 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
private final Client client;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final JobResultsPersister jobResultsPersister;
private final NamedXContentRegistry xContentRegistry;
@Inject
public TransportPreviewDatafeedAction(ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Client client, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider, NamedXContentRegistry xContentRegistry) {
DatafeedConfigProvider datafeedConfigProvider, JobResultsProvider jobResultsProvider,
JobResultsPersister jobResultsPersister, NamedXContentRegistry xContentRegistry) {
super(PreviewDatafeedAction.NAME, transportService, actionFilters,
(Supplier<PreviewDatafeedAction.Request>) PreviewDatafeedAction.Request::new);
this.threadPool = threadPool;
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsProvider = jobResultsProvider;
this.jobResultsPersister = jobResultsPersister;
this.xContentRegistry = xContentRegistry;
}
@Override
protected void doExecute(Task task, PreviewDatafeedAction.Request request, ActionListener<PreviewDatafeedAction.Response> listener) {
datafeedConfigProvider.getDatafeedConfig(request.getDatafeedId(), ActionListener.wrap(
datafeedConfigBuilder -> {
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
jobBuilder -> {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
datafeedConfigBuilder -> {
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
jobConfigProvider.getJob(datafeedConfig.getJobId(), ActionListener.wrap(
jobBuilder -> {
DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeedConfig);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> ClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
previewDatafeed.setHeaders(headers);
jobResultsProvider.datafeedTimingStats(
jobBuilder.getId(),
timingStats -> {
// NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, previewDatafeed.build(), jobBuilder.build(), xContentRegistry,
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}
DataExtractorFactory.create(
client,
previewDatafeed.build(),
jobBuilder.build(),
xContentRegistry,
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
new ActionListener<DataExtractorFactory>() {
@Override
public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
},
listener::onFailure
));
},
listener::onFailure
));
listener::onFailure);
},
listener::onFailure));
},
listener::onFailure));
}
/** Visible for testing */

View File

@ -43,6 +43,7 @@ import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
@ -50,9 +51,11 @@ import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import java.io.IOException;
@ -79,6 +82,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
private final PersistentTasksService persistentTasksService;
private final JobConfigProvider jobConfigProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
private final Auditor auditor;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
private final NamedXContentRegistry xContentRegistry;
@ -89,7 +93,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
PersistentTasksService persistentTasksService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Client client, JobConfigProvider jobConfigProvider, DatafeedConfigProvider datafeedConfigProvider,
Auditor auditor, NamedXContentRegistry xContentRegistry) {
JobResultsPersister jobResultsPersister, Auditor auditor, NamedXContentRegistry xContentRegistry) {
super(StartDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
StartDatafeedAction.Request::new);
this.licenseState = licenseState;
@ -97,6 +101,7 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
this.client = client;
this.jobConfigProvider = jobConfigProvider;
this.datafeedConfigProvider = datafeedConfigProvider;
this.jobResultsPersister = jobResultsPersister;
this.auditor = auditor;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.xContentRegistry = xContentRegistry;
@ -241,14 +246,22 @@ public class TransportStartDatafeedAction extends TransportMasterNodeAction<Star
datafeedConfigProvider.getDatafeedConfig(params.getDatafeedId(), datafeedListener);
}
/** Creates {@link DataExtractorFactory} solely for the purpose of validation i.e. verifying that it can be created. */
private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params,
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>
listener) {
DataExtractorFactory.create(client, datafeed, job, xContentRegistry, ActionListener.wrap(
dataExtractorFactory ->
persistentTasksService.sendStartRequest(MlTasks.datafeedTaskId(params.getDatafeedId()),
MlTasks.DATAFEED_TASK_NAME, params, listener)
, listener::onFailure));
DataExtractorFactory.create(
client,
datafeed,
job,
xContentRegistry,
// Creating fake {@link TimingStatsReporter} so that search API call is not needed.
new DatafeedTimingStatsReporter(new DatafeedTimingStats(job.getId()), jobResultsPersister),
ActionListener.wrap(
unused ->
persistentTasksService.sendStartRequest(
MlTasks.datafeedTaskId(params.getDatafeedId()), MlTasks.DATAFEED_TASK_NAME, params, listener),
listener::onFailure));
}
@Override

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -30,12 +31,14 @@ import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
import java.util.Collections;
import java.util.Map;
public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {
private final Client client;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobConfigProvider jobConfigProvider;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;
@ -48,9 +51,10 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
super(UpdateDatafeedAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, UpdateDatafeedAction.Request::new);
datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.client = client;
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
this.jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
}
@Override
@ -85,21 +89,42 @@ public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<Upd
String datafeedId = request.getUpdate().getId();
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
CheckedConsumer<BulkByScrollResponse, Exception> updateConsumer =
unused -> {
datafeedConfigProvider.updateDatefeedConfig(
datafeedId,
request.getUpdate(),
headers,
jobConfigProvider::validateDatafeedJob,
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure
));
};
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure));
};
CheckedConsumer<Boolean, Exception> deleteTimingStatsAndUpdateConsumer =
unused -> {
datafeedConfigProvider.getDatafeedConfig(
datafeedId,
ActionListener.wrap(
datafeedConfigBuilder -> {
String jobId = datafeedConfigBuilder.build().getJobId();
if (jobId.equals(request.getUpdate().getJobId())) {
// Datafeed's jobId didn't change, no point in deleting datafeed timing stats.
updateConsumer.accept(null);
} else {
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, jobId);
jobDataDeleter.deleteDatafeedTimingStats(ActionListener.wrap(updateConsumer, listener::onFailure));
}
},
listener::onFailure));
};
if (request.getUpdate().getJobId() != null) {
checkJobDoesNotHaveADifferentDatafeed(request.getUpdate().getJobId(), datafeedId,
ActionListener.wrap(updateConsumer, listener::onFailure));
checkJobDoesNotHaveADifferentDatafeed(
request.getUpdate().getJobId(), datafeedId, ActionListener.wrap(deleteTimingStatsAndUpdateConsumer, listener::onFailure));
} else {
updateConsumer.accept(Boolean.TRUE);
updateConsumer.accept(null);
}
}

View File

@ -8,12 +8,12 @@ package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
@ -25,6 +25,7 @@ import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
@ -37,36 +38,28 @@ import java.util.function.Supplier;
public class DatafeedJobBuilder {
private final Client client;
private final Settings settings;
private final NamedXContentRegistry xContentRegistry;
private final Auditor auditor;
private final Supplier<Long> currentTimeSupplier;
private final JobConfigProvider jobConfigProvider;
private final JobResultsProvider jobResultsProvider;
private final DatafeedConfigProvider datafeedConfigProvider;
private final JobResultsPersister jobResultsPersister;
public DatafeedJobBuilder(Client client, Settings settings, NamedXContentRegistry xContentRegistry,
Auditor auditor, Supplier<Long> currentTimeSupplier) {
public DatafeedJobBuilder(Client client, NamedXContentRegistry xContentRegistry, Auditor auditor, Supplier<Long> currentTimeSupplier,
JobConfigProvider jobConfigProvider, JobResultsProvider jobResultsProvider,
DatafeedConfigProvider datafeedConfigProvider, JobResultsPersister jobResultsPersister) {
this.client = client;
this.settings = Objects.requireNonNull(settings);
this.xContentRegistry = Objects.requireNonNull(xContentRegistry);
this.auditor = Objects.requireNonNull(auditor);
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.jobConfigProvider = Objects.requireNonNull(jobConfigProvider);
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.datafeedConfigProvider = Objects.requireNonNull(datafeedConfigProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
}
void build(String datafeedId, ActionListener<DatafeedJob> listener) {
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, xContentRegistry);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
build(datafeedId, jobResultsProvider, jobConfigProvider, datafeedConfigProvider, listener);
}
/**
* For testing only.
* Use {@link #build(String, ActionListener)} instead
*/
void build(String datafeedId, JobResultsProvider jobResultsProvider, JobConfigProvider jobConfigProvider,
DatafeedConfigProvider datafeedConfigProvider, ActionListener<DatafeedJob> listener) {
AtomicReference<Job> jobHolder = new AtomicReference<>();
AtomicReference<DatafeedConfig> datafeedConfigHolder = new AtomicReference<>();
@ -98,11 +91,21 @@ public class DatafeedJobBuilder {
);
// Create data extractor factory
Consumer<DatafeedTimingStats> datafeedTimingStatsHandler = timingStats -> {
DataExtractorFactory.create(
client,
datafeedConfigHolder.get(),
jobHolder.get(),
xContentRegistry,
new DatafeedTimingStatsReporter(timingStats, jobResultsPersister),
dataExtractorFactoryHandler);
};
Consumer<DataCounts> dataCountsHandler = dataCounts -> {
if (dataCounts.getLatestRecordTimeStamp() != null) {
context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
DataExtractorFactory.create(client, datafeedConfigHolder.get(), jobHolder.get(), xContentRegistry, dataExtractorFactoryHandler);
jobResultsProvider.datafeedTimingStats(jobHolder.get().getId(), datafeedTimingStatsHandler, listener::onFailure);
};
// Collect data counts
@ -118,7 +121,8 @@ public class DatafeedJobBuilder {
Consumer<String> jobIdConsumer = jobId -> {
BucketsQueryBuilder latestBucketQuery = new BucketsQueryBuilder()
.sortField(Result.TIMESTAMP.getPreferredName())
.sortDescending(true).size(1)
.sortDescending(true)
.size(1)
.includeInterim(false);
jobResultsProvider.bucketsViaInternalClient(jobId, latestBucketQuery, bucketsHandler, e -> {
if (e instanceof ResourceNotFoundException) {

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import java.util.Objects;
/**
* {@link DatafeedTimingStatsReporter} class handles the logic of persisting {@link DatafeedTimingStats} if they changed significantly
* since the last time they were persisted.
*
* This class is not thread-safe.
*/
public class DatafeedTimingStatsReporter {
/** Persisted timing stats. May be stale. */
private DatafeedTimingStats persistedTimingStats;
/** Current timing stats. */
private volatile DatafeedTimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private final JobResultsPersister jobResultsPersister;
public DatafeedTimingStatsReporter(DatafeedTimingStats timingStats, JobResultsPersister jobResultsPersister) {
Objects.requireNonNull(timingStats);
this.persistedTimingStats = new DatafeedTimingStats(timingStats);
this.currentTimingStats = new DatafeedTimingStats(timingStats);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
}
public DatafeedTimingStats getCurrentTimingStats() {
return new DatafeedTimingStats(currentTimingStats);
}
/**
* Reports how much time did the search request execution take.
*/
public void reportSearchDuration(TimeValue searchDuration) {
if (searchDuration == null) {
return;
}
currentTimingStats.incrementTotalSearchTimeMs(searchDuration.millis());
if (differSignificantly(currentTimingStats, persistedTimingStats)) {
// TODO: Consider changing refresh policy to NONE here and only do IMMEDIATE on datafeed _stop action
flush(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
private void flush(WriteRequest.RefreshPolicy refreshPolicy) {
persistedTimingStats = new DatafeedTimingStats(currentTimingStats);
jobResultsPersister.persistDatafeedTimingStats(persistedTimingStats, refreshPolicy);
}
/**
* Returns true if given stats objects differ from each other by more than 10% for at least one of the statistics.
*/
public static boolean differSignificantly(DatafeedTimingStats stats1, DatafeedTimingStats stats2) {
return differSignificantly(stats1.getTotalSearchTimeMs(), stats2.getTotalSearchTimeMs());
}
/**
* Returns {@code true} if one of the ratios { value1 / value2, value2 / value1 } is smaller than MIN_VALID_RATIO.
* This can be interpreted as values { value1, value2 } differing significantly from each other.
*/
private static boolean differSignificantly(double value1, double value2) {
return (value2 / value1 < MIN_VALID_RATIO)
|| (value1 / value2 < MIN_VALID_RATIO)
|| Math.abs(value1 - value2) > MAX_VALID_ABS_DIFFERENCE_MS;
}
/**
* Minimum ratio of values that is interpreted as values being similar.
* If the values ratio is less than MIN_VALID_RATIO, the values are interpreted as significantly different.
*/
private static final double MIN_VALID_RATIO = 0.9;
/**
* Maximum absolute difference of values that is interpreted as values being similar.
* If the values absolute difference is greater than MAX_VALID_ABS_DIFFERENCE, the values are interpreted as significantly different.
*/
private static final double MAX_VALID_ABS_DIFFERENCE_MS = 10000.0; // 10s
}

View File

@ -16,9 +16,10 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
public interface DataExtractorFactory {
@ -31,10 +32,11 @@ public interface DataExtractorFactory {
DatafeedConfig datafeed,
Job job,
NamedXContentRegistry xContentRegistry,
DatafeedTimingStatsReporter timingStatsReporter,
ActionListener<DataExtractorFactory> listener) {
ActionListener<DataExtractorFactory> factoryHandler = ActionListener.wrap(
factory -> listener.onResponse(datafeed.getChunkingConfig().isEnabled()
? new ChunkedDataExtractorFactory(client, datafeed, job, xContentRegistry, factory) : factory)
? new ChunkedDataExtractorFactory(client, datafeed, job, xContentRegistry, factory, timingStatsReporter) : factory)
, listener::onFailure
);
@ -42,13 +44,15 @@ public interface DataExtractorFactory {
response -> {
if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
if (datafeed.hasAggregations()) {
factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job, xContentRegistry));
factoryHandler.onResponse(
new AggregationDataExtractorFactory(client, datafeed, job, xContentRegistry, timingStatsReporter));
} else {
ScrollDataExtractorFactory.create(client, datafeed, job, xContentRegistry, factoryHandler);
ScrollDataExtractorFactory.create(client, datafeed, job, xContentRegistry, timingStatsReporter, factoryHandler);
}
} else {
if (datafeed.hasAggregations()) { // Rollup indexes require aggregations
RollupDataExtractorFactory.create(client, datafeed, job, response.getJobs(), xContentRegistry, factoryHandler);
RollupDataExtractorFactory.create(
client, datafeed, job, response.getJobs(), xContentRegistry, timingStatsReporter, factoryHandler);
} else {
listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices"));
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -50,17 +51,20 @@ abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<S
protected final Client client;
protected final AggregationDataExtractorContext context;
private final DatafeedTimingStatsReporter timingStatsReporter;
private boolean hasNext;
private boolean isCancelled;
private AggregationToJsonProcessor aggregationToJsonProcessor;
private ByteArrayOutputStream outputStream;
AbstractAggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
AbstractAggregationDataExtractor(
Client client, AggregationDataExtractorContext dataExtractorContext, DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
context = Objects.requireNonNull(dataExtractorContext);
hasNext = true;
isCancelled = false;
outputStream = new ByteArrayOutputStream();
this.context = Objects.requireNonNull(dataExtractorContext);
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
this.hasNext = true;
this.isCancelled = false;
this.outputStream = new ByteArrayOutputStream();
}
@Override
@ -107,6 +111,7 @@ abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<S
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(buildBaseSearchSource()));
LOGGER.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return validateAggs(searchResponse.getAggregations());
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
/**
* An implementation that extracts data from elasticsearch using search with aggregations on a client.
@ -18,8 +19,9 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
*/
class AggregationDataExtractor extends AbstractAggregationDataExtractor<SearchRequestBuilder> {
AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
super(client, dataExtractorContext);
AggregationDataExtractor(
Client client, AggregationDataExtractorContext dataExtractorContext, DatafeedTimingStatsReporter timingStatsReporter) {
super(client, dataExtractorContext, timingStatsReporter);
}
@Override

View File

@ -9,9 +9,10 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.util.Objects;
@ -21,12 +22,19 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
private final DatafeedConfig datafeedConfig;
private final Job job;
private final NamedXContentRegistry xContentRegistry;
private final DatafeedTimingStatsReporter timingStatsReporter;
public AggregationDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, NamedXContentRegistry xContentRegistry) {
public AggregationDataExtractorFactory(
Client client,
DatafeedConfig datafeedConfig,
Job job,
NamedXContentRegistry xContentRegistry,
DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.xContentRegistry = xContentRegistry;
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
}
@Override
@ -43,6 +51,6 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
datafeedConfig.getHeaders());
return new AggregationDataExtractor(client, dataExtractorContext);
return new AggregationDataExtractor(client, dataExtractorContext, timingStatsReporter);
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
/**
* An implementation that extracts data from elasticsearch using search with aggregations against rollup indexes on a client.
@ -18,8 +19,9 @@ import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
*/
class RollupDataExtractor extends AbstractAggregationDataExtractor<RollupSearchAction.RequestBuilder> {
RollupDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
super(client, dataExtractorContext);
RollupDataExtractor(
Client client, AggregationDataExtractorContext dataExtractorContext, DatafeedTimingStatsReporter timingStatsReporter) {
super(client, dataExtractorContext, timingStatsReporter);
}
@Override
@ -28,5 +30,4 @@ class RollupDataExtractor extends AbstractAggregationDataExtractor<RollupSearchA
return new RollupSearchAction.RequestBuilder(client, searchRequest);
}
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.time.ZoneId;
@ -45,12 +46,19 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
private final DatafeedConfig datafeedConfig;
private final Job job;
private final NamedXContentRegistry xContentRegistry;
private final DatafeedTimingStatsReporter timingStatsReporter;
private RollupDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, NamedXContentRegistry xContentRegistry) {
private RollupDataExtractorFactory(
Client client,
DatafeedConfig datafeedConfig,
Job job,
NamedXContentRegistry xContentRegistry,
DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.xContentRegistry = xContentRegistry;
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
}
@Override
@ -67,7 +75,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
datafeedConfig.getHeaders());
return new RollupDataExtractor(client, dataExtractorContext);
return new RollupDataExtractor(client, dataExtractorContext, timingStatsReporter);
}
public static void create(Client client,
@ -75,6 +83,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
Job job,
Map<String, RollableIndexCaps> rollupJobsWithCaps,
NamedXContentRegistry xContentRegistry,
DatafeedTimingStatsReporter timingStatsReporter,
ActionListener<DataExtractorFactory> listener) {
final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation(
@ -119,7 +128,7 @@ public class RollupDataExtractorFactory implements DataExtractorFactory {
return;
}
listener.onResponse(new RollupDataExtractorFactory(client, datafeed, job, xContentRegistry));
listener.onResponse(new RollupDataExtractorFactory(client, datafeed, job, xContentRegistry, timingStatsReporter));
}
private static boolean validInterval(long datafeedInterval, ParsedRollupCaps rollupJobGroupConfig) {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
@ -68,16 +69,22 @@ public class ChunkedDataExtractor implements DataExtractor {
private final DataExtractorFactory dataExtractorFactory;
private final ChunkedDataExtractorContext context;
private final DataSummaryFactory dataSummaryFactory;
private final DatafeedTimingStatsReporter timingStatsReporter;
private long currentStart;
private long currentEnd;
private long chunkSpan;
private boolean isCancelled;
private DataExtractor currentExtractor;
public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFactory, ChunkedDataExtractorContext context) {
public ChunkedDataExtractor(
Client client,
DataExtractorFactory dataExtractorFactory,
ChunkedDataExtractorContext context,
DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.context = Objects.requireNonNull(context);
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
this.currentStart = context.start;
this.currentEnd = context.start;
this.isCancelled = false;
@ -198,15 +205,16 @@ public class ChunkedDataExtractor implements DataExtractor {
private DataSummary newScrolledDataSummary() throws IOException {
SearchRequestBuilder searchRequestBuilder = rangeSearchRequest();
SearchResponse response = executeSearchRequest(searchRequestBuilder);
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
LOGGER.debug("[{}] Scrolling Data summary response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
Aggregations aggregations = response.getAggregations();
Aggregations aggregations = searchResponse.getAggregations();
long earliestTime = 0;
long latestTime = 0;
long totalHits = response.getHits().getTotalHits().value;
long totalHits = searchResponse.getHits().getTotalHits().value;
if (totalHits > 0) {
Min min = aggregations.get(EARLIEST_TIME);
earliestTime = (long) min.getValue();
@ -220,12 +228,13 @@ public class ChunkedDataExtractor implements DataExtractor {
// TODO: once RollupSearchAction is changed from indices:admin* to indices:data/read/* this branch is not needed
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder =
dataExtractorFactory instanceof RollupDataExtractorFactory ? rollupRangeSearchRequest() : rangeSearchRequest();
SearchResponse response = executeSearchRequest(searchRequestBuilder);
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
Aggregations aggregations = response.getAggregations();
Aggregations aggregations = searchResponse.getAggregations();
Min min = aggregations.get(EARLIEST_TIME);
Max max = aggregations.get(LATEST_TIME);
return new AggregatedDataSummary(min.getValue(), max.getValue(), context.histogramInterval);

View File

@ -9,6 +9,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
@ -22,17 +23,20 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
private final Job job;
private final DataExtractorFactory dataExtractorFactory;
private final NamedXContentRegistry xContentRegistry;
private final DatafeedTimingStatsReporter timingStatsReporter;
public ChunkedDataExtractorFactory(Client client,
DatafeedConfig datafeedConfig,
Job job,
NamedXContentRegistry xContentRegistry,
DataExtractorFactory dataExtractorFactory) {
DataExtractorFactory dataExtractorFactory,
DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.dataExtractorFactory = Objects.requireNonNull(dataExtractorFactory);
this.xContentRegistry = xContentRegistry;
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
}
@Override
@ -52,7 +56,7 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
datafeedConfig.hasAggregations(),
datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis(xContentRegistry) : null
);
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext);
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext, timingStatsReporter);
}
private ChunkedDataExtractorContext.TimeAligner newTimeAligner() {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import java.io.ByteArrayInputStream;
@ -47,6 +48,7 @@ class ScrollDataExtractor implements DataExtractor {
private final Client client;
private final ScrollDataExtractorContext context;
private final DatafeedTimingStatsReporter timingStatsReporter;
private String scrollId;
private boolean isCancelled;
private boolean hasNext;
@ -54,9 +56,10 @@ class ScrollDataExtractor implements DataExtractor {
protected Long lastTimestamp;
private boolean searchHasShardFailure;
ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext, DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
context = Objects.requireNonNull(dataExtractorContext);
this.context = Objects.requireNonNull(dataExtractorContext);
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
hasNext = true;
searchHasShardFailure = false;
}
@ -109,6 +112,7 @@ class ScrollDataExtractor implements DataExtractor {
LOGGER.debug("[{}] Initializing scroll", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(startTimestamp));
LOGGER.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
return processSearchResponse(searchResponse);
}
@ -188,12 +192,14 @@ class ScrollDataExtractor implements DataExtractor {
if (searchHasShardFailure == false) {
LOGGER.debug("[{}] Reinitializing scroll due to SearchPhaseExecutionException", context.jobId);
markScrollAsErrored();
searchResponse = executeSearchRequest(buildSearchRequest(lastTimestamp == null ? context.start : lastTimestamp));
searchResponse =
executeSearchRequest(buildSearchRequest(lastTimestamp == null ? context.start : lastTimestamp));
} else {
throw searchExecutionException;
}
}
LOGGER.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
return processSearchResponse(searchResponse);
}
@ -209,7 +215,7 @@ class ScrollDataExtractor implements DataExtractor {
protected SearchResponse executeSearchScrollRequest(String scrollId) {
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client,
() -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
() -> new SearchScrollRequestBuilder(client, SearchScrollAction.INSTANCE)
.setScroll(SCROLL_TIMEOUT)
.setScrollId(scrollId)
.get());

View File

@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
@ -31,14 +32,16 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
private final Job job;
private final TimeBasedExtractedFields extractedFields;
private final NamedXContentRegistry xContentRegistry;
private final DatafeedTimingStatsReporter timingStatsReporter;
private ScrollDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job, TimeBasedExtractedFields extractedFields,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, DatafeedTimingStatsReporter timingStatsReporter) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
this.extractedFields = Objects.requireNonNull(extractedFields);
this.xContentRegistry = xContentRegistry;
this.timingStatsReporter = Objects.requireNonNull(timingStatsReporter);
}
@Override
@ -53,30 +56,33 @@ public class ScrollDataExtractorFactory implements DataExtractorFactory {
start,
end,
datafeedConfig.getHeaders());
return new ScrollDataExtractor(client, dataExtractorContext);
return new ScrollDataExtractor(client, dataExtractorContext, timingStatsReporter);
}
public static void create(Client client,
DatafeedConfig datafeed,
Job job,
NamedXContentRegistry xContentRegistry,
ActionListener<DataExtractorFactory> listener ) {
DatafeedTimingStatsReporter timingStatsReporter,
ActionListener<DataExtractorFactory> listener) {
// Step 2. Contruct the factory and notify listener
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesHandler = ActionListener.wrap(
fieldCapabilitiesResponse -> {
TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
listener.onResponse(new ScrollDataExtractorFactory(client, datafeed, job, extractedFields, xContentRegistry));
}, e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else {
listener.onFailure(e);
}
fieldCapabilitiesResponse -> {
TimeBasedExtractedFields extractedFields = TimeBasedExtractedFields.build(job, datafeed, fieldCapabilitiesResponse);
listener.onResponse(
new ScrollDataExtractorFactory(client, datafeed, job, extractedFields, xContentRegistry, timingStatsReporter));
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException) e).getIndex() + " does not exist"));
} else if (e instanceof IllegalArgumentException) {
listener.onFailure(ExceptionsHelper.badRequestException("[" + datafeed.getId() + "] " + e.getMessage()));
} else {
listener.onFailure(e);
}
}
);
// Step 1. Get field capabilities necessary to build the information of how to extract fields

View File

@ -87,6 +87,7 @@ public class JobManager {
private final Environment environment;
private final JobResultsProvider jobResultsProvider;
private final JobResultsPersister jobResultsPersister;
private final ClusterService clusterService;
private final Auditor auditor;
private final Client client;
@ -101,10 +102,11 @@ public class JobManager {
* Create a JobManager
*/
public JobManager(Environment environment, Settings settings, JobResultsProvider jobResultsProvider,
ClusterService clusterService, Auditor auditor, ThreadPool threadPool,
JobResultsPersister jobResultsPersister, ClusterService clusterService, Auditor auditor, ThreadPool threadPool,
Client client, UpdateJobProcessNotifier updateJobProcessNotifier, NamedXContentRegistry xContentRegistry) {
this.environment = environment;
this.jobResultsProvider = Objects.requireNonNull(jobResultsProvider);
this.jobResultsPersister = Objects.requireNonNull(jobResultsPersister);
this.clusterService = Objects.requireNonNull(clusterService);
this.auditor = Objects.requireNonNull(auditor);
this.client = Objects.requireNonNull(client);
@ -573,12 +575,11 @@ public class JobManager {
ModelSnapshot modelSnapshot) {
final ModelSizeStats modelSizeStats = modelSnapshot.getModelSizeStats();
final JobResultsPersister persister = new JobResultsPersister(client);
// Step 3. After the model size stats is persisted, also persist the snapshot's quantiles and respond
// -------
CheckedConsumer<IndexResponse, Exception> modelSizeStatsResponseHandler = response -> {
persister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE,
jobResultsPersister.persistQuantiles(modelSnapshot.getQuantiles(), WriteRequest.RefreshPolicy.IMMEDIATE,
ActionListener.wrap(quantilesResponse -> {
// The quantiles can be large, and totally dominate the output -
// it's clearer to remove them as they are not necessary for the revert op
@ -593,7 +594,7 @@ public class JobManager {
CheckedConsumer<Boolean, Exception> updateHandler = response -> {
if (response) {
ModelSizeStats revertedModelSizeStats = new ModelSizeStats.Builder(modelSizeStats).setLogTime(new Date()).build();
persister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(
jobResultsPersister.persistModelSizeStats(revertedModelSizeStats, WriteRequest.RefreshPolicy.IMMEDIATE, ActionListener.wrap(
modelSizeStatsResponseHandler, actionListener::onFailure));
}
};

View File

@ -21,6 +21,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.results.Result;
@ -121,6 +122,20 @@ public class JobDataDeleter {
LOGGER.error("[" + jobId + "] An error occurred while deleting interim results", e);
}
}
/**
* Delete the datafeed timing stats document from all the job results indices
*
* @param listener Response listener
*/
public void deleteDatafeedTimingStats(ActionListener<BulkByScrollResponse> listener) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobResultsAliasedName(jobId))
.setRefresh(true)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(new IdsQueryBuilder().addIds(DatafeedTimingStats.documentId(jobId)));
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, deleteByQueryRequest, listener);
}
// Wrapper to ensure safety
private static class DeleteByQueryHolder {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
@ -325,6 +326,20 @@ public class JobResultsPersister {
}
}
/**
* Persist datafeed timing stats
*
* @param timingStats datafeed timing stats to persist
* @param refreshPolicy refresh policy to apply
*/
public IndexResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {
String jobId = timingStats.getJobId();
logger.trace("[{}] Persisting datafeed timing stats", jobId);
Persistable persistable = new Persistable(jobId, timingStats, DatafeedTimingStats.documentId(timingStats.getJobId()));
persistable.setRefreshPolicy(refreshPolicy);
return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
}
private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
XContentBuilder builder = jsonBuilder();
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
@ -81,6 +82,7 @@ import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
@ -116,6 +118,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@ -428,18 +431,101 @@ public class JobResultsProvider {
searchSingleResult(
jobId,
TimingStats.TYPE.getPreferredName(),
createTimingStatsSearch(indexName, jobId),
createLatestTimingStatsSearch(indexName, jobId),
TimingStats.PARSER,
result -> handler.accept(result.result),
errorHandler,
() -> new TimingStats(jobId));
}
private SearchRequestBuilder createTimingStatsSearch(String indexName, String jobId) {
private SearchRequestBuilder createLatestTimingStatsSearch(String indexName, String jobId) {
return client.prepareSearch(indexName)
.setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(TimingStats.documentId(jobId)));
.setQuery(QueryBuilders.idsQuery().addIds(TimingStats.documentId(jobId)))
.addSort(SortBuilders.fieldSort(TimingStats.BUCKET_COUNT.getPreferredName()).order(SortOrder.DESC));
}
public void datafeedTimingStats(List<String> jobIds, Consumer<Map<String, DatafeedTimingStats>> handler,
Consumer<Exception> errorHandler) {
if (jobIds.isEmpty()) {
handler.accept(Collections.emptyMap());
return;
}
MultiSearchRequestBuilder msearchRequestBuilder = client.prepareMultiSearch();
for (String jobId : jobIds) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
msearchRequestBuilder.add(createLatestDatafeedTimingStatsSearch(indexName, jobId));
}
MultiSearchRequest msearchRequest = msearchRequestBuilder.request();
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
msearchRequest,
ActionListener.<MultiSearchResponse>wrap(
msearchResponse -> {
Map<String, DatafeedTimingStats> timingStatsByJobId = new HashMap<>();
for (int i = 0; i < msearchResponse.getResponses().length; i++) {
String jobId = jobIds.get(i);
MultiSearchResponse.Item itemResponse = msearchResponse.getResponses()[i];
if (itemResponse.isFailure()) {
errorHandler.accept(itemResponse.getFailure());
} else {
SearchResponse searchResponse = itemResponse.getResponse();
ShardSearchFailure[] shardFailures = searchResponse.getShardFailures();
int unavailableShards = searchResponse.getTotalShards() - searchResponse.getSuccessfulShards();
if (shardFailures != null && shardFailures.length > 0) {
LOGGER.error("[{}] Search request returned shard failures: {}", jobId, Arrays.toString(shardFailures));
errorHandler.accept(
new ElasticsearchException(ExceptionsHelper.shardFailuresToErrorMsg(jobId, shardFailures)));
} else if (unavailableShards > 0) {
errorHandler.accept(
new ElasticsearchException(
"[" + jobId + "] Search request encountered [" + unavailableShards + "] unavailable shards"));
} else {
SearchHits hits = searchResponse.getHits();
long hitsCount = hits.getHits().length;
if (hitsCount == 0) {
SearchRequest searchRequest = msearchRequest.requests().get(i);
LOGGER.debug("Found 0 hits for [{}]", new Object[]{searchRequest.indices()});
} else if (hitsCount > 1) {
SearchRequest searchRequest = msearchRequest.requests().get(i);
LOGGER.debug("Found multiple hits for [{}]", new Object[]{searchRequest.indices()});
} else {
assert hitsCount == 1;
SearchHit hit = hits.getHits()[0];
DatafeedTimingStats timingStats = parseSearchHit(hit, DatafeedTimingStats.PARSER, errorHandler);
timingStatsByJobId.put(jobId, timingStats);
}
}
}
}
handler.accept(timingStatsByJobId);
},
errorHandler
),
client::multiSearch);
}
public void datafeedTimingStats(String jobId, Consumer<DatafeedTimingStats> handler, Consumer<Exception> errorHandler) {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
searchSingleResult(
jobId,
DatafeedTimingStats.TYPE.getPreferredName(),
createLatestDatafeedTimingStatsSearch(indexName, jobId),
DatafeedTimingStats.PARSER,
result -> handler.accept(result.result),
errorHandler,
() -> new DatafeedTimingStats(jobId));
}
private SearchRequestBuilder createLatestDatafeedTimingStatsSearch(String indexName, String jobId) {
return client.prepareSearch(indexName)
.setSize(1)
.setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setQuery(QueryBuilders.idsQuery().addIds(DatafeedTimingStats.documentId(jobId)))
.addSort(SortBuilders.fieldSort(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName()).order(SortOrder.DESC));
}
public void getAutodetectParams(Job job, Consumer<AutodetectParams> consumer, Consumer<Exception> errorHandler) {
@ -468,7 +554,7 @@ public class JobResultsProvider {
MultiSearchRequestBuilder msearch = client.prepareMultiSearch()
.add(createLatestDataCountsSearch(resultsIndex, jobId))
.add(createLatestModelSizeStatsSearch(resultsIndex))
.add(createTimingStatsSearch(resultsIndex, jobId))
.add(createLatestTimingStatsSearch(resultsIndex, jobId))
// These next two document IDs never need to be the legacy ones due to the rule
// that you cannot open a 5.4 job in a subsequent version of the product
.add(createDocIdSearch(resultsIndex, ModelSnapshot.documentId(jobId, job.getModelSnapshotId())))
@ -525,7 +611,7 @@ public class JobResultsProvider {
.setRouting(id);
}
private void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
private static void parseAutodetectParamSearchHit(String jobId, AutodetectParams.Builder paramsBuilder, SearchHit hit,
Consumer<Exception> errorHandler) {
String hitId = hit.getId();
if (DataCounts.documentId(jobId).equals(hitId)) {
@ -547,7 +633,7 @@ public class JobResultsProvider {
}
}
private <T, U> T parseSearchHit(SearchHit hit, BiFunction<XContentParser, U, T> objectParser,
private static <T, U> T parseSearchHit(SearchHit hit, BiFunction<XContentParser, U, T> objectParser,
Consumer<Exception> errorHandler) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();

View File

@ -20,9 +20,9 @@ public class TimingStatsReporter {
/** Persisted timing stats. May be stale. */
private TimingStats persistedTimingStats;
/** Current timing stats. */
private TimingStats currentTimingStats;
private volatile TimingStats currentTimingStats;
/** Object used to persist current timing stats. */
private JobResultsPersister.Builder bulkResultsPersister;
private final JobResultsPersister.Builder bulkResultsPersister;
public TimingStatsReporter(TimingStats timingStats, JobResultsPersister.Builder jobResultsPersister) {
Objects.requireNonNull(timingStats);
@ -50,7 +50,7 @@ public class TimingStatsReporter {
flush();
}
public void flush() {
private void flush() {
persistedTimingStats = new TimingStats(currentTimingStats);
bulkResultsPersister.persistTimingStats(persistedTimingStats);
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
@ -48,6 +49,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
private JobResultsProvider jobResultsProvider;
private JobConfigProvider jobConfigProvider;
private DatafeedConfigProvider datafeedConfigProvider;
private JobResultsPersister jobResultsPersister;
private DatafeedJobBuilder datafeedJobBuilder;
@ -60,7 +62,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
when(client.settings()).thenReturn(Settings.EMPTY);
auditor = mock(Auditor.class);
taskHandler = mock(Consumer.class);
datafeedJobBuilder = new DatafeedJobBuilder(client, Settings.EMPTY, xContentRegistry(), auditor, System::currentTimeMillis);
jobResultsPersister = mock(JobResultsPersister.class);
jobResultsProvider = mock(JobResultsProvider.class);
Mockito.doAnswer(invocationOnMock -> {
@ -80,6 +82,16 @@ public class DatafeedJobBuilderTests extends ESTestCase {
jobConfigProvider = mock(JobConfigProvider.class);
datafeedConfigProvider = mock(DatafeedConfigProvider.class);
datafeedJobBuilder =
new DatafeedJobBuilder(
client,
xContentRegistry(),
auditor,
System::currentTimeMillis,
jobConfigProvider,
jobResultsProvider,
datafeedConfigProvider,
jobResultsPersister);
}
public void testBuild_GivenScrollDatafeedAndNewJob() throws Exception {
@ -103,7 +115,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
@ -131,7 +143,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
@ -159,7 +171,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider, datafeedJobHandler);
datafeedJobBuilder.build("datafeed1", datafeedJobHandler);
assertBusy(() -> wasHandlerCalled.get());
}
@ -184,8 +196,7 @@ public class DatafeedJobBuilderTests extends ESTestCase {
givenJob(jobBuilder);
givenDatafeed(datafeed);
datafeedJobBuilder.build("datafeed1", jobResultsProvider, jobConfigProvider, datafeedConfigProvider,
ActionListener.wrap(datafeedJob -> fail(), taskHandler));
datafeedJobBuilder.build("datafeed1", ActionListener.wrap(datafeedJob -> fail(), taskHandler));
verify(taskHandler).accept(error);
}

View File

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import org.mockito.InOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class DatafeedTimingStatsReporterTests extends ESTestCase {
private static final String JOB_ID = "my-job-id";
private static final TimeValue ONE_SECOND = TimeValue.timeValueSeconds(1);
private JobResultsPersister jobResultsPersister;
@Before
public void setUpTests() {
jobResultsPersister = mock(JobResultsPersister.class);
}
public void testReportSearchDuration() {
DatafeedTimingStatsReporter timingStatsReporter =
new DatafeedTimingStatsReporter(new DatafeedTimingStats(JOB_ID, 3, 10000.0), jobResultsPersister);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 3, 10000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 4, 11000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 5, 12000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 6, 13000.0)));
timingStatsReporter.reportSearchDuration(ONE_SECOND);
assertThat(timingStatsReporter.getCurrentTimingStats(), equalTo(new DatafeedTimingStats(JOB_ID, 7, 14000.0)));
InOrder inOrder = inOrder(jobResultsPersister);
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
new DatafeedTimingStats(JOB_ID, 5, 12000.0), RefreshPolicy.IMMEDIATE);
inOrder.verify(jobResultsPersister).persistDatafeedTimingStats(
new DatafeedTimingStats(JOB_ID, 7, 14000.0), RefreshPolicy.IMMEDIATE);
verifyNoMoreInteractions(jobResultsPersister);
}
public void testTimingStatsDifferSignificantly() {
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1000.0)),
is(false));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1100.0)),
is(false));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 1000.0), new DatafeedTimingStats(JOB_ID, 5, 1120.0)),
is(true));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11000.0)),
is(false));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 10000.0), new DatafeedTimingStats(JOB_ID, 5, 11200.0)),
is(true));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110000.0)),
is(false));
assertThat(
DatafeedTimingStatsReporter.differSignificantly(
new DatafeedTimingStats(JOB_ID, 5, 100000.0), new DatafeedTimingStats(JOB_ID, 5, 110001.0)),
is(true));
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
@ -62,6 +63,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
private GetRollupIndexCapsAction.Response getRollupIndexResponse;
private Client client;
private DatafeedTimingStatsReporter timingStatsReporter;
@Override
protected NamedXContentRegistry xContentRegistry() {
@ -72,6 +74,7 @@ public class DataExtractorFactoryTests extends ESTestCase {
@Before
public void setUpTests() {
client = mock(Client.class);
timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
@ -109,7 +112,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig, jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
@ -125,7 +129,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
@ -141,7 +146,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenDefaultAggregation() {
@ -159,7 +165,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenAggregationWithOffChunk() {
@ -178,7 +185,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
@ -197,7 +205,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndValidAggregation() {
@ -220,7 +229,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
},
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndRemoteIndex() {
@ -246,7 +256,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
},
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
// Test with remote index, aggregation, and chunking
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
@ -254,7 +265,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
// Test with remote index, no aggregation, and no chunking
datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
@ -266,7 +278,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
// Test with remote index, no aggregation, and chunking
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
@ -274,7 +287,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoChunk() {
@ -297,7 +311,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
},
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupButNoAggregations() {
@ -317,7 +332,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupWithBadInterval() {
@ -343,7 +359,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupMissingTerms() {
@ -368,7 +385,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
public void testCreateDataExtractorFactoryGivenRollupMissingMetric() {
@ -393,7 +411,8 @@ public class DataExtractorFactoryTests extends ESTestCase {
assertWarnings("[interval] on [date_histogram] is deprecated, use [fixed_interval] or [calendar_interval] in the future.");
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), listener);
DataExtractorFactory.create(
client, datafeedConfig.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter, listener);
}
private void givenAggregatableRollup(String field, String type, int minuteInterval, String... groupByTerms) {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.junit.Before;
import java.util.Arrays;
@ -29,10 +30,12 @@ import static org.mockito.Mockito.mock;
public class AggregationDataExtractorFactoryTests extends ESTestCase {
private Client client;
private DatafeedTimingStatsReporter timingStatsReporter;
@Before
public void setUpMocks() {
client = mock(Client.class);
timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
}
@Override
@ -76,6 +79,7 @@ public class AggregationDataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
datafeedConfigBuilder.setParsedAggregations(aggs);
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
return new AggregationDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), xContentRegistry());
return new AggregationDataExtractorFactory(
client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()), xContentRegistry(), timingStatsReporter);
}
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
@ -17,7 +18,10 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import java.io.BufferedReader;
@ -54,13 +58,14 @@ public class AggregationDataExtractorTests extends ESTestCase {
private List<String> indices;
private QueryBuilder query;
private AggregatorFactories.Builder aggs;
private DatafeedTimingStatsReporter timingStatsReporter;
private class TestDataExtractor extends AggregationDataExtractor {
private SearchResponse nextResponse;
TestDataExtractor(long start, long end) {
super(testClient, createContext(start, end));
super(testClient, createContext(start, end), timingStatsReporter);
}
@Override
@ -88,6 +93,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
.addAggregator(AggregationBuilders.histogram("time").field("time").interval(1000).subAggregation(
AggregationBuilders.terms("airline").field("airline").subAggregation(
AggregationBuilders.avg("responsetime").field("responsetime"))));
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
}
public void testExtraction() throws IOException {
@ -284,6 +290,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getScrollId()).thenReturn(randomAlphaOfLength(1000));
when(searchResponse.getAggregations()).thenReturn(aggregations);
when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(randomNonNegativeLong()));
return searchResponse;
}
@ -306,6 +313,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getSuccessfulShards()).thenReturn(3);
when(searchResponse.getTotalShards()).thenReturn(3 + unavailableShards);
when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(randomNonNegativeLong()));
return searchResponse;
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
@ -31,6 +32,7 @@ public class ChunkedDataExtractorFactoryTests extends ESTestCase {
private Client client;
private DataExtractorFactory dataExtractorFactory;
private DatafeedTimingStatsReporter timingStatsReporter;
@Override
protected NamedXContentRegistry xContentRegistry() {
@ -42,6 +44,7 @@ public class ChunkedDataExtractorFactoryTests extends ESTestCase {
public void setUpMocks() {
client = mock(Client.class);
dataExtractorFactory = mock(DataExtractorFactory.class);
timingStatsReporter = mock(DatafeedTimingStatsReporter.class);
}
public void testNewExtractor_GivenAlignedTimes() {
@ -103,7 +106,12 @@ public class ChunkedDataExtractorFactoryTests extends ESTestCase {
DatafeedConfig.Builder datafeedConfigBuilder = new DatafeedConfig.Builder("foo-feed", jobBuilder.getId());
datafeedConfigBuilder.setParsedAggregations(aggs);
datafeedConfigBuilder.setIndices(Arrays.asList("my_index"));
return new ChunkedDataExtractorFactory(client, datafeedConfigBuilder.build(), jobBuilder.build(new Date()),
xContentRegistry(), dataExtractorFactory);
return new ChunkedDataExtractorFactory(
client,
datafeedConfigBuilder.build(),
jobBuilder.build(new Date()),
xContentRegistry(),
dataExtractorFactory,
timingStatsReporter);
}
}

View File

@ -23,8 +23,11 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.Min;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import java.io.IOException;
@ -54,17 +57,18 @@ public class ChunkedDataExtractorTests extends ESTestCase {
private int scrollSize;
private TimeValue chunkSpan;
private DataExtractorFactory dataExtractorFactory;
private DatafeedTimingStatsReporter timingStatsReporter;
private class TestDataExtractor extends ChunkedDataExtractor {
private SearchResponse nextResponse;
TestDataExtractor(long start, long end) {
super(client, dataExtractorFactory, createContext(start, end));
super(client, dataExtractorFactory, createContext(start, end), timingStatsReporter);
}
TestDataExtractor(long start, long end, boolean hasAggregations, Long histogramInterval) {
super(client, dataExtractorFactory, createContext(start, end, hasAggregations, histogramInterval));
super(client, dataExtractorFactory, createContext(start, end, hasAggregations, histogramInterval), timingStatsReporter);
}
@Override
@ -89,6 +93,7 @@ public class ChunkedDataExtractorTests extends ESTestCase {
scrollSize = 1000;
chunkSpan = null;
dataExtractorFactory = mock(DataExtractorFactory.class);
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
}
public void testExtractionGivenNoData() throws IOException {

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@ -28,8 +29,11 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.ExtractedField;
import org.elasticsearch.xpack.ml.datafeed.extractor.fields.TimeBasedExtractedFields;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -71,6 +75,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
private int scrollSize;
private long initScrollStartTime;
private ActionFuture<ClearScrollResponse> clearScrollFuture;
private DatafeedTimingStatsReporter timingStatsReporter;
private class TestDataExtractor extends ScrollDataExtractor {
@ -81,7 +86,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
}
TestDataExtractor(ScrollDataExtractorContext context) {
super(client, context);
super(client, context, timingStatsReporter);
}
@Override
@ -140,6 +145,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
clearScrollFuture = mock(ActionFuture.class);
capturedClearScrollRequests = ArgumentCaptor.forClass(ClearScrollRequest.class);
when(client.execute(same(ClearScrollAction.INSTANCE), capturedClearScrollRequests.capture())).thenReturn(clearScrollFuture);
timingStatsReporter = new DatafeedTimingStatsReporter(new DatafeedTimingStats(jobId), mock(JobResultsPersister.class));
}
public void testSinglePageExtraction() throws IOException {
@ -506,6 +512,7 @@ public class ScrollDataExtractorTests extends ESTestCase {
SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[0]),
new TotalHits(hits.size(), TotalHits.Relation.EQUAL_TO), 1);
when(searchResponse.getHits()).thenReturn(searchHits);
when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(randomNonNegativeLong()));
return searchResponse;
}

View File

@ -58,6 +58,7 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzerTests;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.UpdateParams;
@ -103,6 +104,7 @@ public class JobManagerTests extends ESTestCase {
private ClusterService clusterService;
private ThreadPool threadPool;
private JobResultsProvider jobResultsProvider;
private JobResultsPersister jobResultsPersister;
private Auditor auditor;
private UpdateJobProcessNotifier updateJobProcessNotifier;
@ -123,6 +125,7 @@ public class JobManagerTests extends ESTestCase {
givenClusterSettings(settings);
jobResultsProvider = mock(JobResultsProvider.class);
jobResultsPersister = mock(JobResultsPersister.class);
auditor = mock(Auditor.class);
updateJobProcessNotifier = mock(UpdateJobProcessNotifier.class);
@ -593,8 +596,17 @@ public class JobManagerTests extends ESTestCase {
}
private JobManager createJobManager(Client client) {
return new JobManager(environment, environment.settings(), jobResultsProvider, clusterService,
auditor, threadPool, client, updateJobProcessNotifier, xContentRegistry());
return new JobManager(
environment,
environment.settings(),
jobResultsProvider,
jobResultsPersister,
clusterService,
auditor,
threadPool,
client,
updateJobProcessNotifier,
xContentRegistry());
}
private ClusterState createClusterState() {

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import static org.hamcrest.Matchers.arrayContaining;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class JobDataDeleterTests extends ESTestCase {
private static final String JOB_ID = "my-job-id";
private Client client;
@Before
public void setUpTests() {
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
}
public void testDeleteDatafeedTimingStats() {
JobDataDeleter jobDataDeleter = new JobDataDeleter(client, JOB_ID);
jobDataDeleter.deleteDatafeedTimingStats(ActionListener.wrap(
deleteResponse -> {},
e -> fail(e.toString())
));
ArgumentCaptor<DeleteByQueryRequest> deleteRequestCaptor = ArgumentCaptor.forClass(DeleteByQueryRequest.class);
verify(client).threadPool();
verify(client).execute(eq(DeleteByQueryAction.INSTANCE), deleteRequestCaptor.capture(), any(ActionListener.class));
verifyNoMoreInteractions(client);
DeleteByQueryRequest deleteRequest = deleteRequestCaptor.getValue();
assertThat(deleteRequest.indices(), arrayContaining(AnomalyDetectorsIndex.jobResultsAliasedName(JOB_ID)));
}
}

View File

@ -6,15 +6,19 @@
package org.elasticsearch.xpack.ml.job.persistence;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.TimingStats;
import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.core.ml.job.results.Bucket;
@ -33,6 +37,7 @@ import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -226,6 +231,38 @@ public class JobResultsPersisterTests extends ESTestCase {
verifyNoMoreInteractions(client);
}
public void testPersistDatafeedTimingStats() {
Client client = mockClient(ArgumentCaptor.forClass(BulkRequest.class));
doAnswer(
invocationOnMock -> {
// Take the listener passed to client::index as 2nd argument
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1];
// Handle the response on the listener
listener.onResponse(new IndexResponse());
return null;
})
.when(client).index(any(), any(ActionListener.class));
JobResultsPersister persister = new JobResultsPersister(client);
DatafeedTimingStats timingStats = new DatafeedTimingStats("foo", 6, 666.0);
persister.persistDatafeedTimingStats(timingStats, WriteRequest.RefreshPolicy.IMMEDIATE);
ArgumentCaptor<IndexRequest> indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, times(1)).index(indexRequestCaptor.capture(), any(ActionListener.class));
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo"));
assertThat(indexRequest.id(), equalTo("foo_datafeed_timing_stats"));
assertThat(indexRequest.getRefreshPolicy(), equalTo(WriteRequest.RefreshPolicy.IMMEDIATE));
Map<String, Object> expectedSourceAsMap = new HashMap<>();
expectedSourceAsMap.put("job_id", "foo");
expectedSourceAsMap.put("search_count", 6);
expectedSourceAsMap.put("total_search_time_ms", 666.0);
assertThat(indexRequest.sourceAsMap(), equalTo(expectedSourceAsMap));
verify(client, times(1)).threadPool();
verifyNoMoreInteractions(client);
}
@SuppressWarnings({"unchecked"})
private Client mockClient(ArgumentCaptor<BulkRequest> captor) {
Client client = mock(Client.class);

View File

@ -11,7 +11,9 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.MultiSearchAction;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchRequestBuilder;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
@ -42,6 +44,7 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
@ -67,7 +70,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@ -75,6 +80,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class JobResultsProviderTests extends ESTestCase {
@ -880,6 +886,121 @@ public class JobResultsProviderTests extends ESTestCase {
verifyNoMoreInteractions(client);
}
public void testDatafeedTimingStats_EmptyJobList() {
Client client = getBasicMockedClient();
JobResultsProvider provider = createProvider(client);
provider.datafeedTimingStats(
Arrays.asList(),
statsByJobId -> assertThat(statsByJobId, anEmptyMap()),
e -> { throw new AssertionError(); });
verifyZeroInteractions(client);
}
public void testDatafeedTimingStats_MultipleDocumentsAtOnce() throws IOException {
Map<String, Object> sourceFooMap = new HashMap<>();
sourceFooMap.put(Job.ID.getPreferredName(), "foo");
sourceFooMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6);
sourceFooMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0);
Map<String, Object> sourceBarMap = new HashMap<>();
sourceBarMap.put(Job.ID.getPreferredName(), "bar");
sourceBarMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 7);
sourceBarMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 777.0);
List<Map<String, Object>> sourceFoo = Arrays.asList(sourceFooMap);
List<Map<String, Object>> sourceBar = Arrays.asList(sourceBarMap);
SearchResponse responseFoo = createSearchResponse(sourceFoo);
SearchResponse responseBar = createSearchResponse(sourceBar);
MultiSearchResponse multiSearchResponse = new MultiSearchResponse(
new MultiSearchResponse.Item[]{
new MultiSearchResponse.Item(responseFoo, null),
new MultiSearchResponse.Item(responseBar, null)},
randomNonNegativeLong());
Client client = getBasicMockedClient();
when(client.prepareMultiSearch()).thenReturn(new MultiSearchRequestBuilder(client, MultiSearchAction.INSTANCE));
doAnswer(invocationOnMock -> {
MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocationOnMock.getArguments()[0];
assertThat(multiSearchRequest.requests(), hasSize(2));
assertThat(multiSearchRequest.requests().get(0).source().query().getName(), equalTo("ids"));
assertThat(multiSearchRequest.requests().get(1).source().query().getName(), equalTo("ids"));
@SuppressWarnings("unchecked")
ActionListener<MultiSearchResponse> actionListener = (ActionListener<MultiSearchResponse>) invocationOnMock.getArguments()[1];
actionListener.onResponse(multiSearchResponse);
return null;
}).when(client).multiSearch(any(), any());
when(client.prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName("foo")))
.thenReturn(
new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(AnomalyDetectorsIndex.jobResultsAliasedName("foo")));
when(client.prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName("bar")))
.thenReturn(
new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(AnomalyDetectorsIndex.jobResultsAliasedName("bar")));
Map<String, DatafeedTimingStats> expectedStatsByJobId = new HashMap<>();
expectedStatsByJobId.put("foo", new DatafeedTimingStats("foo", 6, 666.0));
expectedStatsByJobId.put("bar", new DatafeedTimingStats("bar", 7, 777.0));
JobResultsProvider provider = createProvider(client);
provider.datafeedTimingStats(
Arrays.asList("foo", "bar"),
statsByJobId -> assertThat(statsByJobId, equalTo(expectedStatsByJobId)),
e -> { throw new AssertionError(); });
verify(client).threadPool();
verify(client).prepareMultiSearch();
verify(client).multiSearch(any(MultiSearchRequest.class), any(ActionListener.class));
verify(client).prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName("foo"));
verify(client).prepareSearch(AnomalyDetectorsIndex.jobResultsAliasedName("bar"));
verifyNoMoreInteractions(client);
}
public void testDatafeedTimingStats_Ok() throws IOException {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
Map<String, Object> sourceFooMap = new HashMap<>();
sourceFooMap.put(Job.ID.getPreferredName(), "foo");
sourceFooMap.put(DatafeedTimingStats.SEARCH_COUNT.getPreferredName(), 6);
sourceFooMap.put(DatafeedTimingStats.TOTAL_SEARCH_TIME_MS.getPreferredName(), 666.0);
List<Map<String, Object>> source = Arrays.asList(sourceFooMap);
SearchResponse response = createSearchResponse(source);
Client client = getMockedClient(
queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),
response);
when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName));
JobResultsProvider provider = createProvider(client);
provider.datafeedTimingStats(
"foo",
stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo", 6, 666.0))),
e -> { throw new AssertionError(); });
verify(client).prepareSearch(indexName);
verify(client).threadPool();
verify(client).search(any(SearchRequest.class), any(ActionListener.class));
verifyNoMoreInteractions(client);
}
public void testDatafeedTimingStats_NotFound() throws IOException {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName("foo");
List<Map<String, Object>> source = new ArrayList<>();
SearchResponse response = createSearchResponse(source);
Client client = getMockedClient(
queryBuilder -> assertThat(queryBuilder.getName(), equalTo("ids")),
response);
when(client.prepareSearch(indexName)).thenReturn(new SearchRequestBuilder(client, SearchAction.INSTANCE).setIndices(indexName));
JobResultsProvider provider = createProvider(client);
provider.datafeedTimingStats(
"foo",
stats -> assertThat(stats, equalTo(new DatafeedTimingStats("foo"))),
e -> { throw new AssertionError(); });
verify(client).prepareSearch(indexName);
verify(client).threadPool();
verify(client).search(any(SearchRequest.class), any(ActionListener.class));
verifyNoMoreInteractions(client);
}
private Bucket createBucketAtEpochTime(long epoch) {
return new Bucket("foo", new Date(epoch), 123);
}
@ -910,11 +1031,16 @@ public class JobResultsProviderTests extends ESTestCase {
return response;
}
private Client getMockedClient(Consumer<QueryBuilder> queryBuilderConsumer, SearchResponse response) {
private Client getBasicMockedClient() {
Client client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
return client;
}
private Client getMockedClient(Consumer<QueryBuilder> queryBuilderConsumer, SearchResponse response) {
Client client = getBasicMockedClient();
doAnswer(invocationOnMock -> {
MultiSearchRequest multiSearchRequest = (MultiSearchRequest) invocationOnMock.getArguments()[0];
queryBuilderConsumer.accept(multiSearchRequest.requests().get(0).source().query());

View File

@ -55,7 +55,7 @@ public class TimingStatsReporterTests extends ESTestCase {
inOrder.verifyNoMoreInteractions();
}
public void testFlush() {
public void testFinishReporting() {
TimingStatsReporter reporter = new TimingStatsReporter(new TimingStats(JOB_ID), bulkResultsPersister);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID)));
@ -68,7 +68,7 @@ public class TimingStatsReporterTests extends ESTestCase {
reporter.reportBucketProcessingTime(10);
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
reporter.flush();
reporter.finishReporting();
assertThat(reporter.getCurrentTimingStats(), equalTo(new TimingStats(JOB_ID, 3, 10.0, 10.0, 10.0, 10.0)));
InOrder inOrder = inOrder(bulkResultsPersister);

View File

@ -126,16 +126,22 @@ setup:
- do:
ml.get_datafeed_stats:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "stopped"}
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1" }
- match: { datafeeds.0.timing_stats.search_count: 0 }
- match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
- do:
ml.get_datafeed_stats:
datafeed_id: datafeed-2
- match: { datafeeds.0.datafeed_id: "datafeed-2"}
- match: { datafeeds.0.state: "stopped"}
- match: { datafeeds.0.datafeed_id: "datafeed-2"}
- match: { datafeeds.0.state: "stopped"}
- is_false: datafeeds.0.node
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-2" }
- match: { datafeeds.0.timing_stats.search_count: 0 }
- match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
---
"Test get stats for started datafeed":
@ -146,8 +152,8 @@ setup:
- do:
ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
ml.get_datafeed_stats:
@ -158,6 +164,42 @@ setup:
- is_true: datafeeds.0.node.transport_address
- match: { datafeeds.0.node.attributes.ml\.max_open_jobs: "20"}
---
"Test get stats for started datafeed contains timing stats":
- do:
ml.open_job:
job_id: get-datafeed-stats-1
- do:
ml.start_datafeed:
datafeed_id: "datafeed-1"
start: 0
- do:
ml.get_datafeed_stats:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "started"}
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
- match: { datafeeds.0.timing_stats.search_count: 0}
- match: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
- do:
ml.stop_datafeed:
datafeed_id: "datafeed-1"
- match: { stopped: true}
- do:
ml.get_datafeed_stats:
datafeed_id: datafeed-1
- match: { datafeeds.0.datafeed_id: "datafeed-1"}
- match: { datafeeds.0.state: "stopped"}
- match: { datafeeds.0.timing_stats.job_id: "get-datafeed-stats-1"}
# TODO: Change "gte 0" to "match 1" once https://github.com/elastic/elasticsearch/issues/44132 is fixed
- gte: { datafeeds.0.timing_stats.search_count: 0}
- gte: { datafeeds.0.timing_stats.total_search_time_ms: 0.0}
---
"Test implicit get all datafeed stats given started datafeeds":
@ -167,8 +209,8 @@ setup:
- do:
ml.start_datafeed:
"datafeed_id": "datafeed-1"
"start": 0
datafeed_id: "datafeed-1"
start: 0
- do:
ml.open_job:
@ -176,8 +218,8 @@ setup:
- do:
ml.start_datafeed:
"datafeed_id": "datafeed-2"
"start": 0
datafeed_id: "datafeed-2"
start: 0
- do:
ml.get_datafeed_stats: {}