This commit is contained in:
parent
7456117019
commit
21f76aba28
|
@ -5,19 +5,10 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.integration;
|
package org.elasticsearch.xpack.ml.integration;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.client.Client;
|
|
||||||
import org.elasticsearch.cluster.ClusterModule;
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.io.PathUtils;
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|
||||||
import org.elasticsearch.common.network.NetworkModule;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
@ -26,23 +17,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.index.reindex.ReindexPlugin;
|
|
||||||
import org.elasticsearch.persistent.PersistentTaskParams;
|
|
||||||
import org.elasticsearch.persistent.PersistentTaskState;
|
|
||||||
import org.elasticsearch.plugins.Plugin;
|
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.SearchHits;
|
||||||
import org.elasticsearch.search.SearchModule;
|
|
||||||
import org.elasticsearch.search.sort.SortBuilders;
|
import org.elasticsearch.search.sort.SortBuilders;
|
||||||
import org.elasticsearch.search.sort.SortOrder;
|
import org.elasticsearch.search.sort.SortOrder;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
|
||||||
import org.elasticsearch.test.SecuritySettingsSourceField;
|
|
||||||
import org.elasticsearch.transport.Netty4Plugin;
|
|
||||||
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
|
|
||||||
import org.elasticsearch.xpack.core.XPackClientPlugin;
|
|
||||||
import org.elasticsearch.xpack.core.XPackSettings;
|
|
||||||
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
|
||||||
import org.elasticsearch.xpack.core.ml.MlTasks;
|
|
||||||
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
|
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
|
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
|
||||||
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
|
||||||
|
@ -70,10 +48,8 @@ import org.elasticsearch.xpack.core.ml.action.util.PageParams;
|
||||||
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
|
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
|
||||||
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
|
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
|
||||||
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
|
||||||
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
|
||||||
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex;
|
||||||
|
@ -85,24 +61,15 @@ import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition;
|
||||||
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
|
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
|
||||||
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
|
import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats;
|
||||||
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
import org.elasticsearch.xpack.core.ml.job.results.Result;
|
||||||
import org.elasticsearch.xpack.core.security.SecurityField;
|
|
||||||
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
|
|
||||||
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
|
@ -110,46 +77,15 @@ import static org.hamcrest.Matchers.notNullValue;
|
||||||
/**
|
/**
|
||||||
* Base class of ML integration tests that use a native autodetect process
|
* Base class of ML integration tests that use a native autodetect process
|
||||||
*/
|
*/
|
||||||
abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
|
abstract class MlNativeAutodetectIntegTestCase extends MlNativeIntegTestCase {
|
||||||
|
|
||||||
private List<Job.Builder> jobs = new ArrayList<>();
|
private List<Job.Builder> jobs = new ArrayList<>();
|
||||||
private List<DatafeedConfig> datafeeds = new ArrayList<>();
|
private List<DatafeedConfig> datafeeds = new ArrayList<>();
|
||||||
@Override
|
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
|
||||||
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
protected void cleanUpResources() {
|
||||||
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Settings externalClusterClientSettings() {
|
|
||||||
Path key;
|
|
||||||
Path certificate;
|
|
||||||
try {
|
|
||||||
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
|
|
||||||
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
|
|
||||||
} catch (URISyntaxException e) {
|
|
||||||
throw new IllegalStateException("error trying to get keystore path", e);
|
|
||||||
}
|
|
||||||
Settings.Builder builder = Settings.builder();
|
|
||||||
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
|
|
||||||
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
|
|
||||||
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
|
|
||||||
builder.put("xpack.security.transport.ssl.enabled", true);
|
|
||||||
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
|
|
||||||
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
|
|
||||||
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
|
|
||||||
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void cleanUp() {
|
|
||||||
cleanUpDatafeeds();
|
cleanUpDatafeeds();
|
||||||
cleanUpJobs();
|
cleanUpJobs();
|
||||||
waitForPendingTasks();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void cleanUpDatafeeds() {
|
private void cleanUpDatafeeds() {
|
||||||
|
@ -182,18 +118,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForPendingTasks() {
|
|
||||||
ListTasksRequest listTasksRequest = new ListTasksRequest();
|
|
||||||
listTasksRequest.setWaitForCompletion(true);
|
|
||||||
listTasksRequest.setDetailed(true);
|
|
||||||
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
|
|
||||||
try {
|
|
||||||
admin().cluster().listTasks(listTasksRequest).get();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new AssertionError("Failed to wait for pending tasks to complete", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void registerJob(Job.Builder job) {
|
protected void registerJob(Job.Builder job) {
|
||||||
if (jobs.add(job) == false) {
|
if (jobs.add(job) == false) {
|
||||||
throw new IllegalArgumentException("job [" + job.getId() + "] is already registered");
|
throw new IllegalArgumentException("job [" + job.getId() + "] is already registered");
|
||||||
|
@ -441,56 +365,6 @@ abstract class MlNativeAutodetectIntegTestCase extends ESIntegTestCase {
|
||||||
return client().execute(PersistJobAction.INSTANCE, request).actionGet();
|
return client().execute(PersistJobAction.INSTANCE, request).actionGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void ensureClusterStateConsistency() throws IOException {
|
|
||||||
if (cluster() != null && cluster().size() > 0) {
|
|
||||||
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
|
|
||||||
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
|
|
||||||
StartDatafeedAction.DatafeedParams::new));
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
|
|
||||||
OpenJobAction.JobParams::new));
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
|
|
||||||
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
|
|
||||||
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
|
|
||||||
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
|
|
||||||
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
|
|
||||||
// remove local node reference
|
|
||||||
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
|
|
||||||
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
|
|
||||||
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
|
|
||||||
String masterId = masterClusterState.nodes().getMasterNodeId();
|
|
||||||
for (Client client : cluster().getClients()) {
|
|
||||||
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
|
|
||||||
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
|
|
||||||
// remove local node reference
|
|
||||||
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
|
|
||||||
final Map<String, Object> localStateMap = convertToMap(localClusterState);
|
|
||||||
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
|
|
||||||
// Check that the non-master node has the same version of the cluster state as the master and
|
|
||||||
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
|
|
||||||
if (masterClusterState.version() == localClusterState.version() &&
|
|
||||||
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
|
|
||||||
try {
|
|
||||||
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
|
|
||||||
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
|
|
||||||
// but we can compare serialization sizes - they should be the same
|
|
||||||
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
|
|
||||||
// Compare JSON serialization
|
|
||||||
assertNull("clusterstate JSON serialization does not match",
|
|
||||||
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
|
|
||||||
} catch (AssertionError error) {
|
|
||||||
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
|
|
||||||
masterClusterState.toString(), localClusterState.toString());
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
|
protected List<String> generateData(long timestamp, TimeValue bucketSpan, int bucketCount,
|
||||||
Function<Integer, Integer> timeToCountFunction) throws IOException {
|
Function<Integer, Integer> timeToCountFunction) throws IOException {
|
||||||
List<String> data = new ArrayList<>();
|
List<String> data = new ArrayList<>();
|
||||||
|
|
|
@ -0,0 +1,156 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License;
|
||||||
|
* you may not use this file except in compliance with the Elastic License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.xpack.ml.integration;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterModule;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.common.io.PathUtils;
|
||||||
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
|
import org.elasticsearch.common.network.NetworkModule;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.index.reindex.ReindexPlugin;
|
||||||
|
import org.elasticsearch.persistent.PersistentTaskParams;
|
||||||
|
import org.elasticsearch.persistent.PersistentTaskState;
|
||||||
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
import org.elasticsearch.search.SearchModule;
|
||||||
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
import org.elasticsearch.test.SecuritySettingsSourceField;
|
||||||
|
import org.elasticsearch.transport.Netty4Plugin;
|
||||||
|
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
|
||||||
|
import org.elasticsearch.xpack.core.XPackClientPlugin;
|
||||||
|
import org.elasticsearch.xpack.core.XPackSettings;
|
||||||
|
import org.elasticsearch.xpack.core.ml.MlMetadata;
|
||||||
|
import org.elasticsearch.xpack.core.ml.MlTasks;
|
||||||
|
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
||||||
|
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
|
||||||
|
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
||||||
|
import org.elasticsearch.xpack.core.security.SecurityField;
|
||||||
|
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.elasticsearch.test.XContentTestUtils.convertToMap;
|
||||||
|
import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class of ML integration tests that use a native autodetect process
|
||||||
|
*/
|
||||||
|
abstract class MlNativeIntegTestCase extends ESIntegTestCase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
|
return Arrays.asList(LocalStateCompositeXPackPlugin.class, Netty4Plugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
|
||||||
|
return Arrays.asList(XPackClientPlugin.class, Netty4Plugin.class, ReindexPlugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Settings externalClusterClientSettings() {
|
||||||
|
Path key;
|
||||||
|
Path certificate;
|
||||||
|
try {
|
||||||
|
key = PathUtils.get(getClass().getResource("/testnode.pem").toURI());
|
||||||
|
certificate = PathUtils.get(getClass().getResource("/testnode.crt").toURI());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IllegalStateException("error trying to get keystore path", e);
|
||||||
|
}
|
||||||
|
Settings.Builder builder = Settings.builder();
|
||||||
|
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NAME4);
|
||||||
|
builder.put(SecurityField.USER_SETTING.getKey(), "x_pack_rest_user:" + SecuritySettingsSourceField.TEST_PASSWORD_SECURE_STRING);
|
||||||
|
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), true);
|
||||||
|
builder.put("xpack.security.transport.ssl.enabled", true);
|
||||||
|
builder.put("xpack.security.transport.ssl.key", key.toAbsolutePath().toString());
|
||||||
|
builder.put("xpack.security.transport.ssl.certificate", certificate.toAbsolutePath().toString());
|
||||||
|
builder.put("xpack.security.transport.ssl.key_passphrase", "testnode");
|
||||||
|
builder.put("xpack.security.transport.ssl.verification_mode", "certificate");
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void cleanUp() {
|
||||||
|
cleanUpResources();
|
||||||
|
waitForPendingTasks();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void cleanUpResources();
|
||||||
|
|
||||||
|
private void waitForPendingTasks() {
|
||||||
|
ListTasksRequest listTasksRequest = new ListTasksRequest();
|
||||||
|
listTasksRequest.setWaitForCompletion(true);
|
||||||
|
listTasksRequest.setDetailed(true);
|
||||||
|
listTasksRequest.setTimeout(TimeValue.timeValueSeconds(10));
|
||||||
|
try {
|
||||||
|
admin().cluster().listTasks(listTasksRequest).get();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new AssertionError("Failed to wait for pending tasks to complete", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void ensureClusterStateConsistency() throws IOException {
|
||||||
|
if (cluster() != null && cluster().size() > 0) {
|
||||||
|
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedWriteables());
|
||||||
|
entries.addAll(new SearchModule(Settings.EMPTY, true, Collections.emptyList()).getNamedWriteables());
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new));
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
|
||||||
|
StartDatafeedAction.DatafeedParams::new));
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
|
||||||
|
OpenJobAction.JobParams::new));
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new));
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream));
|
||||||
|
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new));
|
||||||
|
final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
|
||||||
|
ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();
|
||||||
|
byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState);
|
||||||
|
// remove local node reference
|
||||||
|
masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry);
|
||||||
|
Map<String, Object> masterStateMap = convertToMap(masterClusterState);
|
||||||
|
int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length;
|
||||||
|
String masterId = masterClusterState.nodes().getMasterNodeId();
|
||||||
|
for (Client client : cluster().getClients()) {
|
||||||
|
ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState();
|
||||||
|
byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState);
|
||||||
|
// remove local node reference
|
||||||
|
localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry);
|
||||||
|
final Map<String, Object> localStateMap = convertToMap(localClusterState);
|
||||||
|
final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length;
|
||||||
|
// Check that the non-master node has the same version of the cluster state as the master and
|
||||||
|
// that the master node matches the master (otherwise there is no requirement for the cluster state to match)
|
||||||
|
if (masterClusterState.version() == localClusterState.version() &&
|
||||||
|
masterId.equals(localClusterState.nodes().getMasterNodeId())) {
|
||||||
|
try {
|
||||||
|
assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID());
|
||||||
|
// We cannot compare serialization bytes since serialization order of maps is not guaranteed
|
||||||
|
// but we can compare serialization sizes - they should be the same
|
||||||
|
assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize);
|
||||||
|
// Compare JSON serialization
|
||||||
|
assertNull("clusterstate JSON serialization does not match",
|
||||||
|
differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap));
|
||||||
|
} catch (AssertionError error) {
|
||||||
|
logger.error("Cluster state from master:\n{}\nLocal cluster state:\n{}",
|
||||||
|
masterClusterState.toString(), localClusterState.toString());
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue