diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index a576e5403cb..b38f7470e39 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -70,7 +70,6 @@ public class IngestService implements Closeable { @Override public void close() throws IOException { - pipelineStore.getProcessorRegistry().close(); pipelineStore.close(); } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java new file mode 100644 index 00000000000..029e9189643 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.node.NodeModule; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; + +public class IngestCloseIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return pluginList(IngestPlugin.class); + } + + private static AtomicBoolean called = new AtomicBoolean(false); + + public void testCloseNode() throws Exception { + // We manually stop the node and check we called + stopNode(); + + assertThat(called.get(), is(true)); + + // We need to restart the node for the next tests (and because tearDown() expects a Node) + startNode(); + } + + public static class IngestPlugin extends Plugin { + + @Override + public String name() { + return "ingest"; + } + + @Override + public String description() { + return "ingest mock"; + } + + public void onModule(NodeModule nodeModule) { + nodeModule.registerProcessor("test", (templateService, registry) -> new Factory()); + } + } + + public static final class Factory extends AbstractProcessorFactory implements Closeable { + @Override + protected TestProcessor doCreate(String tag, Map config) throws Exception { + return new TestProcessor("id", "test", ingestDocument -> { + ingestDocument.setFieldValue("processed", true); + if (ingestDocument.getFieldValue("fail", Boolean.class)) { + throw new IllegalArgumentException("test processor failed"); + } + }); + } + + @Override + public void close() throws IOException { + called.set(true); + } + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index de7a5219445..74300bef2ec 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -77,7 +77,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { startNode(); } - private void startNode() { + protected void startNode() { assert NODE == null; NODE = newNode(); // we must wait for the node to actually be up and running. otherwise the node might have started, elected itself master but might not yet have removed the @@ -92,7 +92,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); } - private static void stopNode() throws IOException { + protected static void stopNode() throws IOException { Node node = NODE; NODE = null; IOUtils.close(node);