This commit is contained in:
parent
e0d4544ef6
commit
53f409e5ae
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -38,7 +39,7 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
private static ConstructingObjectParser<DatafeedTimingStats, Void> createParser() {
|
||||
ConstructingObjectParser<DatafeedTimingStats, Void> parser =
|
||||
new ConstructingObjectParser<>(
|
||||
"datafeed_timing_stats",
|
||||
TYPE.getPreferredName(),
|
||||
true,
|
||||
args -> {
|
||||
String jobId = (String) args[0];
|
||||
|
@ -128,6 +129,9 @@ public class DatafeedTimingStats implements ToXContentObject, Writeable {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) {
|
||||
builder.field(Result.RESULT_TYPE.getPreferredName(), TYPE.getPreferredName());
|
||||
}
|
||||
builder.field(JOB_ID.getPreferredName(), jobId);
|
||||
builder.field(SEARCH_COUNT.getPreferredName(), searchCount);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -195,6 +196,9 @@ public class TimingStats implements ToXContentObject, Writeable {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false)) {
|
||||
builder.field(Result.RESULT_TYPE.getPreferredName(), TYPE.getPreferredName());
|
||||
}
|
||||
builder.field(Job.ID.getPreferredName(), jobId);
|
||||
builder.field(BUCKET_COUNT.getPreferredName(), bucketCount);
|
||||
if (params.paramAsBoolean(ToXContentParams.INCLUDE_CALCULATED_FIELDS, false)) {
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.xpack.core.ml.job.results.Forecast;
|
|||
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Influencer;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -130,7 +131,11 @@ public class JobResultsPersister {
|
|||
* @return this
|
||||
*/
|
||||
public Builder persistTimingStats(TimingStats timingStats) {
|
||||
indexResult(TimingStats.documentId(timingStats.getJobId()), timingStats, TimingStats.TYPE.getPreferredName());
|
||||
indexResult(
|
||||
TimingStats.documentId(timingStats.getJobId()),
|
||||
timingStats,
|
||||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||
TimingStats.TYPE.getPreferredName());
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -185,7 +190,11 @@ public class JobResultsPersister {
|
|||
}
|
||||
|
||||
private void indexResult(String id, ToXContent resultDoc, String resultType) {
|
||||
try (XContentBuilder content = toXContentBuilder(resultDoc)) {
|
||||
indexResult(id, resultDoc, ToXContent.EMPTY_PARAMS, resultType);
|
||||
}
|
||||
|
||||
private void indexResult(String id, ToXContent resultDoc, ToXContent.Params params, String resultType) {
|
||||
try (XContentBuilder content = toXContentBuilder(resultDoc, params)) {
|
||||
bulkRequest.add(new IndexRequest(indexName).id(id).source(content));
|
||||
} catch (IOException e) {
|
||||
logger.error(new ParameterizedMessage("[{}] Error serialising {}", jobId, resultType), e);
|
||||
|
@ -335,14 +344,18 @@ public class JobResultsPersister {
|
|||
public IndexResponse persistDatafeedTimingStats(DatafeedTimingStats timingStats, WriteRequest.RefreshPolicy refreshPolicy) {
|
||||
String jobId = timingStats.getJobId();
|
||||
logger.trace("[{}] Persisting datafeed timing stats", jobId);
|
||||
Persistable persistable = new Persistable(jobId, timingStats, DatafeedTimingStats.documentId(timingStats.getJobId()));
|
||||
Persistable persistable = new Persistable(
|
||||
jobId,
|
||||
timingStats,
|
||||
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true")),
|
||||
DatafeedTimingStats.documentId(timingStats.getJobId()));
|
||||
persistable.setRefreshPolicy(refreshPolicy);
|
||||
return persistable.persist(AnomalyDetectorsIndex.resultsWriteAlias(jobId)).actionGet();
|
||||
}
|
||||
|
||||
private XContentBuilder toXContentBuilder(ToXContent obj) throws IOException {
|
||||
private static XContentBuilder toXContentBuilder(ToXContent obj, ToXContent.Params params) throws IOException {
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
obj.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
obj.toXContent(builder, params);
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -350,12 +363,18 @@ public class JobResultsPersister {
|
|||
|
||||
private final String jobId;
|
||||
private final ToXContent object;
|
||||
private final ToXContent.Params params;
|
||||
private final String id;
|
||||
private WriteRequest.RefreshPolicy refreshPolicy;
|
||||
|
||||
Persistable(String jobId, ToXContent object, String id) {
|
||||
this(jobId, object, ToXContent.EMPTY_PARAMS, id);
|
||||
}
|
||||
|
||||
Persistable(String jobId, ToXContent object, ToXContent.Params params, String id) {
|
||||
this.jobId = jobId;
|
||||
this.object = object;
|
||||
this.params = params;
|
||||
this.id = id;
|
||||
this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
|
||||
}
|
||||
|
@ -373,7 +392,7 @@ public class JobResultsPersister {
|
|||
void persist(String indexName, ActionListener<IndexResponse> listener) {
|
||||
logCall(indexName);
|
||||
|
||||
try (XContentBuilder content = toXContentBuilder(object)) {
|
||||
try (XContentBuilder content = toXContentBuilder(object, params)) {
|
||||
IndexRequest indexRequest = new IndexRequest(indexName).id(id).source(content).setRefreshPolicy(refreshPolicy);
|
||||
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, indexRequest, listener, client::index);
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -219,6 +219,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
assertThat(indexRequest.index(), equalTo(".ml-anomalies-.write-foo"));
|
||||
assertThat(indexRequest.id(), equalTo("foo_timing_stats"));
|
||||
Map<String, Object> expectedSourceAsMap = new HashMap<>();
|
||||
expectedSourceAsMap.put("result_type", "timing_stats");
|
||||
expectedSourceAsMap.put("job_id", "foo");
|
||||
expectedSourceAsMap.put("bucket_count", 7);
|
||||
expectedSourceAsMap.put("minimum_bucket_processing_time_ms", 1.0);
|
||||
|
@ -255,6 +256,7 @@ public class JobResultsPersisterTests extends ESTestCase {
|
|||
assertThat(indexRequest.id(), equalTo("foo_datafeed_timing_stats"));
|
||||
assertThat(indexRequest.getRefreshPolicy(), equalTo(WriteRequest.RefreshPolicy.IMMEDIATE));
|
||||
Map<String, Object> expectedSourceAsMap = new HashMap<>();
|
||||
expectedSourceAsMap.put("result_type", "datafeed_timing_stats");
|
||||
expectedSourceAsMap.put("job_id", "foo");
|
||||
expectedSourceAsMap.put("search_count", 6);
|
||||
expectedSourceAsMap.put("bucket_count", 66);
|
||||
|
|
Loading…
Reference in New Issue