diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index e4cb03ce629..a3a4e48923a 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -491,12 +491,10 @@ - - diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index bf95b7517c6..1fd41e37b6d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -43,6 +43,7 @@ import org.elasticsearch.transport.Transport; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; @@ -67,17 +68,17 @@ abstract class AbstractSearchAsyncAction exten private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); private final AtomicInteger successfulOps = new AtomicInteger(); - private final long startTime; + private final TransportSearchAction.SearchTimeProvider timeProvider; protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, Function nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Executor executor, SearchRequest request, - ActionListener listener, GroupShardsIterator shardsIts, long startTime, + ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion, SearchTask task, SearchPhaseResults resultConsumer) { super(name, request, shardsIts, logger); - this.startTime = startTime; + this.timeProvider = timeProvider; this.logger = logger; this.searchTransportService = searchTransportService; this.executor = executor; @@ -94,10 +95,9 @@ abstract class AbstractSearchAsyncAction exten /** * Builds how long it took to execute the search. */ - private long buildTookInMillis() { - // protect ourselves against time going backwards - // negative values don't make sense and we want to be able to serialize that thing as a vLong - return Math.max(1, System.currentTimeMillis() - startTime); + long buildTookInMillis() { + return TimeUnit.NANOSECONDS.toMillis( + timeProvider.getRelativeCurrentNanos() - timeProvider.getRelativeStartNanos()); } /** @@ -300,7 +300,7 @@ abstract class AbstractSearchAsyncAction exten assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shard.index().getUUID(), DEFAULT_INDEX_BOOST); return new ShardSearchTransportRequest(request, shardIt.shardId(), getNumShards(), - filter, indexBoost, startTime); + filter, indexBoost, timeProvider.getAbsoluteStartMillis()); } /** diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index d846c42dbea..d3b2ea3a98e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -33,28 +33,59 @@ import java.util.concurrent.Executor; import java.util.function.Function; final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { + private final SearchPhaseController searchPhaseController; - SearchDfsQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - Function nodeIdToConnection, - Map aliasFilter, Map concreteIndexBoosts, - SearchPhaseController searchPhaseController, Executor executor, SearchRequest request, - ActionListener listener, GroupShardsIterator shardsIts, long startTime, - long clusterStateVersion, SearchTask task) { - super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, - request, listener, shardsIts, startTime, clusterStateVersion, task, new SearchPhaseResults<>(shardsIts.size())); + SearchDfsQueryThenFetchAsyncAction( + final Logger logger, + final SearchTransportService searchTransportService, + final Function nodeIdToConnection, + final Map aliasFilter, + final Map concreteIndexBoosts, + final SearchPhaseController searchPhaseController, + final Executor executor, + final SearchRequest request, + final ActionListener listener, + final GroupShardsIterator shardsIts, + final TransportSearchAction.SearchTimeProvider timeProvider, + final long clusterStateVersion, + final SearchTask task) { + super( + "dfs", + logger, + searchTransportService, + nodeIdToConnection, + aliasFilter, + concreteIndexBoosts, + executor, + request, + listener, + shardsIts, + timeProvider, + clusterStateVersion, + task, + new SearchPhaseResults<>(shardsIts.size())); this.searchPhaseController = searchPhaseController; } @Override - protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) { + protected void executePhaseOnShard( + final ShardIterator shardIt, + final ShardRouting shard, + final ActionListener listener) { getSearchTransport().sendExecuteDfs(getConnection(shard.currentNodeId()), buildShardSearchRequest(shardIt, shard) , getTask(), listener); } @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { - return new DfsQueryPhase(results.results, searchPhaseController, - (queryResults) -> new FetchSearchPhase(queryResults, searchPhaseController, context), context); + protected SearchPhase getNextPhase( + final SearchPhaseResults results, final SearchPhaseContext context) { + return new DfsQueryPhase( + results.results, + searchPhaseController, + (queryResults) -> + new FetchSearchPhase(queryResults, searchPhaseController, context), + context); } + } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 210a9aefda7..fe87b8f4dba 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -32,30 +32,60 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; -final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction { +final class SearchQueryThenFetchAsyncAction + extends AbstractSearchAsyncAction { + private final SearchPhaseController searchPhaseController; - SearchQueryThenFetchAsyncAction(Logger logger, SearchTransportService searchTransportService, - Function nodeIdToConnection, - Map aliasFilter, Map concreteIndexBoosts, - SearchPhaseController searchPhaseController, Executor executor, - SearchRequest request, ActionListener listener, - GroupShardsIterator shardsIts, long startTime, long clusterStateVersion, - SearchTask task) { - super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, - request, listener, shardsIts, startTime, clusterStateVersion, task, - searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); + SearchQueryThenFetchAsyncAction( + final Logger logger, + final SearchTransportService searchTransportService, + final Function nodeIdToConnection, + final Map aliasFilter, + final Map concreteIndexBoosts, + final SearchPhaseController searchPhaseController, + final Executor executor, + final SearchRequest request, + final ActionListener listener, + final GroupShardsIterator shardsIts, + final TransportSearchAction.SearchTimeProvider timeProvider, + long clusterStateVersion, + SearchTask task) { + super( + "query", + logger, + searchTransportService, + nodeIdToConnection, + aliasFilter, + concreteIndexBoosts, + executor, + request, + listener, + shardsIts, + timeProvider, + clusterStateVersion, + task, + searchPhaseController.newSearchPhaseResults(request, shardsIts.size())); this.searchPhaseController = searchPhaseController; } - protected void executePhaseOnShard(ShardIterator shardIt, ShardRouting shard, ActionListener listener) { - getSearchTransport().sendExecuteQuery(getConnection(shard.currentNodeId()), - buildShardSearchRequest(shardIt, shard), getTask(), listener); + protected void executePhaseOnShard( + final ShardIterator shardIt, + final ShardRouting shard, + final ActionListener listener) { + getSearchTransport().sendExecuteQuery( + getConnection(shard.currentNodeId()), + buildShardSearchRequest(shardIt, shard), + getTask(), + listener); } @Override - protected SearchPhase getNextPhase(SearchPhaseResults results, SearchPhaseContext context) { + protected SearchPhase getNextPhase( + final SearchPhaseResults results, + final SearchPhaseContext context) { return new FetchSearchPhase(results, searchPhaseController, context); } + } diff --git a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index e86cfef6e14..008d022a655 100644 --- a/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -52,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.function.Function; +import java.util.function.LongSupplier; import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH; @@ -116,10 +117,62 @@ public class TransportSearchAction extends HandledTransportAction listener) { - // pure paranoia if time goes backwards we are at least positive - final long startTimeInMillis = Math.max(0, System.currentTimeMillis()); + final long absoluteStartMillis = System.currentTimeMillis(); + final long relativeStartNanos = System.nanoTime(); + final SearchTimeProvider timeProvider = + new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime); + final String[] localIndices; final Map> remoteClusterIndices; final ClusterState clusterState = clusterService.state(); @@ -134,7 +187,7 @@ public class TransportSearchAction extends HandledTransportAction null, clusterState, Collections.emptyMap(), listener); } else { remoteClusterService.collectSearchShards(searchRequest, remoteClusterIndices, @@ -143,13 +196,13 @@ public class TransportSearchAction extends HandledTransportAction remoteAliasFilters = new HashMap<>(); Function connectionFunction = remoteClusterService.processRemoteShards( searchShardsResponses, remoteShardIterators, remoteAliasFilters); - executeSearch((SearchTask)task, startTimeInMillis, searchRequest, localIndices, remoteShardIterators, + executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators, connectionFunction, clusterState, remoteAliasFilters, listener); }, listener::onFailure)); } } - private void executeSearch(SearchTask task, long startTimeInMillis, SearchRequest searchRequest, String[] localIndices, + private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, String[] localIndices, List remoteShardIterators, Function remoteConnections, ClusterState clusterState, Map remoteAliasMap, ActionListener listener) { @@ -163,7 +216,7 @@ public class TransportSearchAction extends HandledTransportAction aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), @@ -211,7 +264,7 @@ public class TransportSearchAction extends HandledTransportAction connectionLookup, + SearchTimeProvider timeProvider, Function connectionLookup, long clusterStateVersion, Map aliasFilter, Map concreteIndexBoosts, ActionListener listener) { @@ -245,12 +298,12 @@ public class TransportSearchAction extends HandledTransportAction createAction( + final boolean controlled, + final AtomicLong expected) { + + final Runnable runnable; + final TransportSearchAction.SearchTimeProvider timeProvider; + if (controlled) { + runnable = () -> expected.set(randomNonNegativeLong()); + timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, expected::get); + } else { + runnable = () -> { + long elapsed = spinForAtLeastNMilliseconds(randomIntBetween(1, 10)); + expected.set(elapsed); + }; + timeProvider = new TransportSearchAction.SearchTimeProvider( + 0, + System.nanoTime(), + System::nanoTime); + } + + final ShardIterator it = new ShardIterator() { + @Override + public ShardId shardId() { + return null; + } + + @Override + public void reset() { + + } + + @Override + public int compareTo(ShardIterator o) { + return 0; + } + + @Override + public int size() { + return 0; + } + + @Override + public int sizeActive() { + return 0; + } + + @Override + public ShardRouting nextOrNull() { + return null; + } + + @Override + public int remaining() { + return 0; + } + + @Override + public Iterable asUnordered() { + return null; + } + }; + + return new AbstractSearchAsyncAction( + "test", + null, + null, + null, + null, + null, + null, + null, + null, + new GroupShardsIterator(Collections.singletonList(it)), + timeProvider, + 0, + null, + null + ) { + @Override + protected SearchPhase getNextPhase( + final SearchPhaseResults results, + final SearchPhaseContext context) { + return null; + } + + @Override + protected void executePhaseOnShard( + final ShardIterator shardIt, + final ShardRouting shard, + final ActionListener listener) { + + } + + @Override + long buildTookInMillis() { + runnable.run(); + return super.buildTookInMillis(); + } + }; + } + + public void testTookWithControlledClock() { + runTestTook(true); + } + + public void testTookWithRealClock() { + runTestTook(false); + } + + private void runTestTook(final boolean controlled) { + final AtomicLong expected = new AtomicLong(); + AbstractSearchAsyncAction action = createAction(controlled, expected); + final long actual = action.buildTookInMillis(); + if (controlled) { + // with a controlled clock, we can assert the exact took time + assertThat(actual, equalTo(TimeUnit.NANOSECONDS.toMillis(expected.get()))); + } else { + // with a real clock, the best we can say is that it took as long as we spun for + assertThat(actual, greaterThanOrEqualTo(TimeUnit.NANOSECONDS.toMillis(expected.get()))); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 9b7fad265bf..53e4eb59ae5 100644 --- a/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -92,9 +92,22 @@ public class SearchAsyncActionTests extends ESTestCase { lookup.put(primaryNode.getId(), new MockConnection(primaryNode)); lookup.put(replicaNode.getId(), new MockConnection(replicaNode)); Map aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)); - AbstractSearchAsyncAction asyncAction = new AbstractSearchAsyncAction("test", logger, transportService, - lookup::get, aliasFilters, Collections.emptyMap(), null, request, responseListener, shardsIter, 0, 0, null, - new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) { + AbstractSearchAsyncAction asyncAction = + new AbstractSearchAsyncAction( + "test", + logger, + transportService, + lookup::get, + aliasFilters, + Collections.emptyMap(), + null, + request, + responseListener, + shardsIter, + new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0), + 0, + null, + new InitialSearchPhase.SearchPhaseResults<>(shardsIter.size())) { TestSearchResponse response = new TestSearchResponse(); @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 1ea96e6f548..fa659e06fb2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -1093,9 +1093,15 @@ public abstract class ESTestCase extends LuceneTestCase { } protected static long spinForAtLeastOneMillisecond() { - long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(1, TimeUnit.MILLISECONDS); - // force at least one millisecond to elapse, but ensure the - // clock has enough resolution to observe the passage of time + return spinForAtLeastNMilliseconds(1); + } + + protected static long spinForAtLeastNMilliseconds(final long ms) { + long nanosecondsInMillisecond = TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS); + /* + * Force at least ms milliseconds to elapse, but ensure the clock has enough resolution to + * observe the passage of time. + */ long start = System.nanoTime(); long elapsed; while ((elapsed = (System.nanoTime() - start)) < nanosecondsInMillisecond) {