From bcfc8442f91738cd3cb687e3ca7ba78bed6e32b4 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Wed, 28 Mar 2018 13:43:09 +0100 Subject: [PATCH] [ML] Preview of datafeed with aggs should respect chunking config (elastic/x-pack-elasticsearch#4241) While it makes sense to apply auto-chunking in order to limit the time range of the search for previewing datafeeds without aggs, the same is not the case when aggs are used. In contrary, we should do the preview the same way it would be if the datafeed run, as this can reveal problems with regard to the datafeed configuration. In addition, by default datafeeds with aggs have a manual chunking config that limits the cost of each search. So, setting the chunking to auto in those cases may lead to the datafeed preview failing even though actually running the datafeed would work fine. Original commit: elastic/x-pack-elasticsearch@79e317efb2e3d9e963c5cc56d99eef350cd06e96 --- .../TransportPreviewDatafeedAction.java | 29 +++++++++++++---- .../TransportPreviewDatafeedActionTests.java | 32 +++++++++++++++++++ 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java index 2ee37816374..98ba2caa408 100644 --- a/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java +++ b/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedAction.java @@ -16,16 +16,16 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MlClientHelper; import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; -import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import java.io.BufferedReader; import java.io.InputStream; @@ -61,16 +61,16 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction headers = threadPool.getThreadContext().getHeaders().entrySet().stream() .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - datafeedWithAutoChunking.setHeaders(headers); + previewDatafeed.setHeaders(headers); // NB: this is using the client from the transport layer, NOT the internal client. // This is important because it means the datafeed search will fail if the user // requesting the preview doesn't have permission to search the relevant indices. - DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job, new ActionListener() { + DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener() { @Override public void onResponse(DataExtractorFactory dataExtractorFactory) { DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); @@ -85,6 +85,23 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction listener) { try { diff --git a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java index c63efb9a909..ab3fe083d5f 100644 --- a/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java +++ b/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportPreviewDatafeedActionTests.java @@ -6,8 +6,14 @@ package org.elasticsearch.xpack.ml.action; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; @@ -17,10 +23,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Optional; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; @@ -59,6 +67,30 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase { }).when(actionListener).onFailure(any()); } + public void testBuildPreviewDatafeed_GivenNoAggregations() { + DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo"); + datafeed.setIndices(Collections.singletonList("my_index")); + datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); + + DatafeedConfig previewDatafeed = TransportPreviewDatafeedAction.buildPreviewDatafeed(datafeed.build()).build(); + + assertThat(previewDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newAuto())); + } + + public void testBuildPreviewDatafeed_GivenAggregations() { + DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo"); + datafeed.setIndices(Collections.singletonList("my_index")); + MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time"); + datafeed.setAggregations(AggregatorFactories.builder().addAggregator( + AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time"))); + datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1))); + + DatafeedConfig previewDatafeed = TransportPreviewDatafeedAction.buildPreviewDatafeed(datafeed.build()).build(); + + assertThat(previewDatafeed.getChunkingConfig(), not(equalTo(ChunkingConfig.newAuto()))); + assertThat(previewDatafeed.getChunkingConfig(), equalTo(datafeed.build().getChunkingConfig())); + } + public void testPreviewDatafed_GivenEmptyStream() throws IOException { when(dataExtractor.next()).thenReturn(Optional.empty());