From 766b9d600e510d2bfce1799a2248b722c76a8d27 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 9 Feb 2018 00:25:37 +0100 Subject: [PATCH] Fixed a bug that prevents pipelines to load that use stored scripts after a restart. The bug was caused because the ScriptService had no reference to a ClusterState instance, because it received the ClusterState after the PipelineStore. This only is the case after a restart. A bad side effect is that during a restart, any pipeline to be loaded after the pipeline that uses a stored script, was never loaded, which caused many pipeline to be missing in bulk / index request api calls. --- .../ingest/common/IngestRestartIT.java | 151 ++++++++++++++++++ .../elasticsearch/ingest/PipelineStore.java | 6 +- .../java/org/elasticsearch/node/Node.java | 2 +- .../elasticsearch/script/ScriptService.java | 6 +- 4 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java new file mode 100644 index 00000000000..c62a8fd2371 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.common; + +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.MockScriptPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.function.Function; + +import static org.hamcrest.Matchers.equalTo; + +// Ideally I like this test to live in the server module, but otherwise a large part of the ScriptProcessor +// ends up being copied into this test. +@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) +public class IngestRestartIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(IngestCommonPlugin.class, CustomScriptPlugin.class); + } + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + public static class CustomScriptPlugin extends MockScriptPlugin { + @Override + protected Map, Object>> pluginScripts() { + return Collections.singletonMap("my_script", script -> { + @SuppressWarnings("unchecked") + Map ctx = (Map) script.get("ctx"); + ctx.put("z", 0); + return null; + }); + } + } + + public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception { + internalCluster().startNode(); + + client().admin().cluster().preparePutStoredScript() + .setId("1") + .setContent(new BytesArray("{\"script\": {\"lang\": \"" + MockScriptEngine.NAME + + "\", \"source\": \"my_script\"} }"), XContentType.JSON) + .get(); + BytesReference pipeline = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"y\", \"value\": 0}},\n" + + " {\"script\" : {\"id\": \"1\"}}\n" + + " ]\n" + + "}"); + client().admin().cluster().preparePutPipeline("_id", pipeline, XContentType.JSON).get(); + + client().prepareIndex("index", "doc", "1") + .setSource("x", 0) + .setPipeline("_id") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + Map source = client().prepareGet("index", "doc", "1").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + assertThat(source.get("z"), equalTo(0)); + + // Prior to making this ScriptService implement ClusterStateApplier instead of ClusterStateListener, + // pipelines with a script processor failed to load causing these pipelines and pipelines that were + // supposed to load after these pipelines to not be available during ingestion, which then causes + // the next index request in this test to fail. + internalCluster().fullRestart(); + ensureYellow("index"); + + client().prepareIndex("index", "doc", "2") + .setSource("x", 0) + .setPipeline("_id") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + source = client().prepareGet("index", "doc", "2").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + assertThat(source.get("z"), equalTo(0)); + } + + public void testWithDedicatedIngestNode() throws Exception { + String node = internalCluster().startNode(); + String ingestNode = internalCluster().startNode(Settings.builder() + .put("node.master", false) + .put("node.data", false) + ); + + BytesReference pipeline = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" + + " ]\n" + + "}"); + client().admin().cluster().preparePutPipeline("_id", pipeline, XContentType.JSON).get(); + + client().prepareIndex("index", "doc", "1") + .setSource("x", 0) + .setPipeline("_id") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + Map source = client().prepareGet("index", "doc", "1").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + + logger.info("Stopping"); + internalCluster().restartNode(node, new InternalTestCluster.RestartCallback()); + + client(ingestNode).prepareIndex("index", "doc", "2") + .setSource("x", 0) + .setPipeline("_id") + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + source = client(ingestNode).prepareGet("index", "doc", "2").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + } + +} diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index d19c13cc3a1..21372e46e5f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -35,9 +35,9 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.gateway.GatewayService; import java.util.ArrayList; import java.util.Collections; @@ -70,6 +70,10 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl } void innerUpdatePipelines(ClusterState previousState, ClusterState state) { + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); if (Objects.equals(ingestMetadata, previousIngestMetadata)) { diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index fd1a2159b07..f2bf2e9fb5e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -342,7 +342,7 @@ public class Node implements Closeable { List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class); final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); - clusterService.addListener(scriptModule.getScriptService()); + clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); final IngestService ingestService = new IngestService(settings, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); diff --git a/server/src/main/java/org/elasticsearch/script/ScriptService.java b/server/src/main/java/org/elasticsearch/script/ScriptService.java index 652ec3dda3d..a68166779ef 100644 --- a/server/src/main/java/org/elasticsearch/script/ScriptService.java +++ b/server/src/main/java/org/elasticsearch/script/ScriptService.java @@ -30,7 +30,7 @@ import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRespo import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -57,7 +57,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Function; -public class ScriptService extends AbstractComponent implements Closeable, ClusterStateListener { +public class ScriptService extends AbstractComponent implements Closeable, ClusterStateApplier { static final String DISABLE_DYNAMIC_SCRIPTING_SETTING = "script.disable_dynamic"; @@ -508,7 +508,7 @@ public class ScriptService extends AbstractComponent implements Closeable, Clust } @Override - public void clusterChanged(ClusterChangedEvent event) { + public void applyClusterState(ClusterChangedEvent event) { clusterState = event.state(); }