From dd5f4ae00eb1f1614ebff1361780559a8394daed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Fri, 12 Jul 2019 16:49:48 +0200 Subject: [PATCH] Update .ml-config mappings before indexing job, datafeed or df analytics config (#44216) (#44273) --- .../persistence/ElasticsearchMappings.java | 12 ++-- .../ElasticsearchMappingsTests.java | 65 +++++++++++++++++++ .../TransportPutDataFrameAnalyticsAction.java | 47 ++++++++++++-- .../ml/action/TransportPutDatafeedAction.java | 41 ++++++++++-- .../xpack/ml/job/JobManager.java | 19 +++++- 5 files changed, 165 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java index 77073a23491..89474179112 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappings.java @@ -18,7 +18,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.CheckedBiFunction; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.Index; @@ -140,9 +140,13 @@ public class ElasticsearchMappings { } public static XContentBuilder configMapping() throws IOException { + return configMapping(SINGLE_MAPPING_NAME); + } + + public static XContentBuilder configMapping(String mappingType) throws IOException { XContentBuilder builder = jsonBuilder(); builder.startObject(); - builder.startObject(SINGLE_MAPPING_NAME); + builder.startObject(mappingType); addMetaInformation(builder); addDefaultMapping(builder); builder.startObject(PROPERTIES); @@ -1146,7 +1150,7 @@ public class ElasticsearchMappings { } public static void addDocMappingIfMissing(String alias, - CheckedBiFunction, XContentBuilder, IOException> mappingSupplier, + CheckedFunction mappingSupplier, Client client, ClusterState state, ActionListener listener) { AliasOrIndex aliasOrIndex = state.metaData().getAliasAndIndexLookup().get(alias); if (aliasOrIndex == null) { @@ -1170,7 +1174,7 @@ public class ElasticsearchMappings { IndexMetaData indexMetaData = state.metaData().index(indicesThatRequireAnUpdate[0]); String mappingType = indexMetaData.mapping().type(); - try (XContentBuilder mapping = mappingSupplier.apply(mappingType, Collections.emptyList())) { + try (XContentBuilder mapping = mappingSupplier.apply(mappingType)) { PutMappingRequest putMappingRequest = new PutMappingRequest(indicesThatRequireAnUpdate); putMappingRequest.type(mappingType); putMappingRequest.source(mapping); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java index ee8d9214859..13ce6f2ab61 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/persistence/ElasticsearchMappingsTests.java @@ -10,6 +10,11 @@ import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonToken; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -17,11 +22,13 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedTimingStats; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -35,6 +42,7 @@ import org.elasticsearch.xpack.core.ml.job.results.AnomalyRecord; import org.elasticsearch.xpack.core.ml.job.results.CategoryDefinition; import org.elasticsearch.xpack.core.ml.job.results.ReservedFieldNames; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.mockito.ArgumentCaptor; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; @@ -48,7 +56,16 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; public class ElasticsearchMappingsTests extends ESTestCase { @@ -207,6 +224,54 @@ public class ElasticsearchMappingsTests extends ESTestCase { ElasticsearchMappings.mappingRequiresUpdate(cs, indices, VersionUtils.getPreviousMinorVersion())); } + public void testAddDocMappingIfMissing() throws IOException { + ThreadPool threadPool = mock(ThreadPool.class); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + Client client = mock(Client.class); + when(client.threadPool()).thenReturn(threadPool); + doAnswer( + invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new AcknowledgedResponse(true)); + return null; + }) + .when(client).execute(eq(PutMappingAction.INSTANCE), any(), any(ActionListener.class)); + + ClusterState clusterState = getClusterStateWithMappingsWithMetaData(Collections.singletonMap("index-name", "0.0")); + ElasticsearchMappings.addDocMappingIfMissing( + "index-name", + ElasticsearchMappingsTests::fakeMapping, + client, + clusterState, + ActionListener.wrap( + ok -> assertTrue(ok), + e -> fail(e.toString()) + ) + ); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PutMappingRequest.class); + verify(client).threadPool(); + verify(client).execute(eq(PutMappingAction.INSTANCE), requestCaptor.capture(), any(ActionListener.class)); + verifyNoMoreInteractions(client); + + PutMappingRequest request = requestCaptor.getValue(); + assertThat(request.type(), equalTo("_doc")); + assertThat(request.indices(), equalTo(new String[] { "index-name" })); + assertThat(request.source(), equalTo("{\"_doc\":{\"properties\":{\"some-field\":{\"type\":\"long\"}}}}")); + } + + private static XContentBuilder fakeMapping(String mappingType) throws IOException { + return jsonBuilder() + .startObject() + .startObject(mappingType) + .startObject(ElasticsearchMappings.PROPERTIES) + .startObject("some-field") + .field(ElasticsearchMappings.TYPE, ElasticsearchMappings.LONG) + .endObject() + .endObject() + .endObject() + .endObject(); + } private ClusterState getClusterStateWithMappingsWithMetaData(Map namesAndVersions) throws IOException { MetaData.Builder metaDataBuilder = MetaData.builder(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java index d8f5dbb469f..0eda67644ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDataFrameAnalyticsAction.java @@ -5,11 +5,15 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -29,6 +33,8 @@ import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlStrings; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -43,12 +49,15 @@ import org.elasticsearch.xpack.ml.dataframe.persistence.DataFrameAnalyticsConfig import java.io.IOException; import java.time.Instant; +import java.util.Map; import java.util.Objects; import java.util.function.Supplier; public class TransportPutDataFrameAnalyticsAction extends HandledTransportAction { + private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class); + private final XPackLicenseState licenseState; private final DataFrameAnalyticsConfigProvider configProvider; private final ThreadPool threadPool; @@ -97,6 +106,7 @@ public class TransportPutDataFrameAnalyticsAction .setCreateTime(Instant.now()) .setVersion(Version.CURRENT) .build(); + if (licenseState.isAuthAllowed()) { final String username = securityContext.getUser().principal(); RoleDescriptor.IndicesPrivileges sourceIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder() @@ -120,9 +130,12 @@ public class TransportPutDataFrameAnalyticsAction client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { - configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), - listener::onFailure + updateDocMappingAndPutConfig( + memoryCappedConfig, + threadPool.getThreadContext().getHeaders(), + ActionListener.wrap( + indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), + listener::onFailure )); } } @@ -131,9 +144,12 @@ public class TransportPutDataFrameAnalyticsAction HasPrivilegesResponse response, ActionListener listener) throws IOException { if (response.isCompleteMatch()) { - configProvider.put(memoryCappedConfig, threadPool.getThreadContext().getHeaders(), ActionListener.wrap( - indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), - listener::onFailure + updateDocMappingAndPutConfig( + memoryCappedConfig, + threadPool.getThreadContext().getHeaders(), + ActionListener.wrap( + indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)), + listener::onFailure )); } else { XContentBuilder builder = JsonXContent.contentBuilder(); @@ -150,6 +166,25 @@ public class TransportPutDataFrameAnalyticsAction } } + private void updateDocMappingAndPutConfig(DataFrameAnalyticsConfig config, + Map headers, + ActionListener listener) { + ClusterState clusterState = clusterService.state(); + if (clusterState == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + configProvider.put(config, headers, listener); + return; + } + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings::configMapping, + client, + clusterState, + ActionListener.wrap( + unused -> configProvider.put(config, headers, listener), + listener::onFailure)); + } + private void validateConfig(DataFrameAnalyticsConfig config) { if (MlStrings.isValidId(config.getId()) == false) { throw ExceptionsHelper.badRequestException(Messages.getMessage(Messages.INVALID_ID, DataFrameAnalyticsConfig.ID, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java index 004fd51398a..85fa136c2b5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportPutDatafeedAction.java @@ -5,6 +5,8 @@ */ package org.elasticsearch.xpack.ml.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; @@ -36,6 +38,8 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.rollup.action.GetRollupIndexCapsAction; import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; @@ -58,6 +62,8 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; public class TransportPutDatafeedAction extends TransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutDatafeedAction.class); + private final XPackLicenseState licenseState; private final Client client; private final SecurityContext securityContext; @@ -111,7 +117,7 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, request, r, listener), + r -> handlePrivsResponse(username, request, r, state, listener), listener::onFailure); ActionListener getRollupIndexCapsActionHandler = ActionListener.wrap( @@ -145,15 +151,17 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction listener) throws IOException { if (response.isCompleteMatch()) { - putDatafeed(request, threadPool.getThreadContext().getHeaders(), listener); + putDatafeed(request, threadPool.getThreadContext().getHeaders(), clusterState, listener); } else { XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject(); @@ -169,7 +177,9 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction headers, + private void putDatafeed(PutDatafeedAction.Request request, + Map headers, + ClusterState clusterState, ActionListener listener) { String datafeedId = request.getDatafeed().getId(); @@ -181,13 +191,30 @@ public class TransportPutDatafeedAction extends TransportMasterNodeAction validationOk = ok -> { - datafeedConfigProvider.putDatafeedConfig(request.getDatafeed(), headers, ActionListener.wrap( + CheckedConsumer mappingsUpdated = ok -> { + datafeedConfigProvider.putDatafeedConfig( + request.getDatafeed(), + headers, + ActionListener.wrap( indexResponse -> listener.onResponse(new PutDatafeedAction.Response(request.getDatafeed())), listener::onFailure )); }; + CheckedConsumer validationOk = ok -> { + if (clusterState == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + mappingsUpdated.accept(false); + return; + } + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), + ElasticsearchMappings::configMapping, + client, + clusterState, + ActionListener.wrap(mappingsUpdated, listener::onFailure)); + }; + CheckedConsumer jobOk = ok -> jobConfigProvider.validateDatafeedJob(request.getDatafeed(), ActionListener.wrap(validationOk, listener::onFailure)); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java index 7c2f15591b9..683fbb7c65c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java @@ -45,6 +45,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; +import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -256,7 +258,7 @@ public class JobManager { ActionListener putJobListener = new ActionListener() { @Override - public void onResponse(Boolean indicesCreated) { + public void onResponse(Boolean mappingsUpdated) { jobConfigProvider.putJob(job, ActionListener.wrap( response -> { @@ -283,10 +285,23 @@ public class JobManager { } }; + ActionListener addDocMappingsListener = ActionListener.wrap( + indicesCreated -> { + if (state == null) { + logger.warn("Cannot update doc mapping because clusterState == null"); + putJobListener.onResponse(false); + return; + } + ElasticsearchMappings.addDocMappingIfMissing( + AnomalyDetectorsIndex.configIndexName(), ElasticsearchMappings::configMapping, client, state, putJobListener); + }, + putJobListener::onFailure + ); + ActionListener> checkForLeftOverDocs = ActionListener.wrap( matchedIds -> { if (matchedIds.isEmpty()) { - jobResultsProvider.createJobResultIndex(job, state, putJobListener); + jobResultsProvider.createJobResultIndex(job, state, addDocMappingsListener); } else { // A job has the same Id as one of the group names // error with the first in the list