use AbstractRunnable

This commit is contained in:
Martijn van Groningen 2016-01-20 15:26:37 +01:00
parent 9fe408adbd
commit 7aeb360932
3 changed files with 53 additions and 32 deletions

View File

@ -105,7 +105,11 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> { executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> {
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable); logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
bulkRequestModifier.markCurrentItemAsFailed(throwable); bulkRequestModifier.markCurrentItemAsFailed(throwable);
}, (success) -> { }, (throwable) -> {
if (throwable != null) {
logger.error("failed to execute pipeline for a bulk request", throwable);
listener.onFailure(throwable);
} else {
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest(); BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener);
if (bulkRequest.requests().isEmpty()) { if (bulkRequest.requests().isEmpty()) {
@ -116,6 +120,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
} else { } else {
chain.proceed(task, action, bulkRequest, actionListener); chain.proceed(task, action, bulkRequest, actionListener);
} }
}
}); });
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.ingest.core.IngestDocument; import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline; import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -42,19 +43,33 @@ public class PipelineExecutionService {
public void execute(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) { public void execute(IndexRequest request, Consumer<Throwable> failureHandler, Consumer<Boolean> completionHandler) {
Pipeline pipeline = getPipeline(request.getPipeline()); Pipeline pipeline = getPipeline(request.getPipeline());
threadPool.executor(ThreadPool.Names.INDEX).execute(() -> { threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() {
try {
@Override
public void onFailure(Throwable t) {
failureHandler.accept(t);
}
@Override
protected void doRun() throws Exception {
innerExecute(request, pipeline); innerExecute(request, pipeline);
completionHandler.accept(true); completionHandler.accept(true);
} catch (Exception e) {
failureHandler.accept(e);
} }
}); });
} }
public void execute(Iterable<ActionRequest<?>> actionRequests, public void execute(Iterable<ActionRequest<?>> actionRequests,
BiConsumer<IndexRequest, Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) { BiConsumer<IndexRequest, Throwable> itemFailureHandler,
threadPool.executor(ThreadPool.Names.INDEX).execute(() -> { Consumer<Throwable> completionHandler) {
threadPool.executor(ThreadPool.Names.INDEX).execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
completionHandler.accept(t);
}
@Override
protected void doRun() throws Exception {
for (ActionRequest actionRequest : actionRequests) { for (ActionRequest actionRequest : actionRequests) {
if ((actionRequest instanceof IndexRequest)) { if ((actionRequest instanceof IndexRequest)) {
IndexRequest indexRequest = (IndexRequest) actionRequest; IndexRequest indexRequest = (IndexRequest) actionRequest;
@ -69,7 +84,8 @@ public class PipelineExecutionService {
} }
} }
} }
completionHandler.accept(true); completionHandler.accept(null);
}
}); });
} }

View File

@ -98,7 +98,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class); BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class); Consumer<Throwable> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), failureHandler, completionHandler); executionService.execute(bulkRequest.requests(), failureHandler, completionHandler);
verify(failureHandler, times(1)).accept( verify(failureHandler, times(1)).accept(
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") { argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
@ -115,7 +115,7 @@ public class PipelineExecutionServiceTests extends ESTestCase {
} }
}) })
); );
verify(completionHandler, times(1)).accept(anyBoolean()); verify(completionHandler, times(1)).accept(null);
} }
public void testExecuteSuccess() throws Exception { public void testExecuteSuccess() throws Exception {
@ -311,11 +311,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor)); when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class); BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
Consumer<Boolean> completionHandler = mock(Consumer.class); Consumer<Throwable> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler); executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error)); verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error));
verify(completionHandler, times(1)).accept(true); verify(completionHandler, times(1)).accept(null);
} }
public void testBulkRequestExecution() throws Exception { public void testBulkRequestExecution() throws Exception {
@ -334,11 +334,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class); BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class); Consumer<Throwable> completionHandler = mock(Consumer.class);
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler); executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
verify(requestItemErrorHandler, never()).accept(any(), any()); verify(requestItemErrorHandler, never()).accept(any(), any());
verify(completionHandler, times(1)).accept(true); verify(completionHandler, times(1)).accept(null);
} }
private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) { private IngestDocument eqID(String index, String type, String id, Map<String, Object> source) {