diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java index 41aeccc5395..2024c5d2938 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAlias.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -26,6 +27,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import java.util.Arrays; import java.util.Comparator; @@ -228,4 +232,52 @@ public final class MlIndexAndAlias { listener::onFailure), client.admin().indices()::aliases); } + + /** + * Installs the index template specified by {@code templateConfig} if it is not in already + * installed in {@code clusterState}. + * + * The check for presence is simple and will return the listener on + * the calling thread if successful. If the template has to be installed + * an async call will be made. + * + * @param clusterState The cluster state + * @param client For putting the template + * @param templateConfig The config + * @param listener Async listener + */ + public static void installIndexTemplateIfRequired( + ClusterState clusterState, + Client client, + IndexTemplateConfig templateConfig, + ActionListener listener + ) { + String templateName = templateConfig.getTemplateName(); + + // The check for existence of the template is against the cluster state, so very cheap + if (hasIndexTemplate(clusterState, templateName)) { + listener.onResponse(true); + return; + } + + PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName) + .source(templateConfig.loadBytes(), XContentType.JSON); + request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); + + ActionListener innerListener = ActionListener.wrap( + response -> { + if (response.isAcknowledged() == false) { + logger.warn("error adding legacy template [{}], request was not acknowledged", templateName); + } + listener.onResponse(response.isAcknowledged()); + }, + listener::onFailure); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, innerListener, + client.admin().indices()::putTemplate); + } + + private static boolean hasIndexTemplate(ClusterState state, String templateName) { + return state.getMetadata().getTemplates().containsKey(templateName); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java index c85d2964da2..acdfc34be8e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/utils/MlIndexAndAliasTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -37,6 +38,8 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.inference.persistence.InferenceIndexConstants; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import org.junit.After; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -93,6 +96,7 @@ public class MlIndexAndAliasTests extends ESTestCase { doAnswer(withResponse(new CreateIndexResponse(true, true, FIRST_CONCRETE_INDEX))).when(indicesAdminClient).create(any(), any()); when(indicesAdminClient.prepareAliases()).thenReturn(new IndicesAliasesRequestBuilder(client, IndicesAliasesAction.INSTANCE)); doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).aliases(any(), any()); + doAnswer(withResponse(new AcknowledgedResponse(true))).when(indicesAdminClient).putTemplate(any(), any()); clusterAdminClient = mock(ClusterAdminClient.class); doAnswer(invocationOnMock -> { @@ -121,6 +125,34 @@ public class MlIndexAndAliasTests extends ESTestCase { verifyNoMoreInteractions(indicesAdminClient, listener); } + public void testInstallIndexTemplateIfRequired_GivenTemplateExists() { + ClusterState clusterState = createClusterState(Collections.emptyMap(), + Collections.singletonMap(InferenceIndexConstants.LATEST_INDEX_NAME, + createIndexTemplateMetaData(InferenceIndexConstants.LATEST_INDEX_NAME, + Collections.singletonList(InferenceIndexConstants.LATEST_INDEX_NAME)))); + + IndexTemplateConfig inferenceTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME, + "not_a_real_file.json", Version.CURRENT.id, "xpack.ml.version", + Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id))); + + MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceTemplate, listener); + verify(listener).onResponse(true); + verifyNoMoreInteractions(client); + } + + public void testInstallIndexTemplateIfRequired() { + ClusterState clusterState = createClusterState(Collections.emptyMap()); + + IndexTemplateConfig inferenceTemplate = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME, + "/org/elasticsearch/xpack/core/ml/inference_index_template.json", Version.CURRENT.id, "xpack.ml.version", + Collections.singletonMap("xpack.ml.version.id", String.valueOf(Version.CURRENT.id))); + + MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceTemplate, listener); + InOrder inOrder = inOrder(indicesAdminClient, listener); + inOrder.verify(indicesAdminClient).putTemplate(any(), any()); + inOrder.verify(listener).onResponse(true); + } + public void testCreateStateIndexAndAliasIfNecessary_CleanState() { ClusterState clusterState = createClusterState(Collections.emptyMap()); createIndexAndAliasIfNecessary(clusterState); @@ -266,9 +298,15 @@ public class MlIndexAndAliasTests extends ESTestCase { } private static ClusterState createClusterState(Map indices) { + return createClusterState(indices, Collections.emptyMap()); + } + + private static ClusterState createClusterState(Map indices, Map templates) { return ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder() - .indices(ImmutableOpenMap.builder().putAll(indices).build()).build()) + .indices(ImmutableOpenMap.builder().putAll(indices).build()) + .templates(ImmutableOpenMap.builder().putAll(templates).build()) + .build()) .nodes(DiscoveryNodes.builder() .add(new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), HIDDEN_INTRODUCED_VERSION))) .build(); @@ -282,6 +320,10 @@ public class MlIndexAndAliasTests extends ESTestCase { return createIndexMetadata(indexName, true); } + private static IndexTemplateMetadata createIndexTemplateMetaData(String templateName, List patterns) { + return IndexTemplateMetadata.builder(templateName).patterns(patterns).build(); + } + private static IndexMetadata createIndexMetadata(String indexName, boolean withAlias) { Settings settings = Settings.builder() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 2a9ca0e1552..430457a125e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -773,7 +773,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, memoryTracker.get(), client, expressionResolver), new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get(), expressionResolver), new TransportStartDataFrameAnalyticsAction.TaskExecutor(settings, client, clusterService, dataFrameAnalyticsManager.get(), - dataFrameAnalyticsAuditor.get(), memoryTracker.get(), expressionResolver) + dataFrameAnalyticsAuditor.get(), memoryTracker.get(), expressionResolver, + MlIndexTemplateRegistry.INFERENCE_TEMPLATE) ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java index 714816e0eb6..357db4e13f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlIndexTemplateRegistry.java @@ -52,7 +52,7 @@ public class MlIndexTemplateRegistry extends IndexTemplateRegistry { private static final IndexTemplateConfig CONFIG_TEMPLATE = configTemplate(); - private static final IndexTemplateConfig INFERENCE_TEMPLATE = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME, + public static final IndexTemplateConfig INFERENCE_TEMPLATE = new IndexTemplateConfig(InferenceIndexConstants.LATEST_INDEX_NAME, ROOT_RESOURCE_PATH + "inference_index_template.json", Version.CURRENT.id, VERSION_PATTERN, Collections.singletonMap(VERSION_ID_PATTERN, String.valueOf(Version.CURRENT.id))); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java index bf376c062f8..9b308f35e0c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDataFrameAnalyticsAction.java @@ -64,7 +64,9 @@ import org.elasticsearch.xpack.core.ml.dataframe.analyses.RequiredField; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; +import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias; import org.elasticsearch.xpack.core.ml.utils.PhaseProgress; +import org.elasticsearch.xpack.core.template.IndexTemplateConfig; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsManager; import org.elasticsearch.xpack.ml.dataframe.DataFrameAnalyticsTask; @@ -594,6 +596,7 @@ public class TransportStartDataFrameAnalyticsAction private final DataFrameAnalyticsAuditor auditor; private final MlMemoryTracker memoryTracker; private final IndexNameExpressionResolver resolver; + private final IndexTemplateConfig inferenceIndexTemplate; private volatile int maxMachineMemoryPercent; private volatile int maxLazyMLNodes; @@ -601,7 +604,8 @@ public class TransportStartDataFrameAnalyticsAction private volatile ClusterState clusterState; public TaskExecutor(Settings settings, Client client, ClusterService clusterService, DataFrameAnalyticsManager manager, - DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver) { + DataFrameAnalyticsAuditor auditor, MlMemoryTracker memoryTracker, IndexNameExpressionResolver resolver, + IndexTemplateConfig inferenceIndexTemplate) { super(MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, MachineLearning.UTILITY_THREAD_POOL_NAME); this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); @@ -609,6 +613,7 @@ public class TransportStartDataFrameAnalyticsAction this.auditor = Objects.requireNonNull(auditor); this.memoryTracker = Objects.requireNonNull(memoryTracker); this.resolver = Objects.requireNonNull(resolver); + this.inferenceIndexTemplate = Objects.requireNonNull(inferenceIndexTemplate); this.maxMachineMemoryPercent = MachineLearning.MAX_MACHINE_MEMORY_PERCENT.get(settings); this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); this.maxOpenJobs = MAX_OPEN_JOBS_PER_NODE.get(settings); @@ -693,6 +698,20 @@ public class TransportStartDataFrameAnalyticsAction return; } + ActionListener templateCheckListener = ActionListener.wrap( + ok -> executeTask(analyticsTaskState, task), + error -> { + Throwable cause = ExceptionsHelper.unwrapCause(error); + String msg = "Failed to create internal index template [" + inferenceIndexTemplate.getTemplateName() + "]"; + logger.error(msg, cause); + task.markAsFailed(error); + } + ); + + MlIndexAndAlias.installIndexTemplateIfRequired(clusterState, client, inferenceIndexTemplate, templateCheckListener); + } + + private void executeTask(DataFrameAnalyticsTaskState analyticsTaskState, AllocatedPersistentTask task) { if (analyticsTaskState == null) { DataFrameAnalyticsTaskState startedState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.STARTED, task.getAllocationId(), null);