From 07a014026030d5fb0f6062448cdea516cdb60781 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 11 Sep 2019 16:27:26 +0100 Subject: [PATCH] [ML-DataFrame] Ensure latest index template exists before indexing docs (#46595) When upgrading data nodes to a newer version before master nodes there was a risk that a transform running on an upgraded data node would index a document into the new transforms internal index before its index template was created. This would cause the index to be created with entirely dynamic mappings. This change introduces a check before indexing any internal transforms document to ensure that the required index template exists and create it if it doesn't. Backport of #46553 --- .../persistence/DataFrameInternalIndex.java | 56 +++++++++ ...FrameTransformPersistentTasksExecutor.java | 27 +++-- .../DataFrameInternalIndexTests.java | 114 ++++++++++++++++++ ...TransformPersistentTasksExecutorTests.java | 2 + .../80_data_frame_jobs_crud.yml | 23 ++++ 5 files changed, 215 insertions(+), 7 deletions(-) create mode 100644 x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndexTests.java diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java index c410f2f0c70..0b064a56103 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndex.java @@ -7,11 +7,21 @@ package org.elasticsearch.xpack.transform.persistence; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.transform.DataFrameField; @@ -27,6 +37,8 @@ import java.util.Collections; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.transform.DataFrameField.TRANSFORM_ID; public final class DataFrameInternalIndex { @@ -131,6 +143,10 @@ public final class DataFrameInternalIndex { public static XContentBuilder mappings() throws IOException { XContentBuilder builder = jsonBuilder(); + return mappings(builder); + } + + public static XContentBuilder mappings(XContentBuilder builder) throws IOException { builder.startObject(); builder.startObject(MapperService.SINGLE_MAPPING_NAME); @@ -302,6 +318,46 @@ public final class DataFrameInternalIndex { .endObject(); } + public static boolean haveLatestVersionedIndexTemplate(ClusterState state) { + return state.getMetaData().getTemplates().containsKey(LATEST_INDEX_VERSIONED_NAME); + } + + /** + * This method should be called before any document is indexed that relies on the + * existence of the latest index template to create the internal index. The + * reason is that the standard template upgrader only runs when the master node + * is upgraded to the newer version. If data nodes are upgraded before master + * nodes and transforms get assigned to those data nodes then without this check + * the data nodes will index documents into the internal index before the necessary + * index template is present and this will result in an index with completely + * dynamic mappings being created (which is very bad). + */ + public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client, + ActionListener listener) { + + // The check for existence of the template is against local cluster state, so very cheap + if (haveLatestVersionedIndexTemplate(clusterService.state())) { + listener.onResponse(null); + return; + } + + // Installing the template involves communication with the master node, so it's more expensive but much rarer + try { + IndexTemplateMetaData indexTemplateMetaData = getIndexTemplateMetaData(); + BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed()); + PutIndexTemplateRequest request = new PutIndexTemplateRequest(LATEST_INDEX_VERSIONED_NAME) + .patterns(indexTemplateMetaData.patterns()) + .version(indexTemplateMetaData.version()) + .settings(indexTemplateMetaData.settings()) + .mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2()); + ActionListener innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request, + innerListener, client.admin().indices()::putTemplate); + } catch (IOException e) { + listener.onFailure(e); + } + } + private DataFrameInternalIndex() { } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java index 4b43fea7bdc..a4c6fcc451e 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -63,6 +63,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService; private final SchedulerEngine schedulerEngine; private final ThreadPool threadPool; + private final ClusterService clusterService; private final DataFrameAuditor auditor; private volatile int numFailureRetries; @@ -81,6 +82,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx this.schedulerEngine = schedulerEngine; this.auditor = auditor; this.threadPool = threadPool; + this.clusterService = clusterService; this.numFailureRetries = DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING.get(settings); clusterService.getClusterSettings() .addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries); @@ -144,7 +146,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure) ); - // <5> load next checkpoint + // <7> load next checkpoint ActionListener getTransformNextCheckpointListener = ActionListener.wrap( nextCheckpoint -> { @@ -171,7 +173,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx } ); - // <4> load last checkpoint + // <6> load last checkpoint ActionListener getTransformLastCheckpointListener = ActionListener.wrap( lastCheckpoint -> { indexerBuilder.setLastCheckpoint(lastCheckpoint); @@ -188,7 +190,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx } ); - // <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED) + // <5> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED) // Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start // Schedule execution regardless ActionListener> transformStatsActionListener = ActionListener.wrap( @@ -230,7 +232,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx } ); - // <2> set fieldmappings for the indexer, get the previous stats (if they exist) + // <4> set fieldmappings for the indexer, get the previous stats (if they exist) ActionListener> getFieldMappingsListener = ActionListener.wrap( fieldMappings -> { indexerBuilder.setFieldMappings(fieldMappings); @@ -244,7 +246,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx } ); - // <1> Validate the transform, assigning it to the indexer, and get the field mappings + // <3> Validate the transform, assigning it to the indexer, and get the field mappings ActionListener getTransformConfigListener = ActionListener.wrap( config -> { if (config.isValid()) { @@ -261,8 +263,19 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx markAsFailed(buildTask, msg); } ); - // <0> Get the transform config - transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener); + + // <2> Get the transform config + ActionListener templateCheckListener = ActionListener.wrap( + aVoid -> transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener), + error -> { + String msg = "Failed to create internal index mappings"; + logger.error(msg, error); + markAsFailed(buildTask, msg); + } + ); + + // <1> Check the internal index template is installed + DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener); } private static IndexerState currentIndexerState(DataFrameTransformState previousState) { diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndexTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndexTests.java new file mode 100644 index 00000000000..e61dbffa08a --- /dev/null +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/persistence/DataFrameInternalIndexTests.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.transform.persistence; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class DataFrameInternalIndexTests extends ESTestCase { + + public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE; + + static { + ImmutableOpenMap.Builder mapBuilder = ImmutableOpenMap.builder(); + try { + mapBuilder.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + MetaData.Builder metaBuilder = MetaData.builder(); + metaBuilder.templates(mapBuilder.build()); + ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT); + csBuilder.metaData(metaBuilder.build()); + STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build(); + } + + public void testHaveLatestVersionedIndexTemplate() { + + assertTrue(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE)); + assertFalse(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(ClusterState.EMPTY_STATE)); + } + + public void testInstallLatestVersionedIndexTemplateIfRequired_GivenNotRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE); + + Client client = mock(Client.class); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verifyNoMoreInteractions(client); + } + + public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() { + + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); + + IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); + doAnswer( + invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesClient).putTemplate(any(), any()); + + AdminClient adminClient = mock(AdminClient.class); + when(adminClient.indices()).thenReturn(indicesClient); + Client client = mock(Client.class); + when(client.admin()).thenReturn(adminClient); + + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.threadPool()).thenReturn(threadPool); + + AtomicBoolean gotResponse = new AtomicBoolean(false); + ActionListener testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage())); + + DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener); + + assertTrue(gotResponse.get()); + verify(client, times(1)).threadPool(); + verify(client, times(1)).admin(); + verifyNoMoreInteractions(client); + verify(adminClient, times(1)).indices(); + verifyNoMoreInteractions(adminClient); + verify(indicesClient, times(1)).putTemplate(any(), any()); + verifyNoMoreInteractions(indicesClient); + } +} diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutorTests.java index c2d87c90ef0..f4855125889 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.transform.transforms.DataFrameTransform; import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService; import org.elasticsearch.xpack.transform.notifications.DataFrameAuditor; import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndex; +import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndexTests; import org.elasticsearch.xpack.transform.persistence.DataFrameTransformsConfigManager; import java.util.ArrayList; @@ -107,6 +108,7 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING)); ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn(cSettings); + when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE); DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client, transformsConfigManager, dataFrameTransformsCheckpointService, mock(SchedulerEngine.class), diff --git a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml index 36df712fc35..00dc1a3bb20 100644 --- a/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml +++ b/x-pack/qa/rolling-upgrade/src/test/resources/rest-api-spec/test/upgraded_cluster/80_data_frame_jobs_crud.yml @@ -157,3 +157,26 @@ setup: data_frame.get_data_frame_transform_stats: transform_id: "old-simple-transform,mixed-simple-transform" - match: { count: 0 } + +--- +"Test index mappings for latest internal index": + - do: + data_frame.put_data_frame_transform: + transform_id: "upgraded-simple-transform" + defer_validation: true + body: > + { + "source": { "index": "dataframe-transform-airline-data" }, + "dest": { "index": "upgraded-simple-transform-idx" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } + + - do: + indices.get_mapping: + index: .data-frame-internal-2 + - match: { \.data-frame-internal-2.mappings.dynamic: "false" } + - match: { \.data-frame-internal-2.mappings.properties.id.type: "keyword" }