Merge pull request #15593 from martijnvg/ingest_execution_service_refactoring

[Ingest] Change ExecutionService to process multiple request at a time
This commit is contained in:
Martijn van Groningen 2015-12-22 13:24:44 +01:00
commit e54985b11c
6 changed files with 324 additions and 239 deletions

View File

@ -19,7 +19,7 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -28,6 +28,7 @@ import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
import java.util.function.Consumer;
public class PipelineExecutionService {
@ -41,43 +42,73 @@ public class PipelineExecutionService {
this.threadPool = threadPool;
}
public void execute(IndexRequest indexRequest, String pipelineId, ActionListener<Void> listener) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
listener.onFailure(new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"));
return;
}
public void execute(IndexRequest request, String pipelineId, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
String routing = indexRequest.routing();
String parent = indexRequest.parent();
String timestamp = indexRequest.timestamp();
String ttl = indexRequest.ttl() == null ? null : indexRequest.ttl().toString();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap);
try {
pipeline.execute(ingestDocument);
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
//before ingestion, which might also get modified during ingestion.
indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT));
indexRequest.timestamp(metadataMap.get(IngestDocument.MetaData.TIMESTAMP));
indexRequest.ttl(metadataMap.get(IngestDocument.MetaData.TTL));
indexRequest.source(ingestDocument.getSourceAndMetadata());
listener.onResponse(null);
} catch (Throwable e) {
listener.onFailure(e);
innerExecute(request, pipeline);
completionHandler.accept(true);
} catch (Exception e) {
failureHandler.accept(e);
}
});
}
public void execute(Iterable<ActionRequest> actionRequests, String pipelineId,
Consumer<Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest) == false) {
continue;
}
IndexRequest indexRequest = (IndexRequest) actionRequest;
try {
innerExecute(indexRequest, pipeline);
} catch (Throwable e) {
if (itemFailureHandler != null) {
itemFailureHandler.accept(e);
}
}
}
completionHandler.accept(true);
});
}
private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception {
String index = indexRequest.index();
String type = indexRequest.type();
String id = indexRequest.id();
String routing = indexRequest.routing();
String parent = indexRequest.parent();
String timestamp = indexRequest.timestamp();
String ttl = indexRequest.ttl() == null ? null : indexRequest.ttl().toString();
Map<String, Object> sourceAsMap = indexRequest.sourceAsMap();
IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, timestamp, ttl, sourceAsMap);
pipeline.execute(ingestDocument);
Map<IngestDocument.MetaData, String> metadataMap = ingestDocument.extractMetadata();
//it's fine to set all metadata fields all the time, as ingest document holds their starting values
//before ingestion, which might also get modified during ingestion.
indexRequest.index(metadataMap.get(IngestDocument.MetaData.INDEX));
indexRequest.type(metadataMap.get(IngestDocument.MetaData.TYPE));
indexRequest.id(metadataMap.get(IngestDocument.MetaData.ID));
indexRequest.routing(metadataMap.get(IngestDocument.MetaData.ROUTING));
indexRequest.parent(metadataMap.get(IngestDocument.MetaData.PARENT));
indexRequest.timestamp(metadataMap.get(IngestDocument.MetaData.TIMESTAMP));
indexRequest.ttl(metadataMap.get(IngestDocument.MetaData.TTL));
indexRequest.source(ingestDocument.getSourceAndMetadata());
}
private Pipeline getPipeline(String pipelineId) {
Pipeline pipeline = store.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist");
}
return pipeline;
}
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {

View File

@ -68,8 +68,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
BulkRequest bulkRequest = (BulkRequest) request;
@SuppressWarnings("unchecked")
ActionListener<BulkResponse> actionListener = (ActionListener<BulkResponse>) listener;
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, actionListener);
processBulkIndexRequest(bulkRequest, pipelineId, action, chain, actionListener);
} else {
chain.proceed(action, request, listener);
}
@ -88,23 +87,21 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
chain.proceed(action, indexRequest, listener);
return;
}
executionService.execute(indexRequest, pipelineId, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
}
@Override
public void onFailure(Throwable e) {
logger.error("failed to execute pipeline [{}]", e, pipelineId);
listener.onFailure(e);
}
executionService.execute(indexRequest, pipelineId, t -> {
logger.error("failed to execute pipeline [{}]", t, pipelineId);
listener.onFailure(t);
}, success -> {
indexRequest.putHeader(IngestPlugin.PIPELINE_ALREADY_PROCESSED, true);
chain.proceed(action, indexRequest, listener);
});
}
void processBulkIndexRequest(BulkRequestModifier bulkRequestModifier, String pipelineId, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
if (!bulkRequestModifier.hasNext()) {
void processBulkIndexRequest(BulkRequest original, String pipelineId, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
executionService.execute(() -> bulkRequestModifier, pipelineId, e -> {
logger.debug("failed to execute pipeline [{}]", e, pipelineId);
bulkRequestModifier.markCurrentItemAsFailed(e);
}, (success) -> {
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
if (bulkRequest.requests().isEmpty()) {
@ -115,28 +112,6 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
} else {
chain.proceed(action, bulkRequest, actionListener);
}
return;
}
ActionRequest actionRequest = bulkRequestModifier.next();
if (!(actionRequest instanceof IndexRequest)) {
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
return;
}
IndexRequest indexRequest = (IndexRequest) actionRequest;
executionService.execute(indexRequest, pipelineId, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
}
@Override
public void onFailure(Throwable e) {
logger.debug("failed to execute pipeline [{}]", e, pipelineId);
bulkRequestModifier.markCurrentItemAsFailed(e);
processBulkIndexRequest(bulkRequestModifier, pipelineId, action, chain, listener);
}
});
}

View File

@ -19,15 +19,13 @@
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
@ -44,10 +42,11 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRespon
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@ -146,39 +145,48 @@ public class IngestClientIT extends ESIntegTestCase {
assertThat(simulateDocumentSimpleResult.getFailure(), nullValue());
}
public void testBulkWithIngestFailures() {
public void testBulkWithIngestFailures() throws Exception {
createIndex("index");
new PutPipelineRequestBuilder(client(), PutPipelineAction.INSTANCE)
.setId("_id")
.setSource(jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("join")
.field("field", "field1")
.field("separator", "|")
.endObject()
.endObject()
.endArray()
.endObject().bytes())
.get();
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_none_existing_id");
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
if (i % 2 == 0) {
UpdateRequest updateRequest = new UpdateRequest("index", "type", Integer.toString(i));
updateRequest.upsert("field", "value");
updateRequest.doc(new HashMap());
bulkRequest.add(updateRequest);
indexRequest.source("field1", Arrays.asList("value1", "value2"));
} else {
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
indexRequest.source("field2", Arrays.asList("value1", "value2"));
}
bulkRequest.add(indexRequest);
}
BulkResponse response = client().bulk(bulkRequest).actionGet();
assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));
for (int i = 0; i < bulkRequest.requests().size(); i++) {
ActionRequest request = bulkRequest.requests().get(i);
BulkItemResponse itemResponse = response.getItems()[i];
if (request instanceof IndexRequest) {
BulkItemResponse.Failure failure = itemResponse.getFailure();
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: pipeline with id [_none_existing_id] does not exist"));
} else if (request instanceof UpdateRequest) {
UpdateResponse updateResponse = itemResponse.getResponse();
assertThat(updateResponse.getId(), equalTo(Integer.toString(i)));
assertThat(updateResponse.isCreated(), is(true));
if (i % 2 == 0) {
IndexResponse indexResponse = itemResponse.getResponse();
assertThat(indexResponse.getId(), equalTo(Integer.toString(i)));
assertThat(indexResponse.isCreated(), is(true));
} else {
fail("unexpected request item [" + request + "]");
BulkItemResponse.Failure failure = itemResponse.getFailure();
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: field [field1] not present as part of path [field1]"));
}
}
}

View File

@ -20,8 +20,11 @@
package org.elasticsearch.plugin.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
@ -39,9 +42,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@ -67,11 +72,16 @@ public class PipelineExecutionServiceTests extends ESTestCase {
public void testExecutePipelineDoesNotExist() {
when(store.get("_id")).thenReturn(null);
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
ActionListener<Void> listener = (ActionListener<Void>)mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(listener).onFailure(any(IllegalArgumentException.class));
verify(listener, times(0)).onResponse(any());
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
try {
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
fail("IllegalArgumentException expected");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist"));
}
verify(failureHandler, never()).accept(any(Throwable.class));
verify(completionHandler, never()).accept(anyBoolean());
}
public void testExecuteSuccess() throws Exception {
@ -79,13 +89,13 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
ActionListener<Void> listener = (ActionListener<Void>)mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
//TODO we remove metadata, this check is not valid anymore, what do we replace it with?
//verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener).onResponse(null);
verify(listener, times(0)).onFailure(any(Exception.class));
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
}
public void testExecutePropagateAllMetaDataUpdates() throws Exception {
@ -105,12 +115,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
@SuppressWarnings("unchecked")
ActionListener<Void> listener = (ActionListener<Void>)mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(any());
verify(listener).onResponse(any());
verify(listener, times(0)).onFailure(any(Exception.class));
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
assertThat(indexRequest.index(), equalTo("update_index"));
assertThat(indexRequest.type(), equalTo("update_type"));
@ -126,12 +136,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
doThrow(new RuntimeException()).when(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
@SuppressWarnings("unchecked")
ActionListener<Void> listener = (ActionListener<Void>)mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(processor).execute(eqID("_index", "_type", "_id", Collections.emptyMap()));
verify(listener, times(0)).onResponse(null);
verify(listener).onFailure(any(RuntimeException.class));
verify(failureHandler, times(1)).accept(any(RuntimeException.class));
verify(completionHandler, never()).accept(anyBoolean());
}
@SuppressWarnings("unchecked")
@ -145,12 +155,13 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
ActionListener<Void> listener = (ActionListener<Void>)mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
Consumer<Throwable> failureHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(TimeValue.parseTimeValue("5d", null, "ttl")));
verify(listener, times(1)).onResponse(any());
verify(listener, never()).onFailure(any());
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
// test with invalid ttl
metaProcessorFactory = new SetProcessor.Factory(TestTemplateService.instance());
@ -161,11 +172,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.singletonList(processor)));
indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap());
listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
verify(listener, never()).onResponse(any());
verify(listener, times(1)).onFailure(any(ElasticsearchParseException.class));
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
verify(failureHandler, times(1)).accept(any(ElasticsearchParseException.class));
verify(completionHandler, never()).accept(anyBoolean());
// test with provided ttl
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", Collections.emptyList()));
@ -173,12 +184,71 @@ public class PipelineExecutionServiceTests extends ESTestCase {
indexRequest = new IndexRequest("_index", "_type", "_id")
.source(Collections.emptyMap())
.ttl(1000L);
listener = mock(ActionListener.class);
executionService.execute(indexRequest, "_id", listener);
failureHandler = mock(Consumer.class);
completionHandler = mock(Consumer.class);
executionService.execute(indexRequest, "_id", failureHandler, completionHandler);
assertThat(indexRequest.ttl(), equalTo(new TimeValue(1000L)));
verify(listener, times(1)).onResponse(any());
verify(listener, never()).onFailure(any(Throwable.class));
verify(failureHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
}
public void testBulkRequestExecutionWithFailures() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
int numRequest = scaledRandomIntBetween(8, 64);
int numIndexRequests = 0;
for (int i = 0; i < numRequest; i++) {
ActionRequest request;
if (randomBoolean()) {
if (randomBoolean()) {
request = new DeleteRequest("_index", "_type", "_id");
} else {
request = new UpdateRequest("_index", "_type", "_id");
}
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field1", "value1");
request = indexRequest;
numIndexRequests++;
}
bulkRequest.add(request);
}
String pipelineId = "_id";
Processor pipeline = mock(Processor.class);
Exception error = new RuntimeException();
doThrow(error).when(pipeline).execute(any());
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.singletonList(pipeline)));
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), pipelineId, requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, times(numIndexRequests)).accept(error);
verify(completionHandler, times(1)).accept(true);
}
public void testBulkRequestExecution() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
int numRequest = scaledRandomIntBetween(8, 64);
for (int i = 0; i < numRequest; i++) {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
}
String pipelineId = "_id";
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, Collections.emptyList()));
Consumer<Throwable> requestItemErrorHandler = mock(Consumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), pipelineId, requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, never()).accept(any());
verify(completionHandler, times(1)).accept(true);
}
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {

View File

@ -0,0 +1,101 @@
package org.elasticsearch.plugin.ingest.transport;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class BulkRequestModifierTests extends ESTestCase {
public void testPipelineFailures() {
BulkRequest originalBulkRequest = new BulkRequest();
for (int i = 0; i < 32; i++) {
originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
}
IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
for (int i = 0; modifier.hasNext(); i++) {
modifier.next();
if (i % 2 == 0) {
modifier.markCurrentItemAsFailed(new RuntimeException());
}
}
// So half of the requests have "failed", so only the successful requests are left:
BulkRequest bulkRequest = modifier.getBulkRequest();
assertThat(bulkRequest.requests().size(), Matchers.equalTo(16));
List<BulkItemResponse> responses = new ArrayList<>();
ActionListener<BulkResponse> bulkResponseListener = modifier.wrapActionListenerIfNeeded(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
responses.addAll(Arrays.asList(bulkItemResponses.getItems()));
}
@Override
public void onFailure(Throwable e) {
}
});
List<BulkItemResponse> originalResponses = new ArrayList<>();
for (ActionRequest actionRequest : bulkRequest.requests()) {
IndexRequest indexRequest = (IndexRequest) actionRequest;
IndexResponse indexResponse = new IndexResponse(new ShardId("index", 0), indexRequest.type(), indexRequest.id(), 1, true);
originalResponses.add(new BulkItemResponse(Integer.parseInt(indexRequest.id()), indexRequest.opType().lowercase(), indexResponse));
}
bulkResponseListener.onResponse(new BulkResponse(originalResponses.toArray(new BulkItemResponse[0]), 0));
assertThat(responses.size(), Matchers.equalTo(32));
for (int i = 0; i < 32; i++) {
assertThat(responses.get(i).getId(), Matchers.equalTo(String.valueOf(i)));
}
}
public void testNoFailures() {
BulkRequest originalBulkRequest = new BulkRequest();
for (int i = 0; i < 32; i++) {
originalBulkRequest.add(new IndexRequest("index", "type", String.valueOf(i)));
}
IngestActionFilter.BulkRequestModifier modifier = new IngestActionFilter.BulkRequestModifier(originalBulkRequest);
for (int i = 0; modifier.hasNext(); i++) {
modifier.next();
}
BulkRequest bulkRequest = modifier.getBulkRequest();
assertThat(bulkRequest, Matchers.sameInstance(originalBulkRequest));
ActionListener<BulkResponse> actionListener = Mockito.mock(ActionListener.class);
assertThat(modifier.wrapActionListenerIfNeeded(actionListener), Matchers.sameInstance(actionListener));
}
}

View File

@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import static org.elasticsearch.plugin.ingest.transport.IngestActionFilter.BulkRequestModifier;
import static org.hamcrest.Matchers.equalTo;
@ -93,7 +94,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -106,7 +107,7 @@ public class IngestActionFilterTests extends ESTestCase {
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verifyZeroInteractions(actionFilterChain);
}
@ -133,14 +134,14 @@ public class IngestActionFilterTests extends ESTestCase {
Answer answer = invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<Void> listener = (ActionListener<Void>) invocationOnMock.getArguments()[2];
listener.onResponse(null);
Consumer<Boolean> listener = (Consumer) invocationOnMock.getArguments()[3];
listener.accept(true);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(actionFilterChain).proceed("_action", indexRequest, actionListener);
verifyZeroInteractions(actionListener);
}
@ -156,26 +157,22 @@ public class IngestActionFilterTests extends ESTestCase {
Answer answer = new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
Consumer<Throwable> handler = (Consumer) invocationOnMock.getArguments()[2];
handler.accept(exception);
return null;
}
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
filter.apply("_action", indexRequest, actionListener, actionFilterChain);
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
verify(executionService).execute(any(IndexRequest.class), eq("_id"), any(Consumer.class), any(Consumer.class));
verify(actionListener).onFailure(exception);
verifyZeroInteractions(actionFilterChain);
}
public void testApplyWithBulkRequest() throws Exception {
ThreadPool threadPool = new ThreadPool(
Settings.builder()
.put("name", "_name")
.put(PipelineExecutionService.additionalSettings(Settings.EMPTY))
.build()
);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor(any())).thenReturn(Runnable::run);
PipelineStore store = mock(PipelineStore.class);
Processor processor = new Processor() {
@ -238,83 +235,6 @@ public class IngestActionFilterTests extends ESTestCase {
assertThat(assertedRequests, equalTo(numRequest));
}
});
threadPool.shutdown();
}
public void testApplyWithBulkRequestWithFailureAllFailed() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(0, 8);
for (int i = 0; i < numRequest; i++) {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field1", "value1");
bulkRequest.add(indexRequest);
}
RuntimeException exception = new RuntimeException();
Answer answer = (invocationOnMock) -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
CaptureActionListener actionListener = new CaptureActionListener();
RecordRequestAFC actionFilterChain = new RecordRequestAFC();
filter.apply("_action", bulkRequest, actionListener, actionFilterChain);
assertThat(actionFilterChain.request, nullValue());
ActionResponse response = actionListener.response;
assertThat(response, instanceOf(BulkResponse.class));
BulkResponse bulkResponse = (BulkResponse) response;
assertThat(bulkResponse.getItems().length, equalTo(numRequest));
for (BulkItemResponse bulkItemResponse : bulkResponse) {
assertThat(bulkItemResponse.isFailed(), equalTo(true));
}
}
public void testApplyWithBulkRequestWithFailure() throws Exception {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.putHeader(IngestPlugin.PIPELINE_ID_PARAM, "_id");
int numRequest = scaledRandomIntBetween(8, 64);
int numNonIndexRequests = 0;
for (int i = 0; i < numRequest; i++) {
ActionRequest request;
if (randomBoolean()) {
numNonIndexRequests++;
if (randomBoolean()) {
request = new DeleteRequest("_index", "_type", "_id");
} else {
request = new UpdateRequest("_index", "_type", "_id");
}
} else {
IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id");
indexRequest.source("field1", "value1");
request = indexRequest;
}
bulkRequest.add(request);
}
RuntimeException exception = new RuntimeException();
Answer answer = (invocationOnMock) -> {
ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2];
listener.onFailure(exception);
return null;
};
doAnswer(answer).when(executionService).execute(any(IndexRequest.class), eq("_id"), any(ActionListener.class));
ActionListener actionListener = mock(ActionListener.class);
RecordRequestAFC actionFilterChain = new RecordRequestAFC();
filter.apply("_action", bulkRequest, actionListener, actionFilterChain);
BulkRequest interceptedRequests = actionFilterChain.getRequest();
assertThat(interceptedRequests.requests().size(), equalTo(numNonIndexRequests));
verifyZeroInteractions(actionListener);
}
public void testBulkRequestModifier() {
@ -357,26 +277,6 @@ public class IngestActionFilterTests extends ESTestCase {
}
}
private final static class RecordRequestAFC implements ActionFilterChain {
private ActionRequest request;
@Override
public void proceed(String action, ActionRequest request, ActionListener listener) {
this.request = request;
}
@Override
public void proceed(String action, ActionResponse response, ActionListener listener) {
}
@SuppressWarnings("unchecked")
public <T extends ActionRequest<T>> T getRequest() {
return (T) request;
}
}
private final static class CaptureActionListener implements ActionListener<BulkResponse> {
private BulkResponse response;