diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index c62a8fd2371..a8ca20485c4 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -18,10 +18,14 @@ */ package org.elasticsearch.ingest.common; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; @@ -33,6 +37,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +69,66 @@ public class IngestRestartIT extends ESIntegTestCase { } } + public void testScriptDisabled() throws Exception { + String pipelineIdWithoutScript = randomAlphaOfLengthBetween(5, 10); + String pipelineIdWithScript = pipelineIdWithoutScript + "_script"; + internalCluster().startNode(); + + BytesReference pipelineWithScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"script\" : {\"lang\": \"" + MockScriptEngine.NAME + "\", \"source\": \"my_script\"}}\n" + + " ]\n" + + "}"); + BytesReference pipelineWithoutScript = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\"set\" : {\"field\": \"y\", \"value\": 0}}\n" + + " ]\n" + + "}"); + + Consumer checkPipelineExists = (id) -> assertThat(client().admin().cluster().prepareGetPipeline(id) + .get().pipelines().get(0).getId(), equalTo(id)); + + client().admin().cluster().preparePutPipeline(pipelineIdWithScript, pipelineWithScript, XContentType.JSON).get(); + client().admin().cluster().preparePutPipeline(pipelineIdWithoutScript, pipelineWithoutScript, XContentType.JSON).get(); + + checkPipelineExists.accept(pipelineIdWithScript); + checkPipelineExists.accept(pipelineIdWithoutScript); + + + internalCluster().stopCurrentMasterNode(); + internalCluster().startNode(Settings.builder().put("script.allowed_types", "none")); + + checkPipelineExists.accept(pipelineIdWithoutScript); + checkPipelineExists.accept(pipelineIdWithScript); + + client().prepareIndex("index", "doc", "1") + .setSource("x", 0) + .setPipeline(pipelineIdWithoutScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + ElasticsearchException exception = expectThrows(ElasticsearchException.class, + () -> client().prepareIndex("index", "doc", "2") + .setSource("x", 0) + .setPipeline(pipelineIdWithScript) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get()); + assertThat(exception.getHeaderKeys(), equalTo(Sets.newHashSet("processor_type"))); + assertThat(exception.getHeader("processor_type"), equalTo(Arrays.asList("unknown"))); + assertThat(exception.getRootCause().getMessage(), + equalTo("pipeline with id [" + pipelineIdWithScript + "] could not be loaded, caused by " + + "[ElasticsearchParseException[Error updating pipeline with id [" + pipelineIdWithScript + "]]; " + + "nested: ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; " + + "ElasticsearchException[java.lang.IllegalArgumentException: cannot execute [inline] scripts]; " + + "nested: IllegalArgumentException[cannot execute [inline] scripts];; java.lang.IllegalArgumentException: " + + "cannot execute [inline] scripts]")); + + Map source = client().prepareGet("index", "doc", "1").get().getSource(); + assertThat(source.get("x"), equalTo(0)); + assertThat(source.get("y"), equalTo(0)); + } + public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exception { internalCluster().startNode(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 4a018ca0258..ad2b8643f7a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -34,8 +32,6 @@ import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; -import static org.elasticsearch.common.settings.Setting.Property; - /** * Holder class for several ingest related services. */ diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java index 21372e46e5f..c6dce0bd45b 100644 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java @@ -81,16 +81,41 @@ public class PipelineStore extends AbstractComponent implements ClusterStateAppl } Map pipelines = new HashMap<>(); + List exceptions = new ArrayList<>(); for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { try { pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); } catch (ElasticsearchParseException e) { - throw e; + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + exceptions.add(e); } catch (Exception e) { - throw new ElasticsearchParseException("Error updating pipeline with id [" + pipeline.getId() + "]", e); + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); } } this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + private Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); } /** diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java index dbbc8e443c0..809a81b687e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestClientIT.java @@ -36,16 +36,12 @@ import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.ingest.SimulatePipelineRequest; import org.elasticsearch.action.ingest.SimulatePipelineResponse; import org.elasticsearch.action.ingest.WritePipelineResponse; -import org.elasticsearch.action.support.replication.TransportReplicationActionTests; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptService; import org.elasticsearch.test.ESIntegTestCase; import java.util.Arrays; @@ -130,6 +126,10 @@ public class IngestClientIT extends ESIntegTestCase { IngestDocument ingestDocument = new IngestDocument("index", "type", "id", null, null, null, null, source); assertThat(simulateDocumentBaseResult.getIngestDocument().getSourceAndMetadata(), equalTo(ingestDocument.getSourceAndMetadata())); assertThat(simulateDocumentBaseResult.getFailure(), nullValue()); + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithIngestFailures() throws Exception { @@ -172,6 +172,10 @@ public class IngestClientIT extends ESIntegTestCase { assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); } } + + // cleanup + WritePipelineResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get(); + assertTrue(deletePipelineResponse.isAcknowledged()); } public void testBulkWithUpsert() throws Exception { @@ -271,5 +275,8 @@ public class IngestClientIT extends ESIntegTestCase { assertNotNull(ex); assertThat(ex.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } + + GetPipelineResponse response = client().admin().cluster().prepareGetPipeline("_id").get(); + assertFalse(response.isFound()); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 64593334887..03777b98ab7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -37,7 +37,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; @ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase { @@ -104,7 +103,11 @@ public class IngestProcessorNotInstalledOnAllNodesIT extends ESIntegTestCase { installPlugin = false; String node2 = internalCluster().startNode(); pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); - assertThat(pipeline, nullValue()); + + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, " + + "because pipeline with id [_id] could not be loaded")); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java index 3247761a548..5a3b57a6d7e 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.ingest; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -92,6 +93,32 @@ public class PipelineExecutionServiceTests extends ESTestCase { verify(completionHandler, never()).accept(anyBoolean()); } + public void testExecuteIndexPipelineExistsButFailedParsing() { + when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null, + new CompoundProcessor(new AbstractProcessor("mock") { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException("error"); + } + + @Override + public String getType() { + return null; + } + }))); + SetOnce failed = new SetOnce<>(); + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); + Consumer failureHandler = (e) -> { + assertThat(e.getCause().getClass(), equalTo(IllegalArgumentException.class)); + assertThat(e.getCause().getCause().getClass(), equalTo(IllegalStateException.class)); + assertThat(e.getCause().getCause().getMessage(), equalTo("error")); + failed.set(true); + }; + Consumer completionHandler = (e) -> failed.set(false); + executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler); + assertTrue(failed.get()); + } + public void testExecuteBulkPipelineDoesNotExist() { CompoundProcessor processor = mock(CompoundProcessor.class); when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java index bb0d5787120..250bb5059cf 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -165,7 +164,13 @@ public class PipelineStoreTests extends ESTestCase { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); } pipeline = store.get(id); - assertThat(pipeline, nullValue()); + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + + " id [_id] could not be loaded")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertNull(pipeline.getProcessors().get(0).getTag()); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); } public void testDelete() {