From 4c9b4132c9f7639780e4eab8e7832ad324f5c7ef Mon Sep 17 00:00:00 2001 From: Dimitrios Athanasiou Date: Thu, 27 Apr 2017 13:41:47 +0100 Subject: [PATCH] [TEST] Refactor ML integration test framework - Removes need to handle exception from action methods - Clearly renames DatafeedJobIT to DatafeedJobsRestIT to distinguish from DatafeedJobsIT - Refactors DatafeedJobsIT to reuse MlNativeAutodetectIntegTestCase Original commit: elastic/x-pack-elasticsearch@5bd0c013912166bc79baaa9e3d6f2832cfa01057 --- .../xpack/ml/action/DatafeedJobsIT.java | 244 ------------------ .../xpack/ml/integration/DatafeedJobsIT.java | 142 ++++++++++ ...feedJobIT.java => DatafeedJobsRestIT.java} | 2 +- .../MlNativeAutodetectIntegTestCase.java | 139 +++++++--- .../integration/UpdateInterimResultsIT.java | 38 ++- 5 files changed, 256 insertions(+), 309 deletions(-) delete mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java rename plugin/src/test/java/org/elasticsearch/xpack/ml/integration/{DatafeedJobIT.java => DatafeedJobsRestIT.java} (99%) diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java deleted file mode 100644 index b4018f26a27..00000000000 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/action/DatafeedJobsIT.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.action; - -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; -import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.search.SearchModule; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.SecurityIntegTestCase; -import org.elasticsearch.xpack.XPackSettings; -import org.elasticsearch.xpack.ml.MlMetadata; -import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.ml.job.config.Job; -import org.elasticsearch.xpack.ml.job.config.JobState; -import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; -import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; -import org.elasticsearch.xpack.persistent.PersistentTaskParams; -import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; -import org.elasticsearch.xpack.security.Security; -import org.junit.After; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.test.XContentTestUtils.convertToMap; -import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.deleteAllDatafeeds; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.deleteAllJobs; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; -import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; -import static org.hamcrest.Matchers.equalTo; - -public class DatafeedJobsIT extends SecurityIntegTestCase { - - @Override - protected Settings externalClusterClientSettings() { - Settings.Builder settings = Settings.builder() - .put(Security.USER_SETTING.getKey(), "elastic:changeme"); - settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true); - settings.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME4); - return settings.build(); - } - - @After - public void cleanupWorkaround() throws Exception { - deleteAllDatafeeds(logger, client()); - deleteAllJobs(logger, client()); - } - - public void ensureClusterStateConsistency() throws IOException { - if (cluster() != null && cluster().size() > 0) { - List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); - entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); - entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); - entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, - PersistentTasksCustomMetaData::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME, - StartDatafeedAction.DatafeedParams::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, - PersistentTasksNodeService.Status::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); - ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); - byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); - // remove local node reference - masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); - Map masterStateMap = convertToMap(masterClusterState); - int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; - String masterId = masterClusterState.nodes().getMasterNodeId(); - for (Client client : cluster().getClients()) { - ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); - byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); - // remove local node reference - localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); - final Map localStateMap = convertToMap(localClusterState); - final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; - // Check that the non-master node has the same version of the cluster state as the master and - // that the master node matches the master (otherwise there is no requirement for the cluster state to match) - if (masterClusterState.version() == localClusterState.version() && - masterId.equals(localClusterState.nodes().getMasterNodeId())) { - try { - assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); - // We cannot compare serialization bytes since serialization order of maps is not guaranteed - // but we can compare serialization sizes - they should be the same - assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); - // Compare JSON serialization - assertNull("clusterstate JSON serialization does not match", - differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); - } catch (AssertionError error) { - logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", - masterClusterState.toString(), localClusterState.toString()); - throw error; - } - } - } - } - } - - public void testLookbackOnly() throws Exception { - client().admin().indices().prepareCreate("data-1") - .addMapping("type", "time", "type=date") - .get(); - long numDocs = randomIntBetween(32, 2048); - long now = System.currentTimeMillis(); - long oneWeekAgo = now - 604800000; - long twoWeeksAgo = oneWeekAgo - 604800000; - indexDocs(logger, "data-1", numDocs, twoWeeksAgo, oneWeekAgo); - - client().admin().indices().prepareCreate("data-2") - .addMapping("type", "time", "type=date") - .get(); - ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get(); - long numDocs2 = randomIntBetween(32, 2048); - indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now); - - Job.Builder job = createScheduledJob("lookback-job"); - PutJobAction.Request putJobRequest = new PutJobAction.Request(job); - PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); - assertTrue(putJobResponse.isAcknowledged()); - client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); - }); - - List t = new ArrayList<>(2); - t.add("data-1"); - t.add("data-2"); - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t); - PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig); - PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get(); - assertTrue(putDatafeedResponse.isAcknowledged()); - - StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L); - startDatafeedRequest.getParams().setEndTime(now); - client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); - assertBusy(() -> { - DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); - assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); - - GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); - GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); - assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); - }, 60, TimeUnit.SECONDS); - - waitUntilJobIsClosed(job.getId()); - } - - public void testRealtime() throws Exception { - client().admin().indices().prepareCreate("data") - .addMapping("type", "time", "type=date") - .get(); - long numDocs1 = randomIntBetween(32, 2048); - long now = System.currentTimeMillis(); - long lastWeek = now - 604800000; - indexDocs(logger, "data", numDocs1, lastWeek, now); - - Job.Builder job = createScheduledJob("realtime-job"); - PutJobAction.Request putJobRequest = new PutJobAction.Request(job); - PutJobAction.Response putJobResponse = client().execute(PutJobAction.INSTANCE, putJobRequest).get(); - assertTrue(putJobResponse.isAcknowledged()); - client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId())); - assertBusy(() -> { - GetJobsStatsAction.Response statsResponse = - client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); - assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); - }); - - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); - PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(datafeedConfig); - PutDatafeedAction.Response putDatafeedResponse = client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).get(); - assertTrue(putDatafeedResponse.isAcknowledged()); - - StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(datafeedConfig.getId(), 0L); - StartDatafeedAction.Response startDatafeedResponse = client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get(); - assertBusy(() -> { - DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); - assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); - }); - - long numDocs2 = randomIntBetween(2, 64); - now = System.currentTimeMillis(); - indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); - assertBusy(() -> { - DataCounts dataCounts = getDataCounts(job.getId()); - assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); - assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); - }, 30, TimeUnit.SECONDS); - - StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedConfig.getId()); - try { - StopDatafeedAction.Response stopJobResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).get(); - assertTrue(stopJobResponse.isStopped()); - } catch (Exception e) { - NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get(); - int i = 0; - for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) { - logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads()); - } - throw e; - } - assertBusy(() -> { - GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); - GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); - assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); - }); - } - - private void waitUntilJobIsClosed(String jobId) throws Exception { - assertBusy(() -> { - try { - GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); - GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get(); - assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 30, TimeUnit.SECONDS); - } -} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java new file mode 100644 index 00000000000..bd7931ff21d --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsIT.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads; +import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse; +import org.elasticsearch.xpack.ml.action.GetDatafeedsStatsAction; +import org.elasticsearch.xpack.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.Job; +import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; +import org.junit.After; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createDatafeed; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.createScheduledJob; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.getDataCounts; +import static org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase.indexDocs; +import static org.hamcrest.Matchers.equalTo; + +public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase { + + @After + public void cleanup() throws Exception { + cleanUp(); + } + + public void testLookbackOnly() throws Exception { + client().admin().indices().prepareCreate("data-1") + .addMapping("type", "time", "type=date") + .get(); + long numDocs = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long oneWeekAgo = now - 604800000; + long twoWeeksAgo = oneWeekAgo - 604800000; + indexDocs(logger, "data-1", numDocs, twoWeeksAgo, oneWeekAgo); + + client().admin().indices().prepareCreate("data-2") + .addMapping("type", "time", "type=date") + .get(); + ClusterHealthResponse r = client().admin().cluster().prepareHealth("data-1", "data-2").setWaitForYellowStatus().get(); + long numDocs2 = randomIntBetween(32, 2048); + indexDocs(logger, "data-2", numDocs2, oneWeekAgo, now); + + Job.Builder job = createScheduledJob("lookback-job"); + registerJob(job); + assertTrue(putJob(job).isAcknowledged()); + openJob(job.getId()); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED); + }); + + List t = new ArrayList<>(2); + t.add("data-1"); + t.add("data-2"); + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), t); + registerDatafeed(datafeedConfig); + assertTrue(putDatafeed(datafeedConfig).isAcknowledged()); + + startDatafeed(datafeedConfig.getId(), 0L, now); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs + numDocs2)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }, 60, TimeUnit.SECONDS); + + waitUntilJobIsClosed(job.getId()); + } + + public void testRealtime() throws Exception { + client().admin().indices().prepareCreate("data") + .addMapping("type", "time", "type=date") + .get(); + long numDocs1 = randomIntBetween(32, 2048); + long now = System.currentTimeMillis(); + long lastWeek = now - 604800000; + indexDocs(logger, "data", numDocs1, lastWeek, now); + + Job.Builder job = createScheduledJob("realtime-job"); + assertTrue(putJob(job).isAcknowledged()); + openJob(job.getId()); + assertBusy(() -> { + GetJobsStatsAction.Response statsResponse = + client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId())).actionGet(); + assertEquals(statsResponse.getResponse().results().get(0).getState(), JobState.OPENED); + }); + + DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data")); + registerDatafeed(datafeedConfig); + assertTrue(putDatafeed(datafeedConfig).isAcknowledged()); + + startDatafeed(datafeedConfig.getId(), 0L, null); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + }); + + long numDocs2 = randomIntBetween(2, 64); + now = System.currentTimeMillis(); + indexDocs(logger, "data", numDocs2, now + 5000, now + 6000); + assertBusy(() -> { + DataCounts dataCounts = getDataCounts(job.getId()); + assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2)); + assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L)); + }, 30, TimeUnit.SECONDS); + + try { + StopDatafeedAction.Response stopJobResponse = stopDatafeed(datafeedConfig.getId()); + assertTrue(stopJobResponse.isStopped()); + } catch (Exception e) { + NodesHotThreadsResponse nodesHotThreadsResponse = client().admin().cluster().prepareNodesHotThreads().get(); + int i = 0; + for (NodeHotThreads nodeHotThreads : nodesHotThreadsResponse.getNodes()) { + logger.info(i++ + ":\n" +nodeHotThreads.getHotThreads()); + } + throw e; + } + assertBusy(() -> { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedConfig.getId()); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED)); + }); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java similarity index 99% rename from plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java rename to plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java index 0ee8788b8ff..dcd0994fdac 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobsRestIT.java @@ -30,7 +30,7 @@ import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordTok import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -public class DatafeedJobIT extends ESRestTestCase { +public class DatafeedJobsRestIT extends ESRestTestCase { private static final String BASIC_AUTH_VALUE_ELASTIC = basicAuthHeaderValue("elastic", new SecureString("changeme".toCharArray())); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index b4a94673486..5bd0af67540 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -6,14 +6,22 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.SecurityIntegTestCase; import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.MlMetadata; import org.elasticsearch.xpack.ml.action.CloseJobAction; import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.ml.action.DeleteJobAction; @@ -32,21 +40,30 @@ import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateJobAction; import org.elasticsearch.xpack.ml.action.util.PageParams; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.config.JobTaskStatus; import org.elasticsearch.xpack.ml.job.config.JobUpdate; import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.ml.job.results.Bucket; import org.elasticsearch.xpack.ml.job.results.CategoryDefinition; +import org.elasticsearch.xpack.persistent.PersistentTaskParams; +import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.xpack.persistent.PersistentTasksNodeService; import org.elasticsearch.xpack.security.Security; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.elasticsearch.test.XContentTestUtils.convertToMap; +import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; import static org.hamcrest.Matchers.equalTo; /** @@ -130,106 +147,99 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { return jobs; } - protected void putJob(Job.Builder job) throws Exception { + protected PutJobAction.Response putJob(Job.Builder job) { PutJobAction.Request request = new PutJobAction.Request(job); - client().execute(PutJobAction.INSTANCE, request).get(); + return client().execute(PutJobAction.INSTANCE, request).actionGet(); } - protected void openJob(String jobId) throws Exception { + protected OpenJobAction.Response openJob(String jobId) { OpenJobAction.Request request = new OpenJobAction.Request(jobId); - client().execute(OpenJobAction.INSTANCE, request).get(); + return client().execute(OpenJobAction.INSTANCE, request).actionGet(); } - protected void closeJob(String jobId) throws Exception { + protected CloseJobAction.Response closeJob(String jobId) { CloseJobAction.Request request = new CloseJobAction.Request(jobId); - client().execute(CloseJobAction.INSTANCE, request).get(); + return client().execute(CloseJobAction.INSTANCE, request).actionGet(); } - protected void flushJob(String jobId, boolean calcInterim) throws Exception { + protected FlushJobAction.Response flushJob(String jobId, boolean calcInterim) { FlushJobAction.Request request = new FlushJobAction.Request(jobId); request.setCalcInterim(calcInterim); - client().execute(FlushJobAction.INSTANCE, request).get(); + return client().execute(FlushJobAction.INSTANCE, request).actionGet(); } - protected void updateJob(String jobId, JobUpdate update) throws Exception { + protected PutJobAction.Response updateJob(String jobId, JobUpdate update) { UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, update); - client().execute(UpdateJobAction.INSTANCE, request).get(); + return client().execute(UpdateJobAction.INSTANCE, request).actionGet(); } - protected void deleteJob(String jobId) throws Exception { + protected DeleteJobAction.Response deleteJob(String jobId) { DeleteJobAction.Request request = new DeleteJobAction.Request(jobId); - client().execute(DeleteJobAction.INSTANCE, request).get(); + return client().execute(DeleteJobAction.INSTANCE, request).actionGet(); } - protected void putDatafeed(DatafeedConfig datafeed) throws Exception { + protected PutDatafeedAction.Response putDatafeed(DatafeedConfig datafeed) { PutDatafeedAction.Request request = new PutDatafeedAction.Request(datafeed); - client().execute(PutDatafeedAction.INSTANCE, request).get(); + return client().execute(PutDatafeedAction.INSTANCE, request).actionGet(); } - protected void stopDatafeed(String datafeedId) throws Exception { + protected StopDatafeedAction.Response stopDatafeed(String datafeedId) { StopDatafeedAction.Request request = new StopDatafeedAction.Request(datafeedId); - client().execute(StopDatafeedAction.INSTANCE, request).get(); + return client().execute(StopDatafeedAction.INSTANCE, request).actionGet(); } - protected void deleteDatafeed(String datafeedId) throws Exception { + protected DeleteDatafeedAction.Response deleteDatafeed(String datafeedId) { DeleteDatafeedAction.Request request = new DeleteDatafeedAction.Request(datafeedId); - client().execute(DeleteDatafeedAction.INSTANCE, request).get(); + return client().execute(DeleteDatafeedAction.INSTANCE, request).actionGet(); } - protected void startDatafeed(String datafeedId, long start, long end) throws Exception { + protected StartDatafeedAction.Response startDatafeed(String datafeedId, long start, Long end) { StartDatafeedAction.Request request = new StartDatafeedAction.Request(datafeedId, start); request.getParams().setEndTime(end); - client().execute(StartDatafeedAction.INSTANCE, request).get(); + return client().execute(StartDatafeedAction.INSTANCE, request).actionGet(); } protected void waitUntilJobIsClosed(String jobId) throws Exception { - assertBusy(() -> { - try { - assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + assertBusy(() -> assertThat(getJobStats(jobId).get(0).getState(), equalTo(JobState.CLOSED)), 30, TimeUnit.SECONDS); } - protected List getJobStats(String jobId) throws Exception { + protected List getJobStats(String jobId) { GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); - GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).get(); + GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); return response.getResponse().results(); } - protected List getBuckets(String jobId) throws Exception { + protected List getBuckets(String jobId) { GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); return getBuckets(request); } - protected List getBuckets(GetBucketsAction.Request request) throws Exception { - GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); + protected List getBuckets(GetBucketsAction.Request request) { + GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).actionGet(); return response.getBuckets().results(); } - protected List getRecords(String jobId) throws Exception { + protected List getRecords(String jobId) { GetRecordsAction.Request request = new GetRecordsAction.Request(jobId); return getRecords(request); } - protected List getRecords(GetRecordsAction.Request request) throws Exception { - GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).get(); + protected List getRecords(GetRecordsAction.Request request) { + GetRecordsAction.Response response = client().execute(GetRecordsAction.INSTANCE, request).actionGet(); return response.getRecords().results(); } - protected List getModelSnapshots(String jobId) throws Exception { + protected List getModelSnapshots(String jobId) { GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); - GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).get(); + GetModelSnapshotsAction.Response response = client().execute(GetModelSnapshotsAction.INSTANCE, request).actionGet(); return response.getPage().results(); } - protected List getCategories(String jobId) throws Exception { + protected List getCategories(String jobId) { GetCategoriesAction.Request getCategoriesRequest = new GetCategoriesAction.Request(jobId); getCategoriesRequest.setPageParams(new PageParams()); - GetCategoriesAction.Response categoriesResponse = client().execute( - GetCategoriesAction.INSTANCE, getCategoriesRequest).get(); + GetCategoriesAction.Response categoriesResponse = client().execute(GetCategoriesAction.INSTANCE, getCategoriesRequest).actionGet(); return categoriesResponse.getResult().results(); } @@ -242,7 +252,54 @@ abstract class MlNativeAutodetectIntegTestCase extends SecurityIntegTestCase { @Override protected void ensureClusterStateConsistency() throws IOException { - // this method in ESIntegTestCase is not plugin-friendly - it does not account for plugin NamedWritableRegistries + if (cluster() != null && cluster().size() > 0) { + List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); + entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); + entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); + entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, + PersistentTasksCustomMetaData::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME, + StartDatafeedAction.DatafeedParams::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, + PersistentTasksNodeService.Status::new)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); + entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); + // remove local node reference + masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); + Map masterStateMap = convertToMap(masterClusterState); + int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; + String masterId = masterClusterState.nodes().getMasterNodeId(); + for (Client client : cluster().getClients()) { + ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); + byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); + // remove local node reference + localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); + final Map localStateMap = convertToMap(localClusterState); + final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; + // Check that the non-master node has the same version of the cluster state as the master and + // that the master node matches the master (otherwise there is no requirement for the cluster state to match) + if (masterClusterState.version() == localClusterState.version() && + masterId.equals(localClusterState.nodes().getMasterNodeId())) { + try { + assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); + // We cannot compare serialization bytes since serialization order of maps is not guaranteed + // but we can compare serialization sizes - they should be the same + assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); + // Compare JSON serialization + assertNull("clusterstate JSON serialization does not match", + differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); + } catch (AssertionError error) { + logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", + masterClusterState.toString(), localClusterState.toString()); + throw error; + } + } + } + } } protected static String createJsonRecord(Map keyValueMap) throws IOException { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java index 3106f91492c..adfc2292953 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/integration/UpdateInterimResultsIT.java @@ -80,17 +80,13 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { // We might need to retry this while waiting for a refresh assertBusy(() -> { - try { - List firstInterimBuckets = getInterimResults(job.getId()); - assertThat(firstInterimBuckets.size(), equalTo(2)); - assertThat(firstInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); - assertThat(firstInterimBuckets.get(0).getRecordCount(), equalTo(0)); - assertThat(firstInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); - assertThat(firstInterimBuckets.get(1).getRecordCount(), equalTo(1)); - assertThat(firstInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); - } catch (Exception e) { - throw new RuntimeException(e); - } + List firstInterimBuckets = getInterimResults(job.getId()); + assertThat(firstInterimBuckets.size(), equalTo(2)); + assertThat(firstInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); + assertThat(firstInterimBuckets.get(0).getRecordCount(), equalTo(0)); + assertThat(firstInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); + assertThat(firstInterimBuckets.get(1).getRecordCount(), equalTo(1)); + assertThat(firstInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); }); // push 1 more record, flush (with interim), check same interim result @@ -99,16 +95,12 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { flushJob(job.getId(), true); assertBusy(() -> { - try { - List secondInterimBuckets = getInterimResults(job.getId()); - assertThat(secondInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); - assertThat(secondInterimBuckets.get(0).getRecordCount(), equalTo(0)); - assertThat(secondInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); - assertThat(secondInterimBuckets.get(1).getRecordCount(), equalTo(1)); - assertThat(secondInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); - } catch (Exception e) { - throw new RuntimeException(e); - } + List secondInterimBuckets = getInterimResults(job.getId()); + assertThat(secondInterimBuckets.get(0).getTimestamp().getTime(), equalTo(1400039000000L)); + assertThat(secondInterimBuckets.get(0).getRecordCount(), equalTo(0)); + assertThat(secondInterimBuckets.get(1).getTimestamp().getTime(), equalTo(1400040000000L)); + assertThat(secondInterimBuckets.get(1).getRecordCount(), equalTo(1)); + assertThat(secondInterimBuckets.get(1).getRecords().get(0).getActual().get(0), equalTo(16.0)); }); // push rest of data, close, verify no interim results @@ -136,11 +128,11 @@ public class UpdateInterimResultsIT extends MlNativeAutodetectIntegTestCase { return data.toString(); } - private List getInterimResults(String jobId) throws Exception { + private List getInterimResults(String jobId) { GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); request.setExpand(true); request.setPageParams(new PageParams(0, 1500)); - GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).get(); + GetBucketsAction.Response response = client().execute(GetBucketsAction.INSTANCE, request).actionGet(); assertThat(response.getBuckets().count(), lessThan(1500L)); List buckets = response.getBuckets().results(); assertThat(buckets.size(), greaterThan(0));