[TEST] Refactor ML integration test framework

- Removes need to handle exception from action methods
- Clearly renames DatafeedJobIT to DatafeedJobsRestIT to distinguish
  from DatafeedJobsIT
- Refactors DatafeedJobsIT to reuse MlNativeAutodetectIntegTestCase

Original commit: elastic/x-pack-elasticsearch@5bd0c01391
This commit is contained in:
Dimitrios Athanasiou 2017-04-27 13:41:47 +01:00
parent ddf5fd68c2
commit 4c9b4132c9
5 changed files with 256 additions and 309 deletions

View File

@ -1,244 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.action;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
import org.elasticsearch.xpack.security.Security;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.deleteAllDatafeeds;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.deleteAllJobs;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
public class DatafeedJobsIT extends SecurityIntegTestCase {
@Override
protected Settings externalClusterClientSettings() {
Settings.Builder settings = Settings.builder()
.put(Security.USER_SETTING.getKey(), "elastic:changeme");
settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
settings.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME4);
return settings.build();
}
@After
public void cleanupWorkaround() throws Exception {
deleteAllDatafeeds(logger, client());
deleteAllJobs(logger, client());
}
public void ensureClusterStateConsistency() throws IOException {
if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
PersistentTasksCustomMetaData::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
PersistentTasksNodeService.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() &&
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(), localClusterState.toString());
throw error;
}
}
}
}
}
public void testLookbackOnly() throws Exception {
client().admin().indices().prepareCreate("data-1")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data-1", numDocs, twoWeeksAgo, oneWeekAgo);
client().admin().indices().prepareCreate("data-2")
.addMapping("type", "time", "type=date")
.get();
ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
long numDocs2 = randomIntBetween(32, 2048);
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
Job.Builder job = createScheduledJob("lookback-job");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
List<String> t = new ArrayList<>(2);
t.add("data-1");
t.add("data-2");
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t);
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig);
PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
assertTrue(putDatafeedResponse.isAcknowledged());
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
startDatafeedRequest.getParams().setEndTime(now);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
}
public void testRealtime() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = now - 604800000;
indexDocs(logger, "data", numDocs1, lastWeek, now);
Job.Builder job = createScheduledJob("realtime-job");
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get();
assertTrue(putJobResponse.isAcknowledged());
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig);
PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get();
assertTrue(putDatafeedResponse.isAcknowledged());
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L);
StartDatafeedAction.Response startDatafeedResponse = client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId());
try {
StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get();
assertTrue(stopJobResponse.isStopped());
} catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
int i = 0;
for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads());
}
throw e;
}
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}
private void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> {
try {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get();
assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED));
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 30, TimeUnit.SECONDS);
}
}

View File

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts;
import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs;
import static org.hamcrest.Matchers.equalTo;
public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup() throws Exception {
cleanUp();
}
public void testLookbackOnly() throws Exception {
client().admin().indices().prepareCreate("data-1")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long oneWeekAgo = now - 604800000;
long twoWeeksAgo = oneWeekAgo - 604800000;
indexDocs(logger, "data-1", numDocs, twoWeeksAgo, oneWeekAgo);
client().admin().indices().prepareCreate("data-2")
.addMapping("type", "time", "type=date")
.get();
ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get();
long numDocs2 = randomIntBetween(32, 2048);
indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now);
Job.Builder job = createScheduledJob("lookback-job");
registerJob(job);
assertTrue(putJob(job).isAcknowledged());
openJob(job.getId());
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED);
});
List<String> t = new ArrayList<>(2);
t.add("data-1");
t.add("data-2");
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t);
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
startDatafeed(datafeedConfig.getId(), 0L, now);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
}
public void testRealtime() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = now - 604800000;
indexDocs(logger, "data", numDocs1, lastWeek, now);
Job.Builder job = createScheduledJob("realtime-job");
assertTrue(putJob(job).isAcknowledged());
openJob(job.getId());
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse =
client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet();
assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED);
});
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
registerDatafeed(datafeedConfig);
assertTrue(putDatafeed(datafeedConfig).isAcknowledged());
startDatafeed(datafeedConfig.getId(), 0L, null);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
}, 30, TimeUnit.SECONDS);
try {
StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedConfig.getId());
assertTrue(stopJobResponse.isStopped());
} catch (Exception e) {
NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get();
int i = 0;
for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) {
logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads());
}
throw e;
}
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId());
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
}
}

View File

@ -30,7 +30,7 @@ import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordTok
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class DatafeedJobIT extends ESRestTestCase { public class DatafeedJobsRestIT extends ESRestTestCase {
private static final String BASIC_AUTH_VALUE_ELASTIC = private static final String BASIC_AUTH_VALUE_ELASTIC =
basicAuthHeaderValue("elastic", new SecureString("changeme".toCharArray())); basicAuthHeaderValue("elastic", new SecureString("changeme".toCharArray()));

View File

@ -6,14 +6,22 @@
package org.elasticsearch.xpack.ml.integration; package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.xpack.XPackSettings; import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.ml.action.DeleteJobAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction;
@ -32,21 +40,30 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction;
import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.action.util.PageParams;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord;
import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.Bucket;
import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition;
import org.elasticsearch.xpack.persistent.PersistentTaskParams;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksNodeService;
import org.elasticsearch.xpack.security.Security; import org.elasticsearch.xpack.security.Security;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
/** /**
@ -130,106 +147,99 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
return jobs; return jobs;
} }
protected void putJob(Job.Builder job) throws Exception { protected PutJobAction.Response putJob(Job.Builder job) {
PutJobAction.Request request = new PutJobAction.Request(job); PutJobAction.Request request = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, request).get(); return client().execute(PutJobAction.INSTANCE, request).actionGet();
} }
protected void openJob(String jobId) throws Exception { protected OpenJobAction.Response openJob(String jobId) {
OpenJobAction.Request request = new OpenJobAction.Request(jobId); OpenJobAction.Request request = new OpenJobAction.Request(jobId);
client().execute(OpenJobAction.INSTANCE, request).get(); return client().execute(OpenJobAction.INSTANCE, request).actionGet();
} }
protected void closeJob(String jobId) throws Exception { protected CloseJobAction.Response closeJob(String jobId) {
CloseJobAction.Request request = new CloseJobAction.Request(jobId); CloseJobAction.Request request = new CloseJobAction.Request(jobId);
client().execute(CloseJobAction.INSTANCE, request).get(); return client().execute(CloseJobAction.INSTANCE, request).actionGet();
} }
protected void flushJob(String jobId, boolean calcInterim) throws Exception { protected FlushJobAction.Response flushJob(String jobId, boolean calcInterim) {
FlushJobAction.Request request = new FlushJobAction.Request(jobId); FlushJobAction.Request request = new FlushJobAction.Request(jobId);
request.setCalcInterim(calcInterim); request.setCalcInterim(calcInterim);
client().execute(FlushJobAction.INSTANCE, request).get(); return client().execute(FlushJobAction.INSTANCE, request).actionGet();
} }
protected void updateJob(String jobId, JobUpdate update) throws Exception { protected PutJobAction.Response updateJob(String jobId, JobUpdate update) {
UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, update); UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, update);
client().execute(UpdateJobAction.INSTANCE, request).get(); return client().execute(UpdateJobAction.INSTANCE, request).actionGet();
} }
protected void deleteJob(String jobId) throws Exception { protected DeleteJobAction.Response deleteJob(String jobId) {
DeleteJobAction.Request request = new DeleteJobAction.Request(jobId); DeleteJobAction.Request request = new DeleteJobAction.Request(jobId);
client().execute(DeleteJobAction.INSTANCE, request).get(); return client().execute(DeleteJobAction.INSTANCE, request).actionGet();
} }
protected void putDatafeed(DatafeedConfig datafeed) throws Exception { protected PutDatafeedAction.Response putDatafeed(DatafeedConfig datafeed) {
PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed); PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed);
client().execute(PutDatafeedAction.INSTANCE, request).get(); return client().execute(PutDatafeedAction.INSTANCE, request).actionGet();
} }
protected void stopDatafeed(String datafeedId) throws Exception { protected StopDatafeedAction.Response stopDatafeed(String datafeedId) {
StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId);
client().execute(StopDatafeedAction.INSTANCE, request).get(); return client().execute(StopDatafeedAction.INSTANCE, request).actionGet();
} }
protected void deleteDatafeed(String datafeedId) throws Exception { protected DeleteDatafeedAction.Response deleteDatafeed(String datafeedId) {
DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId); DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId);
client().execute(DeleteDatafeedAction.INSTANCE, request).get(); return client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet();
} }
protected void startDatafeed(String datafeedId, long start, long end) throws Exception { protected StartDatafeedAction.Response startDatafeed(String datafeedId, long start, Long end) {
StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, start); StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, start);
request.getParams().setEndTime(end); request.getParams().setEndTime(end);
client().execute(StartDatafeedAction.INSTANCE, request).get(); return client().execute(StartDatafeedAction.INSTANCE, request).actionGet();
} }
protected void waitUntilJobIsClosed(String jobId) throws Exception { protected void waitUntilJobIsClosed(String jobId) throws Exception {
assertBusy(() -> { assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS);
try {
assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} }
protected List<GetJobsStatsAction.Response.JobStats> getJobStats(String jobId) throws Exception { protected List<GetJobsStatsAction.Response.JobStats> getJobStats(String jobId) {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get(); GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
return response.getResponse().results(); return response.getResponse().results();
} }
protected List<Bucket> getBuckets(String jobId) throws Exception { protected List<Bucket> getBuckets(String jobId) {
GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); GetBucketsAction.Request request = new GetBucketsAction.Request(jobId);
return getBuckets(request); return getBuckets(request);
} }
protected List<Bucket> getBuckets(GetBucketsAction.Request request) throws Exception { protected List<Bucket> getBuckets(GetBucketsAction.Request request) {
GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).actionGet();
return response.getBuckets().results(); return response.getBuckets().results();
} }
protected List<AnomalyRecord> getRecords(String jobId) throws Exception { protected List<AnomalyRecord> getRecords(String jobId) {
GetRecordsAction.Request request = new GetRecordsAction.Request(jobId); GetRecordsAction.Request request = new GetRecordsAction.Request(jobId);
return getRecords(request); return getRecords(request);
} }
protected List<AnomalyRecord> getRecords(GetRecordsAction.Request request) throws Exception { protected List<AnomalyRecord> getRecords(GetRecordsAction.Request request) {
GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).get(); GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).actionGet();
return response.getRecords().results(); return response.getRecords().results();
} }
protected List<ModelSnapshot> getModelSnapshots(String jobId) throws Exception { protected List<ModelSnapshot> getModelSnapshots(String jobId) {
GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null);
GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).get(); GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).actionGet();
return response.getPage().results(); return response.getPage().results();
} }
protected List<CategoryDefinition> getCategories(String jobId) throws Exception { protected List<CategoryDefinition> getCategories(String jobId) {
GetCategoriesAction.Request getCategoriesRequest = GetCategoriesAction.Request getCategoriesRequest =
new GetCategoriesAction.Request(jobId); new GetCategoriesAction.Request(jobId);
getCategoriesRequest.setPageParams(new PageParams()); getCategoriesRequest.setPageParams(new PageParams());
GetCategoriesAction.Response categoriesResponse = client().execute( GetCategoriesAction.Response categoriesResponse = client().execute(GetCategoriesAction.INSTANCE, getCategoriesRequest).actionGet();
GetCategoriesAction.INSTANCE, getCategoriesRequest).get();
return categoriesResponse.getResult().results(); return categoriesResponse.getResult().results();
} }
@ -242,7 +252,54 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase {
@Override @Override
protected void ensureClusterStateConsistency() throws IOException { protected void ensureClusterStateConsistency() throws IOException {
// this method in ESIntegTestCase is not plugin-friendly - it does not account for plugin NamedWritableRegistries if (cluster() != null && cluster().size() > 0) {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE,
PersistentTasksCustomMetaData::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
StartDatafeedAction.DatafeedParams::new));
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME,
PersistentTasksNodeService.Status::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new));
entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream));
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
// remove local node reference
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
String masterId = masterClusterState.nodes().getMasterNodeId();
for (Client client : cluster().getClients()) {
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
// remove local node reference
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
final Map<String, Object> localStateMap = convertToMap(localClusterState);
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
// Check that the non-master node has the same version of the cluster state as the master and
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
if (masterClusterState.version() == localClusterState.version() &&
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
try {
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
// but we can compare serialization sizes - they should be the same
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
// Compare JSON serialization
assertNull("clusterstate JSON serialization does not match",
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
} catch (AssertionError error) {
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
masterClusterState.toString(), localClusterState.toString());
throw error;
}
}
}
}
} }
protected static String createJsonRecord(Map<String, Object> keyValueMap) throws IOException { protected static String createJsonRecord(Map<String, Object> keyValueMap) throws IOException {

View File

@ -80,17 +80,13 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase {
// We might need to retry this while waiting for a refresh // We might need to retry this while waiting for a refresh
assertBusy(() -> { assertBusy(() -> {
try { List<Bucket> firstInterimBuckets = getInterimResults(job.getId());
List<Bucket> firstInterimBuckets = getInterimResults(job.getId()); assertThat(firstInterimBuckets.size(), equalTo(2));
assertThat(firstInterimBuckets.size(), equalTo(2)); assertThat(firstInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L));
assertThat(firstInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); assertThat(firstInterimBuckets.get(0).getRecordCount(), equalTo(0));
assertThat(firstInterimBuckets.get(0).getRecordCount(), equalTo(0)); assertThat(firstInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L));
assertThat(firstInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); assertThat(firstInterimBuckets.get(1).getRecordCount(), equalTo(1));
assertThat(firstInterimBuckets.get(1).getRecordCount(), equalTo(1)); assertThat(firstInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0));
assertThat(firstInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0));
} catch (Exception e) {
throw new RuntimeException(e);
}
}); });
// push 1 more record, flush (with interim), check same interim result // push 1 more record, flush (with interim), check same interim result
@ -99,16 +95,12 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase {
flushJob(job.getId(), true); flushJob(job.getId(), true);
assertBusy(() -> { assertBusy(() -> {
try { List<Bucket> secondInterimBuckets = getInterimResults(job.getId());
List<Bucket> secondInterimBuckets = getInterimResults(job.getId()); assertThat(secondInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L));
assertThat(secondInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); assertThat(secondInterimBuckets.get(0).getRecordCount(), equalTo(0));
assertThat(secondInterimBuckets.get(0).getRecordCount(), equalTo(0)); assertThat(secondInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L));
assertThat(secondInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); assertThat(secondInterimBuckets.get(1).getRecordCount(), equalTo(1));
assertThat(secondInterimBuckets.get(1).getRecordCount(), equalTo(1)); assertThat(secondInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0));
assertThat(secondInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0));
} catch (Exception e) {
throw new RuntimeException(e);
}
}); });
// push rest of data, close, verify no interim results // push rest of data, close, verify no interim results
@ -136,11 +128,11 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase {
return data.toString(); return data.toString();
} }
private List<Bucket> getInterimResults(String jobId) throws Exception { private List<Bucket> getInterimResults(String jobId) {
GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); GetBucketsAction.Request request = new GetBucketsAction.Request(jobId);
request.setExpand(true); request.setExpand(true);
request.setPageParams(new PageParams(0, 1500)); request.setPageParams(new PageParams(0, 1500));
GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).actionGet();
assertThat(response.getBuckets().count(), lessThan(1500L)); assertThat(response.getBuckets().count(), lessThan(1500L));
List<Bucket> buckets = response.getBuckets().results(); List<Bucket> buckets = response.getBuckets().results();
assertThat(buckets.size(), greaterThan(0)); assertThat(buckets.size(), greaterThan(0));