diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index cd33e1d8076..ab9b8073940 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -5,19 +5,10 @@ */ package org.elasticsearch.xpack.ml.integration; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterModule; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.network.NetworkModule; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -26,23 +17,10 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.reindex.ReindexPlugin; -import org.elasticsearch.persistent.PersistentTaskParams; -import org.elasticsearch.persistent.PersistentTaskState; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.SecuritySettingsSourceField; -import org.elasticsearch.transport.Netty4Plugin; -import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; -import org.elasticsearch.xpack.core.XPackClientPlugin; -import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; @@ -70,10 +48,8 @@ import org.elasticsearch.xpack.core.ml.action.util.PageParams; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -85,24 +61,15 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.core.security.SecurityField; -import org.elasticsearch.xpack.core.security.authc.TokenMetaData; import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static org.elasticsearch.test.XContentTestUtils.convertToMap; -import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -110,46 +77,15 @@ import static org.hamcrest.Matchers.notNullValue; /** * Base class of ML integration tests that use a native autodetect process */ -abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { +abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase { private List jobs = new ArrayList<>(); private List datafeeds = new ArrayList<>(); - @Override - protected Collection> nodePlugins() { - return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); - } @Override - protected Collection> transportClientPlugins() { - return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class); - } - - @Override - protected Settings externalClusterClientSettings() { - Path key; - Path certificate; - try { - key = PathUtils.get(getClass().getResource("/testnode.pem").toURI()); - certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI()); - } catch (URISyntaxException e) { - throw new IllegalStateException("error trying to get keystore path", e); - } - Settings.Builder builder = Settings.builder(); - builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); - builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); - builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true); - builder.put("xpack.security.transport.ssl.enabled", true); - builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString()); - builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString()); - builder.put("xpack.security.transport.ssl.key_passphrase", "testnode"); - builder.put("xpack.security.transport.ssl.verification_mode", "certificate"); - return builder.build(); - } - - protected void cleanUp() { + protected void cleanUpResources() { cleanUpDatafeeds(); cleanUpJobs(); - waitForPendingTasks(); } private void cleanUpDatafeeds() { @@ -182,18 +118,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { } } - private void waitForPendingTasks() { - ListTasksRequest listTasksRequest = new ListTasksRequest(); - listTasksRequest.setWaitForCompletion(true); - listTasksRequest.setDetailed(true); - listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10)); - try { - admin().cluster().listTasks(listTasksRequest).get(); - } catch (Exception e) { - throw new AssertionError("Failed to wait for pending tasks to complete", e); - } - } - protected void registerJob(Job.Builder job) { if (jobs.add(job) == false) { throw new IllegalArgumentException("job [" + job.getId() + "] is already registered"); @@ -441,56 +365,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase { return client().execute(PersistJobAction.INSTANCE, request).actionGet(); } - @Override - protected void ensureClusterStateConsistency() throws IOException { - if (cluster() != null && cluster().size() > 0) { - List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); - entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); - entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME, - StartDatafeedAction.DatafeedParams::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, - OpenJobAction.JobParams::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); - entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); - entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new)); - final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); - ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); - byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); - // remove local node reference - masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); - Map masterStateMap = convertToMap(masterClusterState); - int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; - String masterId = masterClusterState.nodes().getMasterNodeId(); - for (Client client : cluster().getClients()) { - ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); - byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); - // remove local node reference - localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); - final Map localStateMap = convertToMap(localClusterState); - final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; - // Check that the non-master node has the same version of the cluster state as the master and - // that the master node matches the master (otherwise there is no requirement for the cluster state to match) - if (masterClusterState.version() == localClusterState.version() && - masterId.equals(localClusterState.nodes().getMasterNodeId())) { - try { - assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); - // We cannot compare serialization bytes since serialization order of maps is not guaranteed - // but we can compare serialization sizes - they should be the same - assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); - // Compare JSON serialization - assertNull("clusterstate JSON serialization does not match", - differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); - } catch (AssertionError error) { - logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", - masterClusterState.toString(), localClusterState.toString()); - throw error; - } - } - } - } - } - protected List generateData(long timestamp, TimeValue bucketSpan, int bucketCount, Function timeToCountFunction) throws IOException { List data = new ArrayList<>(); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java new file mode 100644 index 00000000000..f844d813cb5 --- /dev/null +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -0,0 +1,156 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.integration; + +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.SearchModule; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.SecuritySettingsSourceField; +import org.elasticsearch.transport.Netty4Plugin; +import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.XPackClientPlugin; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; +import org.elasticsearch.xpack.core.security.SecurityField; +import org.elasticsearch.xpack.core.security.authc.TokenMetaData; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.XContentTestUtils.convertToMap; +import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; + +/** + * Base class of ML integration tests that use a native autodetect process + */ +abstract class MlNativeIntegTestCase extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class); + } + + @Override + protected Settings externalClusterClientSettings() { + Path key; + Path certificate; + try { + key = PathUtils.get(getClass().getResource("/testnode.pem").toURI()); + certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI()); + } catch (URISyntaxException e) { + throw new IllegalStateException("error trying to get keystore path", e); + } + Settings.Builder builder = Settings.builder(); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4); + builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING); + builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true); + builder.put("xpack.security.transport.ssl.enabled", true); + builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString()); + builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString()); + builder.put("xpack.security.transport.ssl.key_passphrase", "testnode"); + builder.put("xpack.security.transport.ssl.verification_mode", "certificate"); + return builder.build(); + } + + protected void cleanUp() { + cleanUpResources(); + waitForPendingTasks(); + } + + protected abstract void cleanUpResources(); + + private void waitForPendingTasks() { + ListTasksRequest listTasksRequest = new ListTasksRequest(); + listTasksRequest.setWaitForCompletion(true); + listTasksRequest.setDetailed(true); + listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10)); + try { + admin().cluster().listTasks(listTasksRequest).get(); + } catch (Exception e) { + throw new AssertionError("Failed to wait for pending tasks to complete", e); + } + } + + @Override + protected void ensureClusterStateConsistency() throws IOException { + if (cluster() != null && cluster().size() > 0) { + List entries = new ArrayList<>(ClusterModule.getNamedWriteables()); + entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables()); + entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME, + StartDatafeedAction.DatafeedParams::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME, + OpenJobAction.JobParams::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); + entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new)); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); + // remove local node reference + masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); + Map masterStateMap = convertToMap(masterClusterState); + int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; + String masterId = masterClusterState.nodes().getMasterNodeId(); + for (Client client : cluster().getClients()) { + ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); + byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); + // remove local node reference + localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); + final Map localStateMap = convertToMap(localClusterState); + final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; + // Check that the non-master node has the same version of the cluster state as the master and + // that the master node matches the master (otherwise there is no requirement for the cluster state to match) + if (masterClusterState.version() == localClusterState.version() && + masterId.equals(localClusterState.nodes().getMasterNodeId())) { + try { + assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); + // We cannot compare serialization bytes since serialization order of maps is not guaranteed + // but we can compare serialization sizes - they should be the same + assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); + // Compare JSON serialization + assertNull("clusterstate JSON serialization does not match", + differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); + } catch (AssertionError error) { + logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}", + masterClusterState.toString(), localClusterState.toString()); + throw error; + } + } + } + } + } +}