Move pipelines resolved assertion (#46892)
This assertion was added during the development of required pipelines. In the initial version of that work, the notion of whether or not a request was forwarded from the coordinating node to an ingest node was introduced. It was realized later that instead we needed to track whether or not the pipeline for the request was resolved. When that change was made, this assertion, while not incorrect, was left behind and only applied if the coordnating node was forwarding the request. Instead, the assertion applies whether or not the request is being forwarded. This commit addresses that by moving the assertion and renaming some variables.
This commit is contained in:
parent
fcea154f2e
commit
97acf353fa
|
@ -275,17 +275,17 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
|||
// also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method,
|
||||
// this path is never taken.
|
||||
try {
|
||||
if (Assertions.ENABLED) {
|
||||
final boolean arePipelinesResolved = bulkRequest.requests()
|
||||
.stream()
|
||||
.map(TransportBulkAction::getIndexWriteRequest)
|
||||
.filter(Objects::nonNull)
|
||||
.allMatch(IndexRequest::isPipelineResolved);
|
||||
assert arePipelinesResolved : bulkRequest;
|
||||
}
|
||||
if (clusterService.localNode().isIngestNode()) {
|
||||
processBulkIndexIngestRequest(task, bulkRequest, listener);
|
||||
} else {
|
||||
if (Assertions.ENABLED) {
|
||||
final boolean allAreForwardedRequests = bulkRequest.requests()
|
||||
.stream()
|
||||
.map(TransportBulkAction::getIndexWriteRequest)
|
||||
.filter(Objects::nonNull)
|
||||
.allMatch(IndexRequest::isPipelineResolved);
|
||||
assert allAreForwardedRequests : bulkRequest;
|
||||
}
|
||||
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
|
Loading…
Reference in New Issue