Search took time should use a relative clock

Search took time uses an absolute clock to measure elapsed time, and
then tries to deal with the complexities of using an absolute clock for
this purpose. Instead, we should use a high-precision monotonic relative
clock that is designed exactly for measuring elapsed time. This commit
modifies the search infrastructure to use a relative clock for measuring
took time, but still provides an absolute clock for the components of
search that require a real clock (e.g., index name expression
resolution, etc.).

Relates #23662
This commit is contained in:
Jason Tedor 2017-03-20 18:48:51 -04:00 committed by GitHub
parent ce08594008
commit 7b17689458
8 changed files with 345 additions and 53 deletions

View File

@ -491,12 +491,10 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]ReduceSearchPhaseException.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterConnection.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]RemoteClusterService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchDfsQueryThenFetchAsyncAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhase.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseContext.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseController.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseExecutionException.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchQueryThenFetchAsyncAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchResponse.java" checks="LineLength" />

View File

@ -43,6 +43,7 @@ import org.elasticsearch.transport.Transport;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -67,17 +68,17 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicInteger successfulOps = new AtomicInteger();
private final long startTime;
private final TransportSearchAction.SearchTimeProvider timeProvider;
protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion, SearchTask task, SearchPhaseResults<Result> resultConsumer) {
super(name, request, shardsIts, logger);
this.startTime = startTime;
this.timeProvider = timeProvider;
this.logger = logger;
this.searchTransportService = searchTransportService;
this.executor = executor;
@ -94,10 +95,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
/**
* Builds how long it took to execute the search.
*/
private long buildTookInMillis() {
// protect ourselves against time going backwards
// negative values don't make sense and we want to be able to serialize that thing as a vLong
return Math.max(1, System.currentTimeMillis() - startTime);
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(
timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos());
}
/**
@ -300,7 +300,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
assert filter != null;
float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST);
return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(),
filter, indexBoost, startTime);
filter, indexBoost, timeProvider.getAbsoluteStartMillis());
}
/**

View File

@ -33,28 +33,59 @@ import java.util.concurrent.Executor;
import java.util.function.Function;
final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {
private final SearchPhaseController searchPhaseController;
SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor, SearchRequest request,
ActionListener<SearchResponse> listener, GroupShardsIterator shardsIts, long startTime,
long clusterStateVersion, SearchTask task) {
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size()));
SearchDfsQueryThenFetchAsyncAction(
final Logger logger,
final SearchTransportService searchTransportService,
final Function<String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts,
final SearchPhaseController searchPhaseController,
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
final long clusterStateVersion,
final SearchTask task) {
super(
"dfs",
logger,
searchTransportService,
nodeIdToConnection,
aliasFilter,
concreteIndexBoosts,
executor,
request,
listener,
shardsIts,
timeProvider,
clusterStateVersion,
task,
new SearchPhaseResults<>(shardsIts.size()));
this.searchPhaseController = searchPhaseController;
}
@Override
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
protected void executePhaseOnShard(
final ShardIterator shardIt,
final ShardRouting shard,
final ActionListener<DfsSearchResult> listener) {
getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard) , getTask(), listener);
}
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<DfsSearchResult> results, SearchPhaseContext context) {
return new DfsQueryPhase(results.results, searchPhaseController,
(queryResults) -> new FetchSearchPhase(queryResults, searchPhaseController, context), context);
protected SearchPhase getNextPhase(
final SearchPhaseResults<DfsSearchResult> results, final SearchPhaseContext context) {
return new DfsQueryPhase(
results.results,
searchPhaseController,
(queryResults) ->
new FetchSearchPhase(queryResults, searchPhaseController, context),
context);
}
}

View File

@ -32,30 +32,60 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
final class SearchQueryThenFetchAsyncAction
extends AbstractSearchAsyncAction<QuerySearchResultProvider> {
private final SearchPhaseController searchPhaseController;
SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService,
Function<String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
SearchPhaseController searchPhaseController, Executor executor,
SearchRequest request, ActionListener<SearchResponse> listener,
GroupShardsIterator shardsIts, long startTime, long clusterStateVersion,
SearchTask task) {
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor,
request, listener, shardsIts, startTime, clusterStateVersion, task,
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
SearchQueryThenFetchAsyncAction(
final Logger logger,
final SearchTransportService searchTransportService,
final Function<String, Transport.Connection> nodeIdToConnection,
final Map<String, AliasFilter> aliasFilter,
final Map<String, Float> concreteIndexBoosts,
final SearchPhaseController searchPhaseController,
final Executor executor,
final SearchRequest request,
final ActionListener<SearchResponse> listener,
final GroupShardsIterator shardsIts,
final TransportSearchAction.SearchTimeProvider timeProvider,
long clusterStateVersion,
SearchTask task) {
super(
"query",
logger,
searchTransportService,
nodeIdToConnection,
aliasFilter,
concreteIndexBoosts,
executor,
request,
listener,
shardsIts,
timeProvider,
clusterStateVersion,
task,
searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
this.searchPhaseController = searchPhaseController;
}
protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) {
getSearchTransport().sendExecuteQuery(getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard), getTask(), listener);
protected void executePhaseOnShard(
final ShardIterator shardIt,
final ShardRouting shard,
final ActionListener<QuerySearchResultProvider> listener) {
getSearchTransport().sendExecuteQuery(
getConnection(shard.currentNodeId()),
buildShardSearchRequest(shardIt, shard),
getTask(),
listener);
}
@Override
protected SearchPhase getNextPhase(SearchPhaseResults<QuerySearchResultProvider> results, SearchPhaseContext context) {
protected SearchPhase getNextPhase(
final SearchPhaseResults<QuerySearchResultProvider> results,
final SearchPhaseContext context) {
return new FetchSearchPhase(results, searchPhaseController, context);
}
}

View File

@ -52,6 +52,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@ -116,10 +117,62 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return Collections.unmodifiableMap(concreteIndexBoosts);
}
/**
* Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving
* "now" to an index name). Another clock is needed for measuring how long a search operation
* took. These two uses are at odds with each other. There are many issues with using a real
* clock for measuring how long an operation took (they often lack precision, they are subject
* to moving backwards due to NTP and other such complexities, etc.). There are also issues with
* using a relative clock for reporting real time. Thus, we simply separate these two uses.
*/
static class SearchTimeProvider {
private final long absoluteStartMillis;
private final long relativeStartNanos;
private final LongSupplier relativeCurrentNanosProvider;
/**
* Instantiates a new search time provider. The absolute start time is the real clock time
* used for resolving index expressions that include dates. The relative start time is the
* start of the search operation according to a relative clock. The total time the search
* operation took can be measured against the provided relative clock and the relative start
* time.
*
* @param absoluteStartMillis the absolute start time in milliseconds since the epoch
* @param relativeStartNanos the relative start time in nanoseconds
* @param relativeCurrentNanosProvider provides the current relative time
*/
SearchTimeProvider(
final long absoluteStartMillis,
final long relativeStartNanos,
final LongSupplier relativeCurrentNanosProvider) {
this.absoluteStartMillis = absoluteStartMillis;
this.relativeStartNanos = relativeStartNanos;
this.relativeCurrentNanosProvider = relativeCurrentNanosProvider;
}
long getAbsoluteStartMillis() {
return absoluteStartMillis;
}
long getRelativeStartNanos() {
return relativeStartNanos;
}
long getRelativeCurrentNanos() {
return relativeCurrentNanosProvider.getAsLong();
}
}
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// pure paranoia if time goes backwards we are at least positive
final long startTimeInMillis = Math.max(0, System.currentTimeMillis());
final long absoluteStartMillis = System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
final String[] localIndices;
final Map<String, List<String>> remoteClusterIndices;
final ClusterState clusterState = clusterService.state();
@ -134,7 +187,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, Collections.emptyList(),
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(nodeId) -> null, clusterState, Collections.emptyMap(), listener);
} else {
remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices,
@ -143,13 +196,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
Function<String, Transport.Connection> connectionFunction = remoteClusterService.processRemoteShards(
searchShardsResponses, remoteShardIterators, remoteAliasFilters);
executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators,
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators,
connectionFunction, clusterState, remoteAliasFilters, listener);
}, listener::onFailure));
}
}
private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices,
private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, String[] localIndices,
List<ShardIterator> remoteShardIterators, Function<String, Transport.Connection> remoteConnections,
ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap,
ActionListener<SearchResponse> listener) {
@ -163,7 +216,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
startTimeInMillis, localIndices);
timeProvider.getAbsoluteStartMillis(), localIndices);
}
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
@ -211,7 +264,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
return connection;
};
searchAsyncAction(task, searchRequest, shardIterators, startTimeInMillis, connectionLookup, clusterState.version(),
searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(),
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener).start();
}
@ -236,7 +289,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
}
private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator shardIterators,
long startTime, Function<String, Transport.Connection> connectionLookup,
SearchTimeProvider timeProvider, Function<String, Transport.Connection> connectionLookup,
long clusterStateVersion, Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
ActionListener<SearchResponse> listener) {
@ -245,12 +298,12 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
switch(searchRequest.searchType()) {
case DFS_QUERY_THEN_FETCH:
searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
break;
case QUERY_THEN_FETCH:
searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, startTime,
aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider,
clusterStateVersion, task);
break;
default:

View File

@ -0,0 +1,161 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class AbstractSearchAsyncActionTookTests extends ESTestCase {
private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
final boolean controlled,
final AtomicLong expected) {
final Runnable runnable;
final TransportSearchAction.SearchTimeProvider timeProvider;
if (controlled) {
runnable = () -> expected.set(randomNonNegativeLong());
timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, expected::get);
} else {
runnable = () -> {
long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(1, 10));
expected.set(elapsed);
};
timeProvider = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime);
}
final ShardIterator it = new ShardIterator() {
@Override
public ShardId shardId() {
return null;
}
@Override
public void reset() {
}
@Override
public int compareTo(ShardIterator o) {
return 0;
}
@Override
public int size() {
return 0;
}
@Override
public int sizeActive() {
return 0;
}
@Override
public ShardRouting nextOrNull() {
return null;
}
@Override
public int remaining() {
return 0;
}
@Override
public Iterable<ShardRouting> asUnordered() {
return null;
}
};
return new AbstractSearchAsyncAction<SearchPhaseResult>(
"test",
null,
null,
null,
null,
null,
null,
null,
null,
new GroupShardsIterator(Collections.singletonList(it)),
timeProvider,
0,
null,
null
) {
@Override
protected SearchPhase getNextPhase(
final SearchPhaseResults<SearchPhaseResult> results,
final SearchPhaseContext context) {
return null;
}
@Override
protected void executePhaseOnShard(
final ShardIterator shardIt,
final ShardRouting shard,
final ActionListener<SearchPhaseResult> listener) {
}
@Override
long buildTookInMillis() {
runnable.run();
return super.buildTookInMillis();
}
};
}
public void testTookWithControlledClock() {
runTestTook(true);
}
public void testTookWithRealClock() {
runTestTook(false);
}
private void runTestTook(final boolean controlled) {
final AtomicLong expected = new AtomicLong();
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(controlled, expected);
final long actual = action.buildTookInMillis();
if (controlled) {
// with a controlled clock, we can assert the exact took time
assertThat(actual, equalTo(TimeUnit.NANOSECONDS.toMillis(expected.get())));
} else {
// with a real clock, the best we can say is that it took as long as we spun for
assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get())));
}
}
}

View File

@ -92,9 +92,22 @@ public class SearchAsyncActionTests extends ESTestCase {
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction<TestSearchPhaseResult>("test", logger, transportService,
lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null,
new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) {
AbstractSearchAsyncAction asyncAction =
new AbstractSearchAsyncAction<TestSearchPhaseResult>(
"test",
logger,
transportService,
lookup::get,
aliasFilters,
Collections.emptyMap(),
null,
request,
responseListener,
shardsIter,
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
0,
null,
new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) {
TestSearchResponse response = new TestSearchResponse();
@Override

View File

@ -1093,9 +1093,15 @@ public abstract class ESTestCase extends LuceneTestCase {
}
protected static long spinForAtLeastOneMillisecond() {
long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS);
// force at least one millisecond to elapse, but ensure the
// clock has enough resolution to observe the passage of time
return spinForAtLeastNMilliseconds(1);
}
protected static long spinForAtLeastNMilliseconds(final long ms) {
long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
/*
* Force at least ms milliseconds to elapse, but ensure the clock has enough resolution to
* observe the passage of time.
*/
long start = System.nanoTime();
long elapsed;
while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) {