ingest: Use BiConsumer instead of Cunsumer to pass down the failed index request with throwable
This commit is contained in:
parent
a7730b05b2
commit
4d88da5ad5
|
@ -109,9 +109,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
|
||||||
|
|
||||||
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
void processBulkIndexRequest(Task task, BulkRequest original, String action, ActionFilterChain chain, ActionListener<BulkResponse> listener) {
|
||||||
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
|
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
|
||||||
executionService.execute(() -> bulkRequestModifier, tuple -> {
|
executionService.execute(() -> bulkRequestModifier, (indexRequest, throwable) -> {
|
||||||
IndexRequest indexRequest = tuple.v1();
|
|
||||||
Throwable throwable = tuple.v2();
|
|
||||||
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.pipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
|
logger.debug("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.pipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id(), throwable);
|
||||||
bulkRequestModifier.markCurrentItemAsFailed(throwable);
|
bulkRequestModifier.markCurrentItemAsFailed(throwable);
|
||||||
}, (success) -> {
|
}, (success) -> {
|
||||||
|
|
|
@ -22,12 +22,12 @@ package org.elasticsearch.ingest;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.ingest.core.IngestDocument;
|
import org.elasticsearch.ingest.core.IngestDocument;
|
||||||
import org.elasticsearch.ingest.core.Pipeline;
|
import org.elasticsearch.ingest.core.Pipeline;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
public class PipelineExecutionService {
|
public class PipelineExecutionService {
|
||||||
|
@ -53,7 +53,7 @@ public class PipelineExecutionService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void execute(Iterable<ActionRequest> actionRequests,
|
public void execute(Iterable<ActionRequest> actionRequests,
|
||||||
Consumer<Tuple<IndexRequest, Throwable>> itemFailureHandler, Consumer<Boolean> completionHandler) {
|
BiConsumer<IndexRequest, Throwable> itemFailureHandler, Consumer<Boolean> completionHandler) {
|
||||||
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
|
threadPool.executor(ThreadPool.Names.INGEST).execute(() -> {
|
||||||
for (ActionRequest actionRequest : actionRequests) {
|
for (ActionRequest actionRequest : actionRequests) {
|
||||||
if ((actionRequest instanceof IndexRequest)) {
|
if ((actionRequest instanceof IndexRequest)) {
|
||||||
|
@ -64,7 +64,7 @@ public class PipelineExecutionService {
|
||||||
//this shouldn't be needed here but we do it for consistency with index api which requires it to prevent double execution
|
//this shouldn't be needed here but we do it for consistency with index api which requires it to prevent double execution
|
||||||
indexRequest.pipeline(null);
|
indexRequest.pipeline(null);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
itemFailureHandler.accept(new Tuple<>(indexRequest, e));
|
itemFailureHandler.accept(indexRequest, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
import org.elasticsearch.action.update.UpdateRequest;
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.ingest.core.CompoundProcessor;
|
import org.elasticsearch.ingest.core.CompoundProcessor;
|
||||||
import org.elasticsearch.ingest.core.IngestDocument;
|
import org.elasticsearch.ingest.core.IngestDocument;
|
||||||
|
@ -41,9 +40,11 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
|
@ -95,24 +96,25 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("does_not_exist");
|
IndexRequest indexRequest2 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).pipeline("does_not_exist");
|
||||||
bulkRequest.add(indexRequest2);
|
bulkRequest.add(indexRequest2);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Consumer<Tuple<IndexRequest, Throwable>> failureHandler = mock(Consumer.class);
|
BiConsumer<IndexRequest, Throwable> failureHandler = mock(BiConsumer.class);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||||
executionService.execute(bulkRequest.requests(), failureHandler, completionHandler);
|
executionService.execute(bulkRequest.requests(), failureHandler, completionHandler);
|
||||||
verify(failureHandler, times(1)).accept(argThat(new CustomTypeSafeMatcher<Tuple<IndexRequest,Throwable>>("failure handler was not called with the expected arguments") {
|
verify(failureHandler, times(1)).accept(
|
||||||
@Override
|
argThat(new CustomTypeSafeMatcher<IndexRequest>("failure handler was not called with the expected arguments") {
|
||||||
protected boolean matchesSafely(Tuple<IndexRequest, Throwable> item) {
|
@Override
|
||||||
if( item.v1() != indexRequest2) {
|
protected boolean matchesSafely(IndexRequest item) {
|
||||||
return false;
|
return item == indexRequest2;
|
||||||
}
|
}
|
||||||
if (item.v2() instanceof IllegalArgumentException == false) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
IllegalArgumentException iae = (IllegalArgumentException) item.v2();
|
|
||||||
return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
}));
|
}),
|
||||||
|
argThat(new CustomTypeSafeMatcher<IllegalArgumentException>("failure handler was not called with the expected arguments") {
|
||||||
|
@Override
|
||||||
|
protected boolean matchesSafely(IllegalArgumentException iae) {
|
||||||
|
return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
);
|
||||||
verify(completionHandler, times(1)).accept(anyBoolean());
|
verify(completionHandler, times(1)).accept(anyBoolean());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,11 +310,11 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
doThrow(error).when(processor).execute(any());
|
doThrow(error).when(processor).execute(any());
|
||||||
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
|
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, processor));
|
||||||
|
|
||||||
Consumer<Tuple<IndexRequest, Throwable>> requestItemErrorHandler = mock(Consumer.class);
|
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
|
||||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||||
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||||
|
|
||||||
verify(requestItemErrorHandler, times(numIndexRequests)).accept(new Tuple<>(any(IndexRequest.class), error));
|
verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error));
|
||||||
verify(completionHandler, times(1)).accept(true);
|
verify(completionHandler, times(1)).accept(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -330,12 +332,12 @@ public class PipelineExecutionServiceTests extends ESTestCase {
|
||||||
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
|
when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, new CompoundProcessor()));
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Consumer<Tuple<IndexRequest, Throwable>> requestItemErrorHandler = mock(Consumer.class);
|
BiConsumer<IndexRequest, Throwable> requestItemErrorHandler = mock(BiConsumer.class);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
Consumer<Boolean> completionHandler = mock(Consumer.class);
|
||||||
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
executionService.execute(bulkRequest.requests(), requestItemErrorHandler, completionHandler);
|
||||||
|
|
||||||
verify(requestItemErrorHandler, never()).accept(any());
|
verify(requestItemErrorHandler, never()).accept(any(), any());
|
||||||
verify(completionHandler, times(1)).accept(true);
|
verify(completionHandler, times(1)).accept(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue