[ML] Preview of datafeed with aggs should respect chunking config (elastic/x-pack-elasticsearch#4241)

While it makes sense to apply auto-chunking in order to limit
the time range of the search for previewing datafeeds without aggs,
the same is not the case when aggs are used. In contrary, we should
do the preview the same way it would be if the datafeed run, as this
can reveal problems with regard to the datafeed configuration.

In addition, by default datafeeds with aggs have a manual chunking config
that limits the cost of each search. So, setting the chunking to auto
in those cases may lead to the datafeed preview failing even though
actually running the datafeed would work fine.

Original commit: elastic/x-pack-elasticsearch@79e317efb2
This commit is contained in:
Dimitris Athanasiou 2018-03-28 13:43:09 +01:00 committed by GitHub
parent bee81758c5
commit bcfc8442f9
2 changed files with 55 additions and 6 deletions

View File

@ -16,16 +16,16 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.MLMetadataField; import org.elasticsearch.xpack.core.ml.MLMetadataField;
import org.elasticsearch.xpack.core.ml.MlClientHelper; import org.elasticsearch.xpack.core.ml.MlClientHelper;
import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.InputStream; import java.io.InputStream;
@ -61,16 +61,16 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
if (job == null) { if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId()); throw ExceptionsHelper.missingJobException(datafeed.getJobId());
} }
DatafeedConfig.Builder datafeedWithAutoChunking = new DatafeedConfig.Builder(datafeed);
datafeedWithAutoChunking.setChunkingConfig(ChunkingConfig.newAuto()); DatafeedConfig.Builder previewDatafeed = buildPreviewDatafeed(datafeed);
Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream() Map<String, String> headers = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey())) .filter(e -> MlClientHelper.SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
datafeedWithAutoChunking.setHeaders(headers); previewDatafeed.setHeaders(headers);
// NB: this is using the client from the transport layer, NOT the internal client. // NB: this is using the client from the transport layer, NOT the internal client.
// This is important because it means the datafeed search will fail if the user // This is important because it means the datafeed search will fail if the user
// requesting the preview doesn't have permission to search the relevant indices. // requesting the preview doesn't have permission to search the relevant indices.
DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job, new ActionListener<DataExtractorFactory>() { DataExtractorFactory.create(client, previewDatafeed.build(), job, new ActionListener<DataExtractorFactory>() {
@Override @Override
public void onResponse(DataExtractorFactory dataExtractorFactory) { public void onResponse(DataExtractorFactory dataExtractorFactory) {
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE); DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, Long.MAX_VALUE);
@ -85,6 +85,23 @@ public class TransportPreviewDatafeedAction extends HandledTransportAction<Previ
} }
/** Visible for testing */
static DatafeedConfig.Builder buildPreviewDatafeed(DatafeedConfig datafeed) {
// Since we only want a preview, it's worth limiting the cost
// of the search in the case of non-aggregated datafeeds.
// We do so by setting auto-chunking. This ensures to find
// a sensible time range with enough data to preview.
// When aggregations are present, it's best to comply with
// what the datafeed is set to do as it can reveal problems with
// the datafeed config (e.g. a chunking config that would hit circuit-breakers).
DatafeedConfig.Builder previewDatafeed = new DatafeedConfig.Builder(datafeed);
if (datafeed.hasAggregations() == false) {
previewDatafeed.setChunkingConfig(ChunkingConfig.newAuto());
}
return previewDatafeed;
}
/** Visible for testing */ /** Visible for testing */
static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewDatafeedAction.Response> listener) { static void previewDatafeed(DataExtractor dataExtractor, ActionListener<PreviewDatafeedAction.Response> listener) {
try { try {

View File

@ -6,8 +6,14 @@
package org.elasticsearch.xpack.ml.action; package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.junit.Before; import org.junit.Before;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
@ -17,10 +23,12 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Optional; import java.util.Optional;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -59,6 +67,30 @@ public class TransportPreviewDatafeedActionTests extends ESTestCase {
}).when(actionListener).onFailure(any()); }).when(actionListener).onFailure(any());
} }
public void testBuildPreviewDatafeed_GivenNoAggregations() {
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo");
datafeed.setIndices(Collections.singletonList("my_index"));
datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
DatafeedConfig previewDatafeed = TransportPreviewDatafeedAction.buildPreviewDatafeed(datafeed.build()).build();
assertThat(previewDatafeed.getChunkingConfig(), equalTo(ChunkingConfig.newAuto()));
}
public void testBuildPreviewDatafeed_GivenAggregations() {
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("no_aggs_feed", "job_foo");
datafeed.setIndices(Collections.singletonList("my_index"));
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
datafeed.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.histogram("time").interval(300000).subAggregation(maxTime).field("time")));
datafeed.setChunkingConfig(ChunkingConfig.newManual(TimeValue.timeValueHours(1)));
DatafeedConfig previewDatafeed = TransportPreviewDatafeedAction.buildPreviewDatafeed(datafeed.build()).build();
assertThat(previewDatafeed.getChunkingConfig(), not(equalTo(ChunkingConfig.newAuto())));
assertThat(previewDatafeed.getChunkingConfig(), equalTo(datafeed.build().getChunkingConfig()));
}
public void testPreviewDatafed_GivenEmptyStream() throws IOException { public void testPreviewDatafed_GivenEmptyStream() throws IOException {
when(dataExtractor.next()).thenReturn(Optional.empty()); when(dataExtractor.next()).thenReturn(Optional.empty());