[TEST] move testBulkRequestModifier to existing BulkRequestModifierTests class

This commit is contained in:
javanna 2016-01-11 16:42:52 +01:00 committed by Luca Cavanna
parent fad2571ba5
commit 362deb4579
2 changed files with 68 additions and 82 deletions

View File

@ -29,14 +29,60 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.mockito.Mockito;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
public class BulkRequestModifierTests extends ESTestCase { public class BulkRequestModifierTests extends ESTestCase {
public void testBulkRequestModifier() {
int numRequests = scaledRandomIntBetween(8, 64);
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numRequests; i++) {
bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
}
CaptureActionListener actionListener = new CaptureActionListener();
IngestActionFilter.BulkRequestModifier bulkRequestModifier = new IngestActionFilter.BulkRequestModifier(bulkRequest);
int i = 0;
Set<Integer> failedSlots = new HashSet<>();
while (bulkRequestModifier.hasNext()) {
bulkRequestModifier.next();
if (randomBoolean()) {
bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
failedSlots.add(i);
}
i++;
}
assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
// simulate that we actually executed the modified bulk request:
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener);
result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
BulkResponse bulkResponse = actionListener.getResponse();
for (int j = 0; j < bulkResponse.getItems().length; j++) {
if (failedSlots.contains(j)) {
BulkItemResponse item = bulkResponse.getItems()[j];
assertThat(item.isFailed(), is(true));
assertThat(item.getFailure().getIndex(), equalTo("_index"));
assertThat(item.getFailure().getType(), equalTo("_type"));
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
} else {
assertThat(bulkResponse.getItems()[j], nullValue());
}
}
}
public void testPipelineFailures() { public void testPipelineFailures() {
BulkRequest originalBulkRequest = new BulkRequest(); BulkRequest originalBulkRequest = new BulkRequest();
for (int i = 0; i < 32; i++) { for (int i = 0; i < 32; i++) {
@ -73,7 +119,7 @@ public class BulkRequestModifierTests extends ESTestCase {
IndexResponse indexResponse = new IndexResponse(new ShardId("index", 0), indexRequest.type(), indexRequest.id(), 1, true); IndexResponse indexResponse = new IndexResponse(new ShardId("index", 0), indexRequest.type(), indexRequest.id(), 1, true);
originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse)); originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse));
} }
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[0]), 0)); bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[originalResponses.size()]), 0));
assertThat(responses.size(), Matchers.equalTo(32)); assertThat(responses.size(), Matchers.equalTo(32));
for (int i = 0; i < 32; i++) { for (int i = 0; i < 32; i++) {
@ -88,14 +134,32 @@ public class BulkRequestModifierTests extends ESTestCase {
} }
IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest); IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
for (int i = 0; modifier.hasNext(); i++) { while (modifier.hasNext()) {
modifier.next(); modifier.next();
} }
BulkRequest bulkRequest = modifier.getBulkRequest(); BulkRequest bulkRequest = modifier.getBulkRequest();
assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest)); assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
ActionListener<BulkResponse> actionListener = Mockito.mock(ActionListener.class); @SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = mock(ActionListener.class);
assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener)); assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener));
} }
private static class CaptureActionListener implements ActionListener<BulkResponse> {
private BulkResponse response;
@Override
public void onResponse(BulkResponse bulkItemResponses) {
this.response = bulkItemResponses ;
}
@Override
public void onFailure(Throwable e) {
}
public BulkResponse getResponse() {
return response;
}
}
} }

View File

@ -22,9 +22,7 @@ package org.elasticsearch.action.ingest;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkAction; import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -44,14 +42,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer; import java.util.function.Consumer;
import static org.elasticsearch.action.ingest.IngestActionFilter.BulkRequestModifier;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -112,18 +105,6 @@ public class IngestActionFilterTests extends ESTestCase {
verifyZeroInteractions(actionFilterChain); verifyZeroInteractions(actionFilterChain);
} }
public void testApplyAlreadyProcessed() throws Exception {
Task task = mock(Task.class);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field", "value");
indexRequest.putHeader(IngestActionFilter.PIPELINE_ALREADY_PROCESSED, true);
ActionListener actionListener = mock(ActionListener.class);
ActionFilterChain actionFilterChain = mock(ActionFilterChain.class);
filter.apply(task, IndexAction.NAME, indexRequest, actionListener, actionFilterChain);
verify(actionFilterChain).proceed(task, IndexAction.NAME, indexRequest, actionListener);
verifyZeroInteractions(executionService, actionListener);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testApplyExecuted() throws Exception { public void testApplyExecuted() throws Exception {
Task task = mock(Task.class); Task task = mock(Task.class);
@ -231,63 +212,4 @@ public class IngestActionFilterTests extends ESTestCase {
assertThat(assertedRequests, equalTo(numRequest)); assertThat(assertedRequests, equalTo(numRequest));
}); });
} }
public void testBulkRequestModifier() {
int numRequests = scaledRandomIntBetween(8, 64);
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numRequests; i++) {
bulkRequest.add(new IndexRequest("_index", "_type", String.valueOf(i)).source("{}"));
}
CaptureActionListener actionListener = new CaptureActionListener();
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
int i = 0;
Set<Integer> failedSlots = new HashSet<>();
while (bulkRequestModifier.hasNext()) {
bulkRequestModifier.next();
if (randomBoolean()) {
bulkRequestModifier.markCurrentItemAsFailed(new RuntimeException());
failedSlots.add(i);
}
i++;
}
assertThat(bulkRequestModifier.getBulkRequest().requests().size(), equalTo(numRequests - failedSlots.size()));
// simulate that we actually executed the modified bulk request:
ActionListener<BulkResponse> result = bulkRequestModifier.wrapActionListenerIfNeeded(actionListener);
result.onResponse(new BulkResponse(new BulkItemResponse[numRequests - failedSlots.size()], 0));
BulkResponse bulkResponse = actionListener.getResponse();
for (int j = 0; j < bulkResponse.getItems().length; j++) {
if (failedSlots.contains(j)) {
BulkItemResponse item = bulkResponse.getItems()[j];
assertThat(item.isFailed(), is(true));
assertThat(item.getFailure().getIndex(), equalTo("_index"));
assertThat(item.getFailure().getType(), equalTo("_type"));
assertThat(item.getFailure().getId(), equalTo(String.valueOf(j)));
assertThat(item.getFailure().getMessage(), equalTo("java.lang.RuntimeException"));
} else {
assertThat(bulkResponse.getItems()[j], nullValue());
}
}
}
private static class CaptureActionListener implements ActionListener<BulkResponse> {
private BulkResponse response;
@Override
public void onResponse(BulkResponse bulkItemResponses) {
this.response = bulkItemResponses ;
}
@Override
public void onFailure(Throwable e) {
}
public BulkResponse getResponse() {
return response;
}
}
} }