diff --git a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java index a799b66678e..aa30c89ef59 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/BulkRequestModifierTests.java @@ -29,14 +29,60 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; -import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; public class BulkRequestModifierTests extends ESTestCase { + public void testBulkRequestModifier() { + int numRequests = scaledRandomIntBetween(8, 64); + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numRequests; i++) { + bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}")); + } + CaptureActionListener actionListener = new CaptureActionListener(); + IngestActionFilter.BulkRequestModifier bulkRequestModifier = new IngestActionFilter.BulkRequestModifier(bulkRequest); + + int i = 0; + Set failedSlots = new HashSet<>(); + while (bulkRequestModifier.hasNext()) { + bulkRequestModifier.next(); + if (randomBoolean()) { + bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); + failedSlots.add(i); + } + i++; + } + + assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size())); + // simulate that we actually executed the modified bulk request: + ActionListener result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener); + result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0)); + + BulkResponse bulkResponse = actionListener.getResponse(); + for (int j = 0; j < bulkResponse.getItems().length; j++) { + if (failedSlots.contains(j)) { + BulkItemResponse item = bulkResponse.getItems()[j]; + assertThat(item.isFailed(), is(true)); + assertThat(item.getFailure().getIndex(), equalTo("_index")); + assertThat(item.getFailure().getType(), equalTo("_type")); + assertThat(item.getFailure().getId(), equalTo(String.valueOf(j))); + assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException")); + } else { + assertThat(bulkResponse.getItems()[j], nullValue()); + } + } + } + public void testPipelineFailures() { BulkRequest originalBulkRequest = new BulkRequest(); for (int i = 0; i < 32; i++) { @@ -73,7 +119,7 @@ public class BulkRequestModifierTests extends ESTestCase { IndexResponse indexResponse = new IndexResponse(new ShardId("index", 0), indexRequest.type(), indexRequest.id(), 1, true); originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse)); } - bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[0]), 0)); + bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0)); assertThat(responses.size(), Matchers.equalTo(32)); for (int i = 0; i < 32; i++) { @@ -88,14 +134,32 @@ public class BulkRequestModifierTests extends ESTestCase { } IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest); - for (int i = 0; modifier.hasNext(); i++) { + while (modifier.hasNext()) { modifier.next(); } BulkRequest bulkRequest = modifier.getBulkRequest(); assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest)); - ActionListener actionListener = Mockito.mock(ActionListener.class); + @SuppressWarnings("unchecked") + ActionListener actionListener = mock(ActionListener.class); assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener)); } + private static class CaptureActionListener implements ActionListener { + + private BulkResponse response; + + @Override + public void onResponse(BulkResponse bulkItemResponses) { + this.response = bulkItemResponses ; + } + + @Override + public void onFailure(Throwable e) { + } + + public BulkResponse getResponse() { + return response; + } + } } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java index d8663ade60b..344770b2bba 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestActionFilterTests.java @@ -22,9 +22,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.bulk.BulkAction; -import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexRequest; @@ -44,14 +42,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; import org.mockito.stubbing.Answer; -import java.util.HashSet; -import java.util.Set; import java.util.function.Consumer; -import static org.elasticsearch.action.ingest.IngestActionFilter.BulkRequestModifier; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.same; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; @@ -112,18 +105,6 @@ public class IngestActionFilterTests extends ESTestCase { verifyZeroInteractions(actionFilterChain); } - public void testApplyAlreadyProcessed() throws Exception { - Task task = mock(Task.class); - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id"); - indexRequest.source("field", "value"); - indexRequest.putHeader(IngestActionFilter.PIPELINE_ALREADY_PROCESSED, true); - ActionListener actionListener = mock(ActionListener.class); - ActionFilterChain actionFilterChain = mock(ActionFilterChain.class); - filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain); - verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener); - verifyZeroInteractions(executionService, actionListener); - } - @SuppressWarnings("unchecked") public void testApplyExecuted() throws Exception { Task task = mock(Task.class); @@ -231,63 +212,4 @@ public class IngestActionFilterTests extends ESTestCase { assertThat(assertedRequests, equalTo(numRequest)); }); } - - public void testBulkRequestModifier() { - int numRequests = scaledRandomIntBetween(8, 64); - BulkRequest bulkRequest = new BulkRequest(); - for (int i = 0; i < numRequests; i++) { - bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}")); - } - CaptureActionListener actionListener = new CaptureActionListener(); - BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); - - int i = 0; - Set failedSlots = new HashSet<>(); - while (bulkRequestModifier.hasNext()) { - bulkRequestModifier.next(); - if (randomBoolean()) { - bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException()); - failedSlots.add(i); - } - i++; - } - - assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size())); - // simulate that we actually executed the modified bulk request: - ActionListener result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener); - result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0)); - - BulkResponse bulkResponse = actionListener.getResponse(); - for (int j = 0; j < bulkResponse.getItems().length; j++) { - if (failedSlots.contains(j)) { - BulkItemResponse item = bulkResponse.getItems()[j]; - assertThat(item.isFailed(), is(true)); - assertThat(item.getFailure().getIndex(), equalTo("_index")); - assertThat(item.getFailure().getType(), equalTo("_type")); - assertThat(item.getFailure().getId(), equalTo(String.valueOf(j))); - assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException")); - } else { - assertThat(bulkResponse.getItems()[j], nullValue()); - } - } - } - - private static class CaptureActionListener implements ActionListener { - - private BulkResponse response; - - @Override - public void onResponse(BulkResponse bulkItemResponses) { - this.response = bulkItemResponses ; - } - - @Override - public void onFailure(Throwable e) { - } - - public BulkResponse getResponse() { - return response; - } - } - }