applied feedback

This commit is contained in:
Martijn van Groningen 2015-12-22 12:20:16 +01:00
parent 7e99ee4edf
commit 72fc34731e
3 changed files with 9 additions and 10 deletions

View File

@ -54,12 +54,11 @@ public class PipelineExecutionService {
});
}
public void execute(Iterable<ActionRequest> indexRequests, String pipelineId,
public void execute(Iterable<ActionRequest> actionRequests, String pipelineId,
Consumer<Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(pipelineId);
threadPool.executor(THREAD_POOL_NAME).execute(() -> {
Throwable lastThrowable = null;
for (ActionRequest actionRequest : indexRequests) {
for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest) == false) {
continue;
}
@ -68,13 +67,12 @@ public class PipelineExecutionService {
try {
innerExecute(indexRequest, pipeline);
} catch (Throwable e) {
lastThrowable = e;
if (itemFailureHandler != null) {
itemFailureHandler.accept(e);
}
}
}
completionHandler.accept(lastThrowable == null);
completionHandler.accept(true);
});
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@ -27,8 +26,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugin.ingest.IngestPlugin;
import org.elasticsearch.plugin.ingest.transport.delete.DeletePipelineAction;
@ -45,7 +42,11 @@ import org.elasticsearch.plugin.ingest.transport.simulate.SimulatePipelineRespon
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

View File

@ -227,7 +227,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
executionService.execute(bulkRequest.requests(), pipelineId, requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, times(numIndexRequests)).accept(error);
verify(completionHandler, times(1)).accept(false);
verify(completionHandler, times(1)).accept(true);
}
public void testBulkRequestExecution() throws Exception {