diff --git a/core/src/main/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesAction.java b/core/src/main/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesAction.java index 6a764132cb9..8d38c08c4b6 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest.reload; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.PipelineStore; @@ -33,8 +34,6 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -59,21 +58,10 @@ public class ReloadPipelinesAction extends AbstractComponent implements Transpor } public void reloadPipelinesOnAllNodes(Consumer listener) { - List ingestNodes = new ArrayList<>(); - for (DiscoveryNode node : clusterService.state().getNodes()) { - String nodeEnabled = node.getAttributes().get("ingest"); - if ("true".equals(nodeEnabled)) { - ingestNodes.add(node); - } - } - - if (ingestNodes.isEmpty()) { - throw new IllegalStateException("There are no ingest nodes in this cluster"); - } - AtomicBoolean failed = new AtomicBoolean(); - AtomicInteger expectedResponses = new AtomicInteger(ingestNodes.size()); - for (DiscoveryNode node : ingestNodes) { + DiscoveryNodes nodes = clusterService.state().getNodes(); + AtomicInteger expectedResponses = new AtomicInteger(nodes.size()); + for (DiscoveryNode node : nodes) { ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest(); transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler() { @Override diff --git a/core/src/test/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesActionTests.java b/core/src/test/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesActionTests.java index abd66fc08d7..b6f767323c1 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/reload/ReloadPipelinesActionTests.java @@ -36,9 +36,7 @@ import org.junit.Before; import org.mockito.Matchers; import java.util.Collections; -import java.util.Map; -import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -61,24 +59,12 @@ public class ReloadPipelinesActionTests extends ESTestCase { public void testSuccess() { int numNodes = randomIntBetween(1, 10); - int numIngestNodes = 0; - - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - for (int i = 0; i < numNodes; i++) { - boolean ingestNode = i == 0 || randomBoolean(); - DiscoveryNode discoNode = generateDiscoNode(i, ingestNode); - discoNodes.put(discoNode); - if (ingestNode) { - numIngestNodes++; - } - } - ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build(); when(clusterService.state()).thenReturn(state); - final int finalNumIngestNodes = numIngestNodes; doAnswer(mock -> { TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; - for (int i = 0; i < finalNumIngestNodes; i++) { + for (int i = 0; i < numNodes; i++) { handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); } return mock; @@ -88,25 +74,14 @@ public class ReloadPipelinesActionTests extends ESTestCase { public void testWithAtLeastOneFailure() { int numNodes = randomIntBetween(1, 10); - int numIngestNodes = 0; - DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); - for (int i = 0; i < numNodes; i++) { - boolean ingestNode = i == 0 || randomBoolean(); - DiscoveryNode discoNode = generateDiscoNode(i, ingestNode); - discoNodes.put(discoNode); - if (ingestNode) { - numIngestNodes++; - } - } - ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); + ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build(); when(clusterService.state()).thenReturn(state); - final int finalNumIngestNodes = numIngestNodes; doAnswer(mock -> { TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3]; handler.handleException(new TransportException("test failure")); - for (int i = 1; i < finalNumIngestNodes; i++) { + for (int i = 1; i < numNodes; i++) { if (randomBoolean()) { handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse()); } else { @@ -118,44 +93,13 @@ public class ReloadPipelinesActionTests extends ESTestCase { reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false))); } - public void testNoIngestNodes() { - // expected exception if there are no nodes: - DiscoveryNodes discoNodes = DiscoveryNodes.builder() - .build(); - ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); - when(clusterService.state()).thenReturn(state); - - try { - reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked")); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster")); - } - - // expected exception if there are no ingest nodes: - discoNodes = DiscoveryNodes.builder() - .put(new DiscoveryNode("_name", "_id", new LocalTransportAddress("_id"), Collections.singletonMap("ingest", "false"), Version.CURRENT)) - .build(); - state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build(); - when(clusterService.state()).thenReturn(state); - - try { - reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked")); - fail("exception expected"); - } catch (IllegalStateException e) { - assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster")); + private static DiscoveryNodes.Builder generateDiscoNodes(int numNodes) { + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < numNodes; i++) { + String id = Integer.toString(i); + DiscoveryNode discoNode = new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Version.CURRENT); + discoNodes.put(discoNode); } + return discoNodes; } - - private DiscoveryNode generateDiscoNode(int index, boolean ingestNode) { - Map attributes; - if (ingestNode) { - attributes = Collections.singletonMap("ingest", "true"); - } else { - attributes = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap("ingest", "false"); - } - String id = String.valueOf(index); - return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT); - } - } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java index e08fca8b134..202acd56beb 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/IngestPlugin.java @@ -59,22 +59,20 @@ public class IngestPlugin extends Plugin { } public void onModule(IngestModule ingestModule) { - if (ingestEnabled) { - ingestModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); - ingestModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); - ingestModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); - ingestModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); - ingestModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); - ingestModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); - ingestModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); - ingestModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); - ingestModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); - ingestModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); - ingestModule.addProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); - ingestModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); - ingestModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); - ingestModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); - ingestModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); - } + ingestModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile())); + ingestModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile())); + ingestModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory()); + ingestModule.addProcessor(SetProcessor.TYPE, (environment, templateService) -> new SetProcessor.Factory(templateService)); + ingestModule.addProcessor(AppendProcessor.TYPE, (environment, templateService) -> new AppendProcessor.Factory(templateService)); + ingestModule.addProcessor(RenameProcessor.TYPE, (environment, templateService) -> new RenameProcessor.Factory()); + ingestModule.addProcessor(RemoveProcessor.TYPE, (environment, templateService) -> new RemoveProcessor.Factory(templateService)); + ingestModule.addProcessor(SplitProcessor.TYPE, (environment, templateService) -> new SplitProcessor.Factory()); + ingestModule.addProcessor(JoinProcessor.TYPE, (environment, templateService) -> new JoinProcessor.Factory()); + ingestModule.addProcessor(UppercaseProcessor.TYPE, (environment, templateService) -> new UppercaseProcessor.Factory()); + ingestModule.addProcessor(LowercaseProcessor.TYPE, (environment, templateService) -> new LowercaseProcessor.Factory()); + ingestModule.addProcessor(TrimProcessor.TYPE, (environment, templateService) -> new TrimProcessor.Factory()); + ingestModule.addProcessor(ConvertProcessor.TYPE, (environment, templateService) -> new ConvertProcessor.Factory()); + ingestModule.addProcessor(GsubProcessor.TYPE, (environment, templateService) -> new GsubProcessor.Factory()); + ingestModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService)); } } diff --git a/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml index f470b3152bd..a8eb7861efc 100644 --- a/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml +++ b/qa/ingest-disabled/src/test/resources/rest-api-spec/test/ingest_mustache/10_ingest_disabled.yaml @@ -1,7 +1,6 @@ --- -"Test ingest APIS fail when is disabled": +"Test ingest CRUD APIS work fine when node.ingest is set to false": - do: - catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ ingest.put_pipeline: id: "my_pipeline" body: > @@ -10,26 +9,36 @@ "processors": [ { "set" : { - "field" : "field", - "value": "valie" + "field" : "field2", + "value": "_value" } } ] } + - match: { _index: ".ingest" } + - match: { _type: "pipeline" } + - match: { _version: 1 } + - match: { _id: "my_pipeline" } - do: - catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ - ingest.delete_pipeline: - id: "my_pipeline" - - - do: - catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ ingest.get_pipeline: id: "my_pipeline" + - match: { my_pipeline._source.description: "_description" } + - match: { my_pipeline._version: 1 } - do: - catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/ - ingest.simulate: + ingest.delete_pipeline: + id: "my_pipeline" + - match: { _index: ".ingest" } + - match: { _type: "pipeline" } + - match: { _version: 2 } + - match: { _id: "my_pipeline" } + - match: { found: true } + +--- +"Test ingest simulate API works fine when node.ingest is set to false": + - do: + ingest.put_pipeline: id: "my_pipeline" body: > { @@ -37,13 +46,38 @@ "processors": [ { "set" : { - "field" : "field", - "value": "valie" + "field" : "field2", + "value" : "_value" } } ] } + - match: { _id: "my_pipeline" } + - do: + ingest.simulate: + id: "my_pipeline" + body: > + { + "docs": [ + { + "_index": "index", + "_type": "type", + "_id": "id", + "_source": { + "foo": "bar" + } + } + ] + } + - length: { docs: 1 } + - match: { docs.0.doc._source.foo: "bar" } + - match: { docs.0.doc._source.field2: "_value" } + - length: { docs.0.doc._ingest: 1 } + - is_true: docs.0.doc._ingest.timestamp + +--- +"Test index api with pipeline id fails when node.ingest is set to false": - do: catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/ ingest.index: @@ -56,3 +90,22 @@ field2: "2", field3: "3" } + +--- +"Test bulk api with pipeline id fails when node.ingest is set to false": + - do: + catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/ + ingest.bulk: + pipeline: "my_pipeline_1" + body: + - index: + _index: test_index + _type: test_type + _id: test_id + - f1: v1 + - index: + _index: test_index + _type: test_type + _id: test_id2 + - f1: v2 +