Add ingest tests for single item bulk action

This commit is contained in:
Areek Zillur 2016-12-22 01:47:59 -05:00
parent 1c91719a88
commit d51f414ea3
2 changed files with 115 additions and 1 deletions

View File

@ -101,7 +101,8 @@ public abstract class TransportSingleItemBulkWriteAction<
itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) replicaRequest));
BulkShardRequest bulkShardRequest = new BulkShardRequest(replicaRequest.shardId(), refreshPolicy, itemRequests);
WriteReplicaResult<BulkShardRequest> result = shardBulkAction.shardOperationOnReplica(bulkShardRequest, replica);
// nocommit - is the null failure ok?
// a replica operation can never throw a document-level failure,
// as the same document has been already indexed successfully in the primary
return new WriteReplicaResult<>(replicaRequest, result.location, null, replica, logger);
}

View File

@ -21,21 +21,27 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.PipelineExecutionService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.junit.Before;
@ -48,6 +54,7 @@ import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
@ -84,6 +91,9 @@ public class TransportBulkActionIngestTests extends ESTestCase {
/** The actual action we want to test, with real indexing mocked */
TestTransportBulkAction action;
/** Single item bulk write action that wraps index requests */
TestSingleItemBulkWriteAction singleItemBulkWriteAction;
/** True if the next call to the index action should act as an ingest node */
boolean localIngest;
@ -110,6 +120,20 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}
}
class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction<IndexRequest, IndexResponse> {
TestSingleItemBulkWriteAction(TestTransportBulkAction bulkAction) {
super(Settings.EMPTY, IndexAction.NAME, transportService, TransportBulkActionIngestTests.this.clusterService,
null, null, null, new ActionFilters(Collections.emptySet()), null,
IndexRequest::new, IndexRequest::new, ThreadPool.Names.INDEX, bulkAction, null);
}
@Override
protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
}
@Before
public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics
@ -142,6 +166,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
executionService = mock(PipelineExecutionService.class);
when(ingestService.getPipelineExecutionService()).thenReturn(executionService);
action = new TestTransportBulkAction();
singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action);
reset(transportService); // call on construction of action
}
@ -157,6 +182,16 @@ public class TransportBulkActionIngestTests extends ESTestCase {
verifyZeroInteractions(ingestService);
}
public void testSingleItemBulkActionIngestSkipped() throws Exception {
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(response -> {}, exception -> {
throw new AssertionError(exception);
}));
assertTrue(action.isExecuted);
verifyZeroInteractions(ingestService);
}
public void testIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
BulkRequest bulkRequest = new BulkRequest();
@ -200,6 +235,38 @@ public class TransportBulkActionIngestTests extends ESTestCase {
verifyZeroInteractions(transportService);
}
public void testSingleItemBulkActionIngestLocal() throws Exception {
Exception exception = new Exception("fake exception");
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
AtomicBoolean responseCalled = new AtomicBoolean(false);
AtomicBoolean failureCalled = new AtomicBoolean(false);
singleItemBulkWriteAction.execute(null, indexRequest, ActionListener.wrap(
response -> {
responseCalled.set(true);
},
e -> {
assertThat(e, sameInstance(exception));
failureCalled.set(true);
}));
// check failure works, and passes through to the listener
assertFalse(action.isExecuted); // haven't executed yet
assertFalse(responseCalled.get());
assertFalse(failureCalled.get());
verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture());
completionHandler.getValue().accept(exception);
assertTrue(failureCalled.get());
// now check success
indexRequest.setPipeline(null); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyZeroInteractions(transportService);
}
public void testIngestForward() throws Exception {
localIngest = false;
BulkRequest bulkRequest = new BulkRequest();
@ -246,5 +313,51 @@ public class TransportBulkActionIngestTests extends ESTestCase {
}
}
public void testSingleItemBulkActionIngestForward() throws Exception {
localIngest = false;
IndexRequest indexRequest = new IndexRequest("index", "type", "id");
indexRequest.source(Collections.emptyMap());
indexRequest.setPipeline("testpipeline");
IndexResponse indexResponse = mock(IndexResponse.class);
AtomicBoolean responseCalled = new AtomicBoolean(false);
ActionListener<IndexResponse> listener = ActionListener.wrap(
response -> {
responseCalled.set(true);
assertSame(indexResponse, response);
},
e -> {
throw new AssertionError(e);
});
singleItemBulkWriteAction.execute(null, indexRequest, listener);
// should not have executed ingest locally
verify(executionService, never()).executeBulkRequest(any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
boolean usedNode1 = node.getValue() == remoteNode1; // make sure we used one of the nodes
if (usedNode1 == false) {
assertSame(remoteNode2, node.getValue());
}
assertFalse(action.isExecuted); // no local index execution
assertFalse(responseCalled.get()); // listener not called yet
BulkItemResponse itemResponse = new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, indexResponse);
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[1];
bulkItemResponses[0] = itemResponse;
remoteResponseHandler.getValue().handleResponse(new BulkResponse(bulkItemResponses, 0)); // call the listener for the remote node
assertTrue(responseCalled.get()); // now the listener we passed should have been delegated to by the remote listener
assertFalse(action.isExecuted); // still no local index execution
// now make sure ingest nodes are rotated through with a subsequent request
reset(transportService);
singleItemBulkWriteAction.execute(null, indexRequest, listener);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
if (usedNode1) {
assertSame(remoteNode2, node.getValue());
} else {
assertSame(remoteNode1, node.getValue());
}
}
}