mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Added pipeline execution service that deals with updating data as it comes in using a dedicated thread pool.
Also changed how bulk requests are handled, because before it just didn't work, but added a todo there because it can potentially be handled differently.
This commit is contained in:
parent
fa187a2e69
commit
82a9ba355d
@ -33,6 +33,8 @@ public final class Data {
|
||||
private final String id;
|
||||
private final Map<String, Object> document;
|
||||
|
||||
private boolean modified = false;
|
||||
|
||||
public Data(String index, String type, String id, Map<String, Object> document) {
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
@ -46,6 +48,7 @@ public final class Data {
|
||||
}
|
||||
|
||||
public void addField(String field, String value) {
|
||||
modified = true;
|
||||
document.put(field, value);
|
||||
}
|
||||
|
||||
@ -64,4 +67,8 @@ public final class Data {
|
||||
public Map<String, Object> getDocument() {
|
||||
return document;
|
||||
}
|
||||
|
||||
public boolean isModified() {
|
||||
return modified;
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ public final class SimpleProcessor implements Processor {
|
||||
public static class Builder implements Processor.Builder {
|
||||
|
||||
private String path;
|
||||
private String value;
|
||||
private String expectedValue;
|
||||
private String addField;
|
||||
private String addFieldValue;
|
||||
|
||||
@ -59,8 +59,8 @@ public final class SimpleProcessor implements Processor {
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
public void setValue(String value) {
|
||||
this.value = value;
|
||||
public void setExpectedValue(String value) {
|
||||
this.expectedValue = value;
|
||||
}
|
||||
|
||||
public void setAddField(String addField) {
|
||||
@ -73,14 +73,14 @@ public final class SimpleProcessor implements Processor {
|
||||
|
||||
public void fromMap(Map<String, Object> config) {
|
||||
this.path = (String) config.get("path");
|
||||
this.value = (String) config.get("value");
|
||||
this.expectedValue = (String) config.get("expected_value");
|
||||
this.addField = (String) config.get("add_field");
|
||||
this.addFieldValue = (String) config.get("add_field_value");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor build() {
|
||||
return new SimpleProcessor(path, value, addField, addFieldValue);
|
||||
return new SimpleProcessor(path, expectedValue, addField, addFieldValue);
|
||||
}
|
||||
|
||||
public static class Factory implements Processor.Builder.Factory {
|
||||
|
@ -37,6 +37,7 @@ public class IngestModule extends AbstractModule {
|
||||
@Override
|
||||
protected void configure() {
|
||||
binder().bind(IngestRestFilter.class).asEagerSingleton();
|
||||
binder().bind(PipelineExecutionService.class).asEagerSingleton();
|
||||
binder().bind(PipelineStore.class).asEagerSingleton();
|
||||
binder().bind(PipelineConfigDocReader.class).asEagerSingleton();
|
||||
|
||||
|
@ -23,6 +23,7 @@ package org.elasticsearch.plugin.ingest;
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugin.ingest.transport.IngestActionFilter;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.action.RestActionModule;
|
||||
@ -31,14 +32,23 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
|
||||
public class IngestPlugin extends Plugin {
|
||||
|
||||
public static final String INGEST_CONTEXT_KEY = "__ingest__";
|
||||
public static final String INGEST_PARAM = "ingest";
|
||||
public static final String NAME = "ingest";
|
||||
|
||||
private final Settings nodeSettings;
|
||||
|
||||
public IngestPlugin(Settings nodeSettings) {
|
||||
this.nodeSettings = nodeSettings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "ingest";
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -56,6 +66,13 @@ public class IngestPlugin extends Plugin {
|
||||
return Arrays.asList(PipelineStore.class, PipelineConfigDocReader.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings additionalSettings() {
|
||||
return settingsBuilder()
|
||||
.put(PipelineExecutionService.additionalSettings(nodeSettings))
|
||||
.build();
|
||||
}
|
||||
|
||||
public void onModule(ActionModule module) {
|
||||
module.registerFilter(IngestActionFilter.class);
|
||||
}
|
||||
|
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
public class PipelineExecutionService {
|
||||
|
||||
static final String THREAD_POOL_NAME = IngestPlugin.NAME;
|
||||
|
||||
private final PipelineStore store;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
@Inject
|
||||
public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) {
|
||||
this.store = store;
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public void execute(Data data, String pipelineId, Listener listener) {
|
||||
Pipeline pipeline = store.get(pipelineId);
|
||||
if (pipeline == null) {
|
||||
listener.failed(new IllegalArgumentException(LoggerMessageFormat.format("pipeline with id [{}] does not exist", pipelineId)));
|
||||
return;
|
||||
}
|
||||
|
||||
threadPool.executor(THREAD_POOL_NAME).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
pipeline.execute(data);
|
||||
listener.executed(data);
|
||||
} catch (Exception e) {
|
||||
listener.failed(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void executed(Data data);
|
||||
|
||||
void failed(Exception e);
|
||||
|
||||
}
|
||||
|
||||
public static Settings additionalSettings(Settings nodeSettings) {
|
||||
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
|
||||
if (!settings.names().isEmpty()) {
|
||||
// the TP is already configured in the node settings
|
||||
// no need for additional settings
|
||||
return Settings.EMPTY;
|
||||
}
|
||||
int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings);
|
||||
return Settings.builder()
|
||||
.put("threadpool." + THREAD_POOL_NAME + ".type", "fixed")
|
||||
.put("threadpool." + THREAD_POOL_NAME + ".size", availableProcessors)
|
||||
.put("threadpool." + THREAD_POOL_NAME + ".queue_size", 200)
|
||||
.build();
|
||||
}
|
||||
|
||||
}
|
@ -25,65 +25,108 @@ import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
public class IngestActionFilter extends ActionFilter.Simple {
|
||||
public class IngestActionFilter extends AbstractComponent implements ActionFilter {
|
||||
|
||||
private final PipelineStore pipelineStore;
|
||||
private final PipelineExecutionService executionService;
|
||||
|
||||
@Inject
|
||||
public IngestActionFilter(Settings settings, PipelineStore pipelineStore) {
|
||||
public IngestActionFilter(Settings settings, PipelineExecutionService executionService) {
|
||||
super(settings);
|
||||
this.pipelineStore = pipelineStore;
|
||||
this.executionService = executionService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionRequest request, ActionListener listener) {
|
||||
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.INGEST_CONTEXT_KEY);
|
||||
if (pipelineId == null) {
|
||||
pipelineId = request.getHeader(IngestPlugin.INGEST_PARAM);
|
||||
if (pipelineId == null) {
|
||||
return true;
|
||||
chain.proceed(action, request, listener);
|
||||
return;
|
||||
}
|
||||
}
|
||||
Pipeline pipeline = pipelineStore.get(pipelineId);
|
||||
if (pipeline == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (request instanceof IndexRequest) {
|
||||
processIndexRequest((IndexRequest) request, pipeline);
|
||||
processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId);
|
||||
} else if (request instanceof BulkRequest) {
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
List<ActionRequest> actionRequests = bulkRequest.requests();
|
||||
for (ActionRequest actionRequest : actionRequests) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
processIndexRequest((IndexRequest) actionRequest, pipeline);
|
||||
}
|
||||
}
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, bulkRequest.requests().iterator());
|
||||
} else {
|
||||
chain.proceed(action, request, listener);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// TODO: this should be delegated to a PipelineExecutor service that executes on a different thread (pipeline TP)
|
||||
void processIndexRequest(IndexRequest indexRequest, Pipeline pipeline) {
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
pipeline.execute(data);
|
||||
indexRequest.source(data.getDocument());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionResponse response, ActionListener listener) {
|
||||
return true;
|
||||
public void apply(String action, ActionResponse response, ActionListener listener, ActionFilterChain chain) {
|
||||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) {
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() {
|
||||
@Override
|
||||
public void executed(Data data) {
|
||||
if (data.isModified()) {
|
||||
indexRequest.source(data.getDocument());
|
||||
}
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception e) {
|
||||
logger.error("failed to execute pipeline [{}]", e, pipelineId);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: rethink how to deal with bulk requests:
|
||||
// This doesn't scale very well for a single bulk requests, so it would be great if a bulk requests could be broken up into several chunks so that the ingesting can be paralized
|
||||
// on the other hand if there are many index/bulk requests then breaking up bulk requests isn't going to help much.
|
||||
// I think the execution service should be smart enough about when it should break things up in chunks based on the ingest threadpool usage,
|
||||
// this means that the contract of the execution service should change in order to accept multiple data instances.
|
||||
void processBulkIndexRequest(String action, ActionListener listener, ActionFilterChain chain, BulkRequest bulkRequest, String pipelineId, Iterator<ActionRequest> requests) {
|
||||
if (!requests.hasNext()) {
|
||||
chain.proceed(action, bulkRequest, listener);
|
||||
return;
|
||||
}
|
||||
|
||||
ActionRequest actionRequest = requests.next();
|
||||
if (!(actionRequest instanceof IndexRequest)) {
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests);
|
||||
return;
|
||||
}
|
||||
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
|
||||
Data data = new Data(indexRequest.index(), indexRequest.type(), indexRequest.id(), sourceAsMap);
|
||||
executionService.execute(data, pipelineId, new PipelineExecutionService.Listener() {
|
||||
@Override
|
||||
public void executed(Data data) {
|
||||
if (data.isModified()) {
|
||||
indexRequest.source(data.getDocument());
|
||||
}
|
||||
processBulkIndexRequest(action, listener, chain, bulkRequest, pipelineId, requests);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Exception e) {
|
||||
logger.error("failed to execute pipeline [{}]", e, pipelineId);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,7 +48,7 @@ public class BasicTests extends ESIntegTestCase {
|
||||
.startObject()
|
||||
.startObject("simple")
|
||||
.field("path", "field2")
|
||||
.field("value", "abc")
|
||||
.field("expected_value", "abc")
|
||||
.field("add_field", "field3")
|
||||
.field("add_field_value", "xyz")
|
||||
.endObject()
|
||||
@ -64,16 +64,25 @@ public class BasicTests extends ESIntegTestCase {
|
||||
.putHeader("ingest", "_id")
|
||||
.get();
|
||||
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "1")
|
||||
.get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
}
|
||||
});
|
||||
|
||||
client().prepareBulk().add(
|
||||
client().prepareIndex("test", "type", "2").setSource("field2", "abc")
|
||||
).putHeader("ingest", "_id").get();
|
||||
|
||||
doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Map<String, Object> doc = client().prepareGet("test", "type", "2").get().getSourceAsMap();
|
||||
assertThat(doc.get("field3"), equalTo("xyz"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,127 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.Processor;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class PipelineExecutionServiceTests extends ESTestCase {
|
||||
|
||||
private PipelineStore store;
|
||||
private ThreadPool threadPool;
|
||||
private PipelineExecutionService executionService;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
store = mock(PipelineStore.class);
|
||||
threadPool = new ThreadPool(
|
||||
Settings.builder()
|
||||
.put("name", "_name")
|
||||
.put(PipelineExecutionService.additionalSettings(Settings.EMPTY))
|
||||
.build()
|
||||
);
|
||||
executionService = new PipelineExecutionService(store, threadPool);
|
||||
}
|
||||
|
||||
@After
|
||||
public void destroy() {
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
public void testExecute_pipelineDoesNotExist() {
|
||||
when(store.get("_id")).thenReturn(null);
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
verify(listener).failed(any(IllegalArgumentException.class));
|
||||
verify(listener, times(0)).executed(data);
|
||||
}
|
||||
|
||||
public void testExecute_success() throws Exception {
|
||||
Pipeline.Builder builder = new Pipeline.Builder("_id");
|
||||
Processor processor = mock(Processor.class);
|
||||
builder.addProcessors(new Processor.Builder() {
|
||||
@Override
|
||||
public void fromMap(Map<String, Object> config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor build() {
|
||||
return processor;
|
||||
}
|
||||
});
|
||||
|
||||
when(store.get("_id")).thenReturn(builder.build());
|
||||
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(processor).execute(data);
|
||||
verify(listener).executed(data);
|
||||
verify(listener, times(0)).failed(any(Exception.class));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testExecute_failure() throws Exception {
|
||||
Pipeline.Builder builder = new Pipeline.Builder("_id");
|
||||
Processor processor = mock(Processor.class);
|
||||
builder.addProcessors(new Processor.Builder() {
|
||||
@Override
|
||||
public void fromMap(Map<String, Object> config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Processor build() {
|
||||
return processor;
|
||||
}
|
||||
});
|
||||
|
||||
when(store.get("_id")).thenReturn(builder.build());
|
||||
Data data = new Data("_index", "_type", "_id", Collections.emptyMap());
|
||||
doThrow(new RuntimeException()).when(processor).execute(data);
|
||||
PipelineExecutionService.Listener listener = mock(PipelineExecutionService.Listener.class);
|
||||
executionService.execute(data, "_id", listener);
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(processor).execute(data);
|
||||
verify(listener, times(0)).executed(data);
|
||||
verify(listener).failed(any(RuntimeException.class));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,209 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.ingest.transport;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.ingest.Data;
|
||||
import org.elasticsearch.ingest.Pipeline;
|
||||
import org.elasticsearch.ingest.SimpleProcessor;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
public class IngestActionFilterTests extends ESTestCase {
|
||||
|
||||
private IngestActionFilter filter;
|
||||
private PipelineExecutionService executionService;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
executionService = mock(PipelineExecutionService.class);
|
||||
filter = new IngestActionFilter(Settings.EMPTY, executionService);
|
||||
}
|
||||
|
||||
public void testApplyNoIngestId() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest();
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(executionService, actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApplyIngestIdViaRequestParam() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApplyIngestIdViaContext() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putInContext(IngestPlugin.INGEST_CONTEXT_KEY, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApply_executed() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
Answer answer = new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
Data data = (Data) invocationOnMock.getArguments()[0];
|
||||
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
|
||||
listener.executed(data);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
public void testApply_failed() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
RuntimeException exception = new RuntimeException();
|
||||
Answer answer = new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
PipelineExecutionService.Listener listener = (PipelineExecutionService.Listener) invocationOnMock.getArguments()[2];
|
||||
listener.failed(exception);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(Data.class), eq("_id"), any(PipelineExecutionService.Listener.class));
|
||||
verify(actionListener).onFailure(exception);
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApply_withBulkRequest() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool(
|
||||
Settings.builder()
|
||||
.put("name", "_name")
|
||||
.put(PipelineExecutionService.additionalSettings(Settings.EMPTY))
|
||||
.build()
|
||||
);
|
||||
PipelineStore store = mock(PipelineStore.class);
|
||||
Pipeline.Builder pipelineBuilder = new Pipeline.Builder("_id");
|
||||
SimpleProcessor.Builder processorBuilder = new SimpleProcessor.Builder();
|
||||
processorBuilder.setPath("field1");
|
||||
processorBuilder.setExpectedValue("value1");
|
||||
processorBuilder.setAddField("field2");
|
||||
processorBuilder.setAddFieldValue("value2");
|
||||
pipelineBuilder.addProcessors(processorBuilder);
|
||||
when(store.get("_id")).thenReturn(pipelineBuilder.build());
|
||||
executionService = new PipelineExecutionService(store, threadPool);
|
||||
filter = new IngestActionFilter(Settings.EMPTY, executionService);
|
||||
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
bulkRequest.putHeader(IngestPlugin.INGEST_PARAM, "_id");
|
||||
int numRequest = scaledRandomIntBetween(8, 64);
|
||||
for (int i = 0; i < numRequest; i++) {
|
||||
if (rarely()) {
|
||||
ActionRequest request;
|
||||
if (randomBoolean()) {
|
||||
request = new DeleteRequest("_index", "_type", "_id");
|
||||
} else {
|
||||
request = new UpdateRequest("_index", "_type", "_id");
|
||||
}
|
||||
bulkRequest.add(request);
|
||||
} else {
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field1", "value1");
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
}
|
||||
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", bulkRequest, actionListener, actionFilterChain);
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(actionFilterChain).proceed("_action", bulkRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
|
||||
int assertedRequests = 0;
|
||||
for (ActionRequest actionRequest : bulkRequest.requests()) {
|
||||
if (actionRequest instanceof IndexRequest) {
|
||||
IndexRequest indexRequest = (IndexRequest) actionRequest;
|
||||
assertThat(indexRequest.sourceAsMap().size(), equalTo(2));
|
||||
assertThat(indexRequest.sourceAsMap().get("field1"), equalTo("value1"));
|
||||
assertThat(indexRequest.sourceAsMap().get("field2"), equalTo("value2"));
|
||||
}
|
||||
assertedRequests++;
|
||||
}
|
||||
assertThat(assertedRequests, equalTo(numRequest));
|
||||
}
|
||||
});
|
||||
|
||||
threadPool.shutdown();
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user