[7.x]ML] Parse and report memory usage for DF Analytics (#52778) (#52980)

Adds reporting of memory usage for data frame analytics jobs.
This commit introduces a new index pattern `.ml-stats-*` whose
first concrete index will be `.ml-stats-000001`. This index serves
to store instrumentation information for those jobs.

Backport of #52778 and #52958
This commit is contained in:
Dimitris Athanasiou 2020-02-29 13:03:40 +02:00 committed by GitHub
parent d9463983af
commit 85b4e45093
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 931 additions and 218 deletions

View File

@ -44,6 +44,7 @@ public class DataFrameAnalyticsStats {
static final ParseField STATE = new ParseField("state");
static final ParseField FAILURE_REASON = new ParseField("failure_reason");
static final ParseField PROGRESS = new ParseField("progress");
static final ParseField MEMORY_USAGE = new ParseField("memory_usage");
static final ParseField NODE = new ParseField("node");
static final ParseField ASSIGNMENT_EXPLANATION = new ParseField("assignment_explanation");
@ -55,8 +56,9 @@ public class DataFrameAnalyticsStats {
(DataFrameAnalyticsState) args[1],
(String) args[2],
(List<PhaseProgress>) args[3],
(NodeAttributes) args[4],
(String) args[5]));
(MemoryUsage) args[4],
(NodeAttributes) args[5],
(String) args[6]));
static {
PARSER.declareString(constructorArg(), ID);
@ -68,6 +70,7 @@ public class DataFrameAnalyticsStats {
}, STATE, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), FAILURE_REASON);
PARSER.declareObjectArray(optionalConstructorArg(), PhaseProgress.PARSER, PROGRESS);
PARSER.declareObject(optionalConstructorArg(), MemoryUsage.PARSER, MEMORY_USAGE);
PARSER.declareObject(optionalConstructorArg(), NodeAttributes.PARSER, NODE);
PARSER.declareString(optionalConstructorArg(), ASSIGNMENT_EXPLANATION);
}
@ -76,16 +79,18 @@ public class DataFrameAnalyticsStats {
private final DataFrameAnalyticsState state;
private final String failureReason;
private final List<PhaseProgress> progress;
private final MemoryUsage memoryUsage;
private final NodeAttributes node;
private final String assignmentExplanation;
public DataFrameAnalyticsStats(String id, DataFrameAnalyticsState state, @Nullable String failureReason,
@Nullable List<PhaseProgress> progress, @Nullable NodeAttributes node,
@Nullable String assignmentExplanation) {
@Nullable List<PhaseProgress> progress, @Nullable MemoryUsage memoryUsage,
@Nullable NodeAttributes node, @Nullable String assignmentExplanation) {
this.id = id;
this.state = state;
this.failureReason = failureReason;
this.progress = progress;
this.memoryUsage = memoryUsage;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
}
@ -106,6 +111,11 @@ public class DataFrameAnalyticsStats {
return progress;
}
@Nullable
public MemoryUsage getMemoryUsage() {
return memoryUsage;
}
public NodeAttributes getNode() {
return node;
}
@ -124,13 +134,14 @@ public class DataFrameAnalyticsStats {
&& Objects.equals(state, other.state)
&& Objects.equals(failureReason, other.failureReason)
&& Objects.equals(progress, other.progress)
&& Objects.equals(memoryUsage, other.memoryUsage)
&& Objects.equals(node, other.node)
&& Objects.equals(assignmentExplanation, other.assignmentExplanation);
}
@Override
public int hashCode() {
return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation);
return Objects.hash(id, state, failureReason, progress, memoryUsage, node, assignmentExplanation);
}
@Override
@ -140,6 +151,7 @@ public class DataFrameAnalyticsStats {
.add("state", state)
.add("failureReason", failureReason)
.add("progress", progress)
.add("memoryUsage", memoryUsage)
.add("node", node)
.add("assignmentExplanation", assignmentExplanation)
.toString();

View File

@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;
import org.elasticsearch.client.common.TimeUtil;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.internal.ToStringBuilder;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
public class MemoryUsage implements ToXContentObject {
static final ParseField TIMESTAMP = new ParseField("timestamp");
static final ParseField PEAK_USAGE_BYTES = new ParseField("peak_usage_bytes");
public static final ConstructingObjectParser<MemoryUsage, Void> PARSER = new ConstructingObjectParser<>("analytics_memory_usage",
true, a -> new MemoryUsage((Instant) a[0], (long) a[1]));
static {
PARSER.declareField(ConstructingObjectParser.constructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES);
}
private final Instant timestamp;
private final long peakUsageBytes;
public MemoryUsage(Instant timestamp, long peakUsageBytes) {
this.timestamp = Instant.ofEpochMilli(Objects.requireNonNull(timestamp).toEpochMilli());
this.peakUsageBytes = peakUsageBytes;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;
MemoryUsage other = (MemoryUsage) o;
return Objects.equals(timestamp, other.timestamp)
&& peakUsageBytes == other.peakUsageBytes;
}
@Override
public int hashCode() {
return Objects.hash(timestamp, peakUsageBytes);
}
@Override
public String toString() {
return new ToStringBuilder(getClass())
.add(TIMESTAMP.getPreferredName(), timestamp.getEpochSecond())
.add(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes)
.toString();
}
}

View File

@ -1536,6 +1536,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
assertThat(progress.get(1), equalTo(new PhaseProgress("loading_data", 0)));
assertThat(progress.get(2), equalTo(new PhaseProgress("analyzing", 0)));
assertThat(progress.get(3), equalTo(new PhaseProgress("writing_results", 0)));
assertThat(stats.getMemoryUsage(), is(nullValue()));
}
public void testStartDataFrameAnalyticsConfig() throws Exception {

View File

@ -47,6 +47,7 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase {
randomFrom(DataFrameAnalyticsState.values()),
randomBoolean() ? null : randomAlphaOfLength(10),
randomBoolean() ? null : createRandomProgress(),
randomBoolean() ? null : MemoryUsageTests.createRandom(),
randomBoolean() ? null : NodeAttributesTests.createRandom(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 20));
}
@ -70,6 +71,9 @@ public class DataFrameAnalyticsStatsTests extends ESTestCase {
if (stats.getProgress() != null) {
builder.field(DataFrameAnalyticsStats.PROGRESS.getPreferredName(), stats.getProgress());
}
if (stats.getMemoryUsage() != null) {
builder.field(DataFrameAnalyticsStats.MEMORY_USAGE.getPreferredName(), stats.getMemoryUsage());
}
if (stats.getNode() != null) {
builder.field(DataFrameAnalyticsStats.NODE.getPreferredName(), stats.getNode());
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml.dataframe;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.time.Instant;
public class MemoryUsageTests extends AbstractXContentTestCase<MemoryUsage> {
@Override
protected MemoryUsage createTestInstance() {
return createRandom();
}
public static MemoryUsage createRandom() {
return new MemoryUsage(Instant.now(), randomNonNegativeLong());
}
@Override
protected MemoryUsage doParseInstance(XContentParser parser) throws IOException {
return MemoryUsage.PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View File

@ -449,13 +449,25 @@ sorted by the `id` value in ascending order.
`progress`:::
(array) The progress report of the {dfanalytics-job} by phase.
`phase`:::
`phase`::::
(string) Defines the phase of the {dfanalytics-job}. Possible phases:
`reindexing`, `loading_data`, `analyzing`, and `writing_results`.
`progress_percent`:::
`progress_percent`::::
(integer) The progress that the {dfanalytics-job} has made expressed in
percentage.
`memory_usage`:::
(Optional, Object) An object describing memory usage of the analytics.
It will be present only after the job has started and memory usage has
been reported.
`timestamp`::::
(date) The timestamp when memory usage was calculated.
`peak_usage_bytes`::::
(long) The number of bytes used at the highest peak of memory usage.
end::data-frame-analytics-stats[]
tag::datafeed-id[]

View File

@ -22,6 +22,10 @@ public final class TimeUtils {
// Do nothing
}
/**
* @deprecated Please use {@link #parseTimeFieldToInstant(XContentParser, String)} instead.
*/
@Deprecated
public static Date parseTimeField(XContentParser parser, String fieldName) throws IOException {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return new Date(parser.longValue());
@ -36,7 +40,7 @@ public final class TimeUtils {
if (parser.currentToken() == XContentParser.Token.VALUE_NUMBER) {
return Instant.ofEpochMilli(parser.longValue());
} else if (parser.currentToken() == XContentParser.Token.VALUE_STRING) {
return Instant.ofEpochMilli(dateStringToEpoch(parser.text()));
return Instant.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(parser.text()));
}
throw new IllegalArgumentException(
"unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]");
@ -54,6 +58,7 @@ public final class TimeUtils {
* @return The epoch time in milliseconds or -1 if the date cannot be
* parsed.
*/
@Deprecated
public static long dateStringToEpoch(String date) {
try {
long epoch = Long.parseLong(date);

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import java.util.Collections;
/**
* Describes the indices where ML is storing various stats about the users jobs.
*/
public class MlStatsIndex {
public static final String TEMPLATE_NAME = ".ml-stats";
private static final String MAPPINGS_VERSION_VARIABLE = "xpack.ml.version";
private MlStatsIndex() {}
public static String mapping() {
return mapping(MapperService.SINGLE_MAPPING_NAME);
}
public static String mapping(String mappingType) {
return TemplateUtils.loadTemplate("/org/elasticsearch/xpack/core/ml/stats_index_mappings.json",
Version.CURRENT.toString(), MAPPINGS_VERSION_VARIABLE, Collections.singletonMap("xpack.ml.mapping_type", mappingType));
}
public static String indexPattern() {
return TEMPLATE_NAME + "-*";
}
public static String writeAlias() {
return ".ml-stats-write";
}
/**
* Creates the first concrete .ml-stats-000001 index (if necessary)
* Creates the .ml-stats-write alias for that index.
* The listener will be notified with a boolean to indicate if the index was created because of this call,
* but unless there is a failure after this method returns the index and alias should be present.
*/
public static void createStatsIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver,
ActionListener<Boolean> listener) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver, TEMPLATE_NAME, writeAlias(), listener);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
@ -163,17 +164,21 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
*/
private final List<PhaseProgress> progress;
@Nullable
private final MemoryUsage memoryUsage;
@Nullable
private final DiscoveryNode node;
@Nullable
private final String assignmentExplanation;
public Stats(String id, DataFrameAnalyticsState state, @Nullable String failureReason, List<PhaseProgress> progress,
@Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
@Nullable MemoryUsage memoryUsage, @Nullable DiscoveryNode node, @Nullable String assignmentExplanation) {
this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state);
this.failureReason = failureReason;
this.progress = Objects.requireNonNull(progress);
this.memoryUsage = memoryUsage;
this.node = node;
this.assignmentExplanation = assignmentExplanation;
}
@ -187,6 +192,11 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
} else {
progress = in.readList(PhaseProgress::new);
}
if (in.getVersion().onOrAfter(Version.V_7_7_0)) {
memoryUsage = in.readOptionalWriteable(MemoryUsage::new);
} else {
memoryUsage = null;
}
node = in.readOptionalWriteable(DiscoveryNode::new);
assignmentExplanation = in.readOptionalString();
}
@ -240,6 +250,11 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
return progress;
}
@Nullable
public MemoryUsage getMemoryUsage() {
return memoryUsage;
}
public DiscoveryNode getNode() {
return node;
}
@ -267,6 +282,9 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
if (progress != null) {
builder.field("progress", progress);
}
if (memoryUsage != null) {
builder.field("memory_usage", memoryUsage);
}
if (node != null) {
builder.startObject("node");
builder.field("id", node.getId());
@ -297,6 +315,9 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
} else {
out.writeList(progress);
}
if (out.getVersion().onOrAfter(Version.V_7_7_0)) {
out.writeOptionalWriteable(memoryUsage);
}
out.writeOptionalWriteable(node);
out.writeOptionalString(assignmentExplanation);
}
@ -329,7 +350,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
@Override
public int hashCode() {
return Objects.hash(id, state, failureReason, progress, node, assignmentExplanation);
return Objects.hash(id, state, failureReason, progress, memoryUsage, node, assignmentExplanation);
}
@Override
@ -345,6 +366,7 @@ public class GetDataFrameAnalyticsStatsAction extends ActionType<GetDataFrameAna
&& Objects.equals(this.state, other.state)
&& Objects.equals(this.failureReason, other.failureReason)
&& Objects.equals(this.progress, other.progress)
&& Objects.equals(this.memoryUsage, other.memoryUsage)
&& Objects.equals(this.node, other.node)
&& Objects.equals(this.assignmentExplanation, other.assignmentExplanation);
}

View File

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe.stats;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import java.io.IOException;
import java.time.Instant;
import java.util.Objects;
public class MemoryUsage implements Writeable, ToXContentObject {
public static final String TYPE_VALUE = "analytics_memory_usage";
public static final ParseField TYPE = new ParseField("type");
public static final ParseField JOB_ID = new ParseField("job_id");
public static final ParseField TIMESTAMP = new ParseField("timestamp");
public static final ParseField PEAK_USAGE_BYTES = new ParseField("peak_usage_bytes");
public static final ConstructingObjectParser<MemoryUsage, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<MemoryUsage, Void> LENIENT_PARSER = createParser(true);
private static ConstructingObjectParser<MemoryUsage, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<MemoryUsage, Void> parser = new ConstructingObjectParser<>(TYPE_VALUE,
ignoreUnknownFields, a -> new MemoryUsage((String) a[0], (Instant) a[1], (long) a[2]));
parser.declareString((bucket, s) -> {}, TYPE);
parser.declareString(ConstructingObjectParser.constructorArg(), JOB_ID);
parser.declareField(ConstructingObjectParser.constructorArg(),
p -> TimeUtils.parseTimeFieldToInstant(p, TIMESTAMP.getPreferredName()),
TIMESTAMP,
ObjectParser.ValueType.VALUE);
parser.declareLong(ConstructingObjectParser.constructorArg(), PEAK_USAGE_BYTES);
return parser;
}
private final String jobId;
private final Instant timestamp;
private final long peakUsageBytes;
public MemoryUsage(String jobId, Instant timestamp, long peakUsageBytes) {
this.jobId = Objects.requireNonNull(jobId);
// We intend to store this timestamp in millis granularity. Thus we're rounding here to ensure
// internal representation matches toXContent
this.timestamp = Instant.ofEpochMilli(ExceptionsHelper.requireNonNull(timestamp, TIMESTAMP).toEpochMilli());
this.peakUsageBytes = peakUsageBytes;
}
public MemoryUsage(StreamInput in) throws IOException {
jobId = in.readString();
timestamp = in.readInstant();
peakUsageBytes = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(jobId);
out.writeInstant(timestamp);
out.writeVLong(peakUsageBytes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) {
builder.field(TYPE.getPreferredName(), TYPE_VALUE);
builder.field(JOB_ID.getPreferredName(), jobId);
}
builder.timeField(TIMESTAMP.getPreferredName(), TIMESTAMP.getPreferredName() + "_string", timestamp.toEpochMilli());
builder.field(PEAK_USAGE_BYTES.getPreferredName(), peakUsageBytes);
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (o == this) return true;
if (o == null || getClass() != o.getClass()) return false;
MemoryUsage other = (MemoryUsage) o;
return Objects.equals(jobId, other.jobId)
&& Objects.equals(timestamp, other.timestamp)
&& peakUsageBytes == other.peakUsageBytes;
}
@Override
public int hashCode() {
return Objects.hash(jobId, timestamp, peakUsageBytes);
}
@Override
public String toString() {
return Strings.toString(this);
}
public String documentId(String jobId) {
return documentIdPrefix(jobId) + timestamp.toEpochMilli();
}
public static String documentIdPrefix(String jobId) {
return TYPE_VALUE + "_" + jobId + "_";
}
}

View File

@ -5,30 +5,19 @@
*/
package org.elasticsearch.xpack.core.ml.job.persistence;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.TemplateUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.regex.Pattern;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* Methods for handling index naming related functions
*/
@ -120,61 +109,8 @@ public final class AnomalyDetectorsIndex {
*/
public static void createStateIndexAndAliasIfNecessary(Client client, ClusterState state, IndexNameExpressionResolver resolver,
final ActionListener<Boolean> finalListener) {
if (state.getMetaData().getAliasAndIndexLookup().containsKey(jobStateIndexWriteAlias())) {
finalListener.onResponse(false);
return;
}
final ActionListener<String> createAliasListener = ActionListener.wrap(
concreteIndexName -> {
final IndicesAliasesRequest request = client.admin()
.indices()
.prepareAliases()
.addAlias(concreteIndexName, jobStateIndexWriteAlias())
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> finalListener.onResponse(resp.isAcknowledged()),
finalListener::onFailure),
client.admin().indices()::aliases);
},
finalListener::onFailure
);
String[] stateIndices = resolver.concreteIndexNames(state,
IndicesOptions.lenientExpandOpen(),
jobStateIndexPattern());
if (stateIndices.length > 0) {
String latestStateIndex = Arrays.stream(stateIndices).max(STATE_INDEX_NAME_COMPARATOR).get();
createAliasListener.onResponse(latestStateIndex);
} else {
// The initial index name must be suitable for rollover functionality.
String initialJobStateIndex = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001";
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(initialJobStateIndex)
.addAlias(new Alias(jobStateIndexWriteAlias()))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> finalListener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(initialJobStateIndex);
} else {
finalListener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
}
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, state, resolver,
AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX, AnomalyDetectorsIndex.jobStateIndexWriteAlias(), finalListener);
}
public static String resultsMapping() {

View File

@ -0,0 +1,119 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import java.util.Arrays;
import java.util.Comparator;
import java.util.regex.Pattern;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
/**
* Utils to create an ML index with alias ready for rollover with a 6-digit suffix
*/
public final class MlIndexAndAlias {
// Visible for testing
static final Comparator<String> INDEX_NAME_COMPARATOR = new Comparator<String>() {
private final Pattern HAS_SIX_DIGIT_SUFFIX = Pattern.compile("\\d{6}");
@Override
public int compare(String index1, String index2) {
String[] index1Parts = index1.split("-");
String index1Suffix = index1Parts[index1Parts.length - 1];
boolean index1HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index1Suffix).matches();
String[] index2Parts = index2.split("-");
String index2Suffix = index2Parts[index2Parts.length - 1];
boolean index2HasSixDigitsSuffix = HAS_SIX_DIGIT_SUFFIX.matcher(index2Suffix).matches();
if (index1HasSixDigitsSuffix && index2HasSixDigitsSuffix) {
return index1Suffix.compareTo(index2Suffix);
} else if (index1HasSixDigitsSuffix != index2HasSixDigitsSuffix) {
return Boolean.compare(index1HasSixDigitsSuffix, index2HasSixDigitsSuffix);
} else {
return index1.compareTo(index2);
}
}
};
private MlIndexAndAlias() {}
/**
* Creates the first index with a name of the given {@code indexPatternPrefix} followed by "-000001", if the index is missing.
* Adds an {@code alias} to that index if it was created,
* or to the index with the highest suffix if the index did not have to be created.
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
*/
public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver,
String indexPatternPrefix, String alias, ActionListener<Boolean> listener) {
if (clusterState.getMetaData().getAliasAndIndexLookup().containsKey(alias)) {
listener.onResponse(false);
return;
}
final ActionListener<String> createAliasListener = ActionListener.wrap(
concreteIndexName -> {
final IndicesAliasesRequest request = client.admin()
.indices()
.prepareAliases()
.addAlias(concreteIndexName, alias)
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
request,
ActionListener.<AcknowledgedResponse>wrap(
resp -> listener.onResponse(resp.isAcknowledged()),
listener::onFailure),
client.admin().indices()::aliases);
},
listener::onFailure
);
String[] stateIndices = resolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(), indexPatternPrefix + "*");
if (stateIndices.length > 0) {
String latestStateIndex = Arrays.stream(stateIndices).max(INDEX_NAME_COMPARATOR).get();
createAliasListener.onResponse(latestStateIndex);
} else {
// The initial index name must be suitable for rollover functionality.
String initialJobStateIndex = indexPatternPrefix + "-000001";
CreateIndexRequest createIndexRequest = client.admin()
.indices()
.prepareCreate(initialJobStateIndex)
.addAlias(new Alias(alias))
.request();
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
ML_ORIGIN,
createIndexRequest,
ActionListener.<CreateIndexResponse>wrap(
createIndexResponse -> listener.onResponse(true),
createIndexFailure -> {
// If it was created between our last check, and this request being handled, we should add the alias
// Adding an alias that already exists is idempotent. So, no need to double check if the alias exists
// as well.
if (ExceptionsHelper.unwrapCause(createIndexFailure) instanceof ResourceAlreadyExistsException) {
createAliasListener.onResponse(initialJobStateIndex);
} else {
listener.onFailure(createIndexFailure);
}
}),
client.admin().indices()::create);
}
}
}

View File

@ -0,0 +1,21 @@
{
"${xpack.ml.mapping_type}": {
"_meta": {
"version" : "${xpack.ml.version}"
},
"properties" : {
"type" : {
"type" : "keyword"
},
"job_id" : {
"type" : "keyword"
},
"timestamp" : {
"type" : "date"
},
"peak_usage_bytes" : {
"type" : "long"
}
}
}
}

View File

@ -0,0 +1,15 @@
{
"order" : 0,
"version" : ${xpack.ml.version.id},
"index_patterns" : [
".ml-stats-*"
],
"settings": {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1",
"hidden": true
}
},
"mappings" : ${xpack.ml.stats.mappings}
}

View File

@ -11,6 +11,8 @@ import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.Response;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import java.util.ArrayList;
@ -27,8 +29,9 @@ public class GetDataFrameAnalyticsStatsActionResponseTests extends AbstractWireS
List<PhaseProgress> progress = new ArrayList<>(progressSize);
IntStream.of(progressSize).forEach(progressIndex -> progress.add(
new PhaseProgress(randomAlphaOfLength(10), randomIntBetween(0, 100))));
MemoryUsage memoryUsage = randomBoolean() ? null : MemoryUsageTests.createRandom();
Response.Stats stats = new Response.Stats(DataFrameAnalyticsConfigTests.randomValidId(),
randomFrom(DataFrameAnalyticsState.values()), failureReason, progress, null, randomAlphaOfLength(20));
randomFrom(DataFrameAnalyticsState.values()), failureReason, progress, memoryUsage, null, randomAlphaOfLength(20));
analytics.add(stats);
}
return new Response(new QueryPage<>(analytics, analytics.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD));

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.dataframe.stats;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.junit.Before;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
public class MemoryUsageTests extends AbstractSerializingTestCase<MemoryUsage> {
private boolean lenient;
@Before
public void chooseStrictOrLenient() {
lenient = randomBoolean();
}
@Override
protected boolean supportsUnknownFields() {
return lenient;
}
@Override
protected MemoryUsage doParseInstance(XContentParser parser) throws IOException {
return lenient ? MemoryUsage.LENIENT_PARSER.parse(parser, null) : MemoryUsage.STRICT_PARSER.parse(parser, null);
}
@Override
protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
}
public static MemoryUsage createRandom() {
return new MemoryUsage(randomAlphaOfLength(10), Instant.now(), randomNonNegativeLong());
}
@Override
protected Writeable.Reader<MemoryUsage> instanceReader() {
return MemoryUsage::new;
}
@Override
protected MemoryUsage createTestInstance() {
return createRandom();
}
}

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.job.persistence;
package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -56,11 +56,12 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class AnomalyDetectorsIndexTests extends ESTestCase {
public class MlIndexAndAliasTests extends ESTestCase {
private static final String LEGACY_ML_STATE = ".ml-state";
private static final String INITIAL_ML_STATE = ".ml-state-000001";
private static final String ML_STATE_WRITE_ALIAS = ".ml-state-write";
private static final String TEST_INDEX_PREFIX = "test";
private static final String TEST_INDEX_ALIAS = "test-alias";
private static final String LEGACY_INDEX_WITHOUT_SUFFIX = TEST_INDEX_PREFIX;
private static final String FIRST_CONCRETE_INDEX = "test-000001";
private ThreadPool threadPool;
private IndicesAdminClient indicesAdminClient;
@ -77,9 +78,9 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
indicesAdminClient = mock(IndicesAdminClient.class);
when(indicesAdminClient.prepareCreate(INITIAL_ML_STATE))
.thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, INITIAL_ML_STATE));
doAnswer(withResponse(new CreateIndexResponse(true, true, INITIAL_ML_STATE))).when(indicesAdminClient).create(any(), any());
when(indicesAdminClient.prepareCreate(FIRST_CONCRETE_INDEX))
.thenReturn(new CreateIndexRequestBuilder(client, CreateIndexAction.INSTANCE, FIRST_CONCRETE_INDEX));
doAnswer(withResponse(new CreateIndexResponse(true, true, FIRST_CONCRETE_INDEX))).when(indicesAdminClient).create(any(), any());
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());
@ -103,31 +104,31 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
public void testCreateStateIndexAndAliasIfNecessary_CleanState() {
ClusterState clusterState = createClusterState(Collections.emptyMap());
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
inOrder.verify(indicesAdminClient).prepareCreate(INITIAL_ML_STATE);
inOrder.verify(indicesAdminClient).prepareCreate(FIRST_CONCRETE_INDEX);
inOrder.verify(indicesAdminClient).create(createRequestCaptor.capture(), any());
inOrder.verify(finalListener).onResponse(true);
CreateIndexRequest createRequest = createRequestCaptor.getValue();
assertThat(createRequest.index(), equalTo(INITIAL_ML_STATE));
assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(ML_STATE_WRITE_ALIAS))));
assertThat(createRequest.index(), equalTo(FIRST_CONCRETE_INDEX));
assertThat(createRequest.aliases(), equalTo(Collections.singleton(new Alias(TEST_INDEX_ALIAS))));
}
private void assertNoClientInteractionsWhenWriteAliasAlreadyExists(String indexName) {
ClusterState clusterState = createClusterState(Collections.singletonMap(indexName, createIndexMetaDataWithAlias(indexName)));
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
createIndexAndAliasIfNecessary(clusterState);
verify(finalListener).onResponse(false);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyStateIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_ML_STATE);
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtLegacyIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(LEGACY_INDEX_WITHOUT_SUFFIX);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtInitialStateIndex() {
assertNoClientInteractionsWhenWriteAliasAlreadyExists(INITIAL_ML_STATE);
assertNoClientInteractionsWhenWriteAliasAlreadyExists(FIRST_CONCRETE_INDEX);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasAlreadyExistsAndPointsAtSubsequentStateIndex() {
@ -141,8 +142,8 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
private void assertMlStateWriteAliasAddedToMostRecentMlStateIndex(List<String> existingIndexNames, String expectedWriteIndexName) {
ClusterState clusterState =
createClusterState(
existingIndexNames.stream().collect(toMap(Function.identity(), AnomalyDetectorsIndexTests::createIndexMetaData)));
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(), finalListener);
existingIndexNames.stream().collect(toMap(Function.identity(), MlIndexAndAliasTests::createIndexMetaData)));
createIndexAndAliasIfNecessary(clusterState);
InOrder inOrder = inOrder(indicesAdminClient, finalListener);
inOrder.verify(indicesAdminClient).prepareAliases();
@ -152,54 +153,59 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
IndicesAliasesRequest indicesAliasesRequest = aliasesRequestCaptor.getValue();
assertThat(
indicesAliasesRequest.getAliasActions(),
contains(AliasActions.add().alias(ML_STATE_WRITE_ALIAS).index(expectedWriteIndexName)));
contains(AliasActions.add().alias(TEST_INDEX_ALIAS).index(expectedWriteIndexName)));
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyStateIndexExists() {
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButLegacyIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(LEGACY_ML_STATE), LEGACY_ML_STATE);
Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX), LEGACY_INDEX_WITHOUT_SUFFIX);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButInitialStateIndexExists() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(INITIAL_ML_STATE), INITIAL_ML_STATE);
Arrays.asList(FIRST_CONCRETE_INDEX), FIRST_CONCRETE_INDEX);
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButSubsequentStateIndicesExist() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500");
Arrays.asList("test-000003", "test-000040", "test-000500"), "test-000500");
}
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewStateIndicesDoExist() {
public void testCreateStateIndexAndAliasIfNecessary_WriteAliasDoesNotExistButBothLegacyAndNewIndicesExist() {
assertMlStateWriteAliasAddedToMostRecentMlStateIndex(
Arrays.asList(LEGACY_ML_STATE, ".ml-state-000003", ".ml-state-000040", ".ml-state-000500"), ".ml-state-000500");
Arrays.asList(LEGACY_INDEX_WITHOUT_SUFFIX, "test-000003", "test-000040", "test-000500"), "test-000500");
}
public void testStateIndexNameComparator() {
Comparator<String> comparator = AnomalyDetectorsIndex.STATE_INDEX_NAME_COMPARATOR;
public void testIndexNameComparator() {
Comparator<String> comparator = MlIndexAndAlias.INDEX_NAME_COMPARATOR;
assertThat(
Stream.of(".ml-state-000001").max(comparator).get(),
equalTo(".ml-state-000001"));
Stream.of("test-000001").max(comparator).get(),
equalTo("test-000001"));
assertThat(
Stream.of(".ml-state-000002", ".ml-state-000001").max(comparator).get(),
equalTo(".ml-state-000002"));
Stream.of("test-000002", "test-000001").max(comparator).get(),
equalTo("test-000002"));
assertThat(
Stream.of(".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(),
equalTo(".ml-state-000500"));
Stream.of("test-000003", "test-000040", "test-000500").max(comparator).get(),
equalTo("test-000500"));
assertThat(
Stream.of(".ml-state-000042", ".ml-state-000049", ".ml-state-000038").max(comparator).get(),
equalTo(".ml-state-000049"));
Stream.of("test-000042", "test-000049", "test-000038").max(comparator).get(),
equalTo("test-000049"));
assertThat(
Stream.of(".ml-state", ".ml-state-000003", ".ml-state-000040", ".ml-state-000500").max(comparator).get(),
equalTo(".ml-state-000500"));
Stream.of("test", "test-000003", "test-000040", "test-000500").max(comparator).get(),
equalTo("test-000500"));
assertThat(
Stream.of(".reindexed-6-ml-state", ".ml-state-000042").max(comparator).get(),
equalTo(".ml-state-000042"));
Stream.of(".reindexed-6-test", "test-000042").max(comparator).get(),
equalTo("test-000042"));
assertThat(
Stream.of(".a-000002", ".b-000001").max(comparator).get(),
equalTo(".a-000002"));
}
private void createIndexAndAliasIfNecessary(ClusterState clusterState) {
MlIndexAndAlias.createIndexAndAliasIfNecessary(client, clusterState, new IndexNameExpressionResolver(),
TEST_INDEX_PREFIX, TEST_INDEX_ALIAS, finalListener);
}
@SuppressWarnings("unchecked")
private static <Response> Answer<Response> withResponse(Response response) {
return invocationOnMock -> {
@ -234,7 +240,7 @@ public class AnomalyDetectorsIndexTests extends ESTestCase {
IndexMetaData.Builder builder = IndexMetaData.builder(indexName)
.settings(settings);
if (withAlias) {
builder.putAlias(AliasMetaData.builder(ML_STATE_WRITE_ALIAS).build());
builder.putAlias(AliasMetaData.builder(TEST_INDEX_ALIAS).build());
}
return builder.build();
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlConfigIndex;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields;
@ -56,6 +57,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
ROOT_RESOURCE_PATH + "inference_index_template.json", Version.CURRENT.id, VERSION_PATTERN,
Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id)));
private static final IndexTemplateConfig STATS_TEMPLATE = statsTemplate();
private static IndexTemplateConfig configTemplate() {
Map<String, String> variables = new HashMap<>();
variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
@ -80,6 +83,17 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
variables);
}
private static IndexTemplateConfig statsTemplate() {
Map<String, String> variables = new HashMap<>();
variables.put(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id));
variables.put("xpack.ml.stats.mappings", MlStatsIndex.mapping());
return new IndexTemplateConfig(MlStatsIndex.TEMPLATE_NAME,
ROOT_RESOURCE_PATH + "stats_index_template.json",
Version.CURRENT.id, VERSION_PATTERN,
variables);
}
public MlIndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry);
@ -98,7 +112,8 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry {
CONFIG_TEMPLATE,
INFERENCE_TEMPLATE,
META_TEMPLATE,
NOTIFICATIONS_TEMPLATE
NOTIFICATIONS_TEMPLATE,
STATS_TEMPLATE
);
}

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.ml.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
@ -22,20 +21,19 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
@ -43,20 +41,22 @@ import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction.R
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask;
import org.elasticsearch.xpack.ml.dataframe.StoredProgress;
import org.elasticsearch.xpack.ml.dataframe.stats.ProgressTracker;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
@ -98,16 +98,20 @@ public class TransportGetDataFrameAnalyticsStatsAction
ActionListener<QueryPage<Stats>> listener) {
logger.debug("Get stats for running task [{}]", task.getParams().getId());
ActionListener<List<PhaseProgress>> progressListener = ActionListener.wrap(
progress -> {
Stats stats = buildStats(task.getParams().getId(), progress);
ActionListener<StatsHolder> statsHolderListener = ActionListener.wrap(
statsHolder -> {
Stats stats = buildStats(
task.getParams().getId(),
statsHolder.getProgressTracker().report(),
statsHolder.getMemoryUsage()
);
listener.onResponse(new QueryPage<>(Collections.singletonList(stats), 1,
GetDataFrameAnalyticsAction.Response.RESULTS_FIELD));
}, listener::onFailure
);
ActionListener<Void> reindexingProgressListener = ActionListener.wrap(
aVoid -> progressListener.onResponse(task.getStatsHolder().getProgressTracker().report()),
aVoid -> statsHolderListener.onResponse(task.getStatsHolder()),
listener::onFailure
);
@ -157,22 +161,25 @@ public class TransportGetDataFrameAnalyticsStatsAction
return;
}
searchStoredProgresses(stoppedTasksIds, ActionListener.wrap(
storedProgresses -> {
List<Stats> stoppedStats = new ArrayList<>(stoppedTasksIds.size());
AtomicInteger counter = new AtomicInteger(stoppedTasksIds.size());
AtomicArray<Stats> jobStats = new AtomicArray<>(stoppedTasksIds.size());
for (int i = 0; i < stoppedTasksIds.size(); i++) {
String configId = stoppedTasksIds.get(i);
StoredProgress storedProgress = storedProgresses.get(i);
stoppedStats.add(buildStats(configId, storedProgress.get()));
}
final int slot = i;
String jobId = stoppedTasksIds.get(i);
searchStats(jobId, ActionListener.wrap(
stats -> {
jobStats.set(slot, stats);
if (counter.decrementAndGet() == 0) {
List<Stats> allTasksStats = new ArrayList<>(runningTasksResponse.getResponse().results());
allTasksStats.addAll(stoppedStats);
allTasksStats.addAll(jobStats.asList());
Collections.sort(allTasksStats, Comparator.comparing(Stats::getId));
listener.onResponse(new GetDataFrameAnalyticsStatsAction.Response(new QueryPage<>(
allTasksStats, allTasksStats.size(), GetDataFrameAnalyticsAction.Response.RESULTS_FIELD)));
}
},
listener::onFailure
));
listener::onFailure)
);
}
}
static List<String> determineStoppedTasksIds(List<String> expandedIds, List<Stats> runningTasksStats) {
@ -180,19 +187,15 @@ public class TransportGetDataFrameAnalyticsStatsAction
return expandedIds.stream().filter(id -> startedTasksIds.contains(id) == false).collect(Collectors.toList());
}
private void searchStoredProgresses(List<String> configIds, ActionListener<List<StoredProgress>> listener) {
private void searchStats(String configId, ActionListener<Stats> listener) {
RetrievedStatsHolder retrievedStatsHolder = new RetrievedStatsHolder();
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
for (String configId : configIds) {
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
multiSearchRequest.add(searchRequest);
}
multiSearchRequest.add(buildStoredProgressSearch(configId));
multiSearchRequest.add(buildMemoryUsageSearch(configId));
executeAsyncWithOrigin(client, ML_ORIGIN, MultiSearchAction.INSTANCE, multiSearchRequest, ActionListener.wrap(
multiSearchResponse -> {
List<StoredProgress> progresses = new ArrayList<>(configIds.size());
for (MultiSearchResponse.Item itemResponse : multiSearchResponse.getResponses()) {
if (itemResponse.isFailure()) {
listener.onFailure(ExceptionsHelper.serverError(itemResponse.getFailureMessage(), itemResponse.getFailure()));
@ -200,32 +203,60 @@ public class TransportGetDataFrameAnalyticsStatsAction
} else {
SearchHit[] hits = itemResponse.getResponse().getHits().getHits();
if (hits.length == 0) {
progresses.add(new StoredProgress(new ProgressTracker().report()));
// Not found
} else if (hits.length == 1) {
parseHit(hits[0], configId, retrievedStatsHolder);
} else {
progresses.add(parseStoredProgress(hits[0]));
throw ExceptionsHelper.serverError("Found [" + hits.length + "] hits when just one was requested");
}
}
}
listener.onResponse(progresses);
listener.onResponse(buildStats(configId,
retrievedStatsHolder.progress.get(),
retrievedStatsHolder.memoryUsage
));
},
e -> listener.onFailure(ExceptionsHelper.serverError("Error searching for stored progresses", e))
e -> listener.onFailure(ExceptionsHelper.serverError("Error searching for stats", e))
));
}
private StoredProgress parseStoredProgress(SearchHit hit) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
StoredProgress storedProgress = StoredProgress.PARSER.apply(parser, null);
return storedProgress;
} catch (IOException e) {
logger.error(new ParameterizedMessage("failed to parse progress from doc with it [{}]", hit.getId()), e);
return new StoredProgress(Collections.emptyList());
private static SearchRequest buildStoredProgressSearch(String configId) {
SearchRequest searchRequest = new SearchRequest(AnomalyDetectorsIndex.jobStateIndexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(StoredProgress.documentId(configId)));
return searchRequest;
}
private static SearchRequest buildMemoryUsageSearch(String configId) {
SearchRequest searchRequest = new SearchRequest(MlStatsIndex.indexPattern());
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.source().size(1);
QueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(MemoryUsage.JOB_ID.getPreferredName(), configId))
.filter(QueryBuilders.termQuery(MemoryUsage.TYPE.getPreferredName(), MemoryUsage.TYPE_VALUE));
searchRequest.source().query(query);
searchRequest.source().sort(SortBuilders.fieldSort(MemoryUsage.TIMESTAMP.getPreferredName()).order(SortOrder.DESC)
// We need this for the search not to fail when there are no mappings yet in the index
.unmappedType("long"));
searchRequest.source().sort(MemoryUsage.TIMESTAMP.getPreferredName(), SortOrder.DESC);
return searchRequest;
}
private static void parseHit(SearchHit hit, String configId, RetrievedStatsHolder retrievedStatsHolder) {
String hitId = hit.getId();
if (StoredProgress.documentId(configId).equals(hitId)) {
retrievedStatsHolder.progress = MlParserUtils.parse(hit, StoredProgress.PARSER);
} else if (hitId.startsWith(MemoryUsage.documentIdPrefix(configId))) {
retrievedStatsHolder.memoryUsage = MlParserUtils.parse(hit, MemoryUsage.LENIENT_PARSER);
} else {
throw ExceptionsHelper.serverError("unexpected doc id [" + hitId + "]");
}
}
private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concreteAnalyticsId, List<PhaseProgress> progress) {
private GetDataFrameAnalyticsStatsAction.Response.Stats buildStats(String concreteAnalyticsId,
List<PhaseProgress> progress,
MemoryUsage memoryUsage) {
ClusterState clusterState = clusterService.state();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData.PersistentTask<?> analyticsTask = MlTasks.getDataFrameAnalyticsTask(concreteAnalyticsId, tasks);
@ -242,6 +273,19 @@ public class TransportGetDataFrameAnalyticsStatsAction
assignmentExplanation = analyticsTask.getAssignment().getExplanation();
}
return new GetDataFrameAnalyticsStatsAction.Response.Stats(
concreteAnalyticsId, analyticsState, failureReason, progress, node, assignmentExplanation);
concreteAnalyticsId,
analyticsState,
failureReason,
progress,
memoryUsage,
node,
assignmentExplanation
);
}
private static class RetrievedStatsHolder {
private volatile StoredProgress progress = new StoredProgress(new ProgressTracker().report());
private volatile MemoryUsage memoryUsage;
}
}

View File

@ -30,11 +30,13 @@ import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.dataframe.extractor.DataFrameDataExtractorFactory;
import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfigProvider;
@ -102,15 +104,35 @@ public class DataFrameAnalyticsManager {
);
// Retrieve configuration
ActionListener<Boolean> stateAliasListener = ActionListener.wrap(
ActionListener<Boolean> statsIndexListener = ActionListener.wrap(
aBoolean -> configProvider.get(task.getParams().getId(), configListener),
configListener::onFailure
);
// Make sure the stats index and alias exist
ActionListener<Boolean> stateAliasListener = ActionListener.wrap(
aBoolean -> createStatsIndexAndUpdateMappingsIfNecessary(clusterState, statsIndexListener),
configListener::onFailure
);
// Make sure the state index and alias exist
AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary(client, clusterState, expressionResolver, stateAliasListener);
}
private void createStatsIndexAndUpdateMappingsIfNecessary(ClusterState clusterState, ActionListener<Boolean> listener) {
ActionListener<Boolean> createIndexListener = ActionListener.wrap(
aBoolean -> ElasticsearchMappings.addDocMappingIfMissing(
MlStatsIndex.writeAlias(),
MlStatsIndex::mapping,
client,
clusterState,
listener)
, listener::onFailure
);
MlStatsIndex.createStatsIndexAndAliasIfNecessary(client, clusterState, expressionResolver, createIndexListener);
}
private void executeStartingJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config) {
DataFrameAnalyticsTaskState reindexingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.REINDEXING,
task.getAllocationId(), null);

View File

@ -426,7 +426,8 @@ public class AnalyticsProcessManager {
DataFrameRowsJoiner dataFrameRowsJoiner =
new DataFrameRowsJoiner(config.getId(), dataExtractorFactory.newExtractor(true), resultsPersisterService);
return new AnalyticsResultProcessor(
config, dataFrameRowsJoiner, task.getStatsHolder(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames());
config, dataFrameRowsJoiner, task.getStatsHolder(), trainedModelProvider, auditor, resultsPersisterService,
dataExtractor.get().getFieldNames());
}
}
}

View File

@ -11,24 +11,32 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.license.License;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.security.user.XPackUser;
import org.elasticsearch.xpack.ml.dataframe.process.results.AnalyticsResult;
import org.elasticsearch.xpack.ml.dataframe.process.results.RowResults;
import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
@ -36,6 +44,7 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static java.util.stream.Collectors.toList;
@ -60,6 +69,7 @@ public class AnalyticsResultProcessor {
private final StatsHolder statsHolder;
private final TrainedModelProvider trainedModelProvider;
private final DataFrameAnalyticsAuditor auditor;
private final ResultsPersisterService resultsPersisterService;
private final List<String> fieldNames;
private final CountDownLatch completionLatch = new CountDownLatch(1);
private volatile String failure;
@ -67,12 +77,14 @@ public class AnalyticsResultProcessor {
public AnalyticsResultProcessor(DataFrameAnalyticsConfig analytics, DataFrameRowsJoiner dataFrameRowsJoiner,
StatsHolder statsHolder, TrainedModelProvider trainedModelProvider,
DataFrameAnalyticsAuditor auditor, List<String> fieldNames) {
DataFrameAnalyticsAuditor auditor, ResultsPersisterService resultsPersisterService,
List<String> fieldNames) {
this.analytics = Objects.requireNonNull(analytics);
this.dataFrameRowsJoiner = Objects.requireNonNull(dataFrameRowsJoiner);
this.statsHolder = Objects.requireNonNull(statsHolder);
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
this.auditor = Objects.requireNonNull(auditor);
this.resultsPersisterService = Objects.requireNonNull(resultsPersisterService);
this.fieldNames = Collections.unmodifiableList(Objects.requireNonNull(fieldNames));
}
@ -148,6 +160,11 @@ public class AnalyticsResultProcessor {
if (inferenceModelBuilder != null) {
createAndIndexInferenceModel(inferenceModelBuilder);
}
MemoryUsage memoryUsage = result.getMemoryUsage();
if (memoryUsage != null) {
statsHolder.setMemoryUsage(memoryUsage);
indexStatsResult(memoryUsage, memoryUsage::documentId);
}
}
private void createAndIndexInferenceModel(TrainedModelDefinition.Builder inferenceModel) {
@ -224,4 +241,23 @@ public class AnalyticsResultProcessor {
failure = "error processing results; " + e.getMessage();
auditor.error(analytics.getId(), "Error processing results; " + e.getMessage());
}
private void indexStatsResult(ToXContentObject result, Function<String, String> docIdSupplier) {
try {
resultsPersisterService.indexWithRetry(analytics.getId(),
MlStatsIndex.writeAlias(),
result,
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
WriteRequest.RefreshPolicy.IMMEDIATE,
docIdSupplier.apply(analytics.getId()),
() -> true,
errorMsg -> auditor.error(analytics.getId(),
"failed to persist result with id [" + docIdSupplier.apply(analytics.getId()) + "]; " + errorMsg)
);
} catch (IOException ioe) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), ioe);
} catch (Exception e) {
LOGGER.error(() -> new ParameterizedMessage("[{}] Failed indexing stats result", analytics.getId()), e);
}
}
}

View File

@ -5,11 +5,13 @@
*/
package org.elasticsearch.xpack.ml.dataframe.process.results;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
import java.io.IOException;
@ -23,29 +25,36 @@ public class AnalyticsResult implements ToXContentObject {
public static final ParseField TYPE = new ParseField("analytics_result");
public static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
public static final ParseField INFERENCE_MODEL = new ParseField("inference_model");
private static final ParseField PROGRESS_PERCENT = new ParseField("progress_percent");
private static final ParseField INFERENCE_MODEL = new ParseField("inference_model");
private static final ParseField ANALYTICS_MEMORY_USAGE = new ParseField("analytics_memory_usage");
public static final ConstructingObjectParser<AnalyticsResult, Void> PARSER = new ConstructingObjectParser<>(TYPE.getPreferredName(),
a -> new AnalyticsResult((RowResults) a[0], (Integer) a[1], (TrainedModelDefinition.Builder) a[2]));
a -> new AnalyticsResult((RowResults) a[0], (Integer) a[1], (TrainedModelDefinition.Builder) a[2], (MemoryUsage) a[3]));
static {
PARSER.declareObject(optionalConstructorArg(), RowResults.PARSER, RowResults.TYPE);
PARSER.declareInt(optionalConstructorArg(), PROGRESS_PERCENT);
// TODO change back to STRICT_PARSER once native side is aligned
PARSER.declareObject(optionalConstructorArg(), TrainedModelDefinition.LENIENT_PARSER, INFERENCE_MODEL);
PARSER.declareObject(optionalConstructorArg(), MemoryUsage.STRICT_PARSER, ANALYTICS_MEMORY_USAGE);
}
private final RowResults rowResults;
private final Integer progressPercent;
private final TrainedModelDefinition.Builder inferenceModelBuilder;
private final TrainedModelDefinition inferenceModel;
private final MemoryUsage memoryUsage;
public AnalyticsResult(RowResults rowResults, Integer progressPercent, TrainedModelDefinition.Builder inferenceModelBuilder) {
public AnalyticsResult(@Nullable RowResults rowResults,
@Nullable Integer progressPercent,
@Nullable TrainedModelDefinition.Builder inferenceModelBuilder,
@Nullable MemoryUsage memoryUsage) {
this.rowResults = rowResults;
this.progressPercent = progressPercent;
this.inferenceModelBuilder = inferenceModelBuilder;
this.inferenceModel = inferenceModelBuilder == null ? null : inferenceModelBuilder.build();
this.memoryUsage = memoryUsage;
}
public RowResults getRowResults() {
@ -60,6 +69,10 @@ public class AnalyticsResult implements ToXContentObject {
return inferenceModelBuilder;
}
public MemoryUsage getMemoryUsage() {
return memoryUsage;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -74,6 +87,9 @@ public class AnalyticsResult implements ToXContentObject {
inferenceModel,
new ToXContent.MapParams(Collections.singletonMap(FOR_INTERNAL_STORAGE, "true")));
}
if (memoryUsage != null) {
builder.field(ANALYTICS_MEMORY_USAGE.getPreferredName(), memoryUsage, params);
}
builder.endObject();
return builder;
}
@ -90,11 +106,12 @@ public class AnalyticsResult implements ToXContentObject {
AnalyticsResult that = (AnalyticsResult) other;
return Objects.equals(rowResults, that.rowResults)
&& Objects.equals(progressPercent, that.progressPercent)
&& Objects.equals(inferenceModel, that.inferenceModel);
&& Objects.equals(inferenceModel, that.inferenceModel)
&& Objects.equals(memoryUsage, that.memoryUsage);
}
@Override
public int hashCode() {
return Objects.hash(rowResults, progressPercent, inferenceModel);
return Objects.hash(rowResults, progressPercent, inferenceModel, memoryUsage);
}
}

View File

@ -5,15 +5,33 @@
*/
package org.elasticsearch.xpack.ml.dataframe.stats;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsage;
import java.util.concurrent.atomic.AtomicReference;
/**
* Holds data frame analytics stats in memory so that they may be retrieved
* from the get stats api for started jobs efficiently.
*/
public class StatsHolder {
private final ProgressTracker progressTracker = new ProgressTracker();
private final ProgressTracker progressTracker;
private final AtomicReference<MemoryUsage> memoryUsageHolder;
public StatsHolder() {
progressTracker = new ProgressTracker();
memoryUsageHolder = new AtomicReference<>();
}
public ProgressTracker getProgressTracker() {
return progressTracker;
}
public void setMemoryUsage(MemoryUsage memoryUsage) {
memoryUsageHolder.set(memoryUsage);
}
public MemoryUsage getMemoryUsage() {
return memoryUsageHolder.get();
}
}

View File

@ -113,6 +113,7 @@ import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator;
import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils;
import java.io.IOException;
import java.io.InputStream;
@ -512,7 +513,7 @@ public class JobResultsProvider {
}
SearchHit hit = hits.getHits()[0];
try {
DatafeedTimingStats timingStats = parseSearchHit(hit, DatafeedTimingStats.PARSER);
DatafeedTimingStats timingStats = MlParserUtils.parse(hit, DatafeedTimingStats.PARSER);
timingStatsByJobId.put(jobId, timingStats);
} catch (Exception e) {
listener.onFailure(e);
@ -644,41 +645,24 @@ public class JobResultsProvider {
SearchHit hit) {
String hitId = hit.getId();
if (DataCounts.documentId(jobId).equals(hitId)) {
paramsBuilder.setDataCounts(parseSearchHit(hit, DataCounts.PARSER));
paramsBuilder.setDataCounts(MlParserUtils.parse(hit, DataCounts.PARSER));
} else if (TimingStats.documentId(jobId).equals(hitId)) {
paramsBuilder.setTimingStats(parseSearchHit(hit, TimingStats.PARSER));
paramsBuilder.setTimingStats(MlParserUtils.parse(hit, TimingStats.PARSER));
} else if (hitId.startsWith(ModelSizeStats.documentIdPrefix(jobId))) {
ModelSizeStats.Builder modelSizeStats = parseSearchHit(hit, ModelSizeStats.LENIENT_PARSER);
ModelSizeStats.Builder modelSizeStats = MlParserUtils.parse(hit, ModelSizeStats.LENIENT_PARSER);
paramsBuilder.setModelSizeStats(modelSizeStats == null ? null : modelSizeStats.build());
} else if (hitId.startsWith(ModelSnapshot.documentIdPrefix(jobId))) {
ModelSnapshot.Builder modelSnapshot = parseSearchHit(hit, ModelSnapshot.LENIENT_PARSER);
ModelSnapshot.Builder modelSnapshot = MlParserUtils.parse(hit, ModelSnapshot.LENIENT_PARSER);
paramsBuilder.setModelSnapshot(modelSnapshot == null ? null : modelSnapshot.build());
} else if (Quantiles.documentId(jobId).equals(hit.getId())) {
paramsBuilder.setQuantiles(parseSearchHit(hit, Quantiles.LENIENT_PARSER));
paramsBuilder.setQuantiles(MlParserUtils.parse(hit, Quantiles.LENIENT_PARSER));
} else if (hitId.startsWith(MlFilter.DOCUMENT_ID_PREFIX)) {
paramsBuilder.addFilter(parseSearchHit(hit, MlFilter.LENIENT_PARSER).build());
paramsBuilder.addFilter(MlParserUtils.parse(hit, MlFilter.LENIENT_PARSER).build());
} else {
throw new IllegalStateException("Unexpected Id [" + hitId + "]");
}
}
/**
* @param hit The search hit to parse
* @param objectParser Parser for the object of type T
* @return The parsed value of T from the search hit
* @throws ElasticsearchException on failure
*/
private static <T, U> T parseSearchHit(SearchHit hit, BiFunction<XContentParser, U, T> objectParser) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return objectParser.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse " + hit.getId(), e);
}
}
/**
* Search for buckets with the parameters in the {@link BucketsQueryBuilder}
* Uses the internal client, so runs as the _xpack user
@ -1124,7 +1108,7 @@ public class JobResultsProvider {
handler.accept(new Result<>(null, notFoundSupplier.get()));
} else if (hits.length == 1) {
try {
T result = parseSearchHit(hits[0], objectParser);
T result = MlParserUtils.parse(hits[0], objectParser);
handler.accept(new Result<>(hits[0].getIndex(), result));
} catch (Exception e) {
errorHandler.accept(e);
@ -1268,7 +1252,7 @@ public class JobResultsProvider {
SearchHit[] hits = response.getHits().getHits();
try {
for (SearchHit hit : hits) {
ScheduledEvent.Builder event = parseSearchHit(hit, ScheduledEvent.LENIENT_PARSER);
ScheduledEvent.Builder event = MlParserUtils.parse(hit, ScheduledEvent.LENIENT_PARSER);
event.eventId(hit.getId());
events.add(event.build());
@ -1400,7 +1384,7 @@ public class JobResultsProvider {
SearchHit[] hits = response.getHits().getHits();
try {
for (SearchHit hit : hits) {
calendars.add(parseSearchHit(hit, Calendar.LENIENT_PARSER).build());
calendars.add(MlParserUtils.parse(hit, Calendar.LENIENT_PARSER).build());
}
listener.onResponse(new QueryPage<>(calendars, response.getHits().getTotalHits().value,
Calendar.RESULTS_FIELD));

View File

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.utils.persistence;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.BiFunction;
public final class MlParserUtils {
private MlParserUtils() {}
/**
* @param hit The search hit to parse
* @param objectParser Parser for the object of type T
* @return The parsed value of T from the search hit
* @throws ElasticsearchException on failure
*/
public static <T, U> T parse(SearchHit hit, BiFunction<XContentParser, U, T> objectParser) {
BytesReference source = hit.getSourceRef();
try (InputStream stream = source.streamInput();
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
return objectParser.apply(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("failed to parse " + hit.getId(), e);
}
}
}

View File

@ -53,7 +53,7 @@ public class AnalyticsProcessManagerTests extends ESTestCase {
private static final String CONFIG_ID = "config-id";
private static final int NUM_ROWS = 100;
private static final int NUM_COLS = 4;
private static final AnalyticsResult PROCESS_RESULT = new AnalyticsResult(null, null, null);
private static final AnalyticsResult PROCESS_RESULT = new AnalyticsResult(null, null, null, null);
private Client client;
private DataFrameAnalyticsAuditor auditor;

View File

@ -27,6 +27,7 @@ import org.elasticsearch.xpack.ml.dataframe.stats.StatsHolder;
import org.elasticsearch.xpack.ml.extractor.ExtractedFields;
import org.elasticsearch.xpack.ml.inference.persistence.TrainedModelProvider;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
@ -61,6 +62,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
private StatsHolder statsHolder = new StatsHolder();
private TrainedModelProvider trainedModelProvider;
private DataFrameAnalyticsAuditor auditor;
private ResultsPersisterService resultsPersisterService;
private DataFrameAnalyticsConfig analyticsConfig;
@Before
@ -70,6 +72,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
dataFrameRowsJoiner = mock(DataFrameRowsJoiner.class);
trainedModelProvider = mock(TrainedModelProvider.class);
auditor = mock(DataFrameAnalyticsAuditor.class);
resultsPersisterService = mock(ResultsPersisterService.class);
analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(JOB_ID)
.setDescription(JOB_DESCRIPTION)
@ -93,7 +96,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
public void testProcess_GivenEmptyResults() {
givenDataFrameRows(2);
givenProcessResults(Arrays.asList(new AnalyticsResult(null, 50, null), new AnalyticsResult(null, 100, null)));
givenProcessResults(Arrays.asList(new AnalyticsResult(null, 50, null, null), new AnalyticsResult(null, 100, null, null)));
AnalyticsResultProcessor resultProcessor = createResultProcessor();
resultProcessor.process(process);
@ -108,7 +111,8 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
givenDataFrameRows(2);
RowResults rowResults1 = mock(RowResults.class);
RowResults rowResults2 = mock(RowResults.class);
givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null), new AnalyticsResult(rowResults2, 100, null)));
givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null, null),
new AnalyticsResult(rowResults2, 100, null, null)));
AnalyticsResultProcessor resultProcessor = createResultProcessor();
resultProcessor.process(process);
@ -125,7 +129,8 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
givenDataFrameRows(2);
RowResults rowResults1 = mock(RowResults.class);
RowResults rowResults2 = mock(RowResults.class);
givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null), new AnalyticsResult(rowResults2, 100, null)));
givenProcessResults(Arrays.asList(new AnalyticsResult(rowResults1, 50, null, null),
new AnalyticsResult(rowResults2, 100, null, null)));
doThrow(new RuntimeException("some failure")).when(dataFrameRowsJoiner).processRowResults(any(RowResults.class));
@ -155,7 +160,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
List<String> expectedFieldNames = Arrays.asList("foo", "bar", "baz");
TrainedModelDefinition.Builder inferenceModel = TrainedModelDefinitionTests.createRandomBuilder();
givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel)));
givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel, null)));
AnalyticsResultProcessor resultProcessor = createResultProcessor(expectedFieldNames);
resultProcessor.process(process);
@ -199,7 +204,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
}).when(trainedModelProvider).storeTrainedModel(any(TrainedModelConfig.class), any(ActionListener.class));
TrainedModelDefinition.Builder inferenceModel = TrainedModelDefinitionTests.createRandomBuilder();
givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel)));
givenProcessResults(Arrays.asList(new AnalyticsResult(null, null, inferenceModel, null)));
AnalyticsResultProcessor resultProcessor = createResultProcessor();
resultProcessor.process(process);
@ -232,6 +237,6 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
private AnalyticsResultProcessor createResultProcessor(List<String> fieldNames) {
return new AnalyticsResultProcessor(
analyticsConfig, dataFrameRowsJoiner, statsHolder, trainedModelProvider, auditor, fieldNames);
analyticsConfig, dataFrameRowsJoiner, statsHolder, trainedModelProvider, auditor, resultsPersisterService, fieldNames);
}
}

View File

@ -7,12 +7,15 @@ package org.elasticsearch.xpack.ml.dataframe.process.results;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractXContentTestCase;
import org.elasticsearch.xpack.core.ml.dataframe.stats.MemoryUsageTests;
import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinitionTests;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import java.util.ArrayList;
import java.util.Collections;
@ -42,7 +45,7 @@ public class AnalyticsResultTests extends AbstractXContentTestCase<AnalyticsResu
if (randomBoolean()) {
inferenceModel = TrainedModelDefinitionTests.createRandomBuilder();
}
return new AnalyticsResult(rowResults, progressPercent, inferenceModel);
return new AnalyticsResult(rowResults, progressPercent, inferenceModel, MemoryUsageTests.createRandom());
}
@Override
@ -50,6 +53,11 @@ public class AnalyticsResultTests extends AbstractXContentTestCase<AnalyticsResu
return AnalyticsResult.PARSER.apply(parser, null);
}
@Override
protected ToXContent.Params getToXContentParams() {
return new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));
}
@Override
protected boolean supportsUnknownFields() {
return false;