[ML] Add datafeed preview end-point (elastic/x-pack-elasticsearch#645)

Adds a preview end-point: `/_xpack/ml/datafeeds/{datafeed_id}/_preview`

The endpoint returns an array with the JSON records that will reach
writer to the c++ process. Thus, the preview can be used to verify
that a datafeed is configured correctly with regard to itself and
its corresponding job.

Original commit: elastic/x-pack-elasticsearch@1e0e9c906d
This commit is contained in:
Dimitris Athanasiou 2017-02-25 10:17:06 +00:00 committed by GitHub
parent d49930040e
commit 380f6d9bff
12 changed files with 776 additions and 92 deletions

View File

@ -56,6 +56,7 @@ import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.ml.action.MlDeleteByQueryAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.ml.action.PutFilterAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
@ -91,6 +92,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestDeleteDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedStatsAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestGetDatafeedsAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestPreviewDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestPutDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestStartDatafeedAction;
import org.elasticsearch.xpack.ml.rest.datafeeds.RestStopDatafeedAction;
@ -345,6 +347,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
new RestPutDatafeedAction(settings, restController),
new RestUpdateDatafeedAction(settings, restController),
new RestDeleteDatafeedAction(settings, restController),
new RestPreviewDatafeedAction(settings, restController),
new RestStartDatafeedAction(settings, restController),
new RestStopDatafeedAction(settings, restController),
new RestDeleteModelSnapshotAction(settings, restController)
@ -383,6 +386,7 @@ public class MachineLearning extends Plugin implements ActionPlugin {
new ActionHandler<>(PutDatafeedAction.INSTANCE, PutDatafeedAction.TransportAction.class),
new ActionHandler<>(UpdateDatafeedAction.INSTANCE, UpdateDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteDatafeedAction.INSTANCE, DeleteDatafeedAction.TransportAction.class),
new ActionHandler<>(PreviewDatafeedAction.INSTANCE, PreviewDatafeedAction.TransportAction.class),
new ActionHandler<>(StartDatafeedAction.INSTANCE, StartDatafeedAction.TransportAction.class),
new ActionHandler<>(StopDatafeedAction.INSTANCE, StopDatafeedAction.TransportAction.class),
new ActionHandler<>(DeleteModelSnapshotAction.INSTANCE, DeleteModelSnapshotAction.TransportAction.class),

View File

@ -0,0 +1,248 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
public class PreviewDatafeedAction extends Action<PreviewDatafeedAction.Request, PreviewDatafeedAction.Response,
PreviewDatafeedAction.RequestBuilder> {
public static final PreviewDatafeedAction INSTANCE = new PreviewDatafeedAction();
public static final String NAME = "cluster:admin/ml/datafeeds/preview";
private PreviewDatafeedAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest implements ToXContent {
private String datafeedId;
Request() {
}
public Request(String datafeedId) {
setDatafeedId(datafeedId);
}
public String getDatafeedId() {
return datafeedId;
}
public final void setDatafeedId(String datafeedId) {
this.datafeedId = ExceptionsHelper.requireNonNull(datafeedId, DatafeedConfig.ID.getPreferredName());
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
datafeedId = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datafeedId);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), datafeedId);
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(datafeedId);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return Objects.equals(datafeedId, other.datafeedId);
}
}
static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new Request());
}
}
public static class Response extends ActionResponse implements ToXContentObject {
private BytesReference preview;
Response() {
}
Response(BytesReference preview) {
this.preview = preview;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
preview = in.readBytesReference();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBytesReference(preview);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.rawValue(preview, XContentType.JSON);
return builder;
}
@Override
public int hashCode() {
return Objects.hash(preview);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return Objects.equals(preview, other.preview);
}
@Override
public final String toString() {
return Strings.toString(this);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ClusterService clusterService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
ClusterService clusterService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
MlMetadata mlMetadata = clusterService.state().getMetaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(request.getDatafeedId());
if (datafeed == null) {
throw ExceptionsHelper.missingDatafeedException(request.getDatafeedId());
}
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
if (job == null) {
throw ExceptionsHelper.missingJobException(datafeed.getJobId());
}
DatafeedConfig.Builder datafeedWithAutoChunking = new DatafeedConfig.Builder(datafeed);
datafeedWithAutoChunking.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedWithAutoChunking.build(), job);
DataExtractor dataExtractor = dataExtractorFactory.newExtractor(0, System.currentTimeMillis());
threadPool.generic().execute(() -> previewDatafeed(dataExtractor, listener));
}
/** Visible for testing */
static void previewDatafeed(DataExtractor dataExtractor, ActionListener<Response> listener) {
try {
Optional<InputStream> inputStream = dataExtractor.next();
// DataExtractor returns single-line JSON but without newline characters between objects.
// Instead, it has a space between objects due to how JSON XContenetBuilder works.
// In order to return a proper JSON array from preview, we surround with square brackets and
// we stick in a comma between objects.
// Also, the stream is expected to be a single line but in case it is not, we join lines
// using space to ensure the comma insertion works correctly.
StringBuilder responseBuilder = new StringBuilder("[");
if (inputStream.isPresent()) {
try (BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream.get(), StandardCharsets.UTF_8))) {
responseBuilder.append(buffer.lines().collect(Collectors.joining(" ")).replace("} {", "},{"));
}
}
responseBuilder.append("]");
listener.onResponse(new Response(new BytesArray(responseBuilder.toString().getBytes(StandardCharsets.UTF_8))));
} catch (Exception e) {
listener.onFailure(e);
} finally {
dataExtractor.cancel();
}
}
}
}

View File

@ -186,17 +186,8 @@ public class DatafeedJobRunner extends AbstractComponent {
return holder;
}
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
boolean isScrollSearch = datafeedConfig.hasAggregations() == false;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job);
ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig();
if (chunkingConfig == null) {
chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff();
}
return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory)
: dataExtractorFactory;
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeed, Job job) {
return DataExtractorFactory.create(client, datafeed, job);
}
private static DataDescription buildDataDescription(Job job) {

View File

@ -5,6 +5,30 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.Job;
public interface DataExtractorFactory {
DataExtractor newExtractor(long start, long end);
/**
* Creates a {@code DataExtractorFactory} for the given datafeed-job combination.
*/
static DataExtractorFactory create(Client client, DatafeedConfig datafeedConfig, Job job) {
boolean isScrollSearch = datafeedConfig.hasAggregations() == false;
DataExtractorFactory dataExtractorFactory = isScrollSearch ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job);
ChunkingConfig chunkingConfig = datafeedConfig.getChunkingConfig();
if (chunkingConfig == null) {
chunkingConfig = isScrollSearch ? ChunkingConfig.newAuto() : ChunkingConfig.newOff();
}
return chunkingConfig.isEnabled() ? new ChunkedDataExtractorFactory(client, datafeedConfig, job, dataExtractorFactory)
: dataExtractorFactory;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.rest.datafeeds;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import java.io.IOException;
public class RestPreviewDatafeedAction extends BaseRestHandler {
public RestPreviewDatafeedAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET,
MachineLearning.BASE_PATH + "datafeeds/{" + DatafeedConfig.ID.getPreferredName() + "}/_preview", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String datafeedId = restRequest.param(DatafeedConfig.ID.getPreferredName());
PreviewDatafeedAction.Request request = new PreviewDatafeedAction.Request(datafeedId);
return channel -> client.execute(PreviewDatafeedAction.INSTANCE, request, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction.Request;
public class PreviewDatafeedActionRequestTests extends AbstractStreamableTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAsciiOfLength(10));
}
@Override
protected Request createBlankInstance() {
return new Request();
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.junit.Before;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class PreviewDatafeedActionTests extends ESTestCase {
private DataExtractor dataExtractor;
private ActionListener<PreviewDatafeedAction.Response> actionListener;
private String capturedResponse;
private Exception capturedFailure;
@Before
public void setUpTests() {
dataExtractor = mock(DataExtractor.class);
actionListener = mock(ActionListener.class);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
PreviewDatafeedAction.Response response = (PreviewDatafeedAction.Response) invocationOnMock.getArguments()[0];
capturedResponse = response.toString();
return null;
}
}).when(actionListener).onResponse(any());
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
capturedFailure = (Exception) invocationOnMock.getArguments()[0];
return null;
}
}).when(actionListener).onFailure(any());
}
public void testPreviewDatafed_GivenEmptyStream() throws IOException {
when(dataExtractor.next()).thenReturn(Optional.empty());
PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener);
assertThat(capturedResponse, equalTo("[]"));
assertThat(capturedFailure, is(nullValue()));
verify(dataExtractor).cancel();
}
public void testPreviewDatafed_GivenNonEmptyStream() throws IOException {
String streamAsString = "{\"a\":1, \"b\":2} {\"c\":3, \"d\":4}\n{\"e\":5, \"f\":6}";
InputStream stream = new ByteArrayInputStream(streamAsString.getBytes(StandardCharsets.UTF_8));
when(dataExtractor.next()).thenReturn(Optional.of(stream));
PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener);
assertThat(capturedResponse, equalTo("[{\"a\":1, \"b\":2},{\"c\":3, \"d\":4},{\"e\":5, \"f\":6}]"));
assertThat(capturedFailure, is(nullValue()));
verify(dataExtractor).cancel();
}
public void testPreviewDatafed_GivenFailure() throws IOException {
doThrow(new RuntimeException("failed")).when(dataExtractor).next();
PreviewDatafeedAction.TransportAction.previewDatafeed(dataExtractor, actionListener);
assertThat(capturedResponse, is(nullValue()));
assertThat(capturedFailure.getMessage(), equalTo("failed"));
verify(dataExtractor).cancel();
}
}

View File

@ -20,8 +20,6 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.mock.orig.Mockito;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
@ -32,9 +30,6 @@ import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Detector;
@ -65,7 +60,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ml.action.OpenJobActionTests.createJobTask;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@ -272,81 +266,6 @@ public class DatafeedJobRunnerTests extends ESTestCase {
}
}
public void testCreateDataExtractorFactoryGivenDefaultScroll() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = createDatafeedConfig("datafeed1", "foo").build();
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
() -> currentTime, auditor);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig, jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
() -> currentTime, auditor);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
() -> currentTime, auditor);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a")));
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
() -> currentTime, auditor);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder());
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DatafeedJobRunner runner = new DatafeedJobRunner(threadPool, client, clusterService, mock(JobProvider.class),
() -> currentTime, auditor);
DataExtractorFactory dataExtractorFactory = runner.createDataExtractorFactory(datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public static DatafeedConfig.Builder createDatafeedConfig(String datafeedId, String jobId) {
DatafeedConfig.Builder datafeedConfig = new DatafeedConfig.Builder(datafeedId, jobId);
datafeedConfig.setIndexes(Arrays.asList("myIndex"));

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunnerTests;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.junit.Before;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Mockito.mock;
public class DataExtractorFactoryTests extends ESTestCase {
private Client client;
@Before
public void setUpTests() {
client = mock(Client.class);
}
public void testCreateDataExtractorFactoryGivenDefaultScroll() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo").build();
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig, jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenScrollWithOffChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ScrollDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenAggregation() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(AggregationBuilders.avg("a")));
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(AggregationDataExtractorFactory.class));
}
public void testCreateDataExtractorFactoryGivenDefaultAggregationWithAutoChunk() {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedJobRunnerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedJobRunnerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setAggregations(AggregatorFactories.builder());
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
DataExtractorFactory dataExtractorFactory = DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build());
assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class));
}
}

View File

@ -112,6 +112,7 @@ cluster:admin/ml/anomaly_detectors/results/buckets/get
cluster:admin/ml/anomaly_detectors/model_snapshots/get
cluster:admin/ml/anomaly_detectors/results/records/get
cluster:admin/ml/anomaly_detectors/results/influencers/get
cluster:admin/ml/datafeeds/preview
cluster:admin/ml/datafeeds/put
cluster:admin/ml/datafeeds/update
cluster:admin/ml/anomaly_detectors/model_snapshots/delete

View File

@ -0,0 +1,17 @@
{
"xpack.ml.preview_datafeed": {
"methods": [ "GET" ],
"url": {
"path": "/_xpack/ml/datafeeds/{datafeed_id}/_preview",
"paths": [ "/_xpack/ml/datafeeds/{datafeed_id}/_preview" ],
"parts": {
"datafeed_id": {
"type": "string",
"required": true,
"description": "The ID of the datafeed to preview"
}
}
},
"body": null
}
}

View File

@ -0,0 +1,234 @@
setup:
- do:
indices.create:
index: airline-data
body:
mappings:
response:
properties:
time:
type: date
airline:
type: keyword
responsetime:
type: float
- do:
index:
index: airline-data
type: response
id: 1
body: >
{
"time": "2017-02-18T00:00:00Z",
"airline": "foo",
"responsetime": 1.0
}
- do:
index:
index: airline-data
type: response
id: 2
body: >
{
"time": "2017-02-18T00:30:00Z",
"airline": "foo",
"responsetime": 1.0
}
- do:
index:
index: airline-data
type: response
id: 3
body: >
{
"time": "2017-02-18T01:00:00Z",
"airline": "bar",
"responsetime": 42.0
}
- do:
indices.refresh:
index: airline-data
---
"Test preview scroll datafeed":
- do:
xpack.ml.put_job:
job_id: scroll-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: scroll-feed
body: >
{
"job_id":"scroll-job",
"indexes":"airline-data",
"types":"response"
}
- do:
xpack.ml.preview_datafeed:
datafeed_id: scroll-feed
- length: { $body: 3 }
- match: { 0.time: 1487376000000 }
- match: { 0.airline: foo }
- match: { 0.responsetime: 1.0 }
- match: { 1.time: 1487377800000 }
- match: { 1.airline: foo }
- match: { 1.responsetime: 1.0 }
- match: { 2.time: 1487379600000 }
- match: { 2.airline: bar }
- match: { 2.responsetime: 42.0 }
---
"Test preview aggregation datafeed":
- do:
xpack.ml.put_job:
job_id: aggregation-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"summary_count_field_name": "doc_count",
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: aggregation-feed
body: >
{
"job_id":"aggregation-job",
"indexes":"airline-data",
"types":"response",
"aggregations": {
"time": {
"histogram": {
"field": "time",
"interval": 3600000
},
"aggregations": {
"airline": {
"terms": {
"field": "airline",
"size": 100
},
"aggregations": {
"responsetime": {
"sum": {
"field": "responsetime"
}
}
}
}
}
}
}
}
- do:
xpack.ml.preview_datafeed:
datafeed_id: aggregation-feed
- length: { $body: 2 }
- match: { 0.time: 1.487376E12 }
- match: { 0.airline: foo }
- match: { 0.responsetime: 2.0 }
- match: { 0.doc_count: 2 }
- match: { 1.time: 1.4873796E12 }
- match: { 1.airline: bar }
- match: { 1.responsetime: 42.0 }
- match: { 1.doc_count: 1 }
---
"Test preview missing datafeed":
- do:
catch: missing
xpack.ml.preview_datafeed:
datafeed_id: missing-feed
---
"Test preview datafeed with unavailable index":
- do:
xpack.ml.put_job:
job_id: unavailable-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: unavailable-feed
body: >
{
"job_id":"unavailable-job",
"indexes":"unavailable-data",
"types":"response"
}
- do:
catch: missing
xpack.ml.preview_datafeed:
datafeed_id: unavailable-feed
---
"Test preview datafeed with query that matches nothing":
- do:
xpack.ml.put_job:
job_id: empty-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"detectors" :[{"function":"sum","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: empty-feed
body: >
{
"job_id":"empty-job",
"indexes":"airline-data",
"types":"response",
"query": {
"term": {"airline":"missing"}
}
}
- do:
xpack.ml.preview_datafeed:
datafeed_id: empty-feed
- length: { $body: 0 }