adapt qa tests for when ingest.node is set to false
CRUD and simulate apis work now fine, every node has the pipelines in memory, but node.ingest disables ingestion, meaning that any index or bulk request with a pipeline id is going to fail
This commit is contained in:
parent
52c2a273f9
commit
18aabd67c8
|
@ -21,6 +21,7 @@ package org.elasticsearch.action.ingest.reload;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.PipelineStore;
|
||||
|
@ -33,8 +34,6 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
@ -59,21 +58,10 @@ public class ReloadPipelinesAction extends AbstractComponent implements Transpor
|
|||
}
|
||||
|
||||
public void reloadPipelinesOnAllNodes(Consumer<Boolean> listener) {
|
||||
List<DiscoveryNode> ingestNodes = new ArrayList<>();
|
||||
for (DiscoveryNode node : clusterService.state().getNodes()) {
|
||||
String nodeEnabled = node.getAttributes().get("ingest");
|
||||
if ("true".equals(nodeEnabled)) {
|
||||
ingestNodes.add(node);
|
||||
}
|
||||
}
|
||||
|
||||
if (ingestNodes.isEmpty()) {
|
||||
throw new IllegalStateException("There are no ingest nodes in this cluster");
|
||||
}
|
||||
|
||||
AtomicBoolean failed = new AtomicBoolean();
|
||||
AtomicInteger expectedResponses = new AtomicInteger(ingestNodes.size());
|
||||
for (DiscoveryNode node : ingestNodes) {
|
||||
DiscoveryNodes nodes = clusterService.state().getNodes();
|
||||
AtomicInteger expectedResponses = new AtomicInteger(nodes.size());
|
||||
for (DiscoveryNode node : nodes) {
|
||||
ReloadPipelinesRequest nodeRequest = new ReloadPipelinesRequest();
|
||||
transportService.sendRequest(node, ACTION_NAME, nodeRequest, new TransportResponseHandler<ReloadPipelinesResponse>() {
|
||||
@Override
|
||||
|
|
|
@ -36,9 +36,7 @@ import org.junit.Before;
|
|||
import org.mockito.Matchers;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -61,24 +59,12 @@ public class ReloadPipelinesActionTests extends ESTestCase {
|
|||
|
||||
public void testSuccess() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
int numIngestNodes = 0;
|
||||
|
||||
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
boolean ingestNode = i == 0 || randomBoolean();
|
||||
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
|
||||
discoNodes.put(discoNode);
|
||||
if (ingestNode) {
|
||||
numIngestNodes++;
|
||||
}
|
||||
}
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
final int finalNumIngestNodes = numIngestNodes;
|
||||
doAnswer(mock -> {
|
||||
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
|
||||
for (int i = 0; i < finalNumIngestNodes; i++) {
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
|
||||
}
|
||||
return mock;
|
||||
|
@ -88,25 +74,14 @@ public class ReloadPipelinesActionTests extends ESTestCase {
|
|||
|
||||
public void testWithAtLeastOneFailure() {
|
||||
int numNodes = randomIntBetween(1, 10);
|
||||
int numIngestNodes = 0;
|
||||
|
||||
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
boolean ingestNode = i == 0 || randomBoolean();
|
||||
DiscoveryNode discoNode = generateDiscoNode(i, ingestNode);
|
||||
discoNodes.put(discoNode);
|
||||
if (ingestNode) {
|
||||
numIngestNodes++;
|
||||
}
|
||||
}
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(generateDiscoNodes(numNodes)).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
final int finalNumIngestNodes = numIngestNodes;
|
||||
doAnswer(mock -> {
|
||||
TransportResponseHandler handler = (TransportResponseHandler) mock.getArguments()[3];
|
||||
handler.handleException(new TransportException("test failure"));
|
||||
for (int i = 1; i < finalNumIngestNodes; i++) {
|
||||
for (int i = 1; i < numNodes; i++) {
|
||||
if (randomBoolean()) {
|
||||
handler.handleResponse(new ReloadPipelinesAction.ReloadPipelinesResponse());
|
||||
} else {
|
||||
|
@ -118,44 +93,13 @@ public class ReloadPipelinesActionTests extends ESTestCase {
|
|||
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> assertThat(result, is(false)));
|
||||
}
|
||||
|
||||
public void testNoIngestNodes() {
|
||||
// expected exception if there are no nodes:
|
||||
DiscoveryNodes discoNodes = DiscoveryNodes.builder()
|
||||
.build();
|
||||
ClusterState state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
try {
|
||||
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
|
||||
private static DiscoveryNodes.Builder generateDiscoNodes(int numNodes) {
|
||||
DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder();
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
String id = Integer.toString(i);
|
||||
DiscoveryNode discoNode = new DiscoveryNode(id, id, new LocalTransportAddress(id), Collections.emptyMap(), Version.CURRENT);
|
||||
discoNodes.put(discoNode);
|
||||
}
|
||||
|
||||
// expected exception if there are no ingest nodes:
|
||||
discoNodes = DiscoveryNodes.builder()
|
||||
.put(new DiscoveryNode("_name", "_id", new LocalTransportAddress("_id"), Collections.singletonMap("ingest", "false"), Version.CURRENT))
|
||||
.build();
|
||||
state = ClusterState.builder(new ClusterName("_name")).nodes(discoNodes).build();
|
||||
when(clusterService.state()).thenReturn(state);
|
||||
|
||||
try {
|
||||
reloadPipelinesAction.reloadPipelinesOnAllNodes(result -> fail("shouldn't be invoked"));
|
||||
fail("exception expected");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster"));
|
||||
return discoNodes;
|
||||
}
|
||||
}
|
||||
|
||||
private DiscoveryNode generateDiscoNode(int index, boolean ingestNode) {
|
||||
Map<String, String> attributes;
|
||||
if (ingestNode) {
|
||||
attributes = Collections.singletonMap("ingest", "true");
|
||||
} else {
|
||||
attributes = randomBoolean() ? Collections.emptyMap() : Collections.singletonMap("ingest", "false");
|
||||
}
|
||||
String id = String.valueOf(index);
|
||||
return new DiscoveryNode(id, id, new LocalTransportAddress(id), attributes, Version.CURRENT);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -59,7 +59,6 @@ public class IngestPlugin extends Plugin {
|
|||
}
|
||||
|
||||
public void onModule(IngestModule ingestModule) {
|
||||
if (ingestEnabled) {
|
||||
ingestModule.addProcessor(GeoIpProcessor.TYPE, (environment, templateService) -> new GeoIpProcessor.Factory(environment.configFile()));
|
||||
ingestModule.addProcessor(GrokProcessor.TYPE, (environment, templateService) -> new GrokProcessor.Factory(environment.configFile()));
|
||||
ingestModule.addProcessor(DateProcessor.TYPE, (environment, templateService) -> new DateProcessor.Factory());
|
||||
|
@ -77,4 +76,3 @@ public class IngestPlugin extends Plugin {
|
|||
ingestModule.addProcessor(FailProcessor.TYPE, (environment, templateService) -> new FailProcessor.Factory(templateService));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
---
|
||||
"Test ingest APIS fail when is disabled":
|
||||
"Test ingest CRUD APIS work fine when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
|
@ -10,26 +9,36 @@
|
|||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "field",
|
||||
"value": "valie"
|
||||
"field" : "field2",
|
||||
"value": "_value"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 1 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
|
||||
ingest.get_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { my_pipeline._source.description: "_description" }
|
||||
- match: { my_pipeline._version: 1 }
|
||||
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, pipeline CRUD or simulate APIs cannot be used/
|
||||
ingest.simulate:
|
||||
ingest.delete_pipeline:
|
||||
id: "my_pipeline"
|
||||
- match: { _index: ".ingest" }
|
||||
- match: { _type: "pipeline" }
|
||||
- match: { _version: 2 }
|
||||
- match: { _id: "my_pipeline" }
|
||||
- match: { found: true }
|
||||
|
||||
---
|
||||
"Test ingest simulate API works fine when node.ingest is set to false":
|
||||
- do:
|
||||
ingest.put_pipeline:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
|
@ -37,13 +46,38 @@
|
|||
"processors": [
|
||||
{
|
||||
"set" : {
|
||||
"field" : "field",
|
||||
"value": "valie"
|
||||
"field" : "field2",
|
||||
"value" : "_value"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { _id: "my_pipeline" }
|
||||
|
||||
- do:
|
||||
ingest.simulate:
|
||||
id: "my_pipeline"
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_index": "index",
|
||||
"_type": "type",
|
||||
"_id": "id",
|
||||
"_source": {
|
||||
"foo": "bar"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._source.foo: "bar" }
|
||||
- match: { docs.0.doc._source.field2: "_value" }
|
||||
- length: { docs.0.doc._ingest: 1 }
|
||||
- is_true: docs.0.doc._ingest.timestamp
|
||||
|
||||
---
|
||||
"Test index api with pipeline id fails when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/
|
||||
ingest.index:
|
||||
|
@ -56,3 +90,22 @@
|
|||
field2: "2",
|
||||
field3: "3"
|
||||
}
|
||||
|
||||
---
|
||||
"Test bulk api with pipeline id fails when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /ingest plugin is disabled, cannot execute pipeline with id \[my_pipeline_1\]/
|
||||
ingest.bulk:
|
||||
pipeline: "my_pipeline_1"
|
||||
body:
|
||||
- index:
|
||||
_index: test_index
|
||||
_type: test_type
|
||||
_id: test_id
|
||||
- f1: v1
|
||||
- index:
|
||||
_index: test_index
|
||||
_type: test_type
|
||||
_id: test_id2
|
||||
- f1: v2
|
||||
|
||||
|
|
Loading…
Reference in New Issue