[ml] Registered named writable entry for DatafeedState.

Also added a test to multi node qa module that tests the datafeeder, which should have caught this.

Original commit: elastic/x-pack-elasticsearch@89e4875f6c
This commit is contained in:
Martijn van Groningen 2017-02-15 21:36:33 +01:00
parent c8b5be186d
commit 6d79210f79
2 changed files with 87 additions and 22 deletions

View File

@ -73,6 +73,7 @@ import org.elasticsearch.xpack.ml.action.UpdateProcessAction;
import org.elasticsearch.xpack.ml.action.ValidateDetectorAction;
import org.elasticsearch.xpack.ml.action.ValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.metadata.MlInitializationService;
@ -218,7 +219,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
new NamedWriteableRegistry.Entry(NamedDiff.class, PersistentTasksInProgress.TYPE, PersistentTasksInProgress::readDiffFrom),
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, StartDatafeedAction.NAME, StartDatafeedAction.Request::new),
new NamedWriteableRegistry.Entry(PersistentActionRequest.class, OpenJobAction.NAME, OpenJobAction.Request::new),
new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream)
new NamedWriteableRegistry.Entry(Task.Status.class, JobState.NAME, JobState::fromStream),
new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)
);
}

View File

@ -12,37 +12,28 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.ml.MachineLearning;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.common.xcontent.XContentType.JSON;
public class MlBasicMultiNodeIT extends ESRestTestCase {
public void testBasics() throws Exception {
public void testMiniFarequote() throws Exception {
String jobId = "foo";
createFarequoteJob(jobId);
try {
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
} catch (Exception e) {
Response response = client().performRequest("get", "/_cluster/nodes/hotthreads");
logger.warn("hot_threads:\n" + responseEntityToString(response));
throw e;
}
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
String postData =
"{\"airline\":\"AAL\",\"responsetime\":\"132.2046\",\"sourcetype\":\"farequote\",\"time\":\"1403481600\"}\n" +
"{\"airline\":\"JZA\",\"responsetime\":\"990.4628\",\"sourcetype\":\"farequote\",\"time\":\"1403481700\"}";
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data",
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_data",
Collections.emptyMap(), new StringEntity(postData));
assertEquals(202, response.getStatusLine().getStatusCode());
Map<String, Object> responseBody = responseEntityToMap(response);
@ -83,6 +74,83 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
assertEquals(200, response.getStatusLine().getStatusCode());
}
public void testMiniFarequoteWithDatafeeder() throws Exception {
String mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time\": { \"type\":\"date\"},"
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}"));
client().performRequest("put", "airline-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}"));
// Ensure all data is searchable
client().performRequest("post", "_refresh");
String jobId = "foo";
createFarequoteJob(jobId);
String datafeedId = "bar";
createDatafeed(datafeedId, jobId);
Response response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_open");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("opened", true), responseEntityToMap(response));
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start",
Collections.singletonMap("start", "0"));
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("started", true), responseEntityToMap(response));
assertBusy(() -> {
try {
Response statsResponse =
client().performRequest("get", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
assertEquals(200, statsResponse.getStatusLine().getStatusCode());
@SuppressWarnings("unchecked")
Map<String, Object> dataCountsDoc = (Map<String, Object>)
((Map)((List) responseEntityToMap(statsResponse).get("jobs")).get(0)).get("data_counts");
assertEquals(2, dataCountsDoc.get("input_record_count"));
assertEquals(2, dataCountsDoc.get("processed_record_count"));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
response = client().performRequest("post", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_stop");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("acknowledged", true), responseEntityToMap(response));
response = client().performRequest("post", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_close");
assertEquals(200, response.getStatusLine().getStatusCode());
assertEquals(Collections.singletonMap("closed", true), responseEntityToMap(response));
response = client().performRequest("delete", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId);
assertEquals(200, response.getStatusLine().getStatusCode());
response = client().performRequest("delete", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
assertEquals(200, response.getStatusLine().getStatusCode());
}
private Response createDatafeed(String datafeedId, String jobId) throws Exception {
XContentBuilder xContentBuilder = jsonBuilder();
xContentBuilder.startObject();
xContentBuilder.field("job_id", jobId);
xContentBuilder.array("indexes", "airline-data");
xContentBuilder.array("types", "response");
xContentBuilder.field("_source", true);
xContentBuilder.endObject();
return client().performRequest("put", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId,
Collections.emptyMap(), new StringEntity(xContentBuilder.string()));
}
private Response createFarequoteJob(String jobId) throws Exception {
XContentBuilder xContentBuilder = jsonBuilder();
xContentBuilder.startObject();
@ -111,13 +179,8 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
Collections.emptyMap(), new StringEntity(xContentBuilder.string()));
}
private static Map<String, Object> responseEntityToMap(Response response) throws Exception {
private static Map<String, Object> responseEntityToMap(Response response) throws IOException {
return XContentHelper.convertToMap(JSON.xContent(), response.getEntity().getContent(), false);
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}