[ML] Wait for shards to initialize after creating ML internal indices (#59087)

There have been a few test failures that are likely caused by tests
performing actions that use ML indices immediately after the actions
that create those ML indices.  Currently this can result in attempts
to search the newly created index before its shards have initialized.

This change makes the method that creates the internal ML indices
that have been affected by this problem (state and stats) wait for
the shards to be initialized before returning.

Backport of #59027
This commit is contained in:
David Roberts 2020-07-07 10:52:10 +01:00 committed by GitHub
parent 7c8d644bbc
commit e217f9a1e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 9 deletions

View File

@ -10,6 +10,8 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
@ -19,6 +21,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -71,13 +74,27 @@ public final class MlIndexAndAlias {
* Adds an {@code alias} to that index if it was created, * Adds an {@code alias} to that index if it was created,
* or to the index with the highest suffix if the index did not have to be created. * or to the index with the highest suffix if the index did not have to be created.
* The listener is notified with a {@code boolean} that informs whether the index or the alias were created. * The listener is notified with a {@code boolean} that informs whether the index or the alias were created.
* If the index is created, the listener is not called until the index is ready to use via the supplied alias,
* so that a method that receives a success response from this method can safely use the index immediately.
*/ */
public static void createIndexAndAliasIfNecessary(Client client, public static void createIndexAndAliasIfNecessary(Client client,
ClusterState clusterState, ClusterState clusterState,
IndexNameExpressionResolver resolver, IndexNameExpressionResolver resolver,
String indexPatternPrefix, String indexPatternPrefix,
String alias, String alias,
ActionListener<Boolean> listener) { ActionListener<Boolean> finalListener) {
// If both the index and alias were successfully created then wait for the shards of the index that the alias points to be ready
ActionListener<Boolean> indexCreatedListener = ActionListener.wrap(
created -> {
if (created) {
waitForShardsReady(client, alias, finalListener);
} else {
finalListener.onResponse(false);
}
},
finalListener::onFailure
);
boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION); boolean isHiddenAttributeAvailable = clusterState.nodes().getMinNodeVersion().onOrAfter(HIDDEN_INTRODUCED_VERSION);
@ -93,7 +110,7 @@ public final class MlIndexAndAlias {
if (concreteIndexNames.length == 0) { if (concreteIndexNames.length == 0) {
if (indexPointedByCurrentWriteAlias.isPresent() == false) { if (indexPointedByCurrentWriteAlias.isPresent() == false) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, listener); createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, indexCreatedListener);
return; return;
} }
logger.error( logger.error(
@ -101,7 +118,7 @@ public final class MlIndexAndAlias {
indexPattern, alias, indexPointedByCurrentWriteAlias.get()); indexPattern, alias, indexPointedByCurrentWriteAlias.get());
} else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) { } else if (concreteIndexNames.length == 1 && concreteIndexNames[0].equals(legacyIndexWithoutSuffix)) {
if (indexPointedByCurrentWriteAlias.isPresent() == false) { if (indexPointedByCurrentWriteAlias.isPresent() == false) {
createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, listener); createFirstConcreteIndex(client, firstConcreteIndex, alias, true, isHiddenAttributeAvailable, indexCreatedListener);
return; return;
} }
if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) { if (indexPointedByCurrentWriteAlias.get().getIndex().getName().equals(legacyIndexWithoutSuffix)) {
@ -113,8 +130,8 @@ public final class MlIndexAndAlias {
isHiddenAttributeAvailable, isHiddenAttributeAvailable,
ActionListener.wrap( ActionListener.wrap(
unused -> updateWriteAlias( unused -> updateWriteAlias(
client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, listener), client, alias, legacyIndexWithoutSuffix, firstConcreteIndex, isHiddenAttributeAvailable, indexCreatedListener),
listener::onFailure) finalListener::onFailure)
); );
return; return;
} }
@ -125,12 +142,28 @@ public final class MlIndexAndAlias {
if (indexPointedByCurrentWriteAlias.isPresent() == false) { if (indexPointedByCurrentWriteAlias.isPresent() == false) {
assert concreteIndexNames.length > 0; assert concreteIndexNames.length > 0;
String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get(); String latestConcreteIndexName = Arrays.stream(concreteIndexNames).max(INDEX_NAME_COMPARATOR).get();
updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, listener); updateWriteAlias(client, alias, null, latestConcreteIndexName, isHiddenAttributeAvailable, finalListener);
return; return;
} }
} }
// If the alias is set, there is nothing more to do. // If the alias is set, there is nothing more to do.
listener.onResponse(false); finalListener.onResponse(false);
}
private static void waitForShardsReady(Client client, String index, ActionListener<Boolean> listener) {
ClusterHealthRequest healthRequest = Requests.clusterHealthRequest(index)
.waitForYellowStatus()
.waitForNoRelocatingShards(true)
.waitForNoInitializingShards(true);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ML_ORIGIN,
healthRequest,
ActionListener.<ClusterHealthResponse>wrap(
response -> listener.onResponse(response.isTimedOut() == false),
listener::onFailure),
client.admin().cluster()::health
);
} }
private static void createFirstConcreteIndex(Client client, private static void createFirstConcreteIndex(Client client,

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
@ -19,6 +21,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -71,6 +74,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
private IndicesAdminClient indicesAdminClient; private IndicesAdminClient indicesAdminClient;
private ClusterAdminClient clusterAdminClient;
private AdminClient adminClient; private AdminClient adminClient;
private Client client; private Client client;
private ActionListener<Boolean> listener; private ActionListener<Boolean> listener;
@ -90,8 +94,17 @@ public class MlIndexAndAliasTests extends ESTestCase {
when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE)); when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE));
doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any()); doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any());
clusterAdminClient = mock(ClusterAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new ClusterHealthResponse());
return null;
}).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class));
adminClient = mock(AdminClient.class); adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient); when(adminClient.indices()).thenReturn(indicesAdminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
client = mock(Client.class); client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool); when(client.threadPool()).thenReturn(threadPool);

View File

@ -100,7 +100,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
testExpiredDeletion(null, 10010); testExpiredDeletion(null, 10010);
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/57102")
public void testDeleteExpiredDataWithStandardThrottle() throws Exception { public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
testExpiredDeletion(-1.0f, 100); testExpiredDeletion(-1.0f, 100);
} }

View File

@ -299,7 +299,6 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
assertMlResultsFieldMappings(destIndex, predictedClassField, "double"); assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55807")
public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception { public void testTwoJobsWithSameRandomizeSeedUseSameTrainingSet() throws Exception {
String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source"; String sourceIndex = "regression_two_jobs_with_same_randomize_seed_source";
indexData(sourceIndex, 100, 0); indexData(sourceIndex, 100, 0);