bubble down a now concept from the initiating search node to all nodes executing search
This commit is contained in:
parent
1add5ce566
commit
6560a9ec7b
|
@ -61,7 +61,7 @@ public abstract class TransportSearchHelper {
|
|||
return ret;
|
||||
}
|
||||
|
||||
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases) {
|
||||
public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) {
|
||||
InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType());
|
||||
internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength());
|
||||
internalRequest.extraSource(request.extraSource(), request.extraSourceOffset(), request.extraSourceLength());
|
||||
|
@ -69,6 +69,7 @@ public abstract class TransportSearchHelper {
|
|||
internalRequest.timeout(request.timeout());
|
||||
internalRequest.filteringAliases(filteringAliases);
|
||||
internalRequest.types(request.types());
|
||||
internalRequest.nowInMillis(nowInMillis);
|
||||
return internalRequest;
|
||||
}
|
||||
|
||||
|
|
|
@ -201,7 +201,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
onFirstPhaseResult(shard, shardIt, null);
|
||||
} else {
|
||||
String[] filteringAliases = clusterState.metaData().filteringAliases(shard.index(), request.indices());
|
||||
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases), new SearchServiceListener<FirstResult>() {
|
||||
sendExecuteFirstPhase(node, internalSearchRequest(shard, shardsIts.size(), request, filteringAliases, startTime), new SearchServiceListener<FirstResult>() {
|
||||
@Override public void onResult(FirstResult result) {
|
||||
onFirstPhaseResult(shard, result, shardIt);
|
||||
}
|
||||
|
|
|
@ -399,7 +399,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
|
||||
|
||||
Engine.Searcher engineSearcher = indexShard.searcher();
|
||||
SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.searchType(), request.numberOfShards(), request.timeout(), request.types(), engineSearcher, indexService, scriptService);
|
||||
SearchContext context = new SearchContext(idGenerator.incrementAndGet(), shardTarget, request.searchType(), request.numberOfShards(), request.nowInMillis(), request.timeout(), request.types(), engineSearcher, indexService, scriptService);
|
||||
SearchContext.setCurrent(context);
|
||||
try {
|
||||
context.scroll(request.scroll());
|
||||
|
|
|
@ -79,6 +79,8 @@ public class InternalSearchRequest implements Streamable {
|
|||
private int extraSourceOffset;
|
||||
private int extraSourceLength;
|
||||
|
||||
private long nowInMillis;
|
||||
|
||||
public InternalSearchRequest() {
|
||||
}
|
||||
|
||||
|
@ -151,6 +153,15 @@ public class InternalSearchRequest implements Streamable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public InternalSearchRequest nowInMillis(long nowInMillis) {
|
||||
this.nowInMillis = nowInMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
public long nowInMillis() {
|
||||
return this.nowInMillis;
|
||||
}
|
||||
|
||||
public Scroll scroll() {
|
||||
return scroll;
|
||||
}
|
||||
|
@ -228,6 +239,7 @@ public class InternalSearchRequest implements Streamable {
|
|||
} else {
|
||||
filteringAliases = null;
|
||||
}
|
||||
nowInMillis = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
|
@ -271,5 +283,6 @@ public class InternalSearchRequest implements Streamable {
|
|||
} else {
|
||||
out.writeVInt(0);
|
||||
}
|
||||
out.writeVLong(nowInMillis);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,8 @@ public class SearchContext implements Releasable {
|
|||
|
||||
private final FetchSearchResult fetchResult;
|
||||
|
||||
private final long nowInMillis;
|
||||
|
||||
private final TimeValue timeout;
|
||||
|
||||
private float queryBoost = 1.0f;
|
||||
|
@ -153,9 +155,10 @@ public class SearchContext implements Releasable {
|
|||
|
||||
private Map<String, BlockJoinQuery> nestedQueries;
|
||||
|
||||
public SearchContext(long id, SearchShardTarget shardTarget, SearchType searchType, int numberOfShards, TimeValue timeout,
|
||||
public SearchContext(long id, SearchShardTarget shardTarget, SearchType searchType, int numberOfShards, long nowInMillis, TimeValue timeout,
|
||||
String[] types, Engine.Searcher engineSearcher, IndexService indexService, ScriptService scriptService) {
|
||||
this.id = id;
|
||||
this.nowInMillis = nowInMillis;
|
||||
this.searchType = searchType;
|
||||
this.shardTarget = shardTarget;
|
||||
this.numberOfShards = numberOfShards;
|
||||
|
@ -226,6 +229,10 @@ public class SearchContext implements Releasable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public long nowInMillis() {
|
||||
return nowInMillis;
|
||||
}
|
||||
|
||||
public Scroll scroll() {
|
||||
return this.scroll;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue