ML: Add support for rollup Indexes in Datafeeds (#34654)

* Adding rollup support for datafeeds

* Fixing tests and adjusting formatting

* minor formatting chagne

* fixing some syntax and removing redundancies

* Refactoring and fixing failing test

* Refactoring, adding paranoid null check

* Moving rollup into the aggregation package

* making AggregationToJsonProcessor package private again

* Addressing test failure

* Fixing validations, chunking

* Addressing failing test

* rolling back RollupJobCaps changes

* Adding comment and cleaning up test

* Addressing review comments and test failures

* Moving builder logic into separate methods

* Addressing PR comments, adding test for rollup permissions

* Fixing test failure

* Adding rollup priv check on datafeed put

* Handling missing index when getting caps

* Fixing unused import
This commit is contained in:
Benjamin Trent 2018-11-01 10:02:24 -05:00 committed by GitHub
parent 7b13d0592f
commit 2fadec5c3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1280 additions and 223 deletions

View File

@ -422,9 +422,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static class Builder { public static class Builder {
public static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000;
private static final TimeValue MIN_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1); private static final TimeValue MIN_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(1);
private static final TimeValue MAX_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(2); private static final TimeValue MAX_DEFAULT_QUERY_DELAY = TimeValue.timeValueMinutes(2);
private static final int DEFAULT_AGGREGATION_CHUNKING_BUCKETS = 1000;
private String id; private String id;
private String jobId; private String jobId;

View File

@ -139,7 +139,7 @@ public final class ExtractorUtils {
} }
} }
static long validateAndGetCalendarInterval(String calendarInterval) { public static long validateAndGetCalendarInterval(String calendarInterval) {
TimeValue interval; TimeValue interval;
DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval); DateTimeUnit dateTimeUnit = DateHistogramAggregationBuilder.DATE_FIELD_UNITS.get(calendarInterval);
if (dateTimeUnit != null) { if (dateTimeUnit != null) {

View File

@ -25,7 +25,11 @@ public class RollupSearchAction extends Action<SearchResponse> {
return new SearchResponse(); return new SearchResponse();
} }
static class RequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse> { public static class RequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse> {
public RequestBuilder(ElasticsearchClient client, SearchRequest searchRequest) {
super(client, INSTANCE, searchRequest);
}
RequestBuilder(ElasticsearchClient client) { RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new SearchRequest()); super(client, INSTANCE, new SearchRequest());
} }

View File

@ -17,6 +17,7 @@ import org.elasticsearch.test.SecuritySettingsSourceField;
import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.ml.notifications.AuditorField;
import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.MachineLearning;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -27,6 +28,7 @@ import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
@ -63,6 +65,16 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
client().performRequest(request); client().performRequest(request);
} }
private void setupFullAccessRole(String index) throws IOException {
Request request = new Request("PUT", "/_xpack/security/role/test_data_access");
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [\"" + index + "\"], \"privileges\": [\"all\"] }"
+ " ]"
+ "}");
client().performRequest(request);
}
private void setupUser(String user, List<String> roles) throws IOException { private void setupUser(String user, List<String> roles) throws IOException {
String password = new String(SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING.getChars()); String password = new String(SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING.getChars());
@ -359,7 +371,75 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
assertThat(e.getMessage(), containsString("Cannot create datafeed")); assertThat(e.getMessage(), containsString("Cannot create datafeed"));
assertThat(e.getMessage(), assertThat(e.getMessage(),
containsString("user ml_admin lacks permissions on the indices to be searched")); containsString("user ml_admin lacks permissions on the indices"));
}
public void testInsufficientSearchPrivilegesOnPutWithRollup() throws Exception {
setupDataAccessRole("airline-data-aggs-rollup");
String jobId = "privs-put-job-rollup";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";
ResponseException e = expectThrows(ResponseException.class, () ->
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
.setAggregations(aggregations)
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) //want to search, but no admin access
.build());
assertThat(e.getMessage(), containsString("Cannot create datafeed"));
assertThat(e.getMessage(),
containsString("user ml_admin_plus_data lacks permissions on the indices"));
} }
public void testInsufficientSearchPrivilegesOnPreview() throws Exception { public void testInsufficientSearchPrivilegesOnPreview() throws Exception {
@ -615,7 +695,7 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
// There should be a notification saying that there was a problem extracting data // There should be a notification saying that there was a problem extracting data
client().performRequest(new Request("POST", "/_refresh")); client().performRequest(new Request("POST", "/_refresh"));
Response notificationsResponse = client().performRequest( Response notificationsResponse = client().performRequest(
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId)); new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId));
String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity()); String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity());
assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " + assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " +
"action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\"")); "action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\""));
@ -663,6 +743,171 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
} }
public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throws Exception {
String jobId = "aggs-histogram-rollup-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);
client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_start"));
assertBusy(() -> {
Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId));
String body = EntityUtils.toString(getRollup.getEntity());
assertThat(body, containsString("\"job_state\":\"started\""));
assertThat(body, containsString("\"rollups_indexed\":4"));
}, 60, TimeUnit.SECONDS);
client().performRequest(new Request("POST", "/_xpack/rollup/job/" + rollupJobId + "/_stop"));
assertBusy(() -> {
Response getRollup = client().performRequest(new Request("GET", "/_xpack/rollup/job/" + rollupJobId));
assertThat(EntityUtils.toString(getRollup.getEntity()), containsString("\"job_state\":\"stopped\""));
}, 60, TimeUnit.SECONDS);
final Request refreshRollupIndex = new Request("POST", "airline-data-aggs-rollup/_refresh");
client().performRequest(refreshRollupIndex);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "response").setAggregations(aggregations).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
}
public void testLookbackWithoutPermissionsAndRollup() throws Exception {
setupFullAccessRole("airline-data-aggs-rollup");
String jobId = "rollup-permission-test-network-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String rollupJobId = "rollup-" + jobId;
Request createRollupRequest = new Request("PUT", "/_xpack/rollup/job/" + rollupJobId);
createRollupRequest.setJsonEntity("{\n"
+ "\"index_pattern\": \"airline-data-aggs\",\n"
+ " \"rollup_index\": \"airline-data-aggs-rollup\",\n"
+ " \"cron\": \"*/30 * * * * ?\",\n"
+ " \"page_size\" :1000,\n"
+ " \"groups\" : {\n"
+ " \"date_histogram\": {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"interval\": \"2m\",\n"
+ " \"delay\": \"7d\"\n"
+ " },\n"
+ " \"terms\": {\n"
+ " \"fields\": [\"airline\"]\n"
+ " }"
+ " },\n"
+ " \"metrics\": [\n"
+ " {\n"
+ " \"field\": \"responsetime\",\n"
+ " \"metrics\": [\"avg\",\"min\",\"max\",\"sum\"]\n"
+ " },\n"
+ " {\n"
+ " \"field\": \"time stamp\",\n"
+ " \"metrics\": [\"min\",\"max\"]\n"
+ " }\n"
+ " ]\n"
+ "}");
client().performRequest(createRollupRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";
// At the time we create the datafeed the user can access the network-data index that we have access to
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
.build();
// Change the role so that the user can no longer access network-data
setupFullAccessRole("some-other-data");
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS);
waitUntilJobIsClosed(jobId);
// There should be a notification saying that there was a problem extracting data
client().performRequest(new Request("POST", "/_refresh"));
Response notificationsResponse = client().performRequest(
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?size=1000&q=job_id:" + jobId));
String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity());
assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " +
"action [indices:admin/xpack/rollup/search] is unauthorized for user [ml_admin_plus_data]\""));
}
public void testRealtime() throws Exception { public void testRealtime() throws Exception {
String jobId = "job-realtime-1"; String jobId = "job-realtime-1";
createJob(jobId, "airline"); createJob(jobId, "airline");
@ -882,7 +1127,8 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
@After @After
public void clearMlState() throws Exception { public void clearMlState() throws Exception {
new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata(); new MlRestTestStateCleaner(logger, adminClient()).clearMlMetadata();
ESRestTestCase.waitForPendingTasks(adminClient()); // Don't check rollup jobs because we clear them in the superclass.
waitForPendingTasks(adminClient(), taskName -> taskName.startsWith(RollupJob.NAME));
} }
private static class DatafeedBuilder { private static class DatafeedBuilder {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
@ -32,6 +33,8 @@ import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.core.security.SecurityContext; import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesAction;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest; import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
@ -42,6 +45,9 @@ import org.elasticsearch.xpack.core.security.support.Exceptions;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> { public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
private final XPackLicenseState licenseState; private final XPackLicenseState licenseState;
@ -78,23 +84,48 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
// If security is enabled only create the datafeed if the user requesting creation has // If security is enabled only create the datafeed if the user requesting creation has
// permission to read the indices the datafeed is going to read from // permission to read the indices the datafeed is going to read from
if (licenseState.isAuthAllowed()) { if (licenseState.isAuthAllowed()) {
final String[] indices = request.getDatafeed().getIndices().toArray(new String[0]);
final String username = securityContext.getUser().principal(); final String username = securityContext.getUser().principal();
final HasPrivilegesRequest privRequest = new HasPrivilegesRequest();
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
privRequest.username(username);
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
final RoleDescriptor.IndicesPrivileges.Builder indicesPrivilegesBuilder = RoleDescriptor.IndicesPrivileges.builder()
.indices(indices);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap( ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, request, r, listener), r -> handlePrivsResponse(username, request, r, listener),
listener::onFailure); listener::onFailure);
HasPrivilegesRequest privRequest = new HasPrivilegesRequest(); ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
privRequest.username(username); response -> {
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
// We just check for permission to use the search action. In reality we'll also indicesPrivilegesBuilder.privileges(SearchAction.NAME);
// use the scroll action, but that's considered an implementation detail. } else {
privRequest.indexPrivileges(RoleDescriptor.IndicesPrivileges.builder() indicesPrivilegesBuilder.privileges(SearchAction.NAME, RollupSearchAction.NAME);
.indices(request.getDatafeed().getIndices().toArray(new String[0])) }
.privileges(SearchAction.NAME) privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
.build());
privRequest.applicationPrivileges(new RoleDescriptor.ApplicationResourcePrivileges[0]);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
},
e -> {
if (e instanceof IndexNotFoundException) {
indicesPrivilegesBuilder.privileges(SearchAction.NAME);
privRequest.indexPrivileges(indicesPrivilegesBuilder.build());
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else {
listener.onFailure(e);
}
}
);
executeAsyncWithOrigin(client,
ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
new GetRollupIndexCapsAction.Request(indices),
getRollupIndexCapsActionHandler);
} else { } else {
putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener);
} }
@ -115,8 +146,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDat
builder.endObject(); builder.endObject();
listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" + listener.onFailure(Exceptions.authorizationError("Cannot create datafeed [{}]" +
" because user {} lacks permissions on the indices to be" + " because user {} lacks permissions on the indices: {}",
" searched: {}",
request.getDatafeed().getId(), username, Strings.toString(builder))); request.getDatafeed().getId(), username, Strings.toString(builder)));
} }
} }

View File

@ -5,14 +5,19 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor; package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.Job;
public interface DataExtractorFactory { public interface DataExtractorFactory {
DataExtractor newExtractor(long start, long end); DataExtractor newExtractor(long start, long end);
@ -27,11 +32,39 @@ public interface DataExtractorFactory {
, listener::onFailure , listener::onFailure
); );
boolean isScrollSearch = datafeed.hasAggregations() == false; ActionListener<GetRollupIndexCapsAction.Response> getRollupIndexCapsActionHandler = ActionListener.wrap(
if (isScrollSearch) { response -> {
ScrollDataExtractorFactory.create(client, datafeed, job, factoryHandler); if (response.getJobs().isEmpty()) { // This means no rollup indexes are in the config
} else { if (datafeed.hasAggregations()) {
factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job)); factoryHandler.onResponse(new AggregationDataExtractorFactory(client, datafeed, job));
} else {
ScrollDataExtractorFactory.create(client, datafeed, job, factoryHandler);
}
} else {
if (datafeed.hasAggregations()) { // Rollup indexes require aggregations
RollupDataExtractorFactory.create(client, datafeed, job, response.getJobs(), factoryHandler);
} else {
listener.onFailure(new IllegalArgumentException("Aggregations are required when using Rollup indices"));
} }
} }
},
e -> {
if (e instanceof IndexNotFoundException) {
listener.onFailure(new ResourceNotFoundException("datafeed [" + datafeed.getId()
+ "] cannot retrieve data because index " + ((IndexNotFoundException)e).getIndex() + " does not exist"));
} else {
listener.onFailure(e);
}
}
);
GetRollupIndexCapsAction.Request request = new GetRollupIndexCapsAction.Request(datafeed.getIndices().toArray(new String[0]));
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.ML_ORIGIN,
GetRollupIndexCapsAction.INSTANCE,
request,
getRollupIndexCapsActionHandler);
}
} }

View File

@ -0,0 +1,167 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Abstract class for aggregated data extractors, e.g. {@link RollupDataExtractor}
*
* @param <T> The request builder type for getting data from ElasticSearch
*/
abstract class AbstractAggregationDataExtractor<T extends ActionRequestBuilder<SearchRequest, SearchResponse>>
implements DataExtractor {
private static final Logger LOGGER = LogManager.getLogger(AbstractAggregationDataExtractor.class);
/**
* The number of key-value pairs written in each batch to process.
* This has to be a number that is small enough to allow for responsive
* cancelling and big enough to not cause overhead by calling the
* post data action too often. The value of 1000 was determined via
* such testing.
*/
private static int BATCH_KEY_VALUE_PAIRS = 1000;
protected final Client client;
protected final AggregationDataExtractorContext context;
private boolean hasNext;
private boolean isCancelled;
private AggregationToJsonProcessor aggregationToJsonProcessor;
private ByteArrayOutputStream outputStream;
AbstractAggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
context = Objects.requireNonNull(dataExtractorContext);
hasNext = true;
isCancelled = false;
outputStream = new ByteArrayOutputStream();
}
@Override
public boolean hasNext() {
return hasNext;
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public void cancel() {
LOGGER.debug("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
hasNext = false;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (aggregationToJsonProcessor == null) {
Aggregations aggs = search();
if (aggs == null) {
hasNext = false;
return Optional.empty();
}
initAggregationProcessor(aggs);
}
return Optional.ofNullable(processNextBatch());
}
private Aggregations search() throws IOException {
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest(buildBaseSearchSource()));
LOGGER.debug("[{}] Search response was obtained", context.jobId);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return validateAggs(searchResponse.getAggregations());
}
private void initAggregationProcessor(Aggregations aggs) throws IOException {
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount,
context.start);
aggregationToJsonProcessor.process(aggs);
}
protected SearchResponse executeSearchRequest(T searchRequestBuilder) {
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
}
private SearchSourceBuilder buildBaseSearchSource() {
// For derivative aggregations the first bucket will always be null
// so query one extra histogram bucket back and hope there is data
// in that bucket
long histogramSearchStartTime = Math.max(0, context.start - ExtractorUtils.getHistogramIntervalMillis(context.aggs));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.size(0)
.query(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, histogramSearchStartTime, context.end));
context.aggs.getAggregatorFactories().forEach(searchSourceBuilder::aggregation);
context.aggs.getPipelineAggregatorFactories().forEach(searchSourceBuilder::aggregation);
return searchSourceBuilder;
}
protected abstract T buildSearchRequest(SearchSourceBuilder searchRequestBuilder);
private Aggregations validateAggs(@Nullable Aggregations aggs) {
if (aggs == null) {
return null;
}
List<Aggregation> aggsAsList = aggs.asList();
if (aggsAsList.isEmpty()) {
return null;
}
if (aggsAsList.size() > 1) {
throw new IllegalArgumentException("Multiple top level aggregations not supported; found: "
+ aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList()));
}
return aggs;
}
private InputStream processNextBatch() throws IOException {
outputStream.reset();
hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
}
protected long getHistogramInterval() {
return ExtractorUtils.getHistogramIntervalMillis(context.aggs);
}
public AggregationDataExtractorContext getContext() {
return context;
}
}

View File

@ -5,28 +5,10 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/** /**
* An implementation that extracts data from elasticsearch using search with aggregations on a client. * An implementation that extracts data from elasticsearch using search with aggregations on a client.
@ -34,132 +16,19 @@ import java.util.stream.Collectors;
* stored and they are then processed in batches. Cancellation is supported between batches. * stored and they are then processed in batches. Cancellation is supported between batches.
* Note that this class is NOT thread-safe. * Note that this class is NOT thread-safe.
*/ */
class AggregationDataExtractor implements DataExtractor { class AggregationDataExtractor extends AbstractAggregationDataExtractor<SearchRequestBuilder> {
private static final Logger LOGGER = LogManager.getLogger(AggregationDataExtractor.class);
/**
* The number of key-value pairs written in each batch to process.
* This has to be a number that is small enough to allow for responsive
* cancelling and big enough to not cause overhead by calling the
* post data action too often. The value of 1000 was determined via
* such testing.
*/
private static int BATCH_KEY_VALUE_PAIRS = 1000;
private final Client client;
private final AggregationDataExtractorContext context;
private boolean hasNext;
private boolean isCancelled;
private AggregationToJsonProcessor aggregationToJsonProcessor;
private ByteArrayOutputStream outputStream;
AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) { AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client); super(client, dataExtractorContext);
context = Objects.requireNonNull(dataExtractorContext);
hasNext = true;
isCancelled = false;
outputStream = new ByteArrayOutputStream();
} }
@Override @Override
public boolean hasNext() { protected SearchRequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) {
return hasNext;
}
@Override return new SearchRequestBuilder(client, SearchAction.INSTANCE)
public boolean isCancelled() { .setSource(searchSourceBuilder)
return isCancelled;
}
@Override
public void cancel() {
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
hasNext = false;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
if (aggregationToJsonProcessor == null) {
Aggregations aggs = search();
if (aggs == null) {
hasNext = false;
return Optional.empty();
}
initAggregationProcessor(aggs);
}
return Optional.ofNullable(processNextBatch());
}
private Aggregations search() throws IOException {
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
LOGGER.debug("[{}] Search response was obtained", context.jobId);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return validateAggs(searchResponse.getAggregations());
}
private void initAggregationProcessor(Aggregations aggs) throws IOException {
aggregationToJsonProcessor = new AggregationToJsonProcessor(context.timeField, context.fields, context.includeDocCount,
context.start);
aggregationToJsonProcessor.process(aggs);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
}
private SearchRequestBuilder buildSearchRequest() {
// For derivative aggregations the first bucket will always be null
// so query one extra histogram bucket back and hope there is data
// in that bucket
long histogramSearchStartTime = Math.max(0, context.start - getHistogramInterval());
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices) .setIndices(context.indices)
.setTypes(context.types) .setTypes(context.types);
.setSize(0)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, histogramSearchStartTime, context.end));
context.aggs.getAggregatorFactories().forEach(searchRequestBuilder::addAggregation);
context.aggs.getPipelineAggregatorFactories().forEach(searchRequestBuilder::addAggregation);
return searchRequestBuilder;
} }
private Aggregations validateAggs(@Nullable Aggregations aggs) {
if (aggs == null) {
return null;
}
List<Aggregation> aggsAsList = aggs.asList();
if (aggsAsList.isEmpty()) {
return null;
}
if (aggsAsList.size() > 1) {
throw new IllegalArgumentException("Multiple top level aggregations not supported; found: "
+ aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList()));
}
return aggs;
}
private InputStream processNextBatch() throws IOException {
outputStream.reset();
hasNext = aggregationToJsonProcessor.writeDocs(BATCH_KEY_VALUE_PAIRS, outputStream);
return new ByteArrayInputStream(outputStream.toByteArray());
}
private long getHistogramInterval() {
return ExtractorUtils.getHistogramIntervalMillis(context.aggs);
}
AggregationDataExtractorContext getContext() {
return context;
}
} }

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
/**
* An implementation that extracts data from elasticsearch using search with aggregations against rollup indexes on a client.
* The first time {@link #next()} is called, the search is executed. The result aggregations are
* stored and they are then processed in batches. Cancellation is supported between batches.
* Note that this class is NOT thread-safe.
*/
class RollupDataExtractor extends AbstractAggregationDataExtractor<RollupSearchAction.RequestBuilder> {
RollupDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
super(client, dataExtractorContext);
}
@Override
protected RollupSearchAction.RequestBuilder buildSearchRequest(SearchSourceBuilder searchSourceBuilder) {
SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(searchSourceBuilder);
return new RollupSearchAction.RequestBuilder(client, searchRequest);
}
}

View File

@ -0,0 +1,218 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps.RollupFieldCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.getHistogramAggregation;
import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.getHistogramIntervalMillis;
import static org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils.validateAndGetCalendarInterval;
public class RollupDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final DatafeedConfig datafeedConfig;
private final Job job;
private RollupDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
}
@Override
public DataExtractor newExtractor(long start, long end) {
long histogramInterval = datafeedConfig.getHistogramIntervalMillis();
AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
job.getId(),
job.getDataDescription().getTimeField(),
job.getAnalysisConfig().analysisFields(),
datafeedConfig.getIndices(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getAggregations(),
Intervals.alignToCeil(start, histogramInterval),
Intervals.alignToFloor(end, histogramInterval),
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT),
datafeedConfig.getHeaders());
return new RollupDataExtractor(client, dataExtractorContext);
}
public static void create(Client client,
DatafeedConfig datafeed,
Job job,
Map<String, RollableIndexCaps> rollupJobsWithCaps,
ActionListener<DataExtractorFactory> listener) {
final AggregationBuilder datafeedHistogramAggregation = getHistogramAggregation(
datafeed.getAggregations().getAggregatorFactories());
if ((datafeedHistogramAggregation instanceof DateHistogramAggregationBuilder) == false) {
listener.onFailure(
new IllegalArgumentException("Rollup requires that the datafeed configuration use a [date_histogram] aggregation," +
" not a [histogram] aggregation over the time field."));
return;
}
final String timeField = ((ValuesSourceAggregationBuilder) datafeedHistogramAggregation).field();
Set<ParsedRollupCaps> rollupCapsSet = rollupJobsWithCaps.values()
.stream()
.flatMap(rollableIndexCaps -> rollableIndexCaps.getJobCaps().stream())
.map(rollupJobCaps -> ParsedRollupCaps.fromJobFieldCaps(rollupJobCaps.getFieldCaps(), timeField))
.collect(Collectors.toSet());
final long datafeedInterval = getHistogramIntervalMillis(datafeedHistogramAggregation);
List<ParsedRollupCaps> validIntervalCaps = rollupCapsSet.stream()
.filter(rollupCaps -> validInterval(datafeedInterval, rollupCaps))
.collect(Collectors.toList());
if (validIntervalCaps.isEmpty()) {
listener.onFailure(
new IllegalArgumentException(
"Rollup capabilities do not have a [date_histogram] aggregation with an interval " +
"that is a multiple of the datafeed's interval.")
);
return;
}
final List<ValuesSourceAggregationBuilder> flattenedAggs = new ArrayList<>();
flattenAggregations(datafeed.getAggregations().getAggregatorFactories(), datafeedHistogramAggregation, flattenedAggs);
if (validIntervalCaps.stream().noneMatch(rollupJobConfig -> hasAggregations(rollupJobConfig, flattenedAggs))) {
listener.onFailure(
new IllegalArgumentException("Rollup capabilities do not support all the datafeed aggregations at the desired interval.")
);
return;
}
listener.onResponse(new RollupDataExtractorFactory(client, datafeed, job));
}
private static boolean validInterval(long datafeedInterval, ParsedRollupCaps rollupJobGroupConfig) {
if (rollupJobGroupConfig.hasDatehistogram() == false) {
return false;
}
if ("UTC".equalsIgnoreCase(rollupJobGroupConfig.getTimezone()) == false) {
return false;
}
try {
long jobInterval = validateAndGetCalendarInterval(rollupJobGroupConfig.getInterval());
return datafeedInterval % jobInterval == 0;
} catch (ElasticsearchStatusException exception) {
return false;
}
}
private static void flattenAggregations(final Collection<AggregationBuilder> datafeedAggregations,
final AggregationBuilder datafeedHistogramAggregation,
final List<ValuesSourceAggregationBuilder> flattenedAggregations) {
for (AggregationBuilder aggregationBuilder : datafeedAggregations) {
if (aggregationBuilder.equals(datafeedHistogramAggregation) == false) {
flattenedAggregations.add((ValuesSourceAggregationBuilder)aggregationBuilder);
}
flattenAggregations(aggregationBuilder.getSubAggregations(), datafeedHistogramAggregation, flattenedAggregations);
}
}
private static boolean hasAggregations(ParsedRollupCaps rollupCaps, List<ValuesSourceAggregationBuilder> datafeedAggregations) {
for (ValuesSourceAggregationBuilder aggregationBuilder : datafeedAggregations) {
String type = aggregationBuilder.getType();
String field = aggregationBuilder.field();
if (aggregationBuilder instanceof TermsAggregationBuilder) {
if (rollupCaps.supportedTerms.contains(field) == false) {
return false;
}
} else {
if (rollupCaps.supportedMetrics.contains(field + "_" + type) == false) {
return false;
}
}
}
return true;
}
private static class ParsedRollupCaps {
private final Set<String> supportedMetrics;
private final Set<String> supportedTerms;
private final Map<String, Object> datehistogramAgg;
private static final List<String> aggsToIgnore =
Arrays.asList(HistogramAggregationBuilder.NAME, DateHistogramAggregationBuilder.NAME);
private static ParsedRollupCaps fromJobFieldCaps(Map<String, RollupFieldCaps> rollupFieldCaps, String timeField) {
Map<String, Object> datehistogram = null;
RollupFieldCaps timeFieldCaps = rollupFieldCaps.get(timeField);
if (timeFieldCaps != null) {
for(Map<String, Object> agg : timeFieldCaps.getAggs()) {
if (agg.get("agg").equals(DateHistogramAggregationBuilder.NAME)) {
datehistogram = agg;
}
}
}
Set<String> supportedMetrics = new HashSet<>();
Set<String> supportedTerms = new HashSet<>();
rollupFieldCaps.forEach((field, fieldCaps) -> {
fieldCaps.getAggs().forEach(agg -> {
String type = (String)agg.get("agg");
if (type.equals(TermsAggregationBuilder.NAME)) {
supportedTerms.add(field);
} else if (aggsToIgnore.contains(type) == false) {
supportedMetrics.add(field + "_" + type);
}
});
});
return new ParsedRollupCaps(supportedMetrics, supportedTerms, datehistogram);
}
private ParsedRollupCaps(Set<String> supportedMetrics, Set<String> supportedTerms, Map<String, Object> datehistogramAgg) {
this.supportedMetrics = supportedMetrics;
this.supportedTerms = supportedTerms;
this.datehistogramAgg = datehistogramAgg;
}
private String getInterval() {
if (datehistogramAgg == null) {
return null;
}
return (String)datehistogramAgg.get(DateHistogramGroupConfig.INTERVAL);
}
private String getTimezone() {
if (datehistogramAgg == null) {
return null;
}
return (String)datehistogramAgg.get(DateHistogramGroupConfig.TIME_ZONE);
}
private boolean hasDatehistogram() {
return datehistogramAgg != null;
}
}
}

View File

@ -7,7 +7,9 @@ package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -15,10 +17,14 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.Max; import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.Min; import org.elasticsearch.search.aggregations.metrics.Min;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -43,6 +49,13 @@ import java.util.Optional;
*/ */
public class ChunkedDataExtractor implements DataExtractor { public class ChunkedDataExtractor implements DataExtractor {
private interface DataSummary {
long estimateChunk();
boolean hasData();
long earliestTime();
long getDataTimeSpread();
}
private static final Logger LOGGER = LogManager.getLogger(ChunkedDataExtractor.class); private static final Logger LOGGER = LogManager.getLogger(ChunkedDataExtractor.class);
private static final String EARLIEST_TIME = "earliest_time"; private static final String EARLIEST_TIME = "earliest_time";
@ -54,6 +67,7 @@ public class ChunkedDataExtractor implements DataExtractor {
private final Client client; private final Client client;
private final DataExtractorFactory dataExtractorFactory; private final DataExtractorFactory dataExtractorFactory;
private final ChunkedDataExtractorContext context; private final ChunkedDataExtractorContext context;
private final DataSummaryFactory dataSummaryFactory;
private long currentStart; private long currentStart;
private long currentEnd; private long currentEnd;
private long chunkSpan; private long chunkSpan;
@ -67,6 +81,7 @@ public class ChunkedDataExtractor implements DataExtractor {
this.currentStart = context.start; this.currentStart = context.start;
this.currentEnd = context.start; this.currentEnd = context.start;
this.isCancelled = false; this.isCancelled = false;
this.dataSummaryFactory = new DataSummaryFactory();
} }
@Override @Override
@ -93,48 +108,21 @@ public class ChunkedDataExtractor implements DataExtractor {
} }
private void setUpChunkedSearch() throws IOException { private void setUpChunkedSearch() throws IOException {
DataSummary dataSummary = requestDataSummary(); DataSummary dataSummary = dataSummaryFactory.buildDataSummary();
if (dataSummary.totalHits > 0) { if (dataSummary.hasData()) {
currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime); currentStart = context.timeAligner.alignToFloor(dataSummary.earliestTime());
currentEnd = currentStart; currentEnd = currentStart;
chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis(); chunkSpan = context.chunkSpan == null ? dataSummary.estimateChunk() : context.chunkSpan.getMillis();
chunkSpan = context.timeAligner.alignToCeil(chunkSpan); chunkSpan = context.timeAligner.alignToCeil(chunkSpan);
LOGGER.debug("[{}]Chunked search configured: totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", LOGGER.debug("[{}]Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms",
context.jobId, dataSummary.totalHits, dataSummary.getDataTimeSpread(), chunkSpan); context.jobId, dataSummary.getClass().getSimpleName(), dataSummary.getDataTimeSpread(), chunkSpan);
} else { } else {
// search is over // search is over
currentEnd = context.end; currentEnd = context.end;
} }
} }
private DataSummary requestDataSummary() throws IOException { protected SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
SearchRequestBuilder searchRequestBuilder = new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setSize(0)
.setIndices(context.indices)
.setTypes(context.types)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, currentStart, context.end))
.addAggregation(AggregationBuilders.min(EARLIEST_TIME).field(context.timeField))
.addAggregation(AggregationBuilders.max(LATEST_TIME).field(context.timeField));
SearchResponse response = executeSearchRequest(searchRequestBuilder);
LOGGER.debug("[{}] Data summary response was obtained", context.jobId);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
Aggregations aggregations = response.getAggregations();
long earliestTime = 0;
long latestTime = 0;
long totalHits = response.getHits().getTotalHits();
if (totalHits > 0) {
Min min = aggregations.get(EARLIEST_TIME);
earliestTime = (long) min.getValue();
Max max = aggregations.get(LATEST_TIME);
latestTime = (long) max.getValue();
}
return new DataSummary(earliestTime, latestTime, totalHits);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get); return ClientHelper.executeWithHeaders(context.headers, ClientHelper.ML_ORIGIN, client, searchRequestBuilder::get);
} }
@ -182,19 +170,101 @@ public class ChunkedDataExtractor implements DataExtractor {
isCancelled = true; isCancelled = true;
} }
private class DataSummary { ChunkedDataExtractorContext getContext() {
return context;
}
private class DataSummaryFactory {
/**
* If there are aggregations, an AggregatedDataSummary object is created. It returns a ScrollingDataSummary otherwise.
*
* By default a DatafeedConfig with aggregations, should already have a manual ChunkingConfig created.
* However, the end user could have specifically set the ChunkingConfig to AUTO, which would not really work for aggregations.
* So, if we need to gather an appropriate chunked time for aggregations, we can utilize the AggregatedDataSummary
*
* @return DataSummary object
* @throws IOException when timefield range search fails
*/
private DataSummary buildDataSummary() throws IOException {
return context.hasAggregations ? newAggregatedDataSummary() : newScrolledDataSummary();
}
private DataSummary newScrolledDataSummary() throws IOException {
SearchRequestBuilder searchRequestBuilder = rangeSearchRequest().setTypes(context.types);
SearchResponse response = executeSearchRequest(searchRequestBuilder);
LOGGER.debug("[{}] Scrolling Data summary response was obtained", context.jobId);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
Aggregations aggregations = response.getAggregations();
long earliestTime = 0;
long latestTime = 0;
long totalHits = response.getHits().getTotalHits();
if (totalHits > 0) {
Min min = aggregations.get(EARLIEST_TIME);
earliestTime = (long) min.getValue();
Max max = aggregations.get(LATEST_TIME);
latestTime = (long) max.getValue();
}
return new ScrolledDataSummary(earliestTime, latestTime, totalHits);
}
private DataSummary newAggregatedDataSummary() throws IOException {
// TODO: once RollupSearchAction is changed from indices:admin* to indices:data/read/* this branch is not needed
ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder =
dataExtractorFactory instanceof RollupDataExtractorFactory ? rollupRangeSearchRequest() : rangeSearchRequest();
SearchResponse response = executeSearchRequest(searchRequestBuilder);
LOGGER.debug("[{}] Aggregating Data summary response was obtained", context.jobId);
ExtractorUtils.checkSearchWasSuccessful(context.jobId, response);
Aggregations aggregations = response.getAggregations();
Min min = aggregations.get(EARLIEST_TIME);
Max max = aggregations.get(LATEST_TIME);
return new AggregatedDataSummary(min.getValue(), max.getValue(), context.histogramInterval);
}
private SearchSourceBuilder rangeSearchBuilder() {
return new SearchSourceBuilder()
.size(0)
.query(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, currentStart, context.end))
.aggregation(AggregationBuilders.min(EARLIEST_TIME).field(context.timeField))
.aggregation(AggregationBuilders.max(LATEST_TIME).field(context.timeField));
}
private SearchRequestBuilder rangeSearchRequest() {
return new SearchRequestBuilder(client, SearchAction.INSTANCE)
.setIndices(context.indices)
.setSource(rangeSearchBuilder());
}
private RollupSearchAction.RequestBuilder rollupRangeSearchRequest() {
SearchRequest searchRequest = new SearchRequest().indices(context.indices).source(rangeSearchBuilder());
return new RollupSearchAction.RequestBuilder(client, searchRequest);
}
}
private class ScrolledDataSummary implements DataSummary {
private long earliestTime; private long earliestTime;
private long latestTime; private long latestTime;
private long totalHits; private long totalHits;
private DataSummary(long earliestTime, long latestTime, long totalHits) { private ScrolledDataSummary(long earliestTime, long latestTime, long totalHits) {
this.earliestTime = earliestTime; this.earliestTime = earliestTime;
this.latestTime = latestTime; this.latestTime = latestTime;
this.totalHits = totalHits; this.totalHits = totalHits;
} }
private long getDataTimeSpread() { @Override
public long earliestTime() {
return earliestTime;
}
@Override
public long getDataTimeSpread() {
return latestTime - earliestTime; return latestTime - earliestTime;
} }
@ -206,7 +276,8 @@ public class ChunkedDataExtractor implements DataExtractor {
* However, assuming this as the chunk span may often lead to half-filled pages or empty searches. * However, assuming this as the chunk span may often lead to half-filled pages or empty searches.
* It is beneficial to take a multiple of that. Based on benchmarking, we set this to 10x. * It is beneficial to take a multiple of that. Based on benchmarking, we set this to 10x.
*/ */
private long estimateChunk() { @Override
public long estimateChunk() {
long dataTimeSpread = getDataTimeSpread(); long dataTimeSpread = getDataTimeSpread();
if (totalHits <= 0 || dataTimeSpread <= 0) { if (totalHits <= 0 || dataTimeSpread <= 0) {
return context.end - currentEnd; return context.end - currentEnd;
@ -214,9 +285,46 @@ public class ChunkedDataExtractor implements DataExtractor {
long estimatedChunk = 10 * (context.scrollSize * getDataTimeSpread()) / totalHits; long estimatedChunk = 10 * (context.scrollSize * getDataTimeSpread()) / totalHits;
return Math.max(estimatedChunk, MIN_CHUNK_SPAN); return Math.max(estimatedChunk, MIN_CHUNK_SPAN);
} }
@Override
public boolean hasData() {
return totalHits > 0;
}
} }
ChunkedDataExtractorContext getContext() { private class AggregatedDataSummary implements DataSummary {
return context;
private final double earliestTime;
private final double latestTime;
private final long histogramIntervalMillis;
private AggregatedDataSummary(double earliestTime, double latestTime, long histogramInterval) {
this.earliestTime = earliestTime;
this.latestTime = latestTime;
this.histogramIntervalMillis = histogramInterval;
}
/**
* This heuristic is a direct copy of the manual chunking config auto-creation done in {@link DatafeedConfig.Builder}
*/
@Override
public long estimateChunk() {
return DatafeedConfig.Builder.DEFAULT_AGGREGATION_CHUNKING_BUCKETS * histogramIntervalMillis;
}
@Override
public boolean hasData() {
return (Double.isInfinite(earliestTime) || Double.isInfinite(latestTime)) == false;
}
@Override
public long earliestTime() {
return (long)earliestTime;
}
@Override
public long getDataTimeSpread() {
return (long)latestTime - (long)earliestTime;
}
} }
} }

View File

@ -31,10 +31,13 @@ class ChunkedDataExtractorContext {
final TimeValue chunkSpan; final TimeValue chunkSpan;
final TimeAligner timeAligner; final TimeAligner timeAligner;
final Map<String, String> headers; final Map<String, String> headers;
final boolean hasAggregations;
final Long histogramInterval;
ChunkedDataExtractorContext(String jobId, String timeField, List<String> indices, List<String> types, ChunkedDataExtractorContext(String jobId, String timeField, List<String> indices, List<String> types,
QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan, QueryBuilder query, int scrollSize, long start, long end, @Nullable TimeValue chunkSpan,
TimeAligner timeAligner, Map<String, String> headers) { TimeAligner timeAligner, Map<String, String> headers, boolean hasAggregations,
@Nullable Long histogramInterval) {
this.jobId = Objects.requireNonNull(jobId); this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField); this.timeField = Objects.requireNonNull(timeField);
this.indices = indices.toArray(new String[indices.size()]); this.indices = indices.toArray(new String[indices.size()]);
@ -46,5 +49,7 @@ class ChunkedDataExtractorContext {
this.chunkSpan = chunkSpan; this.chunkSpan = chunkSpan;
this.timeAligner = Objects.requireNonNull(timeAligner); this.timeAligner = Objects.requireNonNull(timeAligner);
this.headers = headers; this.headers = headers;
this.hasAggregations = hasAggregations;
this.histogramInterval = histogramInterval;
} }
} }

View File

@ -42,7 +42,10 @@ public class ChunkedDataExtractorFactory implements DataExtractorFactory {
timeAligner.alignToFloor(end), timeAligner.alignToFloor(end),
datafeedConfig.getChunkingConfig().getTimeSpan(), datafeedConfig.getChunkingConfig().getTimeSpan(),
timeAligner, timeAligner,
datafeedConfig.getHeaders()); datafeedConfig.getHeaders(),
datafeedConfig.hasAggregations(),
datafeedConfig.hasAggregations() ? datafeedConfig.getHistogramIntervalMillis() : null
);
return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext); return new ChunkedDataExtractor(client, dataExtractorFactory, dataExtractorContext);
} }

View File

@ -5,15 +5,29 @@
*/ */
package org.elasticsearch.xpack.ml; package org.elasticsearch.xpack.ml;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.LicenseService; import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.ssl.SSLService; import org.elasticsearch.xpack.core.ssl.SSLService;
import org.elasticsearch.xpack.monitoring.Monitoring; import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.xpack.security.Security; import org.elasticsearch.xpack.security.Security;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
public class LocalStateMachineLearning extends LocalStateCompositeXPackPlugin { public class LocalStateMachineLearning extends LocalStateCompositeXPackPlugin {
@ -50,6 +64,38 @@ public class LocalStateMachineLearning extends LocalStateCompositeXPackPlugin {
@Override @Override
protected XPackLicenseState getLicenseState() { return thisVar.getLicenseState(); } protected XPackLicenseState getLicenseState() { return thisVar.getLicenseState(); }
}); });
} plugins.add(new MockedRollupPlugin());
} }
/**
* This is only required as we now have to have the GetRollupIndexCapsAction as a valid action in our node.
* The MachineLearningLicenseTests attempt to create a datafeed referencing this LocalStateMachineLearning object.
* Consequently, we need to be able to take this rollup action (response does not matter)
* as the datafeed extractor now depends on it.
*/
public static class MockedRollupPlugin extends Plugin implements ActionPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Collections.singletonList(
new ActionHandler<>(GetRollupIndexCapsAction.INSTANCE, MockedRollupIndexCapsTransport.class)
);
}
public static class MockedRollupIndexCapsTransport
extends TransportAction<GetRollupIndexCapsAction.Request, GetRollupIndexCapsAction.Response> {
@Inject
public MockedRollupIndexCapsTransport(Settings settings, TransportService transportService) {
super(settings, GetRollupIndexCapsAction.NAME, new ActionFilters(new HashSet<>()), transportService.getTaskManager());
}
@Override
protected void doExecute(Task task,
GetRollupIndexCapsAction.Request request,
ActionListener<GetRollupIndexCapsAction.Response> listener) {
listener.onResponse(new GetRollupIndexCapsAction.Response());
}
}
}
}

View File

@ -14,23 +14,39 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig; import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction;
import org.elasticsearch.xpack.core.rollup.action.RollableIndexCaps;
import org.elasticsearch.xpack.core.rollup.action.RollupJobCaps;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests; import org.elasticsearch.xpack.ml.datafeed.DatafeedManagerTests;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.junit.Before; import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
@ -41,6 +57,7 @@ import static org.mockito.Mockito.when;
public class DataExtractorFactoryTests extends ESTestCase { public class DataExtractorFactoryTests extends ESTestCase {
private FieldCapabilitiesResponse fieldsCapabilities; private FieldCapabilitiesResponse fieldsCapabilities;
private GetRollupIndexCapsAction.Response getRollupIndexResponse;
private Client client; private Client client;
@ -54,12 +71,22 @@ public class DataExtractorFactoryTests extends ESTestCase {
givenAggregatableField("time", "date"); givenAggregatableField("time", "date");
givenAggregatableField("field", "keyword"); givenAggregatableField("field", "keyword");
getRollupIndexResponse = mock(GetRollupIndexCapsAction.Response.class);
when(getRollupIndexResponse.getJobs()).thenReturn(new HashMap<>());
doAnswer(invocationMock -> { doAnswer(invocationMock -> {
@SuppressWarnings("raw_types") @SuppressWarnings("raw_types")
ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; ActionListener listener = (ActionListener) invocationMock.getArguments()[2];
listener.onResponse(fieldsCapabilities); listener.onResponse(fieldsCapabilities);
return null; return null;
}).when(client).execute(same(FieldCapabilitiesAction.INSTANCE), any(), any()); }).when(client).execute(same(FieldCapabilitiesAction.INSTANCE), any(), any());
doAnswer(invocationMock -> {
@SuppressWarnings("raw_types")
ActionListener listener = (ActionListener) invocationMock.getArguments()[2];
listener.onResponse(getRollupIndexResponse);
return null;
}).when(client).execute(same(GetRollupIndexCapsAction.INSTANCE), any(), any());
} }
public void testCreateDataExtractorFactoryGivenDefaultScroll() { public void testCreateDataExtractorFactoryGivenDefaultScroll() {
@ -165,6 +192,162 @@ public class DataExtractorFactoryTests extends ESTestCase {
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener); DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
} }
public void testCreateDataExtractorFactoryGivenRollupAndValidAggregation() {
givenAggregatableRollup("myField", "max", 5, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(RollupDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenRollupAndValidAggregationAndAutoChunk() {
givenAggregatableRollup("myField", "max", 5, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newAuto());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> assertThat(dataExtractorFactory, instanceOf(ChunkedDataExtractorFactory.class)),
e -> fail()
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenRollupButNoAggregations() {
givenAggregatableRollup("myField", "max", 5);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
e -> {
assertThat(e.getMessage(), equalTo("Aggregations are required when using Rollup indices"));
assertThat(e, instanceOf(IllegalArgumentException.class));
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenRollupWithBadInterval() {
givenAggregatableRollup("myField", "max", 7, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
e -> {
assertThat(e.getMessage(),
containsString("Rollup capabilities do not have a [date_histogram] aggregation with an interval " +
"that is a multiple of the datafeed's interval."));
assertThat(e, instanceOf(IllegalArgumentException.class));
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenRollupMissingTerms() {
givenAggregatableRollup("myField", "max", 5);
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("myField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
e -> {
assertThat(e.getMessage(),
containsString("Rollup capabilities do not support all the datafeed aggregations at the desired interval."));
assertThat(e, instanceOf(IllegalArgumentException.class));
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
public void testCreateDataExtractorFactoryGivenRollupMissingMetric() {
givenAggregatableRollup("myField", "max", 5, "termField");
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeField("time");
Job.Builder jobBuilder = DatafeedManagerTests.createDatafeedJob();
jobBuilder.setDataDescription(dataDescription);
DatafeedConfig.Builder datafeedConfig = DatafeedManagerTests.createDatafeedConfig("datafeed1", "foo");
datafeedConfig.setChunkingConfig(ChunkingConfig.newOff());
MaxAggregationBuilder maxTime = AggregationBuilders.max("time").field("time");
MaxAggregationBuilder myField = AggregationBuilders.max("myField").field("otherField");
TermsAggregationBuilder myTerm = AggregationBuilders.terms("termAgg").field("termField").subAggregation(myField);
datafeedConfig.setAggregations(AggregatorFactories.builder().addAggregator(
AggregationBuilders.dateHistogram("time").interval(600_000).subAggregation(maxTime).subAggregation(myTerm).field("time")));
ActionListener<DataExtractorFactory> listener = ActionListener.wrap(
dataExtractorFactory -> fail(),
e -> {
assertThat(e.getMessage(),
containsString("Rollup capabilities do not support all the datafeed aggregations at the desired interval."));
assertThat(e, instanceOf(IllegalArgumentException.class));
}
);
DataExtractorFactory.create(client, datafeedConfig.build(), jobBuilder.build(new Date()), listener);
}
private void givenAggregatableRollup(String field, String type, int minuteInterval, String... groupByTerms) {
List<MetricConfig> metricConfigs = Arrays.asList(new MetricConfig(field, Collections.singletonList(type)),
new MetricConfig("time", Arrays.asList("min", "max")));
TermsGroupConfig termsGroupConfig = null;
if (groupByTerms.length > 0) {
termsGroupConfig = new TermsGroupConfig(groupByTerms);
}
RollupJobConfig rollupJobConfig = new RollupJobConfig("rollupJob1",
"myIndexes*",
"myIndex_rollup",
"*/30 * * * * ?",
300,
new GroupConfig(
new DateHistogramGroupConfig("time", DateHistogramInterval.minutes(minuteInterval)), null, termsGroupConfig),
metricConfigs,
null);
RollupJobCaps rollupJobCaps = new RollupJobCaps(rollupJobConfig);
RollableIndexCaps rollableIndexCaps = new RollableIndexCaps("myIndex_rollup", Collections.singletonList(rollupJobCaps));
Map<String, RollableIndexCaps> jobs = new HashMap<>(1);
jobs.put("rollupJob1", rollableIndexCaps);
when(getRollupIndexResponse.getJobs()).thenReturn(jobs);
}
private void givenAggregatableField(String field, String type) { private void givenAggregatableField(String field, String type) {
FieldCapabilities fieldCaps = mock(FieldCapabilities.class); FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isSearchable()).thenReturn(true); when(fieldCaps.isSearchable()).thenReturn(true);

View File

@ -46,7 +46,7 @@ import static org.mockito.Mockito.when;
public class AggregationDataExtractorTests extends ESTestCase { public class AggregationDataExtractorTests extends ESTestCase {
private Client client; private Client testClient;
private List<SearchRequestBuilder> capturedSearchRequests; private List<SearchRequestBuilder> capturedSearchRequests;
private String jobId; private String jobId;
private String timeField; private String timeField;
@ -61,7 +61,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
private SearchResponse nextResponse; private SearchResponse nextResponse;
TestDataExtractor(long start, long end) { TestDataExtractor(long start, long end) {
super(client, createContext(start, end)); super(testClient, createContext(start, end));
} }
@Override @Override
@ -77,7 +77,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
@Before @Before
public void setUpTests() { public void setUpTests() {
client = mock(Client.class); testClient = mock(Client.class);
capturedSearchRequests = new ArrayList<>(); capturedSearchRequests = new ArrayList<>();
jobId = "test-job"; jobId = "test-job";
timeField = "time"; timeField = "time";

View File

@ -5,7 +5,8 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.chunked; package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -44,7 +45,7 @@ import static org.mockito.Mockito.when;
public class ChunkedDataExtractorTests extends ESTestCase { public class ChunkedDataExtractorTests extends ESTestCase {
private Client client; private Client client;
private List<SearchRequestBuilder> capturedSearchRequests; private List<SearchRequest> capturedSearchRequests;
private String jobId; private String jobId;
private String timeField; private String timeField;
private List<String> types; private List<String> types;
@ -62,9 +63,13 @@ public class ChunkedDataExtractorTests extends ESTestCase {
super(client, dataExtractorFactory, createContext(start, end)); super(client, dataExtractorFactory, createContext(start, end));
} }
TestDataExtractor(long start, long end, boolean hasAggregations, Long histogramInterval) {
super(client, dataExtractorFactory, createContext(start, end, hasAggregations, histogramInterval));
}
@Override @Override
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { protected SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> searchRequestBuilder) {
capturedSearchRequests.add(searchRequestBuilder); capturedSearchRequests.add(searchRequestBuilder.request());
return nextResponse; return nextResponse;
} }
@ -136,6 +141,89 @@ public class ChunkedDataExtractorTests extends ESTestCase {
assertThat(searchRequest, not(containsString("\"sort\""))); assertThat(searchRequest, not(containsString("\"sort\"")));
} }
public void testExtractionGivenSpecifiedChunkAndAggs() throws IOException {
chunkSpan = TimeValue.timeValueSeconds(1);
TestDataExtractor extractor = new TestDataExtractor(1000L, 2300L, true, 1000L);
extractor.setNextResponse(createSearchResponse(0L, 1000L, 2200L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
InputStream inputStream3 = mock(InputStream.class);
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1, inputStream2);
when(dataExtractorFactory.newExtractor(1000L, 2000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream3);
when(dataExtractorFactory.newExtractor(2000L, 2300L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream3, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
verify(dataExtractorFactory).newExtractor(1000L, 2000L);
verify(dataExtractorFactory).newExtractor(2000L, 2300L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"size\":0"));
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
"{\"range\":{\"time\":{\"from\":1000,\"to\":2300,\"include_lower\":true,\"include_upper\":false," +
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
assertThat(searchRequest, containsString("\"aggregations\":{\"earliest_time\":{\"min\":{\"field\":\"time\"}}," +
"\"latest_time\":{\"max\":{\"field\":\"time\"}}}}"));
assertThat(searchRequest, not(containsString("\"sort\"")));
}
public void testExtractionGivenAutoChunkAndAggs() throws IOException {
chunkSpan = null;
TestDataExtractor extractor = new TestDataExtractor(100_000L, 450_000L, true, 200L);
extractor.setNextResponse(createSearchResponse(0L, 100_000L, 400_000L));
InputStream inputStream1 = mock(InputStream.class);
InputStream inputStream2 = mock(InputStream.class);
// 200 * 1_000 == 200_000
DataExtractor subExtactor1 = new StubSubExtractor(inputStream1);
when(dataExtractorFactory.newExtractor(100_000L, 300_000L)).thenReturn(subExtactor1);
DataExtractor subExtactor2 = new StubSubExtractor(inputStream2);
when(dataExtractorFactory.newExtractor(300_000L, 450_000L)).thenReturn(subExtactor2);
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream1, extractor.next().get());
assertThat(extractor.hasNext(), is(true));
assertEquals(inputStream2, extractor.next().get());
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
verify(dataExtractorFactory).newExtractor(100_000L, 300_000L);
verify(dataExtractorFactory).newExtractor(300_000L, 450_000L);
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndAggsAndNoData() throws IOException {
chunkSpan = null;
TestDataExtractor extractor = new TestDataExtractor(100L, 500L, true, 200L);
extractor.setNextResponse(createNullSearchResponse());
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
Mockito.verifyNoMoreInteractions(dataExtractorFactory);
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenAutoChunkAndScrollSize1000() throws IOException { public void testExtractionGivenAutoChunkAndScrollSize1000() throws IOException {
chunkSpan = null; chunkSpan = null;
scrollSize = 1000; scrollSize = 1000;
@ -430,6 +518,27 @@ public class ChunkedDataExtractorTests extends ESTestCase {
return searchResponse; return searchResponse;
} }
private SearchResponse createNullSearchResponse() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
SearchHit[] hits = new SearchHit[0];
SearchHits searchHits = new SearchHits(hits, 0, 1);
when(searchResponse.getHits()).thenReturn(searchHits);
List<Aggregation> aggs = new ArrayList<>();
Min min = mock(Min.class);
when(min.getValue()).thenReturn(Double.POSITIVE_INFINITY);
when(min.getName()).thenReturn("earliest_time");
aggs.add(min);
Max max = mock(Max.class);
when(max.getValue()).thenReturn(Double.POSITIVE_INFINITY);
when(max.getName()).thenReturn("latest_time");
aggs.add(max);
Aggregations aggregations = new Aggregations(aggs) {};
when(searchResponse.getAggregations()).thenReturn(aggregations);
return searchResponse;
}
private SearchResponse createErrorResponse() { private SearchResponse createErrorResponse() {
SearchResponse searchResponse = mock(SearchResponse.class); SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
@ -445,8 +554,12 @@ public class ChunkedDataExtractorTests extends ESTestCase {
} }
private ChunkedDataExtractorContext createContext(long start, long end) { private ChunkedDataExtractorContext createContext(long start, long end) {
return createContext(start, end, false, null);
}
private ChunkedDataExtractorContext createContext(long start, long end, boolean hasAggregations, Long histogramInterval) {
return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan, return new ChunkedDataExtractorContext(jobId, timeField, indices, types, query, scrollSize, start, end, chunkSpan,
ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap()); ChunkedDataExtractorFactory.newIdentityTimeAligner(), Collections.emptyMap(), hasAggregations, histogramInterval);
} }
private static class StubSubExtractor implements DataExtractor { private static class StubSubExtractor implements DataExtractor {