mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Merge pull request #15926 from javanna/enhancement/redirect_to_ingest_nodes
redirect ingest requests to an ingest node
This commit is contained in:
commit
7a698b4a3f
@ -150,7 +150,7 @@ import org.elasticsearch.action.indexedscripts.get.TransportGetIndexedScriptActi
|
||||
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction;
|
||||
import org.elasticsearch.action.indexedscripts.put.TransportPutIndexedScriptAction;
|
||||
import org.elasticsearch.action.ingest.IngestActionFilter;
|
||||
import org.elasticsearch.action.ingest.IngestDisabledActionFilter;
|
||||
import org.elasticsearch.action.ingest.IngestProxyActionFilter;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineAction;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineTransportAction;
|
||||
import org.elasticsearch.action.ingest.GetPipelineAction;
|
||||
@ -256,7 +256,7 @@ public class ActionModule extends AbstractModule {
|
||||
if (ingestEnabled) {
|
||||
registerFilter(IngestActionFilter.class);
|
||||
} else {
|
||||
registerFilter(IngestDisabledActionFilter.class);
|
||||
registerFilter(IngestProxyActionFilter.class);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,70 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
public final class IngestDisabledActionFilter implements ActionFilter {
|
||||
|
||||
@Override
|
||||
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
boolean isIngestRequest = false;
|
||||
if (IndexAction.NAME.equals(action)) {
|
||||
assert request instanceof IndexRequest;
|
||||
IndexRequest indexRequest = (IndexRequest) request;
|
||||
isIngestRequest = Strings.hasText(indexRequest.pipeline());
|
||||
} else if (BulkAction.NAME.equals(action)) {
|
||||
assert request instanceof BulkRequest;
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
for (ActionRequest actionRequest : bulkRequest.requests()) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.pipeline())) {
|
||||
isIngestRequest = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (isIngestRequest) {
|
||||
throw new IllegalArgumentException("node.ingest is set to false, cannot execute pipeline");
|
||||
}
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
}
|
@ -0,0 +1,152 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.ingest.IngestModule;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public final class IngestProxyActionFilter implements ActionFilter {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final TransportService transportService;
|
||||
private final AtomicInteger randomNodeGenerator = new AtomicInteger(Randomness.get().nextInt());
|
||||
|
||||
@Inject
|
||||
public IngestProxyActionFilter(ClusterService clusterService, TransportService transportService) {
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
Action ingestAction = null;
|
||||
boolean isIngestRequest = false;
|
||||
if (IndexAction.NAME.equals(action)) {
|
||||
ingestAction = IndexAction.INSTANCE;
|
||||
assert request instanceof IndexRequest;
|
||||
IndexRequest indexRequest = (IndexRequest) request;
|
||||
isIngestRequest = Strings.hasText(indexRequest.pipeline());
|
||||
} else if (BulkAction.NAME.equals(action)) {
|
||||
ingestAction = BulkAction.INSTANCE;
|
||||
assert request instanceof BulkRequest;
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
for (ActionRequest actionRequest : bulkRequest.requests()) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
if (Strings.hasText(indexRequest.pipeline())) {
|
||||
isIngestRequest = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (isIngestRequest) {
|
||||
assert ingestAction != null;
|
||||
forwardIngestRequest(ingestAction, request, listener);
|
||||
return;
|
||||
}
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
|
||||
private void forwardIngestRequest(Action action, ActionRequest request, ActionListener listener) {
|
||||
transportService.sendRequest(randomIngestNode(), action.name(), request, new TransportResponseHandler<TransportResponse>() {
|
||||
@Override
|
||||
public TransportResponse newInstance() {
|
||||
return action.newResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void handleResponse(TransportResponse response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
private DiscoveryNode randomIngestNode() {
|
||||
assert IngestModule.isIngestEnabled(clusterService.localNode().attributes()) == false;
|
||||
List<DiscoveryNode> ingestNodes = new ArrayList<>();
|
||||
for (DiscoveryNode node : clusterService.state().nodes()) {
|
||||
if (IngestModule.isIngestEnabled(node.getAttributes())) {
|
||||
ingestNodes.add(node);
|
||||
}
|
||||
}
|
||||
|
||||
if (ingestNodes.isEmpty()) {
|
||||
throw new IllegalStateException("There are no ingest nodes in this cluster, unable to forward request to an ingest node.");
|
||||
}
|
||||
|
||||
int index = getNodeNumber();
|
||||
return ingestNodes.get((index) % ingestNodes.size());
|
||||
}
|
||||
|
||||
private int getNodeNumber() {
|
||||
int index = randomNodeGenerator.incrementAndGet();
|
||||
if (index < 0) {
|
||||
index = 0;
|
||||
randomNodeGenerator.set(0);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
}
|
@ -19,6 +19,8 @@
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.env.Environment;
|
||||
@ -81,4 +83,10 @@ public class IngestModule extends AbstractModule {
|
||||
public static boolean isIngestEnabled(Settings settings) {
|
||||
return settings.getAsBoolean("node.ingest", true);
|
||||
}
|
||||
|
||||
public static boolean isIngestEnabled(ImmutableOpenMap<String, String> nodeAttributes) {
|
||||
String ingestEnabled = nodeAttributes.get("ingest");
|
||||
//reproduces same logic used in settings.getAsBoolean used above
|
||||
return Booleans.parseBoolean(ingestEnabled, true);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,251 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.ingest;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.bulk.BulkAction;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexAction;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.ingest.IngestModule;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.hamcrest.CustomTypeSafeMatcher;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.argThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class IngestProxyActionFilterTests extends ESTestCase {
|
||||
|
||||
private TransportService transportService;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private IngestProxyActionFilter buildFilter(int ingestNodes, int totalNodes) {
|
||||
ClusterState clusterState = mock(ClusterState.class);
|
||||
DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder();
|
||||
DiscoveryNode localNode = null;
|
||||
for (int i = 0; i < totalNodes; i++) {
|
||||
String nodeId = "node" + i;
|
||||
Map<String, String> attributes = new HashMap<>();
|
||||
if (i >= ingestNodes) {
|
||||
attributes.put("ingest", "false");
|
||||
} else if (randomBoolean()) {
|
||||
attributes.put("ingest", "true");
|
||||
}
|
||||
DiscoveryNode node = new DiscoveryNode(nodeId, nodeId, DummyTransportAddress.INSTANCE, attributes, VersionUtils.randomVersion(random()));
|
||||
builder.put(node);
|
||||
if (i == totalNodes - 1) {
|
||||
localNode = node;
|
||||
}
|
||||
}
|
||||
when(clusterState.nodes()).thenReturn(builder.build());
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
when(clusterService.localNode()).thenReturn(localNode);
|
||||
when(clusterService.state()).thenReturn(clusterState);
|
||||
transportService = mock(TransportService.class);
|
||||
return new IngestProxyActionFilter(clusterService, transportService);
|
||||
}
|
||||
|
||||
public void testApplyNoIngestNodes() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
int totalNodes = randomIntBetween(1, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(0, totalNodes);
|
||||
|
||||
String action;
|
||||
ActionRequest request;
|
||||
if (randomBoolean()) {
|
||||
action = IndexAction.NAME;
|
||||
request = new IndexRequest().pipeline("_id");
|
||||
} else {
|
||||
action = BulkAction.NAME;
|
||||
request = new BulkRequest().add(new IndexRequest().pipeline("_id"));
|
||||
}
|
||||
try {
|
||||
filter.apply(task, action, request, actionListener, actionFilterChain);
|
||||
fail("should have failed because there are no ingest nodes");
|
||||
} catch(IllegalStateException e) {
|
||||
assertThat(e.getMessage(), equalTo("There are no ingest nodes in this cluster, unable to forward request to an ingest node."));
|
||||
}
|
||||
verifyZeroInteractions(transportService);
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
public void testApplyNoPipelineId() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
int totalNodes = randomIntBetween(1, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
|
||||
|
||||
String action;
|
||||
ActionRequest request;
|
||||
if (randomBoolean()) {
|
||||
action = IndexAction.NAME;
|
||||
request = new IndexRequest();
|
||||
} else {
|
||||
action = BulkAction.NAME;
|
||||
request = new BulkRequest().add(new IndexRequest());
|
||||
}
|
||||
filter.apply(task, action, request, actionListener, actionFilterChain);
|
||||
verifyZeroInteractions(transportService);
|
||||
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
public void testApplyAnyAction() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
ActionRequest request = mock(ActionRequest.class);
|
||||
int totalNodes = randomIntBetween(1, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(randomIntBetween(0, totalNodes - 1), totalNodes);
|
||||
|
||||
String action = randomAsciiOfLengthBetween(1, 20);
|
||||
filter.apply(task, action, request, actionListener, actionFilterChain);
|
||||
verifyZeroInteractions(transportService);
|
||||
verify(actionFilterChain).proceed(any(Task.class), eq(action), same(request), same(actionListener));
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testApplyIndexRedirect() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
int totalNodes = randomIntBetween(2, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
|
||||
Answer<Void> answer = invocationOnMock -> {
|
||||
TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
|
||||
transportResponseHandler.handleResponse(new IndexResponse());
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
|
||||
|
||||
IndexRequest indexRequest = new IndexRequest().pipeline("_id");
|
||||
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(IndexAction.NAME), same(indexRequest), any(TransportResponseHandler.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
verify(actionListener).onResponse(any(IndexResponse.class));
|
||||
verify(actionListener, never()).onFailure(any(TransportException.class));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testApplyBulkRedirect() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
int totalNodes = randomIntBetween(2, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
|
||||
Answer<Void> answer = invocationOnMock -> {
|
||||
TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
|
||||
transportResponseHandler.handleResponse(new BulkResponse(null, -1));
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.add(new IndexRequest().pipeline("_id"));
|
||||
int numNoPipelineRequests = randomIntBetween(0, 10);
|
||||
for (int i = 0; i < numNoPipelineRequests; i++) {
|
||||
bulkRequest.add(new IndexRequest());
|
||||
}
|
||||
filter.apply(task, BulkAction.NAME, bulkRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(BulkAction.NAME), same(bulkRequest), any(TransportResponseHandler.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
verify(actionListener).onResponse(any(BulkResponse.class));
|
||||
verify(actionListener, never()).onFailure(any(TransportException.class));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testApplyFailures() {
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
int totalNodes = randomIntBetween(2, 5);
|
||||
IngestProxyActionFilter filter = buildFilter(randomIntBetween(1, totalNodes - 1), totalNodes);
|
||||
Answer<Void> answer = invocationOnMock -> {
|
||||
TransportResponseHandler transportResponseHandler = (TransportResponseHandler) invocationOnMock.getArguments()[3];
|
||||
transportResponseHandler.handleException(new TransportException(new IllegalArgumentException()));
|
||||
return null;
|
||||
};
|
||||
doAnswer(answer).when(transportService).sendRequest(any(DiscoveryNode.class), any(String.class), any(TransportRequest.class), any(TransportResponseHandler.class));
|
||||
|
||||
String action;
|
||||
ActionRequest request;
|
||||
if (randomBoolean()) {
|
||||
action = IndexAction.NAME;
|
||||
request = new IndexRequest().pipeline("_id");
|
||||
} else {
|
||||
action = BulkAction.NAME;
|
||||
request = new BulkRequest().add(new IndexRequest().pipeline("_id"));
|
||||
}
|
||||
|
||||
filter.apply(task, action, request, actionListener, actionFilterChain);
|
||||
|
||||
verify(transportService).sendRequest(argThat(new IngestNodeMatcher()), eq(action), same(request), any(TransportResponseHandler.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
verify(actionListener).onFailure(any(TransportException.class));
|
||||
verify(actionListener, never()).onResponse(any(TransportResponse.class));
|
||||
}
|
||||
|
||||
private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> {
|
||||
private IngestNodeMatcher() {
|
||||
super("discovery node should be an ingest node");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean matchesSafely(DiscoveryNode node) {
|
||||
return IngestModule.isIngestEnabled(node.getAttributes());
|
||||
}
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.ingest.SimulateDocumentSimpleResult;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineAction;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineRequestBuilder;
|
||||
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.core.IngestDocument;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
@ -51,8 +52,17 @@ import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
|
||||
public class IngestClientIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
if (nodeOrdinal % 2 == 0) {
|
||||
return Settings.builder().put("node.ingest", false).put(super.nodeSettings(nodeOrdinal)).build();
|
||||
}
|
||||
return super.nodeSettings(nodeOrdinal);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(IngestPlugin.class);
|
||||
|
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.ingest;
|
||||
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class IngestModuleTests extends ESTestCase {
|
||||
|
||||
public void testIsIngestEnabledSettings() {
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.EMPTY), equalTo(true));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", true).build()), equalTo(true));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "true").build()), equalTo(true));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", false).build()), equalTo(false));
|
||||
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "false").build()), equalTo(false));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "off").build()), equalTo(false));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "no").build()), equalTo(false));
|
||||
assertThat(IngestModule.isIngestEnabled(Settings.builder().put("node.ingest", "0").build()), equalTo(false));
|
||||
}
|
||||
|
||||
public void testIsIngestEnabledAttributes() {
|
||||
assertThat(IngestModule.isIngestEnabled(ImmutableOpenMap.<String, String>builder().build()), equalTo(true));
|
||||
|
||||
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", "true");
|
||||
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(true));
|
||||
|
||||
builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", "false");
|
||||
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false));
|
||||
|
||||
builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", "off");
|
||||
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false));
|
||||
|
||||
builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", "no");
|
||||
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false));
|
||||
|
||||
builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", "0");
|
||||
assertThat(IngestModule.isIngestEnabled(builder.build()), equalTo(false));
|
||||
}
|
||||
|
||||
public void testIsIngestEnabledMethodsReturnTheSameValue() {
|
||||
String randomString;
|
||||
if (randomBoolean()) {
|
||||
randomString = randomFrom("true", "false", "on", "off", "yes", "no", "0", "1");
|
||||
} else {
|
||||
randomString = randomAsciiOfLengthBetween(1, 5);
|
||||
}
|
||||
Settings settings = Settings.builder().put("node.ingest", randomString).build();
|
||||
ImmutableOpenMap.Builder<String, String> builder = ImmutableOpenMap.<String, String>builder();
|
||||
builder.put("ingest", randomString);
|
||||
ImmutableOpenMap<String, String> attributes = builder.build();
|
||||
|
||||
assertThat(IngestModule.isIngestEnabled(settings), equalTo(IngestModule.isIngestEnabled(attributes)));
|
||||
}
|
||||
}
|
@ -79,7 +79,7 @@
|
||||
---
|
||||
"Test index api with pipeline id fails when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /node.ingest is set to false, cannot execute pipeline/
|
||||
catch: /There are no ingest nodes in this cluster, unable to forward request to an ingest node./
|
||||
index:
|
||||
index: test
|
||||
type: test
|
||||
@ -94,7 +94,7 @@
|
||||
---
|
||||
"Test bulk api with pipeline id fails when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /node.ingest is set to false, cannot execute pipeline/
|
||||
catch: /There are no ingest nodes in this cluster, unable to forward request to an ingest node./
|
||||
bulk:
|
||||
pipeline: "my_pipeline_1"
|
||||
body:
|
||||
@ -112,7 +112,7 @@
|
||||
---
|
||||
"Test bulk api that contains a single index call with pipeline id fails when node.ingest is set to false":
|
||||
- do:
|
||||
catch: /node.ingest is set to false, cannot execute pipeline/
|
||||
catch: /There are no ingest nodes in this cluster, unable to forward request to an ingest node./
|
||||
bulk:
|
||||
body:
|
||||
- index:
|
||||
|
Loading…
x
Reference in New Issue
Block a user