From e217f9a1e875d72cb13b91de2c7d06f458be671b Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 7 Jul 2020 10:52:10 +0100 Subject: [PATCH] [ML] Wait for shards to initialize after creating ML internal indices (#59087) There have been a few test failures that are likely caused by tests performing actions that use ML indices immediately after the actions that create those ML indices. Currently this can result in attempts to search the newly created index before its shards have initialized. This change makes the method that creates the internal ML indices that have been affected by this problem (state and stats) wait for the shards to be initialized before returning. Backport of #59027 --- .../xpack/core/ml/utils/MlIndexAndAlias.java | 47 ++++++++++++++++--- .../core/ml/utils/MlIndexAndAliasTests.java | 13 +++++ .../ml/integration/DeleteExpiredDataIT.java | 1 - .../xpack/ml/integration/RegressionIT.java | 1 - 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index ab53e5db8b6..41aeccc5395 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; @@ -19,6 +21,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -71,13 +74,27 @@ public final class MlIndexAndAlias { * Adds an {@code alias} to that index if it was created, * or to the index with the highest suffix if the index did not have to be created. * The listener is notified with a {@code boolean} that informs whether the index or the alias were created. + * If the index is created, the listener is not called until the index is ready to use via the supplied alias, + * so that a method that receives a success response from this method can safely use the index immediately. */ public static void createIndexAndAliasIfNecessary(Client client, ClusterState clusterState, IndexNameExpressionResolver resolver, String indexPatternPrefix, String alias, - ActionListener listener) { + ActionListener finalListener) { + + // If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready + ActionListener indexCreatedListener = ActionListener.wrap( + created -> { + if (created) { + waitForShardsReady(client, alias, finalListener); + } else { + finalListener.onResponse(false); + } + }, + finalListener::onFailure + ); boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION); @@ -93,7 +110,7 @@ public final class MlIndexAndAlias { if (concreteIndexNames.length == 0) { if (indexPointedByCurrentWriteAlias.isPresent() == false) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, indexCreatedListener); return; } logger.error( @@ -101,7 +118,7 @@ public final class MlIndexAndAlias { indexPattern, alias, indexPointedByCurrentWriteAlias.get()); } else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) { if (indexPointedByCurrentWriteAlias.isPresent() == false) { - createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, listener); + createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, indexCreatedListener); return; } if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) { @@ -113,8 +130,8 @@ public final class MlIndexAndAlias { isHiddenAttributeAvailable, ActionListener.wrap( unused -> updateWriteAlias( - client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, listener), - listener::onFailure) + client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, indexCreatedListener), + finalListener::onFailure) ); return; } @@ -125,12 +142,28 @@ public final class MlIndexAndAlias { if (indexPointedByCurrentWriteAlias.isPresent() == false) { assert concreteIndexNames.length > 0; String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); - updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, listener); + updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, finalListener); return; } } // If the alias is set, there is nothing more to do. - listener.onResponse(false); + finalListener.onResponse(false); + } + + private static void waitForShardsReady(Client client, String index, ActionListener listener) { + ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(index) + .waitForYellowStatus() + .waitForNoRelocatingShards(true) + .waitForNoInitializingShards(true); + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + ML_ORIGIN, + healthRequest, + ActionListener.wrap( + response -> listener.onResponse(response.isTimedOut() == false), + listener::onFailure), + client.admin().cluster()::health + ); } private static void createFirstConcreteIndex(Client client, diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index 0cd0a76440e..c85d2964da2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.ml.utils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; @@ -19,6 +21,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ClusterAdminClient; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -71,6 +74,7 @@ public class MlIndexAndAliasTests extends ESTestCase { private ThreadPool threadPool; private IndicesAdminClient indicesAdminClient; + private ClusterAdminClient clusterAdminClient; private AdminClient adminClient; private Client client; private ActionListener listener; @@ -90,8 +94,17 @@ public class MlIndexAndAliasTests extends ESTestCase { when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE)); doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any()); + clusterAdminClient = mock(ClusterAdminClient.class); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new ClusterHealthResponse()); + return null; + }).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class)); + adminClient = mock(AdminClient.class); when(adminClient.indices()).thenReturn(indicesAdminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 9307955afd8..ee895121e83 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -100,7 +100,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { testExpiredDeletion(null, 10010); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java index 852aee7f97d..d5f49d260a3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/RegressionIT.java @@ -299,7 +299,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase { assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807") public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source"; indexData(sourceIndex, 100, 0);