From 09fc9485c6fa33809944ddddb09a8276f88967ed Mon Sep 17 00:00:00 2001 From: David Pilato Date: Fri, 8 Apr 2016 18:19:57 +0200 Subject: [PATCH 1/3] Ingest does not close its factories When you implement an ingest factory which implements `Closeable`: ```java public static final class Factory extends AbstractProcessorFactory implements Closeable { @Override public void close() throws IOException { logger.debug("closing my processor factory"); } } ``` The `close()` method is never called which could lead to some leak threads when we close a node. The `ProcessorsRegistry#close()` method exists though and seems to do the right job: ```java @Override public void close() throws IOException { List closeables = new ArrayList<>(); for (Processor.Factory factory : processorFactories.values()) { if (factory instanceof Closeable) { closeables.add((Closeable) factory); } } IOUtils.close(closeables); } ``` But apparently this method is never called in `Node#stop()`. Closes #17625. --- core/src/main/java/org/elasticsearch/ingest/IngestService.java | 1 + .../main/java/org/elasticsearch/node/service/NodeService.java | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index b38f7470e39..a576e5403cb 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -70,6 +70,7 @@ public class IngestService implements Closeable { @Override public void close() throws IOException { + pipelineStore.getProcessorRegistry().close(); pipelineStore.close(); } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/service/NodeService.java index cb11fc02443..cf2bb0f9bb7 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/service/NodeService.java @@ -198,6 +198,7 @@ public class NodeService extends AbstractComponent implements Closeable { @Override public void close() throws IOException { + ingestService.close(); indicesService.close(); } } From 24f48b86b5817ca1406714d91bf39182d387d687 Mon Sep 17 00:00:00 2001 From: David Pilato Date: Sat, 9 Apr 2016 13:14:25 +0200 Subject: [PATCH 2/3] Update after review and add a Test --- .../elasticsearch/ingest/IngestService.java | 1 - .../elasticsearch/ingest/IngestCloseIT.java | 89 +++++++++++++++++++ .../test/ESSingleNodeTestCase.java | 4 +- 3 files changed, 91 insertions(+), 3 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java diff --git a/core/src/main/java/org/elasticsearch/ingest/IngestService.java b/core/src/main/java/org/elasticsearch/ingest/IngestService.java index a576e5403cb..b38f7470e39 100644 --- a/core/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/core/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -70,7 +70,6 @@ public class IngestService implements Closeable { @Override public void close() throws IOException { - pipelineStore.getProcessorRegistry().close(); pipelineStore.close(); } diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java new file mode 100644 index 00000000000..029e9189643 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.ingest.core.AbstractProcessorFactory; +import org.elasticsearch.node.NodeModule; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.junit.After; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.is; + +public class IngestCloseIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return pluginList(IngestPlugin.class); + } + + private static AtomicBoolean called = new AtomicBoolean(false); + + public void testCloseNode() throws Exception { + // We manually stop the node and check we called + stopNode(); + + assertThat(called.get(), is(true)); + + // We need to restart the node for the next tests (and because tearDown() expects a Node) + startNode(); + } + + public static class IngestPlugin extends Plugin { + + @Override + public String name() { + return "ingest"; + } + + @Override + public String description() { + return "ingest mock"; + } + + public void onModule(NodeModule nodeModule) { + nodeModule.registerProcessor("test", (templateService, registry) -> new Factory()); + } + } + + public static final class Factory extends AbstractProcessorFactory implements Closeable { + @Override + protected TestProcessor doCreate(String tag, Map config) throws Exception { + return new TestProcessor("id", "test", ingestDocument -> { + ingestDocument.setFieldValue("processed", true); + if (ingestDocument.getFieldValue("fail", Boolean.class)) { + throw new IllegalArgumentException("test processor failed"); + } + }); + } + + @Override + public void close() throws IOException { + called.set(true); + } + } + +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index de7a5219445..74300bef2ec 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -77,7 +77,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { startNode(); } - private void startNode() { + protected void startNode() { assert NODE == null; NODE = newNode(); // we must wait for the node to actually be up and running. otherwise the node might have started, elected itself master but might not yet have removed the @@ -92,7 +92,7 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); } - private static void stopNode() throws IOException { + protected static void stopNode() throws IOException { Node node = NODE; NODE = null; IOUtils.close(node); From cc4fb61a38926b593a0c8f5834c7154aea835b53 Mon Sep 17 00:00:00 2001 From: David Pilato Date: Mon, 11 Apr 2016 09:59:13 +0200 Subject: [PATCH 3/3] Simplify the test --- .../test/java/org/elasticsearch/ingest/IngestCloseIT.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java index 029e9189643..502b2a9bf02 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestCloseIT.java @@ -73,10 +73,7 @@ public class IngestCloseIT extends ESSingleNodeTestCase { @Override protected TestProcessor doCreate(String tag, Map config) throws Exception { return new TestProcessor("id", "test", ingestDocument -> { - ingestDocument.setFieldValue("processed", true); - if (ingestDocument.getFieldValue("fail", Boolean.class)) { - throw new IllegalArgumentException("test processor failed"); - } + throw new UnsupportedOperationException("this code is actually never called from the test"); }); }