From 1eb5ae1dceae1ca6b94b2bbb75a7abe41bbe4872 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 6 Jan 2016 17:28:35 +0100 Subject: [PATCH] fix compile errors due to changes in master --- .../ingest/transport/IngestActionFilter.java | 21 ++++++------- .../transport/IngestDisabledActionFilter.java | 5 ++-- .../transport/IngestActionFilterTests.java | 30 ++++++++++++------- 3 files changed, 33 insertions(+), 23 deletions(-) diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java index 99117697358..bce3b5cdaa2 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilter.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugin.ingest.IngestBootstrapper; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; +import org.elasticsearch.tasks.Task; import java.util.ArrayList; import java.util.HashSet; @@ -52,25 +53,25 @@ public final class IngestActionFilter extends AbstractComponent implements Actio } @Override - public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { + public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY); if (pipelineId == null) { pipelineId = request.getHeader(IngestPlugin.PIPELINE_ID_PARAM); if (pipelineId == null) { - chain.proceed(action, request, listener); + chain.proceed(task, action, request, listener); return; } } if (request instanceof IndexRequest) { - processIndexRequest(action, listener, chain, (IndexRequest) request, pipelineId); + processIndexRequest(task, action, listener, chain, (IndexRequest) request, pipelineId); } else if (request instanceof BulkRequest) { BulkRequest bulkRequest = (BulkRequest) request; @SuppressWarnings("unchecked") ActionListener actionListener = (ActionListener) listener; - processBulkIndexRequest(bulkRequest, pipelineId, action, chain, actionListener); + processBulkIndexRequest(task, bulkRequest, pipelineId, action, chain, actionListener); } else { - chain.proceed(action, request, listener); + chain.proceed(task, action, request, listener); } } @@ -79,12 +80,12 @@ public final class IngestActionFilter extends AbstractComponent implements Actio chain.proceed(action, response, listener); } - void processIndexRequest(String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) { + void processIndexRequest(Task task, String action, ActionListener listener, ActionFilterChain chain, IndexRequest indexRequest, String pipelineId) { // The IndexRequest has the same type on the node that receives the request and the node that // processes the primary action. This could lead to a pipeline being executed twice for the same // index request, hence this check if (indexRequest.hasHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED)) { - chain.proceed(action, indexRequest, listener); + chain.proceed(task, action, indexRequest, listener); return; } executionService.execute(indexRequest, pipelineId, t -> { @@ -92,11 +93,11 @@ public final class IngestActionFilter extends AbstractComponent implements Actio listener.onFailure(t); }, success -> { indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true); - chain.proceed(action, indexRequest, listener); + chain.proceed(task, action, indexRequest, listener); }); } - void processBulkIndexRequest(BulkRequest original, String pipelineId, String action, ActionFilterChain chain, ActionListener listener) { + void processBulkIndexRequest(Task task, BulkRequest original, String pipelineId, String action, ActionFilterChain chain, ActionListener listener) { BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); executionService.execute(() -> bulkRequestModifier, pipelineId, e -> { logger.debug("failed to execute pipeline [{}]", e, pipelineId); @@ -110,7 +111,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio // (this will happen if all preprocessing all items in the bulk failed) actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0)); } else { - chain.proceed(action, bulkRequest, actionListener); + chain.proceed(task, action, bulkRequest, actionListener); } }); } diff --git a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java index 06db1ab099c..63ff584988d 100644 --- a/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java +++ b/plugins/ingest/src/main/java/org/elasticsearch/plugin/ingest/transport/IngestDisabledActionFilter.java @@ -24,11 +24,12 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilterChain; import org.elasticsearch.plugin.ingest.IngestPlugin; +import org.elasticsearch.tasks.Task; public final class IngestDisabledActionFilter implements ActionFilter { @Override - public void apply(String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { + public void apply(Task task, String action, ActionRequest request, ActionListener listener, ActionFilterChain chain) { String pipelineId = request.getFromContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY); if (pipelineId != null) { failRequest(pipelineId); @@ -38,7 +39,7 @@ public final class IngestDisabledActionFilter implements ActionFilter { failRequest(pipelineId); } - chain.proceed(action, request, listener); + chain.proceed(task, action, request, listener); } @Override diff --git a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java index 2bb44b80868..336c02f7b0d 100644 --- a/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java +++ b/plugins/ingest/src/test/java/org/elasticsearch/plugin/ingest/transport/IngestActionFilterTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.plugin.ingest.IngestBootstrapper; import org.elasticsearch.plugin.ingest.IngestPlugin; import org.elasticsearch.plugin.ingest.PipelineExecutionService; import org.elasticsearch.plugin.ingest.PipelineStore; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; @@ -77,42 +78,46 @@ public class IngestActionFilterTests extends ESTestCase { public void testApplyNoIngestId() throws Exception { IndexRequest indexRequest = new IndexRequest(); + Task task = mock(Task.class); ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); - verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener); verifyZeroInteractions(executionService, actionFilterChain); } public void testApplyIngestIdViaRequestParam() throws Exception { + Task task = mock(Task.class); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); verifyZeroInteractions(actionFilterChain); } public void testApplyIngestIdViaContext() throws Exception { + Task task = mock(Task.class); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); indexRequest.putInContext(IngestPlugin.PIPELINE_ID_PARAM_CONTEXT_KEY, "_id"); ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); verifyZeroInteractions(actionFilterChain); } public void testApplyAlreadyProcessed() throws Exception { + Task task = mock(Task.class); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); @@ -120,13 +125,14 @@ public class IngestActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); - verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener); verifyZeroInteractions(executionService, actionListener); } public void testApplyExecuted() throws Exception { + Task task = mock(Task.class); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); @@ -140,14 +146,15 @@ public class IngestActionFilterTests extends ESTestCase { return null; }; doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); - verify(actionFilterChain).proceed("_action", indexRequest, actionListener); + verify(actionFilterChain).proceed(task, "_action", indexRequest, actionListener); verifyZeroInteractions(actionListener); } public void testApplyFailed() throws Exception { + Task task = mock(Task.class); IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); indexRequest.source("field", "value"); indexRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id"); @@ -164,7 +171,7 @@ public class IngestActionFilterTests extends ESTestCase { } }; doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); - filter.apply("_action", indexRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", indexRequest, actionListener, actionFilterChain); verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class)); verify(actionListener).onFailure(exception); @@ -172,6 +179,7 @@ public class IngestActionFilterTests extends ESTestCase { } public void testApplyWithBulkRequest() throws Exception { + Task task = mock(Task.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(any())).thenReturn(Runnable::run); PipelineStore store = mock(PipelineStore.class); @@ -215,12 +223,12 @@ public class IngestActionFilterTests extends ESTestCase { ActionListener actionListener = mock(ActionListener.class); ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply("_action", bulkRequest, actionListener, actionFilterChain); + filter.apply(task, "_action", bulkRequest, actionListener, actionFilterChain); assertBusy(new Runnable() { @Override public void run() { - verify(actionFilterChain).proceed("_action", bulkRequest, actionListener); + verify(actionFilterChain).proceed(task, "_action", bulkRequest, actionListener); verifyZeroInteractions(actionListener); int assertedRequests = 0;