Switch remaining ml tests to new style Requests (#33107)

In #29623 we added `Request` object flavored requests to the low level
REST client and in #30315 we deprecated the old `performRequest`s. This
changes all calls in the `x-pack/plugin/ml/qa/native-multi-node-tests`,
`x-pack/plugin/ml/qa/single-node-tests` projects to use the new
versions.
This commit is contained in:
Nik Everett 2018-08-24 16:36:40 -04:00 committed by GitHub
parent a9a66a09dc
commit 8bee6b3a92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 580 additions and 586 deletions

View File

@ -5,9 +5,9 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@ -22,10 +22,7 @@ import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
import org.junit.After;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
@ -36,6 +33,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class DatafeedJobsRestIT extends ESRestTestCase {
@ -57,26 +55,24 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
private void setupDataAccessRole(String index) throws IOException {
String json = "{"
Request request = new Request("PUT", "/_xpack/security/role/test_data_access");
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [\"" + index + "\"], \"privileges\": [\"read\"] }"
+ " ]"
+ "}";
client().performRequest("put", "_xpack/security/role/test_data_access", Collections.emptyMap(),
new StringEntity(json, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(request);
}
private void setupUser(String user, List<String> roles) throws IOException {
String password = new String(SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING.getChars());
String json = "{"
Request request = new Request("PUT", "/_xpack/security/user/" + user);
request.setJsonEntity("{"
+ " \"password\" : \"" + password + "\","
+ " \"roles\" : [ " + roles.stream().map(unquoted -> "\"" + unquoted + "\"").collect(Collectors.joining(", ")) + " ]"
+ "}";
client().performRequest("put", "_xpack/security/user/" + user, Collections.emptyMap(),
new StringEntity(json, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(request);
}
@Before
@ -92,7 +88,10 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
private void addAirlineData() throws IOException {
String mappings = "{"
StringBuilder bulk = new StringBuilder();
Request createEmptyAirlineDataRequest = new Request("PUT", "/airline-data-empty");
createEmptyAirlineDataRequest.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
@ -102,12 +101,12 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-empty", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(createEmptyAirlineDataRequest);
// Create index with source = enabled, doc_values = enabled, stored = false + multi-field
mappings = "{"
Request createAirlineDataRequest = new Request("PUT", "/airline-data");
createAirlineDataRequest.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
@ -123,18 +122,17 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(createAirlineDataRequest);
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"airline-data\", \"_type\": \"response\", \"_id\": 1}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data\", \"_type\": \"response\", \"_id\": 2}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}\n");
// Create index with source = enabled, doc_values = disabled (except time), stored = false
mappings = "{"
Request createAirlineDataDisabledDocValues = new Request("PUT", "/airline-data-disabled-doc-values");
createAirlineDataDisabledDocValues.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
@ -144,19 +142,17 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-doc-values", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(createAirlineDataDisabledDocValues);
client().performRequest("put", "airline-data-disabled-doc-values/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-doc-values/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"airline-data-disabled-doc-values\", \"_type\": \"response\", \"_id\": 1}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-disabled-doc-values\", \"_type\": \"response\", \"_id\": 2}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}\n");
// Create index with source = disabled, doc_values = enabled (except time), stored = true
mappings = "{"
Request createAirlineDataDisabledSource = new Request("PUT", "/airline-data-disabled-source");
createAirlineDataDisabledSource.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"_source\":{\"enabled\":false},"
@ -167,19 +163,16 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-disabled-source", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest("put", "airline-data-disabled-source/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-disabled-source/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}",
ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"airline-data-disabled-source\", \"_type\": \"response\", \"_id\": 1}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-disabled-source\", \"_type\": \"response\", \"_id\": 2}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}\n");
// Create index with nested documents
mappings = "{"
Request createAirlineDataNested = new Request("PUT", "/nested-data");
createAirlineDataNested.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
@ -187,18 +180,17 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "nested-data", Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(createAirlineDataNested);
client().performRequest("put", "nested-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "nested-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}",
ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"nested-data\", \"_type\": \"response\", \"_id\": 1}}\n");
bulk.append("{\"time\":\"2016-06-01T00:00:00Z\", \"responsetime\":{\"millis\":135.22}}\n");
bulk.append("{\"index\": {\"_index\": \"nested-data\", \"_type\": \"response\", \"_id\": 2}}\n");
bulk.append("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}\n");
// Create index with multiple docs per time interval for aggregation testing
mappings = "{"
Request createAirlineDataAggs = new Request("PUT", "/airline-data-aggs");
createAirlineDataAggs.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
@ -208,43 +200,33 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-aggs", Collections.emptyMap(),
new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");
client().performRequest(createAirlineDataAggs);
client().performRequest("put", "airline-data-aggs/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/3", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/4", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/5", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/6", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/7", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}",
ContentType.APPLICATION_JSON));
client().performRequest("put", "airline-data-aggs/response/8", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}",
ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 1}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 2}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 3}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 4}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 5}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 6}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 7}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}\n");
bulk.append("{\"index\": {\"_index\": \"airline-data-aggs\", \"_type\": \"response\", \"_id\": 8}}\n");
bulk.append("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}\n");
// Ensure all data is searchable
client().performRequest("post", "_refresh");
bulkIndex(bulk.toString());
}
private void addNetworkData(String index) throws IOException {
// Create index with source = enabled, doc_values = enabled, stored = false + multi-field
String mappings = "{"
Request createIndexRequest = new Request("PUT", index);
createIndexRequest.setJsonEntity("{"
+ " \"mappings\": {"
+ " \"doc\": {"
+ " \"properties\": {"
@ -260,27 +242,25 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", index, Collections.emptyMap(), new StringEntity(mappings, ContentType.APPLICATION_JSON));
+ "}");;
client().performRequest(createIndexRequest);
StringBuilder bulk = new StringBuilder();
String docTemplate = "{\"timestamp\":%d,\"host\":\"%s\",\"network_bytes_out\":%d}";
Date date = new Date(1464739200735L);
for (int i=0; i<120; i++) {
for (int i = 0; i < 120; i++) {
long byteCount = randomNonNegativeLong();
String jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostA", byteCount);
client().performRequest("post", index + "/doc", Collections.emptyMap(),
new StringEntity(jsonDoc, ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"").append(index).append("\", \"_type\": \"doc\"}}\n");
bulk.append(String.format(Locale.ROOT, docTemplate, date.getTime(), "hostA", byteCount)).append('\n');
byteCount = randomNonNegativeLong();
jsonDoc = String.format(Locale.ROOT, docTemplate, date.getTime(), "hostB", byteCount);
client().performRequest("post", index + "/doc", Collections.emptyMap(),
new StringEntity(jsonDoc, ContentType.APPLICATION_JSON));
bulk.append("{\"index\": {\"_index\": \"").append(index).append("\", \"_type\": \"doc\"}}\n");
bulk.append(String.format(Locale.ROOT, docTemplate, date.getTime(), "hostB", byteCount)).append('\n');
date = new Date(date.getTime() + 10_000);
}
// Ensure all data is searchable
client().performRequest("post", "_refresh");
bulkIndex(bulk.toString());
}
public void testLookbackOnlyWithMixedTypes() throws Exception {
@ -314,11 +294,21 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testLookbackOnlyWithNestedFields() throws Exception {
String jobId = "test-lookback-only-with-nested-fields";
String job = "{\"description\":\"Nested job\", \"analysis_config\" : {\"bucket_span\":\"1h\",\"detectors\" :"
+ "[{\"function\":\"mean\",\"field_name\":\"responsetime.millis\"}]}, \"data_description\" : {\"time_field\":\"time\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Nested job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime.millis\"\n"
+ " }\n"
+ " ]\n"
+ " },"
+ " \"data_description\": {\"time_field\": \"time\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").build();
@ -326,8 +316,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(
new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
@ -340,14 +331,23 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testInsufficientSearchPrivilegesOnPut() throws Exception {
String jobId = "privs-put-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\","
+ "\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n "
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\":\"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\" : {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
// This should be disallowed, because even though the ml_admin user has permission to
@ -365,14 +365,23 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testInsufficientSearchPrivilegesOnPreview() throws Exception {
String jobId = "privs-preview-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\","
+ "\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\" : {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").build();
@ -380,10 +389,11 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
// This should be disallowed, because ml_admin is trying to preview a datafeed created by
// by another user (x_pack_rest_user in this case) that will reveal the content of an index they
// don't have permission to search directly
ResponseException e = expectThrows(ResponseException.class, () ->
client().performRequest("get",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_preview",
new BasicHeader("Authorization", BASIC_AUTH_VALUE_ML_ADMIN)));
Request getFeed = new Request("GET", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_preview");
RequestOptions.Builder options = getFeed.getOptions().toBuilder();
options.addHeader("Authorization", BASIC_AUTH_VALUE_ML_ADMIN);
getFeed.setOptions(options);
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(getFeed));
assertThat(e.getMessage(),
containsString("[indices:data/read/field_caps] is unauthorized for user [ml_admin]"));
@ -391,13 +401,23 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
String jobId = "aggs-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
@ -410,8 +430,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
@ -419,13 +440,23 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exception {
String jobId = "aggs-date-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":\"3600s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"description\": \"Aggs job\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"3600s\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"airline\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
@ -438,8 +469,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
@ -447,13 +479,22 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testLookbackUsingDerivativeAggWithLargerHistogramBucketThanDataRate() throws Exception {
String jobId = "derivative-agg-network-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"300s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"bytes-delta\",\"by_field_name\":\"hostname\"}]},"
+ "\"data_description\" : {\"time_field\":\"timestamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"300s\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"bytes-delta\",\n"
+ " \"by_field_name\": \"hostname\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"timestamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations =
@ -471,8 +512,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":40"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":40"));
assertThat(jobStatsResponseAsString, containsString("\"out_of_order_timestamp_count\":0"));
@ -483,13 +525,22 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
public void testLookbackUsingDerivativeAggWithSmallerHistogramBucketThanDataRate() throws Exception {
String jobId = "derivative-agg-network-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"300s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"bytes-delta\",\"by_field_name\":\"hostname\"}]},"
+ "\"data_description\" : {\"time_field\":\"timestamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"300s\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"bytes-delta\",\n"
+ " \"by_field_name\": \"hostname\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"timestamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations =
@ -507,21 +558,31 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":240"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":240"));
}
public void testLookbackWithoutPermissions() throws Exception {
String jobId = "permission-test-network-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"300s\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"bytes-delta\",\"by_field_name\":\"hostname\"}]},"
+ "\"data_description\" : {\"time_field\":\"timestamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"300s\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"bytes-delta\",\n"
+ " \"by_field_name\": \"hostname\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"timestamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations =
@ -545,29 +606,39 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId, BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
// We expect that no data made it through to the job
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":0"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":0"));
// There should be a notification saying that there was a problem extracting data
client().performRequest("post", "_refresh");
Response notificationsResponse = client().performRequest("get", AuditorField.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId);
String notificationsResponseAsString = responseEntityToString(notificationsResponse);
client().performRequest(new Request("POST", "/_refresh"));
Response notificationsResponse = client().performRequest(
new Request("GET", AuditorField.NOTIFICATIONS_INDEX + "/_search?q=job_id:" + jobId));
String notificationsResponseAsString = EntityUtils.toString(notificationsResponse.getEntity());
assertThat(notificationsResponseAsString, containsString("\"message\":\"Datafeed is encountering errors extracting data: " +
"action [indices:data/read/search] is unauthorized for user [ml_admin_plus_data]\""));
}
public void testLookbackWithPipelineBucketAgg() throws Exception {
String jobId = "pipeline-bucket-agg-job";
String job = "{\"analysis_config\" :{\"bucket_span\":\"1h\","
+ "\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"percentile95_airlines_count\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("{\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"summary_count_field_name\": \"doc_count\",\n"
+ " \"detectors\": [\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"percentile95_airlines_count\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\"time_field\": \"time stamp\"}\n"
+ "}");
client().performRequest(createJobRequest);
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"buckets\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"15m\"},"
@ -582,8 +653,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
assertThat(jobStatsResponseAsString, containsString("\"input_field_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":2"));
@ -599,15 +671,15 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
openJob(client(), jobId);
Response response = client().performRequest("post",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"started\":true}"));
Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
startRequest.addParameter("start", "2016-06-01T00:00:00Z");
Response response = client().performRequest(startRequest);
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}"));
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String responseAsString = responseEntityToString(getJobResponse);
Response getJobResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String responseAsString = EntityUtils.toString(getJobResponse.getEntity());
assertThat(responseAsString, containsString("\"processed_record_count\":2"));
assertThat(responseAsString, containsString("\"state\":\"opened\""));
} catch (Exception e1) {
@ -619,9 +691,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
// test a model snapshot is present
assertBusy(() -> {
try {
Response getJobResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/model_snapshots");
String responseAsString = responseEntityToString(getJobResponse);
Response getJobResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/model_snapshots"));
String responseAsString = EntityUtils.toString(getJobResponse.getEntity());
assertThat(responseAsString, containsString("\"count\":1"));
} catch (Exception e1) {
throw new RuntimeException(e1);
@ -629,25 +701,25 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
});
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
() -> client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId)));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete job [" + jobId + "] because datafeed [" + datafeedId
+ "] refers to it"));
assertThat(EntityUtils.toString(response.getEntity()),
containsString("Cannot delete job [" + jobId + "] because datafeed [" + datafeedId + "] refers to it"));
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");
response = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop"));
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"stopped\":true}"));
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"stopped\":true}"));
client().performRequest("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close");
client().performRequest(new Request("POST", "/_xpack/ml/anomaly_detectors/" + jobId + "/_close"));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
response = client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId));
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"acknowledged\":true}"));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
response = client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"acknowledged\":true}"));
}
public void testForceDeleteWhileDatafeedIsRunning() throws Exception {
@ -657,25 +729,26 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
openJob(client(), jobId);
Response response = client().performRequest("post",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z");
Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
startRequest.addParameter("start", "2016-06-01T00:00:00Z");
Response response = client().performRequest(startRequest);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"started\":true}"));
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"started\":true}"));
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("delete", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId));
() -> client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId)));
response = e.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(409));
assertThat(responseEntityToString(response), containsString("Cannot delete datafeed [" + datafeedId
+ "] while its status is started"));
assertThat(EntityUtils.toString(response.getEntity()),
containsString("Cannot delete datafeed [" + datafeedId + "] while its status is started"));
response = client().performRequest("delete",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "?force=true");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(response), equalTo("{\"acknowledged\":true}"));
Request forceDeleteRequest = new Request("DELETE", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
forceDeleteRequest.addParameter("force", "true");
response = client().performRequest(forceDeleteRequest);
assertThat(EntityUtils.toString(response.getEntity()), equalTo("{\"acknowledged\":true}"));
expectThrows(ResponseException.class,
() -> client().performRequest("get", "/_xpack/ml/datafeeds/" + datafeedId));
() -> client().performRequest(new Request("GET", "/_xpack/ml/datafeeds/" + datafeedId)));
}
private class LookbackOnlyTestHelper {
@ -727,9 +800,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
startDatafeedAndWaitUntilStopped(datafeedId);
waitUntilJobIsClosed(jobId);
Response jobStatsResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
String jobStatsResponseAsString = EntityUtils.toString(jobStatsResponse.getEntity());
if (shouldSucceedInput) {
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":2"));
} else {
@ -748,16 +821,20 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
private void startDatafeedAndWaitUntilStopped(String datafeedId, String authHeader) throws Exception {
Response startDatafeedRequest = client().performRequest("post",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start?start=2016-06-01T00:00:00Z&end=2016-06-02T00:00:00Z",
new BasicHeader("Authorization", authHeader));
assertThat(startDatafeedRequest.getStatusLine().getStatusCode(), equalTo(200));
assertThat(responseEntityToString(startDatafeedRequest), equalTo("{\"started\":true}"));
Request request = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
request.addParameter("start", "2016-06-01T00:00:00Z");
request.addParameter("end", "2016-06-02T00:00:00Z");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Authorization", authHeader);
request.setOptions(options);
Response startDatafeedResponse = client().performRequest(request);
assertThat(EntityUtils.toString(startDatafeedResponse.getEntity()), equalTo("{\"started\":true}"));
assertBusy(() -> {
try {
Response datafeedStatsResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stats");
assertThat(responseEntityToString(datafeedStatsResponse), containsString("\"state\":\"stopped\""));
Response datafeedStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stats"));
assertThat(EntityUtils.toString(datafeedStatsResponse.getEntity()),
containsString("\"state\":\"stopped\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -767,9 +844,9 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
private void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> {
try {
Response jobStatsResponse = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertThat(responseEntityToString(jobStatsResponse), containsString("\"state\":\"closed\""));
Response jobStatsResponse = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
assertThat(EntityUtils.toString(jobStatsResponse.getEntity()), containsString("\"state\":\"closed\""));
} catch (Exception e) {
throw new RuntimeException(e);
}
@ -777,27 +854,30 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
private Response createJob(String id, String airlineVariant) throws Exception {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\":\"1h\",\n"
+ " \"detectors\" :[\n"
+ " {\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"" + airlineVariant + "\"}]\n"
+ " },\n" + " \"data_description\" : {\n"
+ " \"format\":\"xcontent\",\n"
+ " \"time_field\":\"time stamp\",\n" + " \"time_format\":\"yyyy-MM-dd'T'HH:mm:ssX\"\n" + " }\n"
+ "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + id,
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + id);
request.setJsonEntity("{\n"
+ " \"description\": \"Analysis of response time by airline\",\n"
+ " \"analysis_config\": {\n"
+ " \"bucket_span\": \"1h\",\n"
+ " \"detectors\" :[\n"
+ " {\n"
+ " \"function\": \"mean\",\n"
+ " \"field_name\": \"responsetime\",\n"
+ " \"by_field_name\": \"" + airlineVariant + "\"\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"data_description\": {\n"
+ " \"format\": \"xcontent\",\n"
+ " \"time_field\": \"time stamp\",\n"
+ " \"time_format\": \"yyyy-MM-dd'T'HH:mm:ssX\"\n"
+ " }\n"
+ "}");
return client().performRequest(request);
}
public static void openJob(RestClient client, String jobId) throws IOException {
Response response = client.performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client.performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open"));
}
@After
@ -850,17 +930,28 @@ public class DatafeedJobsRestIT extends ESRestTestCase {
}
Response build() throws IOException {
String datafeedConfig = "{"
Request request = new Request("PUT", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
request.setJsonEntity("{"
+ "\"job_id\": \"" + jobId + "\",\"indexes\":[\"" + index + "\"],\"types\":[\"" + type + "\"]"
+ (source ? ",\"_source\":true" : "")
+ (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields)
+ (aggregations == null ? "" : ",\"aggs\":" + aggregations)
+ (chunkingTimespan == null ? "" :
",\"chunking_config\":{\"mode\":\"MANUAL\",\"time_span\":\"" + chunkingTimespan + "\"}")
+ "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(),
new StringEntity(datafeedConfig, ContentType.APPLICATION_JSON),
new BasicHeader("Authorization", authHeader));
+ "}");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Authorization", authHeader);
request.setOptions(options);
return client().performRequest(request);
}
}
private void bulkIndex(String bulk) throws IOException {
Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setJsonEntity(bulk);
bulkRequest.addParameter("refresh", "true");
bulkRequest.addParameter("pretty", null);
String bulkResponse = EntityUtils.toString(client().performRequest(bulkRequest).getEntity());
assertThat(bulkResponse, not(containsString("\"errors\": false")));
}
}

View File

@ -5,8 +5,7 @@
*/
package org.elasticsearch.xpack.ml.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
@ -23,15 +22,10 @@ import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFiel
import org.elasticsearch.xpack.test.rest.XPackRestTestHelper;
import org.junit.After;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.hamcrest.Matchers.containsString;
@ -55,15 +49,13 @@ public class MlJobIT extends ESRestTestCase {
public void testPutJob_GivenFarequoteConfig() throws Exception {
Response response = createFarequoteJob("given-farequote-config-job");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
String responseAsString = EntityUtils.toString(response.getEntity());
assertThat(responseAsString, containsString("\"job_id\":\"given-farequote-config-job\""));
}
public void testGetJob_GivenNoSuchJob() throws Exception {
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/non-existing-job/_stats"));
ResponseException e = expectThrows(ResponseException.class, () ->
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/non-existing-job/_stats")));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
assertThat(e.getMessage(), containsString("No known job with id 'non-existing-job'"));
@ -72,11 +64,9 @@ public class MlJobIT extends ESRestTestCase {
public void testGetJob_GivenJobExists() throws Exception {
createFarequoteJob("get-job_given-job-exists-job");
Response response = client().performRequest("get",
MachineLearning.BASE_PATH + "anomaly_detectors/get-job_given-job-exists-job/_stats");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
Response response = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/get-job_given-job-exists-job/_stats"));
String responseAsString = EntityUtils.toString(response.getEntity());
assertThat(responseAsString, containsString("\"count\":1"));
assertThat(responseAsString, containsString("\"job_id\":\"get-job_given-job-exists-job\""));
}
@ -86,20 +76,16 @@ public class MlJobIT extends ESRestTestCase {
createFarequoteJob(jobId);
// Explicit _all
Response response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/_all");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
assertThat(responseAsString, containsString("\"job_id\":\"" + jobId + "\""));
String explictAll = EntityUtils.toString(
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/_all")).getEntity());
assertThat(explictAll, containsString("\"count\":1"));
assertThat(explictAll, containsString("\"job_id\":\"" + jobId + "\""));
// Implicit _all
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
assertThat(responseAsString, containsString("\"job_id\":\"" + jobId + "\""));
String implicitAll = EntityUtils.toString(
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors")).getEntity());
assertThat(implicitAll, containsString("\"count\":1"));
assertThat(implicitAll, containsString("\"job_id\":\"" + jobId + "\""));
}
public void testGetJobs_GivenMultipleJobs() throws Exception {
@ -108,36 +94,37 @@ public class MlJobIT extends ESRestTestCase {
createFarequoteJob("given-multiple-jobs-job-3");
// Explicit _all
Response response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/_all");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-1\""));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-2\""));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
String explicitAll = EntityUtils.toString(
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/_all")).getEntity());
assertThat(explicitAll, containsString("\"count\":3"));
assertThat(explicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-1\""));
assertThat(explicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-2\""));
assertThat(explicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
// Implicit _all
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":3"));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-1\""));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-2\""));
assertThat(responseAsString, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
String implicitAll = EntityUtils.toString(
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors")).getEntity());
assertThat(implicitAll, containsString("\"count\":3"));
assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-1\""));
assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-2\""));
assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-3\""));
}
private Response createFarequoteJob(String jobId) throws IOException {
String job = "{\n" + " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n" + " \"bucket_span\": \"3600s\",\n"
Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
request.setJsonEntity(
"{\n"
+ " \"description\":\"Analysis of response time by airline\",\n"
+ " \"analysis_config\" : {\n"
+ " \"bucket_span\": \"3600s\",\n"
+ " \"detectors\" :[{\"function\":\"metric\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]\n"
+ " },\n" + " \"data_description\" : {\n" + " \"field_delimiter\":\",\",\n" + " " +
"\"time_field\":\"time\",\n"
+ " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n" + " }\n" + "}";
return client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(job, ContentType.APPLICATION_JSON));
+ " },\n" + " \"data_description\" : {\n"
+ " \"field_delimiter\":\",\",\n"
+ " \"time_field\":\"time\",\n"
+ " \"time_format\":\"yyyy-MM-dd HH:mm:ssX\"\n"
+ " }\n"
+ "}");
return client().performRequest(request);
}
public void testCantCreateJobWithSameID() throws Exception {
@ -148,18 +135,14 @@ public class MlJobIT extends ESRestTestCase {
" \"data_description\": {},\n" +
" \"results_index_name\" : \"%s\"}";
String jobConfig = String.format(Locale.ROOT, jobTemplate, "index-1");
String jobId = "cant-create-job-with-same-id-job";
Response response = client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId ,
Collections.emptyMap(),
new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-1"));
client().performRequest(createJob1);
final String jobConfig2 = String.format(Locale.ROOT, jobTemplate, "index-2");
ResponseException e = expectThrows(ResponseException.class,
() ->client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId,
Collections.emptyMap(), new StringEntity(jobConfig2, ContentType.APPLICATION_JSON)));
Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJob2.setJsonEntity(String.format(Locale.ROOT, jobTemplate, "index-2"));
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(400));
assertThat(e.getMessage(), containsString("The job cannot be created with the Id '" + jobId + "'. The Id is already used."));
@ -175,94 +158,78 @@ public class MlJobIT extends ESRestTestCase {
String jobId1 = "create-jobs-with-index-name-option-job-1";
String indexName = "non-default-index";
String jobConfig = String.format(Locale.ROOT, jobTemplate, indexName);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob1 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
createJob1.setJsonEntity(String.format(Locale.ROOT, jobTemplate, indexName));
client().performRequest(createJob1);
String jobId2 = "create-jobs-with-index-name-option-job-2";
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob2 = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
createJob2.setEntity(createJob1.getEntity());
client().performRequest(createJob2);
// With security enabled GET _aliases throws an index_not_found_exception
// if no aliases have been created. In multi-node tests the alias may not
// appear immediately so wait here.
assertBusy(() -> {
try {
Response aliasesResponse = client().performRequest("get", "_aliases");
assertEquals(200, aliasesResponse.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(aliasesResponse);
assertThat(responseAsString,
String aliasesResponse = EntityUtils.toString(client().performRequest(new Request("GET", "/_aliases")).getEntity());
assertThat(aliasesResponse,
containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName("custom-" + indexName) + "\":{\"aliases\":{"));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)
assertThat(aliasesResponse, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId1)
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId1 + "\",\"boost\":1.0}}}}"));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId1) + "\":{}"));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)
assertThat(aliasesResponse, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId1) + "\":{}"));
assertThat(aliasesResponse, containsString("\"" + AnomalyDetectorsIndex.jobResultsAliasedName(jobId2)
+ "\":{\"filter\":{\"term\":{\"job_id\":{\"value\":\"" + jobId2 + "\",\"boost\":1.0}}}}"));
assertThat(responseAsString, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId2) + "\":{}"));
assertThat(aliasesResponse, containsString("\"" + AnomalyDetectorsIndex.resultsWriteAlias(jobId2) + "\":{}"));
} catch (ResponseException e) {
throw new AssertionError(e);
}
});
Response indicesResponse = client().performRequest("get", "_cat/indices");
assertEquals(200, indicesResponse.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(indicesResponse);
String responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(responseAsString,
containsString(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))));
String bucketResult = String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
jobId1, "1234", 1);
String id = String.format(Locale.ROOT, "%s_bucket_%s_%s", jobId1, "1234", 300);
response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/" + id,
Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
assertEquals(201, response.getStatusLine().getStatusCode());
bucketResult = String.format(Locale.ROOT,
Request createResultRequest = new Request("PUT", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/" + id);
createResultRequest.setJsonEntity(String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
jobId1, "1236", 1);
jobId1, "1234", 1));
client().performRequest(createResultRequest);
id = String.format(Locale.ROOT, "%s_bucket_%s_%s", jobId1, "1236", 300);
response = client().performRequest("put", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/" + id,
Collections.emptyMap(), new StringEntity(bucketResult, ContentType.APPLICATION_JSON));
assertEquals(201, response.getStatusLine().getStatusCode());
createResultRequest = new Request("PUT", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/doc/" + id);
createResultRequest.setJsonEntity(String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"result_type\":\"bucket\", \"bucket_span\": \"%s\"}",
jobId1, "1236", 1));
client().performRequest(createResultRequest);
client().performRequest("post", "_refresh");
client().performRequest(new Request("POST", "/_refresh"));
response = client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1 + "/results/buckets");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
responseAsString = EntityUtils.toString(client().performRequest(
new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1 + "/results/buckets")).getEntity());
assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/_search");
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
responseAsString = responseEntityToString(response);
responseAsString = EntityUtils.toString(client().performRequest(
new Request("GET", AnomalyDetectorsIndex.jobResultsAliasedName(jobId1) + "/_search")).getEntity());
assertThat(responseAsString, containsString("\"total\":2"));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1));
// check that indices still exist, but are empty and aliases are gone
response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_aliases")).getEntity());
assertThat(responseAsString, not(containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId1))));
assertThat(responseAsString, containsString(AnomalyDetectorsIndex.jobResultsAliasedName(jobId2))); //job2 still exists
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
responseAsString = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(responseAsString, containsString(AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName));
client().performRequest("post", "_refresh");
client().performRequest(new Request("POST", "/_refresh"));
response = client().performRequest("get", AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName + "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
responseAsString = EntityUtils.toString(client().performRequest(
new Request("GET", AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-" + indexName + "/_count")).getEntity());
assertThat(responseAsString, containsString("\"count\":0"));
}
@ -278,32 +245,27 @@ public class MlJobIT extends ESRestTestCase {
String byFieldName1 = "responsetime";
String jobId2 = "create-job-in-shared-index-updates-mapping-job-2";
String byFieldName2 = "cpu-usage";
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
client().performRequest(createJob1Request);
// Check the index mapping contains the first by_field_name
response = client().performRequest("get", AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX
+ AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, not(containsString(byFieldName2)));
Request getResultsMappingRequest = new Request("GET",
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "/_mapping");
getResultsMappingRequest.addParameter("pretty", null);
String resultsMappingAfterJob1 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
assertThat(resultsMappingAfterJob1, containsString(byFieldName1));
assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2)));
jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
client().performRequest(createJob2Request);
// Check the index mapping now contains both fields
response = client().performRequest("get", AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX
+ AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, containsString(byFieldName2));
String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
assertThat(resultsMappingAfterJob2, containsString(byFieldName1));
assertThat(resultsMappingAfterJob2, containsString(byFieldName2));
}
public void testCreateJobInCustomSharedIndexUpdatesMapping() throws Exception {
@ -318,32 +280,27 @@ public class MlJobIT extends ESRestTestCase {
String byFieldName1 = "responsetime";
String jobId2 = "create-job-in-custom-shared-index-updates-mapping-job-2";
String byFieldName2 = "cpu-usage";
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
client().performRequest(createJob1Request);
// Check the index mapping contains the first by_field_name
response = client().performRequest("get",
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, not(containsString(byFieldName2)));
Request getResultsMappingRequest = new Request("GET",
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index/_mapping");
getResultsMappingRequest.addParameter("pretty", null);
String resultsMappingAfterJob1 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
assertThat(resultsMappingAfterJob1, containsString(byFieldName1));
assertThat(resultsMappingAfterJob1, not(containsString(byFieldName2)));
jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId2, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
client().performRequest(createJob2Request);
// Check the index mapping now contains both fields
response = client().performRequest("get",
AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + "custom-shared-index" + "/_mapping?pretty");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(byFieldName1));
assertThat(responseAsString, containsString(byFieldName2));
String resultsMappingAfterJob2 = EntityUtils.toString(client().performRequest(getResultsMappingRequest).getEntity());
assertThat(resultsMappingAfterJob2, containsString(byFieldName1));
assertThat(resultsMappingAfterJob2, containsString(byFieldName2));
}
public void testCreateJob_WithClashingFieldMappingsFails() throws Exception {
@ -366,17 +323,14 @@ public class MlJobIT extends ESRestTestCase {
byFieldName1 = "response.time";
byFieldName2 = "response";
}
String jobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName1);
Response response = client().performRequest("put", MachineLearning.BASE_PATH
+ "anomaly_detectors/" + jobId1, Collections.emptyMap(), new StringEntity(jobConfig, ContentType.APPLICATION_JSON));
assertEquals(200, response.getStatusLine().getStatusCode());
final String failingJobConfig = String.format(Locale.ROOT, jobTemplate, byFieldName2);
ResponseException e = expectThrows(ResponseException.class,
() -> client().performRequest("put", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2,
Collections.emptyMap(), new StringEntity(failingJobConfig, ContentType.APPLICATION_JSON)));
Request createJob1Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId1);
createJob1Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName1));
client().performRequest(createJob1Request);
Request createJob2Request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId2);
createJob2Request.setJsonEntity(String.format(Locale.ROOT, jobTemplate, byFieldName2));
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(createJob2Request));
assertThat(e.getMessage(),
containsString("This job would cause a mapping clash with existing field [response] - " +
"avoid the clash by assigning a dedicated results index"));
@ -387,35 +341,27 @@ public class MlJobIT extends ESRestTestCase {
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesBeforeDelete, containsString(indexName));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
// check that the index still exists (it's shared by default)
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesAfterDelete, containsString(indexName));
assertBusy(() -> {
try {
Response r = client().performRequest("get", indexName + "/_count");
assertEquals(200, r.getStatusLine().getStatusCode());
String responseString = responseEntityToString(r);
assertThat(responseString, containsString("\"count\":0"));
String count = EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity());
assertThat(count, containsString("\"count\":0"));
} catch (Exception e) {
fail(e.getMessage());
}
});
// check that the job itself is gone
expectThrows(ResponseException.class, () ->
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}
public void testDeleteJobAfterMissingIndex() throws Exception {
@ -424,28 +370,22 @@ public class MlJobIT extends ESRestTestCase {
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesBeforeDelete, containsString(indexName));
// Manually delete the index so that we can test that deletion proceeds
// normally anyway
response = client().performRequest("delete", indexName);
assertEquals(200, response.getStatusLine().getStatusCode());
client().performRequest(new Request("DELETE", indexName));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
// check index was deleted
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, not(containsString(aliasName)));
assertThat(responseAsString, not(containsString(indexName)));
String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesAfterDelete, not(containsString(aliasName)));
assertThat(indicesAfterDelete, not(containsString(indexName)));
expectThrows(ResponseException.class, () ->
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}
public void testDeleteJobAfterMissingAliases() throws Exception {
@ -460,11 +400,9 @@ public class MlJobIT extends ESRestTestCase {
// appear immediately so wait here.
assertBusy(() -> {
try {
Response aliasesResponse = client().performRequest(new Request("get", "_cat/aliases"));
assertEquals(200, aliasesResponse.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(aliasesResponse);
assertThat(responseAsString, containsString(readAliasName));
assertThat(responseAsString, containsString(writeAliasName));
String aliases = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/aliases")).getEntity());
assertThat(aliases, containsString(readAliasName));
assertThat(aliases, containsString(writeAliasName));
} catch (ResponseException e) {
throw new AssertionError(e);
}
@ -472,17 +410,14 @@ public class MlJobIT extends ESRestTestCase {
// Manually delete the aliases so that we can test that deletion proceeds
// normally anyway
Response response = client().performRequest("delete", indexName + "/_alias/" + readAliasName);
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("delete", indexName + "/_alias/" + writeAliasName);
assertEquals(200, response.getStatusLine().getStatusCode());
client().performRequest(new Request("DELETE", indexName + "/_alias/" + readAliasName));
client().performRequest(new Request("DELETE", indexName + "/_alias/" + writeAliasName));
// check aliases were deleted
expectThrows(ResponseException.class, () -> client().performRequest("get", indexName + "/_alias/" + readAliasName));
expectThrows(ResponseException.class, () -> client().performRequest("get", indexName + "/_alias/" + writeAliasName));
expectThrows(ResponseException.class, () -> client().performRequest(new Request("GET", indexName + "/_alias/" + readAliasName)));
expectThrows(ResponseException.class, () -> client().performRequest(new Request("GET", indexName + "/_alias/" + writeAliasName)));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
}
public void testMultiIndexDelete() throws Exception {
@ -490,86 +425,63 @@ public class MlJobIT extends ESRestTestCase {
String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT;
createFarequoteJob(jobId);
Response response = client().performRequest("put", indexName + "-001");
assertEquals(200, response.getStatusLine().getStatusCode());
client().performRequest(new Request("PUT", indexName + "-001"));
client().performRequest(new Request("PUT", indexName + "-002"));
response = client().performRequest("put", indexName + "-002");
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
String responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, containsString(indexName + "-001"));
assertThat(responseAsString, containsString(indexName + "-002"));
String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesBeforeDelete, containsString(indexName));
assertThat(indicesBeforeDelete, containsString(indexName + "-001"));
assertThat(indicesBeforeDelete, containsString(indexName + "-002"));
// Add some documents to each index to make sure the DBQ clears them out
String recordResult =
String.format(Locale.ROOT,
Request createDoc0 = new Request("PUT", indexName + "/doc/" + 123);
createDoc0.setJsonEntity(String.format(Locale.ROOT,
"{\"job_id\":\"%s\", \"timestamp\": \"%s\", \"bucket_span\":%d, \"result_type\":\"record\"}",
jobId, 123, 1);
client().performRequest("put", indexName + "/doc/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", indexName + "-001/doc/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
client().performRequest("put", indexName + "-002/doc/" + 123,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
jobId, 123, 1));
client().performRequest(createDoc0);
Request createDoc1 = new Request("PUT", indexName + "-001/doc/" + 123);
createDoc1.setEntity(createDoc0.getEntity());
client().performRequest(createDoc1);
Request createDoc2 = new Request("PUT", indexName + "-002/doc/" + 123);
createDoc2.setEntity(createDoc0.getEntity());
client().performRequest(createDoc2);
// Also index a few through the alias for the first job
client().performRequest("put", indexName + "/doc/" + 456,
Collections.singletonMap("refresh", "true"), new StringEntity(recordResult, ContentType.APPLICATION_JSON));
Request createDoc3 = new Request("PUT", indexName + "/doc/" + 456);
createDoc3.setEntity(createDoc0.getEntity());
client().performRequest(createDoc3);
client().performRequest("post", "_refresh");
client().performRequest(new Request("POST", "/_refresh"));
// check for the documents
response = client().performRequest("get", indexName+ "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":2"));
response = client().performRequest("get", indexName + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
response = client().performRequest("get", indexName + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":1"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "/_count")).getEntity()),
containsString("\"count\":2"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-001/_count")).getEntity()),
containsString("\"count\":1"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-002/_count")).getEntity()),
containsString("\"count\":1"));
// Delete
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
client().performRequest("post", "_refresh");
client().performRequest(new Request("POST", "/_refresh"));
// check that the indices still exist but are empty
response = client().performRequest("get", "_cat/indices");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString(indexName));
assertThat(responseAsString, containsString(indexName + "-001"));
assertThat(responseAsString, containsString(indexName + "-002"));
String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity());
assertThat(indicesAfterDelete, containsString(indexName));
assertThat(indicesAfterDelete, containsString(indexName + "-001"));
assertThat(indicesAfterDelete, containsString(indexName + "-002"));
response = client().performRequest("get", indexName + "/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
response = client().performRequest("get", indexName + "-001/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
response = client().performRequest("get", indexName + "-002/_count");
assertEquals(200, response.getStatusLine().getStatusCode());
responseAsString = responseEntityToString(response);
assertThat(responseAsString, containsString("\"count\":0"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "/_count")).getEntity()),
containsString("\"count\":0"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-001/_count")).getEntity()),
containsString("\"count\":0"));
assertThat(EntityUtils.toString(client().performRequest(new Request("GET", indexName+ "-002/_count")).getEntity()),
containsString("\"count\":0"));
expectThrows(ResponseException.class, () ->
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"));
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats")));
}
public void testDelete_multipleRequest() throws Exception {
@ -590,7 +502,7 @@ public class MlJobIT extends ESRestTestCase {
if (forceDelete) {
url += "?force=true";
}
Response response = client().performRequest("delete", url);
Response response = client().performRequest(new Request("DELETE", url));
responses.put(Thread.currentThread().getId(), response);
} catch (ResponseException re) {
responseExceptions.put(Thread.currentThread().getId(), re);
@ -640,11 +552,12 @@ public class MlJobIT extends ESRestTestCase {
}
for (Response response : responses.values()) {
assertEquals(responseEntityToString(response), 200, response.getStatusLine().getStatusCode());
assertEquals(EntityUtils.toString(response.getEntity()), 200, response.getStatusLine().getStatusCode());
}
assertNotNull(recreationResponse.get());
assertEquals(responseEntityToString(recreationResponse.get()), 200, recreationResponse.get().getStatusLine().getStatusCode());
assertEquals(EntityUtils.toString(recreationResponse.get().getEntity()),
200, recreationResponse.get().getStatusLine().getStatusCode());
if (recreationException.get() != null) {
assertNull(recreationException.get().getMessage(), recreationException.get());
@ -656,7 +569,7 @@ public class MlJobIT extends ESRestTestCase {
// but in the case that it does not the job that is recreated may get deleted.
// It is not a error if the job does not exist but the following assertions
// will fail in that case.
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId));
// Check that the job aliases exist. These are the last thing to be deleted when a job is deleted, so
// if there's been a race between deletion and recreation these are what will be missing.
@ -682,15 +595,8 @@ public class MlJobIT extends ESRestTestCase {
}
private String getAliases() throws IOException {
Response response = client().performRequest("get", "_aliases");
assertEquals(200, response.getStatusLine().getStatusCode());
return responseEntityToString(response);
}
private static String responseEntityToString(Response response) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
Response response = client().performRequest(new Request("GET", "/_aliases"));
return EntityUtils.toString(response.getEntity());
}
@After

View File

@ -5,9 +5,8 @@
*/
package org.elasticsearch.xpack.ml.transforms;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
@ -18,7 +17,6 @@ import org.elasticsearch.xpack.ml.utils.DomainSplitFunction;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -185,9 +183,10 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
createIndex("painless", settings.build());
client().performRequest("PUT", "painless/test/1", Collections.emptyMap(),
new StringEntity("{\"test\": \"test\"}", ContentType.APPLICATION_JSON));
client().performRequest("POST", "painless/_refresh");
Request createDoc = new Request("PUT", "/painless/test/1");
createDoc.setJsonEntity("{\"test\": \"test\"}");
createDoc.addParameter("refresh", "true");
client().performRequest(createDoc);
Pattern pattern = Pattern.compile("domain_split\":\\[(.*?),(.*?)\\]");
@ -198,7 +197,9 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
String mapAsJson = Strings.toString(jsonBuilder().map(params));
logger.info("params={}", mapAsJson);
StringEntity body = new StringEntity("{\n" +
Request searchRequest = new Request("GET", "/painless/test/_search");
searchRequest.setJsonEntity(
"{\n" +
" \"query\" : {\n" +
" \"match_all\": {}\n" +
" },\n" +
@ -212,10 +213,8 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
" }\n" +
" }\n" +
" }\n" +
"}", ContentType.APPLICATION_JSON);
Response response = client().performRequest("GET", "painless/test/_search", Collections.emptyMap(), body);
String responseBody = EntityUtils.toString(response.getEntity());
"}");
String responseBody = EntityUtils.toString(client().performRequest(searchRequest).getEntity());
Matcher m = pattern.matcher(responseBody);
String actualSubDomain = "";
@ -242,24 +241,23 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32966")
public void testHRDSplit() throws Exception {
// Create job
String job = "{\n" +
" \"description\":\"Domain splitting\",\n" +
" \"analysis_config\" : {\n" +
" \"bucket_span\":\"3600s\",\n" +
" \"detectors\" :[{\"function\":\"count\", \"by_field_name\" : \"domain_split\"}]\n" +
" },\n" +
" \"data_description\" : {\n" +
" \"field_delimiter\":\",\",\n" +
" \"time_field\":\"time\"\n" +
" \n" +
" }\n" +
" }";
client().performRequest("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job", Collections.emptyMap(),
new StringEntity(job, ContentType.APPLICATION_JSON));
client().performRequest("POST", MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/_open");
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job");
createJobRequest.setJsonEntity(
"{\n" +
" \"description\":\"Domain splitting\",\n" +
" \"analysis_config\" : {\n" +
" \"bucket_span\":\"3600s\",\n" +
" \"detectors\" :[{\"function\":\"count\", \"by_field_name\" : \"domain_split\"}]\n" +
" },\n" +
" \"data_description\" : {\n" +
" \"field_delimiter\":\",\",\n" +
" \"time_field\":\"time\"\n" +
" \n" +
" }\n" +
"}");
client().performRequest(createJobRequest);
client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/_open"));
// Create index to hold data
Settings.Builder settings = Settings.builder()
@ -284,44 +282,43 @@ public class PainlessDomainSplitIT extends ESRestTestCase {
if (i == 64) {
// Anomaly has 100 docs, but we don't care about the value
for (int j = 0; j < 100; j++) {
client().performRequest("PUT", "painless/test/" + time.toDateTimeISO() + "_" + j,
Collections.emptyMap(),
new StringEntity("{\"domain\": \"" + "bar.bar.com\", \"time\": \"" + time.toDateTimeISO()
+ "\"}", ContentType.APPLICATION_JSON));
Request createDocRequest = new Request("PUT", "/painless/test/" + time.toDateTimeISO() + "_" + j);
createDocRequest.setJsonEntity("{\"domain\": \"" + "bar.bar.com\", \"time\": \"" + time.toDateTimeISO() + "\"}");
client().performRequest(createDocRequest);
}
} else {
// Non-anomalous values will be what's seen when the anomaly is reported
client().performRequest("PUT", "painless/test/" + time.toDateTimeISO(),
Collections.emptyMap(),
new StringEntity("{\"domain\": \"" + test.hostName + "\", \"time\": \"" + time.toDateTimeISO()
+ "\"}", ContentType.APPLICATION_JSON));
Request createDocRequest = new Request("PUT", "/painless/test/" + time.toDateTimeISO());
createDocRequest.setJsonEntity("{\"domain\": \"" + test.hostName + "\", \"time\": \"" + time.toDateTimeISO() + "\"}");
client().performRequest(createDocRequest);
}
}
client().performRequest("POST", "painless/_refresh");
client().performRequest(new Request("POST", "/painless/_refresh"));
// Create and start datafeed
String body = "{\n" +
" \"job_id\":\"hrd-split-job\",\n" +
" \"indexes\":[\"painless\"],\n" +
" \"types\":[\"test\"],\n" +
" \"script_fields\": {\n" +
" \"domain_split\": {\n" +
" \"script\": \"return domainSplit(doc['domain'].value, params);\"\n" +
" }\n" +
" }\n" +
" }";
Request createFeedRequest = new Request("PUT", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed");
createFeedRequest.setJsonEntity(
"{\n" +
" \"job_id\":\"hrd-split-job\",\n" +
" \"indexes\":[\"painless\"],\n" +
" \"types\":[\"test\"],\n" +
" \"script_fields\": {\n" +
" \"domain_split\": {\n" +
" \"script\": \"return domainSplit(doc['domain'].value, params);\"\n" +
" }\n" +
" }\n" +
"}");
client().performRequest("PUT", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed", Collections.emptyMap(),
new StringEntity(body, ContentType.APPLICATION_JSON));
client().performRequest("POST", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed/_start");
client().performRequest(createFeedRequest);
client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "datafeeds/hrd-split-datafeed/_start"));
boolean passed = awaitBusy(() -> {
try {
client().performRequest("POST", "/_refresh");
client().performRequest(new Request("POST", "/_refresh"));
Response response = client().performRequest("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records");
Response response = client().performRequest(new Request("GET",
MachineLearning.BASE_PATH + "anomaly_detectors/hrd-split-job/results/records"));
String responseBody = EntityUtils.toString(response.getEntity());
if (responseBody.contains("\"count\":2")) {