parent
cb5e9d3bee
commit
c53731a0cd
|
@ -21,7 +21,6 @@ package org.elasticsearch.action.bulk;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
||||||
import org.apache.lucene.util.SparseFixedBitSet;
|
import org.apache.lucene.util.SparseFixedBitSet;
|
||||||
import org.elasticsearch.Assertions;
|
import org.elasticsearch.Assertions;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
|
@ -79,6 +78,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -720,8 +720,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
|
|
||||||
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
|
static final class BulkRequestModifier implements Iterator<DocWriteRequest<?>> {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(BulkRequestModifier.class);
|
|
||||||
|
|
||||||
final BulkRequest bulkRequest;
|
final BulkRequest bulkRequest;
|
||||||
final SparseFixedBitSet failedSlots;
|
final SparseFixedBitSet failedSlots;
|
||||||
final List<BulkItemResponse> itemResponses;
|
final List<BulkItemResponse> itemResponses;
|
||||||
|
@ -802,7 +800,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
|
|
||||||
synchronized void markItemAsFailed(int slot, Exception e) {
|
synchronized void markItemAsFailed(int slot, Exception e) {
|
||||||
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
|
IndexRequest indexRequest = getIndexWriteRequest(bulkRequest.requests().get(slot));
|
||||||
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]",
|
logger.debug(String.format(Locale.ROOT, "failed to execute pipeline [%s] for document [%s/%s/%s]",
|
||||||
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
|
indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), e);
|
||||||
|
|
||||||
// We hit a error during preprocessing a request, so we:
|
// We hit a error during preprocessing a request, so we:
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.ingest;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.ResourceNotFoundException;
|
import org.elasticsearch.ResourceNotFoundException;
|
||||||
|
@ -528,6 +529,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
||||||
String originalIndex = indexRequest.indices()[0];
|
String originalIndex = indexRequest.indices()[0];
|
||||||
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
|
innerExecute(slot, indexRequest, pipeline, onDropped, e -> {
|
||||||
if (e != null) {
|
if (e != null) {
|
||||||
|
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
|
||||||
|
pipelineId, indexRequest.index(), indexRequest.id()), e);
|
||||||
onFailure.accept(slot, e);
|
onFailure.accept(slot, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,6 +570,8 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}]",
|
||||||
|
pipelineId, indexRequest.index(), indexRequest.id()), e);
|
||||||
onFailure.accept(slot, e);
|
onFailure.accept(slot, e);
|
||||||
if (counter.decrementAndGet() == 0) {
|
if (counter.decrementAndGet() == 0) {
|
||||||
onCompletion.accept(originalThread, null);
|
onCompletion.accept(originalThread, null);
|
||||||
|
|
Loading…
Reference in New Issue