fix compile errors due to changes in master
This commit is contained in:
parent
e275af8a58
commit
1eb5ae1dce
|
@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.plugin.ingest.IngestBootstrapper;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
|
@ -52,25 +53,25 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
}
|
||||
|
||||
@Override
|
||||
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
|
||||
if (pipelineId == null) {
|
||||
pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM);
|
||||
if (pipelineId == null) {
|
||||
chain.proceed(action, request, listener);
|
||||
chain.proceed(task, action, request, listener);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (request instanceof IndexRequest) {
|
||||
processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId);
|
||||
processIndexRequest(task, action, listener, chain, (IndexRequest) request, pipelineId);
|
||||
} else if (request instanceof BulkRequest) {
|
||||
BulkRequest bulkRequest = (BulkRequest) request;
|
||||
@SuppressWarnings("unchecked")
|
||||
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
|
||||
processBulkIndexRequest(bulkRequest, pipelineId, action, chain, actionListener);
|
||||
processBulkIndexRequest(task, bulkRequest, pipelineId, action, chain, actionListener);
|
||||
} else {
|
||||
chain.proceed(action, request, listener);
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,12 +80,12 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
chain.proceed(action, response, listener);
|
||||
}
|
||||
|
||||
void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) {
|
||||
void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) {
|
||||
// The IndexRequest has the same type on the node that receives the request and the node that
|
||||
// processes the primary action. This could lead to a pipeline being executed twice for the same
|
||||
// index request, hence this check
|
||||
if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) {
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
chain.proceed(task, action, indexRequest, listener);
|
||||
return;
|
||||
}
|
||||
executionService.execute(indexRequest, pipelineId, t -> {
|
||||
|
@ -92,11 +93,11 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
listener.onFailure(t);
|
||||
}, success -> {
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
|
||||
chain.proceed(action, indexRequest, listener);
|
||||
chain.proceed(task, action, indexRequest, listener);
|
||||
});
|
||||
}
|
||||
|
||||
void processBulkIndexRequest(BulkRequest original, String pipelineId, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
||||
void processBulkIndexRequest(Task task, BulkRequest original, String pipelineId, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
||||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
|
||||
executionService.execute(() -> bulkRequestModifier, pipelineId, e -> {
|
||||
logger.debug("failed to execute pipeline [{}]", e, pipelineId);
|
||||
|
@ -110,7 +111,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
|||
// (this will happen if all preprocessing all items in the bulk failed)
|
||||
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
|
||||
} else {
|
||||
chain.proceed(action, bulkRequest, actionListener);
|
||||
chain.proceed(task, action, bulkRequest, actionListener);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -24,11 +24,12 @@ import org.elasticsearch.action.ActionResponse;
|
|||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilterChain;
|
||||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
public final class IngestDisabledActionFilter implements ActionFilter {
|
||||
|
||||
@Override
|
||||
public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) {
|
||||
String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY);
|
||||
if (pipelineId != null) {
|
||||
failRequest(pipelineId);
|
||||
|
@ -38,7 +39,7 @@ public final class IngestDisabledActionFilter implements ActionFilter {
|
|||
failRequest(pipelineId);
|
||||
}
|
||||
|
||||
chain.proceed(action, request, listener);
|
||||
chain.proceed(task, action, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.plugin.ingest.IngestBootstrapper;
|
|||
import org.elasticsearch.plugin.ingest.IngestPlugin;
|
||||
import org.elasticsearch.plugin.ingest.PipelineExecutionService;
|
||||
import org.elasticsearch.plugin.ingest.PipelineStore;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
|
@ -77,42 +78,46 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
|
||||
public void testApplyNoIngestId() throws Exception {
|
||||
IndexRequest indexRequest = new IndexRequest();
|
||||
Task task = mock(Task.class);
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(executionService, actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApplyIngestIdViaRequestParam() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApplyIngestIdViaContext() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id");
|
||||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
verifyZeroInteractions(actionFilterChain);
|
||||
}
|
||||
|
||||
public void testApplyAlreadyProcessed() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
|
@ -120,13 +125,14 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(executionService, actionListener);
|
||||
}
|
||||
|
||||
public void testApplyExecuted() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
|
@ -140,14 +146,15 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
return null;
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
|
||||
verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
}
|
||||
|
||||
public void testApplyFailed() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
|
||||
indexRequest.source("field", "value");
|
||||
indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
|
||||
|
@ -164,7 +171,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
}
|
||||
};
|
||||
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain);
|
||||
|
||||
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
|
||||
verify(actionListener).onFailure(exception);
|
||||
|
@ -172,6 +179,7 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testApplyWithBulkRequest() throws Exception {
|
||||
Task task = mock(Task.class);
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.executor(any())).thenReturn(Runnable::run);
|
||||
PipelineStore store = mock(PipelineStore.class);
|
||||
|
@ -215,12 +223,12 @@ public class IngestActionFilterTests extends ESTestCase {
|
|||
ActionListener actionListener = mock(ActionListener.class);
|
||||
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
|
||||
|
||||
filter.apply("_action", bulkRequest, actionListener, actionFilterChain);
|
||||
filter.apply(task, "_action", bulkRequest, actionListener, actionFilterChain);
|
||||
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
verify(actionFilterChain).proceed("_action", bulkRequest, actionListener);
|
||||
verify(actionFilterChain).proceed(task, "_action", bulkRequest, actionListener);
|
||||
verifyZeroInteractions(actionListener);
|
||||
|
||||
int assertedRequests = 0;
|
||||
|
|
Loading…
Reference in New Issue