[ML-DataFrame] Ensure latest index template exists before indexing docs (#46595)

When upgrading data nodes to a newer version before
master nodes there was a risk that a transform running
on an upgraded data node would index a document into
the new transforms internal index before its index
template was created.  This would cause the index to
be created with entirely dynamic mappings.

This change introduces a check before indexing any
internal transforms document to ensure that the required
index template exists and create it if it doesn't.

Backport of #46553
This commit is contained in:
David Roberts 2019-09-11 16:27:26 +01:00 committed by GitHub
parent ccf656a9d0
commit 07a0140260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 215 additions and 7 deletions

View File

@ -7,11 +7,21 @@
package org.elasticsearch.xpack.transform.persistence;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
import org.elasticsearch.xpack.core.transform.DataFrameField;
@ -27,6 +37,8 @@ import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.DATA_FRAME_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.transform.DataFrameField.TRANSFORM_ID;
public final class DataFrameInternalIndex {
@ -131,6 +143,10 @@ public final class DataFrameInternalIndex {
public static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder();
return mappings(builder);
}
public static XContentBuilder mappings(XContentBuilder builder) throws IOException {
builder.startObject();
builder.startObject(MapperService.SINGLE_MAPPING_NAME);
@ -302,6 +318,46 @@ public final class DataFrameInternalIndex {
.endObject();
}
public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(LATEST_INDEX_VERSIONED_NAME);
}
/**
* This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The
* reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad).
*/
public static void installLatestVersionedIndexTemplateIfRequired(ClusterService clusterService, Client client,
ActionListener<Void> listener) {
// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestVersionedIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}
// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(LATEST_INDEX_VERSIONED_NAME)
.patterns(indexTemplateMetaData.patterns())
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), DATA_FRAME_ORIGIN, request,
innerListener, client.admin().indices()::putTemplate);
} catch (IOException e) {
listener.onFailure(e);
}
}
private DataFrameInternalIndex() {
}
}

View File

@ -63,6 +63,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
private final DataFrameTransformsCheckpointService dataFrameTransformsCheckpointService;
private final SchedulerEngine schedulerEngine;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final DataFrameAuditor auditor;
private volatile int numFailureRetries;
@ -81,6 +82,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
this.schedulerEngine = schedulerEngine;
this.auditor = auditor;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.numFailureRetries = DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING, this::setNumFailureRetries);
@ -144,7 +146,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
failure -> logger.error("Failed to start task ["+ transformId +"] in node operation", failure)
);
// <5> load next checkpoint
// <7> load next checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(
nextCheckpoint -> {
@ -171,7 +173,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
);
// <4> load last checkpoint
// <6> load last checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformLastCheckpointListener = ActionListener.wrap(
lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint);
@ -188,7 +190,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
);
// <3> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// <5> Set the previous stats (if they exist), initialize the indexer, start the task (If it is STOPPED)
// Since we don't create the task until `_start` is called, if we see that the task state is stopped, attempt to start
// Schedule execution regardless
ActionListener<Tuple<DataFrameTransformStoredDoc, SeqNoPrimaryTermAndIndex>> transformStatsActionListener = ActionListener.wrap(
@ -230,7 +232,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
);
// <2> set fieldmappings for the indexer, get the previous stats (if they exist)
// <4> set fieldmappings for the indexer, get the previous stats (if they exist)
ActionListener<Map<String, String>> getFieldMappingsListener = ActionListener.wrap(
fieldMappings -> {
indexerBuilder.setFieldMappings(fieldMappings);
@ -244,7 +246,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
}
);
// <1> Validate the transform, assigning it to the indexer, and get the field mappings
// <3> Validate the transform, assigning it to the indexer, and get the field mappings
ActionListener<DataFrameTransformConfig> getTransformConfigListener = ActionListener.wrap(
config -> {
if (config.isValid()) {
@ -261,8 +263,19 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
markAsFailed(buildTask, msg);
}
);
// <0> Get the transform config
transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener);
// <2> Get the transform config
ActionListener<Void> templateCheckListener = ActionListener.wrap(
aVoid -> transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener),
error -> {
String msg = "Failed to create internal index mappings";
logger.error(msg, error);
markAsFailed(buildTask, msg);
}
);
// <1> Check the internal index template is installed
DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener);
}
private static IndexerState currentIndexerState(DataFrameTransformState previousState) {

View File

@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.transform.persistence;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class DataFrameInternalIndexTests extends ESTestCase {
public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(DataFrameInternalIndex.LATEST_INDEX_VERSIONED_NAME, DataFrameInternalIndex.getIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
MetaData.Builder metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();
}
public void testHaveLatestVersionedIndexTemplate() {
assertTrue(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE));
assertFalse(DataFrameInternalIndex.haveLatestVersionedIndexTemplate(ClusterState.EMPTY_STATE));
}
public void testInstallLatestVersionedIndexTemplateIfRequired_GivenNotRequired() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
Client client = mock(Client.class);
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener);
assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}
public void testInstallLatestVersionedIndexTemplateIfRequired_GivenRequired() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(
invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
DataFrameInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.xpack.core.transform.transforms.DataFrameTransform;
import org.elasticsearch.xpack.transform.checkpoint.DataFrameTransformsCheckpointService;
import org.elasticsearch.xpack.transform.notifications.DataFrameAuditor;
import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndex;
import org.elasticsearch.xpack.transform.persistence.DataFrameInternalIndexTests;
import org.elasticsearch.xpack.transform.persistence.DataFrameTransformsConfigManager;
import java.util.ArrayList;
@ -107,6 +108,7 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING));
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(cSettings);
when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client,
transformsConfigManager,
dataFrameTransformsCheckpointService, mock(SchedulerEngine.class),

View File

@ -157,3 +157,26 @@ setup:
data_frame.get_data_frame_transform_stats:
transform_id: "old-simple-transform,mixed-simple-transform"
- match: { count: 0 }
---
"Test index mappings for latest internal index":
- do:
data_frame.put_data_frame_transform:
transform_id: "upgraded-simple-transform"
defer_validation: true
body: >
{
"source": { "index": "dataframe-transform-airline-data" },
"dest": { "index": "upgraded-simple-transform-idx" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }
- do:
indices.get_mapping:
index: .data-frame-internal-2
- match: { \.data-frame-internal-2.mappings.dynamic: "false" }
- match: { \.data-frame-internal-2.mappings.properties.id.type: "keyword" }