diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java index 157dffbc58d..efa50ce6cd1 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/action/ScheduledJobsIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.xpack.prelert.action; -import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -42,6 +41,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.xpack.prelert.integration.TooManyJobsIT.ensureClusterStateConsistencyWorkAround; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; @@ -261,4 +261,9 @@ public class ScheduledJobsIT extends ESIntegTestCase { assertTrue(response.isAcknowledged()); } } + + @Override + protected void ensureClusterStateConsistency() throws IOException { + ensureClusterStateConsistencyWorkAround(); + } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java index dc701ef1757..9e1cc07391c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/integration/TooManyJobsIT.java @@ -7,6 +7,11 @@ package org.elasticsearch.xpack.prelert.integration; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; @@ -21,10 +26,18 @@ import org.elasticsearch.xpack.prelert.job.DataDescription; import org.elasticsearch.xpack.prelert.job.Detector; import org.elasticsearch.xpack.prelert.job.Job; import org.elasticsearch.xpack.prelert.job.manager.AutodetectProcessManager; +import org.elasticsearch.xpack.prelert.job.metadata.PrelertMetadata; import org.junit.After; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.XContentTestUtils.convertToMap; +import static org.elasticsearch.test.XContentTestUtils.differenceBetweenMapsIgnoringArrayOrder; @ESIntegTestCase.ClusterScope(numDataNodes = 1) public class TooManyJobsIT extends ESIntegTestCase { @@ -101,4 +114,49 @@ public class TooManyJobsIT extends ESIntegTestCase { return builder; } + @Override + protected void ensureClusterStateConsistency() throws IOException { + ensureClusterStateConsistencyWorkAround(); + } + + // TODO: Fix in ES. In ESIntegTestCase we should get all NamedWriteableRegistry.Entry entries from ESIntegTestCase#nodePlugins() + public static void ensureClusterStateConsistencyWorkAround() throws IOException { + if (cluster() != null && cluster().size() > 0) { + List namedWritables = new ArrayList<>(ClusterModule.getNamedWriteables()); + namedWritables.add(new NamedWriteableRegistry.Entry(MetaData.Custom.class, "prelert", PrelertMetadata::new)); + final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWritables); + ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState(); + byte[] masterClusterStateBytes = ClusterState.Builder.toBytes(masterClusterState); + // remove local node reference + masterClusterState = ClusterState.Builder.fromBytes(masterClusterStateBytes, null, namedWriteableRegistry); + Map masterStateMap = convertToMap(masterClusterState); + int masterClusterStateSize = ClusterState.Builder.toBytes(masterClusterState).length; + String masterId = masterClusterState.nodes().getMasterNodeId(); + for (Client client : cluster().getClients()) { + ClusterState localClusterState = client.admin().cluster().prepareState().all().setLocal(true).get().getState(); + byte[] localClusterStateBytes = ClusterState.Builder.toBytes(localClusterState); + // remove local node reference + localClusterState = ClusterState.Builder.fromBytes(localClusterStateBytes, null, namedWriteableRegistry); + final Map localStateMap = convertToMap(localClusterState); + final int localClusterStateSize = ClusterState.Builder.toBytes(localClusterState).length; + // Check that the non-master node has the same version of the cluster state as the master and + // that the master node matches the master (otherwise there is no requirement for the cluster state to match) + if (masterClusterState.version() == localClusterState.version() && masterId.equals(localClusterState.nodes().getMasterNodeId())) { + try { + assertEquals("clusterstate UUID does not match", masterClusterState.stateUUID(), localClusterState.stateUUID()); + // We cannot compare serialization bytes since serialization order of maps is not guaranteed + // but we can compare serialization sizes - they should be the same + assertEquals("clusterstate size does not match", masterClusterStateSize, localClusterStateSize); + // Compare JSON serialization + assertNull("clusterstate JSON serialization does not match", differenceBetweenMapsIgnoringArrayOrder(masterStateMap, localStateMap)); + } catch (AssertionError error) { + fail("Cluster state from master:\n" + masterClusterState.toString() + "\nLocal cluster state:\n" + localClusterState.toString()); + throw error; + } + } + } + } + + } + }