Internal: make sure that multi_percolate request hands over its context and headers to its corresponding shard requests
Closes #7371
This commit is contained in:
parent
9dd3597f1f
commit
b6cdaff30c
|
@ -122,7 +122,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
|
||||||
percolateRequests.set(slot, itemResponse.getFailure());
|
percolateRequests.set(slot, itemResponse.getFailure());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
new ASyncAction(percolateRequests, listener, clusterState).run();
|
new ASyncAction(request, percolateRequests, listener, clusterState).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -131,7 +131,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
new ASyncAction(percolateRequests, listener, clusterState).run();
|
new ASyncAction(request, percolateRequests, listener, clusterState).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -140,6 +140,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
|
||||||
|
|
||||||
final ActionListener<MultiPercolateResponse> finalListener;
|
final ActionListener<MultiPercolateResponse> finalListener;
|
||||||
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
|
final Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard;
|
||||||
|
final MultiPercolateRequest multiPercolateRequest;
|
||||||
final List<Object> percolateRequests;
|
final List<Object> percolateRequests;
|
||||||
|
|
||||||
final Map<ShardId, IntArrayList> shardToSlots;
|
final Map<ShardId, IntArrayList> shardToSlots;
|
||||||
|
@ -148,8 +149,9 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
|
||||||
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
|
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem;
|
||||||
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;
|
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard;
|
||||||
|
|
||||||
ASyncAction(List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
|
ASyncAction(MultiPercolateRequest multiPercolateRequest, List<Object> percolateRequests, ActionListener<MultiPercolateResponse> finalListener, ClusterState clusterState) {
|
||||||
this.finalListener = finalListener;
|
this.finalListener = finalListener;
|
||||||
|
this.multiPercolateRequest = multiPercolateRequest;
|
||||||
this.percolateRequests = percolateRequests;
|
this.percolateRequests = percolateRequests;
|
||||||
responsesByItemAndShard = new AtomicReferenceArray<>(percolateRequests.size());
|
responsesByItemAndShard = new AtomicReferenceArray<>(percolateRequests.size());
|
||||||
expectedOperationsPerItem = new AtomicReferenceArray<>(percolateRequests.size());
|
expectedOperationsPerItem = new AtomicReferenceArray<>(percolateRequests.size());
|
||||||
|
@ -192,7 +194,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
|
||||||
ShardId shardId = shard.shardId();
|
ShardId shardId = shard.shardId();
|
||||||
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
|
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
|
||||||
if (requests == null) {
|
if (requests == null) {
|
||||||
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
|
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(multiPercolateRequest, shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
|
||||||
}
|
}
|
||||||
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
|
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
|
||||||
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
|
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));
|
||||||
|
|
|
@ -126,8 +126,8 @@ public class TransportShardMultiPercolateAction extends TransportShardSingleOper
|
||||||
Request() {
|
Request() {
|
||||||
}
|
}
|
||||||
|
|
||||||
Request(String concreteIndex, int shardId, String preference) {
|
Request(MultiPercolateRequest multiPercolateRequest, String concreteIndex, int shardId, String preference) {
|
||||||
this.index = concreteIndex;
|
super(multiPercolateRequest, concreteIndex);
|
||||||
this.shardId = shardId;
|
this.shardId = shardId;
|
||||||
this.preference = preference;
|
this.preference = preference;
|
||||||
this.items = new ArrayList<>();
|
this.items = new ArrayList<>();
|
||||||
|
|
Loading…
Reference in New Issue