diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c0991a7c9da..3410359ade3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.ml.action.GetRecordsAction; import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction; import org.elasticsearch.xpack.ml.action.OpenJobAction; import org.elasticsearch.xpack.ml.action.PostDataAction; +import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction; import org.elasticsearch.xpack.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.ml.action.PutFilterAction; import org.elasticsearch.xpack.ml.action.PutJobAction; @@ -91,6 +92,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction; +import org.elasticsearch.xpack.ml.rest.datafeeds.RestPreviewDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestPutDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStartDatafeedAction; import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction; @@ -345,6 +347,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { new RestPutDatafeedAction(settings, restController), new RestUpdateDatafeedAction(settings, restController), new RestDeleteDatafeedAction(settings, restController), + new RestPreviewDatafeedAction(settings, restController), new RestStartDatafeedAction(settings, restController), new RestStopDatafeedAction(settings, restController), new RestDeleteModelSnapshotAction(settings, restController) @@ -383,6 +386,7 @@ public class MachineLearning extends Plugin implements ActionPlugin { new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class), new ActionHandler<>(UpdateDatafeedAction.INSTANCE, UpdateDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class), + new ActionHandler<>(PreviewDatafeedAction.INSTANCE, PreviewDatafeedAction.TransportAction.class), new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class), new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class), new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class), diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedAction.java new file mode 100644 index 00000000000..4796981beee --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedAction.java @@ -0,0 +1,248 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +public class PreviewDatafeedAction extends Action { + + public static final PreviewDatafeedAction INSTANCE = new PreviewDatafeedAction(); + public static final String NAME = "cluster:admin/ml/datafeeds/preview"; + + private PreviewDatafeedAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest implements ToXContent { + + private String datafeedId; + + Request() { + } + + public Request(String datafeedId) { + setDatafeedId(datafeedId); + } + + public String getDatafeedId() { + return datafeedId; + } + + public final void setDatafeedId(String datafeedId) { + this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName()); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + datafeedId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(datafeedId); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId); + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(datafeedId); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Request other = (Request) obj; + return Objects.equals(datafeedId, other.datafeedId); + } + } + + static class RequestBuilder extends ActionRequestBuilder { + + RequestBuilder(ElasticsearchClient client) { + super(client, INSTANCE, new Request()); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + private BytesReference preview; + + Response() { + } + + Response(BytesReference preview) { + this.preview = preview; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + preview = in.readBytesReference(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBytesReference(preview); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.rawValue(preview, XContentType.JSON); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(preview); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + Response other = (Response) obj; + return Objects.equals(preview, other.preview); + } + + @Override + public final String toString() { + return Strings.toString(this); + } + } + + public static class TransportAction extends HandledTransportAction { + + private final Client client; + private final ClusterService clusterService; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client, + ClusterService clusterService) { + super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new); + this.client = client; + this.clusterService = clusterService; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MlMetadata.TYPE); + DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId()); + if (datafeed == null) { + throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId()); + } + Job job = mlMetadata.getJobs().get(datafeed.getJobId()); + if (job == null) { + throw ExceptionsHelper.missingJobException(datafeed.getJobId()); + } + DatafeedConfig.Builder datafeedWithAutoChunking = new DatafeedConfig.Builder(datafeed); + datafeedWithAutoChunking.setChunkingConfig(ChunkingConfig.newAuto()); + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job); + DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, System.currentTimeMillis()); + threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener)); + } + + /** Visible for testing */ + static void previewDatafeed(DataExtractor dataExtractor, ActionListener listener) { + try { + Optional inputStream = dataExtractor.next(); + // DataExtractor returns single-line JSON but without newline characters between objects. + // Instead, it has a space between objects due to how JSON XContenetBuilder works. + // In order to return a proper JSON array from preview, we surround with square brackets and + // we stick in a comma between objects. + // Also, the stream is expected to be a single line but in case it is not, we join lines + // using space to ensure the comma insertion works correctly. + StringBuilder responseBuilder = new StringBuilder("["); + if (inputStream.isPresent()) { + try (BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream.get(), StandardCharsets.UTF_8))) { + responseBuilder.append(buffer.lines().collect(Collectors.joining(" ")).replace("} {", "},{")); + } + } + responseBuilder.append("]"); + listener.onResponse(new Response(new BytesArray(responseBuilder.toString().getBytes(StandardCharsets.UTF_8)))); + } catch (Exception e) { + listener.onFailure(e); + } finally { + dataExtractor.cancel(); + } + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index e93a2675d22..24aa4b37c4a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -186,17 +186,8 @@ public class DatafeedJobRunner extends AbstractComponent { return holder; } - DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { - boolean isScrollSearch = datafeedConfig.hasAggregations() == false; - DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) - : new AggregationDataExtractorFactory(client, datafeedConfig, job); - ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); - if (chunkingConfig == null) { - chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff(); - } - - return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory) - : dataExtractorFactory; + DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) { + return DataExtractorFactory.create(client, datafeed, job); } private static DataDescription buildDataDescription(Job job) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java index 6db53f812ab..dd4849df6e6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactory.java @@ -5,6 +5,30 @@ */ package org.elasticsearch.xpack.ml.datafeed.extractor; +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.Job; + public interface DataExtractorFactory { DataExtractor newExtractor(long start, long end); + + /** + * Creates a {@code DataExtractorFactory} for the given datafeed-job combination. + */ + static DataExtractorFactory create(Client client, DatafeedConfig datafeedConfig, Job job) { + boolean isScrollSearch = datafeedConfig.hasAggregations() == false; + DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job) + : new AggregationDataExtractorFactory(client, datafeedConfig, job); + ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig(); + if (chunkingConfig == null) { + chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff(); + } + + return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory) + : dataExtractorFactory; + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java new file mode 100644 index 00000000000..1c105fbf745 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/rest/datafeeds/RestPreviewDatafeedAction.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.rest.datafeeds; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; + +import java.io.IOException; + +public class RestPreviewDatafeedAction extends BaseRestHandler { + + public RestPreviewDatafeedAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.GET, + MachineLearning.BASE_PATH + "datafeeds/{" + DatafeedConfig.ID.getPreferredName() + "}/_preview", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName()); + PreviewDatafeedAction.Request request = new PreviewDatafeedAction.Request(datafeedId); + return channel -> client.execute(PreviewDatafeedAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionRequestTests.java new file mode 100644 index 00000000000..f5528637bdc --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionRequestTests.java @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction.Request; + +public class PreviewDatafeedActionRequestTests extends AbstractStreamableTestCase { + + @Override + protected Request createTestInstance() { + return new Request(randomAsciiOfLength(10)); + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionTests.java new file mode 100644 index 00000000000..92a43ac94ec --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/PreviewDatafeedActionTests.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.junit.Before; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PreviewDatafeedActionTests extends ESTestCase { + + private DataExtractor dataExtractor; + private ActionListener actionListener; + private String capturedResponse; + private Exception capturedFailure; + + @Before + public void setUpTests() { + dataExtractor = mock(DataExtractor.class); + actionListener = mock(ActionListener.class); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0]; + capturedResponse = response.toString(); + return null; + } + }).when(actionListener).onResponse(any()); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + capturedFailure = (Exception) invocationOnMock.getArguments()[0]; + return null; + } + }).when(actionListener).onFailure(any()); + } + + public void testPreviewDatafed_GivenEmptyStream() throws IOException { + when(dataExtractor.next()).thenReturn(Optional.empty()); + + PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener); + + assertThat(capturedResponse, equalTo("[]")); + assertThat(capturedFailure, is(nullValue())); + verify(dataExtractor).cancel(); + } + + public void testPreviewDatafed_GivenNonEmptyStream() throws IOException { + String streamAsString = "{\"a\":1, \"b\":2} {\"c\":3, \"d\":4}\n{\"e\":5, \"f\":6}"; + InputStream stream = new ByteArrayInputStream(streamAsString.getBytes(StandardCharsets.UTF_8)); + when(dataExtractor.next()).thenReturn(Optional.of(stream)); + + PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener); + + assertThat(capturedResponse, equalTo("[{\"a\":1, \"b\":2},{\"c\":3, \"d\":4},{\"e\":5, \"f\":6}]")); + assertThat(capturedFailure, is(nullValue())); + verify(dataExtractor).cancel(); + } + + public void testPreviewDatafed_GivenFailure() throws IOException { + doThrow(new RuntimeException("failed")).when(dataExtractor).next(); + + PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener); + + assertThat(capturedResponse, is(nullValue())); + assertThat(capturedFailure.getMessage(), equalTo("failed")); + verify(dataExtractor).cancel(); + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java index 8914540315c..1dcbdb84e76 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunnerTests.java @@ -20,8 +20,6 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.mock.orig.Mockito; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; @@ -32,9 +30,6 @@ import org.elasticsearch.xpack.ml.action.PostDataAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; -import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.Detector; @@ -65,7 +60,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.function.Consumer; import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask; -import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -272,81 +266,6 @@ public class DatafeedJobRunnerTests extends ESTestCase { } } - public void testCreateDataExtractorFactoryGivenDefaultScroll() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeField("time"); - Job.Builder jobBuilder = createDatafeedJob(); - jobBuilder.setDataDescription(dataDescription); - DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build(); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), - () -> currentTime, auditor); - - DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig, jobBuilder.build()); - - assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); - } - - public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeField("time"); - Job.Builder jobBuilder = createDatafeedJob(); - jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); - datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), - () -> currentTime, auditor); - - DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); - - assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); - } - - public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeField("time"); - Job.Builder jobBuilder = createDatafeedJob(); - jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); - datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), - () -> currentTime, auditor); - - DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); - - assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)); - } - - public void testCreateDataExtractorFactoryGivenAggregation() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeField("time"); - Job.Builder jobBuilder = createDatafeedJob(); - jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); - datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a"))); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), - () -> currentTime, auditor); - - DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); - - assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)); - } - - public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setTimeField("time"); - Job.Builder jobBuilder = createDatafeedJob(); - jobBuilder.setDataDescription(dataDescription); - DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo"); - datafeedConfig.setAggregations(AggregatorFactories.builder()); - datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); - DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class), - () -> currentTime, auditor); - - DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build()); - - assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); - } - public static DatafeedConfig.Builder createDatafeedConfig(String datafeedId, String jobId) { DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId); datafeedConfig.setIndexes(Arrays.asList("myIndex")); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java new file mode 100644 index 00000000000..f7204af1752 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/DataExtractorFactoryTests.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor; + +import org.elasticsearch.client.Client; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.DataDescription; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.junit.Before; + +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; + +public class DataExtractorFactoryTests extends ESTestCase { + + private Client client; + + @Before + public void setUpTests() { + client = mock(Client.class); + } + + public void testCreateDataExtractorFactoryGivenDefaultScroll() { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo").build(); + + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build()); + + assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); + } + + public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); + + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); + + assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); + } + + public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setChunkingConfig(ChunkingConfig.newOff()); + + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); + + assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class)); + } + + public void testCreateDataExtractorFactoryGivenAggregation() { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a"))); + + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); + + assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class)); + } + + public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setTimeField("time"); + Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob(); + jobBuilder.setDataDescription(dataDescription); + DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo"); + datafeedConfig.setAggregations(AggregatorFactories.builder()); + datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto()); + + DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build()); + + assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)); + } +} \ No newline at end of file diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index 9c2c41bbfc0..6312d8aee0d 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -112,6 +112,7 @@ cluster:admin/ml/anomaly_detectors/results/buckets/get cluster:admin/ml/anomaly_detectors/model_snapshots/get cluster:admin/ml/anomaly_detectors/results/records/get cluster:admin/ml/anomaly_detectors/results/influencers/get +cluster:admin/ml/datafeeds/preview cluster:admin/ml/datafeeds/put cluster:admin/ml/datafeeds/update cluster:admin/ml/anomaly_detectors/model_snapshots/delete diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.ml.preview_datafeed.json b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.preview_datafeed.json new file mode 100644 index 00000000000..96b37267532 --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.ml.preview_datafeed.json @@ -0,0 +1,17 @@ +{ + "xpack.ml.preview_datafeed": { + "methods": [ "GET" ], + "url": { + "path": "/_xpack/ml/datafeeds/{datafeed_id}/_preview", + "paths": [ "/_xpack/ml/datafeeds/{datafeed_id}/_preview" ], + "parts": { + "datafeed_id": { + "type": "string", + "required": true, + "description": "The ID of the datafeed to preview" + } + } + }, + "body": null + } +} diff --git a/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml b/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml new file mode 100644 index 00000000000..a79adaec3bd --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/test/ml/preview_datafeed.yaml @@ -0,0 +1,234 @@ +setup: + - do: + indices.create: + index: airline-data + body: + mappings: + response: + properties: + time: + type: date + airline: + type: keyword + responsetime: + type: float + + - do: + index: + index: airline-data + type: response + id: 1 + body: > + { + "time": "2017-02-18T00:00:00Z", + "airline": "foo", + "responsetime": 1.0 + } + + - do: + index: + index: airline-data + type: response + id: 2 + body: > + { + "time": "2017-02-18T00:30:00Z", + "airline": "foo", + "responsetime": 1.0 + } + + - do: + index: + index: airline-data + type: response + id: 3 + body: > + { + "time": "2017-02-18T01:00:00Z", + "airline": "bar", + "responsetime": 42.0 + } + + - do: + indices.refresh: + index: airline-data + +--- +"Test preview scroll datafeed": + + - do: + xpack.ml.put_job: + job_id: scroll-job + body: > + { + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"time" + } + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: scroll-feed + body: > + { + "job_id":"scroll-job", + "indexes":"airline-data", + "types":"response" + } + + - do: + xpack.ml.preview_datafeed: + datafeed_id: scroll-feed + - length: { $body: 3 } + - match: { 0.time: 1487376000000 } + - match: { 0.airline: foo } + - match: { 0.responsetime: 1.0 } + - match: { 1.time: 1487377800000 } + - match: { 1.airline: foo } + - match: { 1.responsetime: 1.0 } + - match: { 2.time: 1487379600000 } + - match: { 2.airline: bar } + - match: { 2.responsetime: 42.0 } + +--- +"Test preview aggregation datafeed": + + - do: + xpack.ml.put_job: + job_id: aggregation-job + body: > + { + "analysis_config" : { + "bucket_span":3600, + "summary_count_field_name": "doc_count", + "detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"time" + } + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: aggregation-feed + body: > + { + "job_id":"aggregation-job", + "indexes":"airline-data", + "types":"response", + "aggregations": { + "time": { + "histogram": { + "field": "time", + "interval": 3600000 + }, + "aggregations": { + "airline": { + "terms": { + "field": "airline", + "size": 100 + }, + "aggregations": { + "responsetime": { + "sum": { + "field": "responsetime" + } + } + } + } + } + } + } + } + + - do: + xpack.ml.preview_datafeed: + datafeed_id: aggregation-feed + - length: { $body: 2 } + - match: { 0.time: 1.487376E12 } + - match: { 0.airline: foo } + - match: { 0.responsetime: 2.0 } + - match: { 0.doc_count: 2 } + - match: { 1.time: 1.4873796E12 } + - match: { 1.airline: bar } + - match: { 1.responsetime: 42.0 } + - match: { 1.doc_count: 1 } + +--- +"Test preview missing datafeed": + + - do: + catch: missing + xpack.ml.preview_datafeed: + datafeed_id: missing-feed + +--- +"Test preview datafeed with unavailable index": + + - do: + xpack.ml.put_job: + job_id: unavailable-job + body: > + { + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"time" + } + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: unavailable-feed + body: > + { + "job_id":"unavailable-job", + "indexes":"unavailable-data", + "types":"response" + } + + - do: + catch: missing + xpack.ml.preview_datafeed: + datafeed_id: unavailable-feed + +--- +"Test preview datafeed with query that matches nothing": + + - do: + xpack.ml.put_job: + job_id: empty-job + body: > + { + "analysis_config" : { + "bucket_span":3600, + "detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "time_field":"time" + } + } + + - do: + xpack.ml.put_datafeed: + datafeed_id: empty-feed + body: > + { + "job_id":"empty-job", + "indexes":"airline-data", + "types":"response", + "query": { + "term": {"airline":"missing"} + } + } + + - do: + xpack.ml.preview_datafeed: + datafeed_id: empty-feed + - length: { $body: 0 }