Concurrent Searching (Experimental): modify profiling implementation to support concurrent data collection (#1673)

* Concurrent Searching (Experimental): modify profiling implementation to support concurrent data collection

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Addressing code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
This commit is contained in:
Andriy Redko 2022-01-10 18:11:48 -05:00 committed by GitHub
parent ec24ee56e8
commit 4dbf1d268c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 243 additions and 48 deletions

View File

@ -66,9 +66,9 @@ import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.dfs.AggregatedDfs;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfileBreakdown;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.QuerySearchResult;
@ -80,6 +80,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
/**
* Context-aware extension of {@link IndexSearcher}.
@ -102,7 +103,18 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader
) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader);
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, null);
}
public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader,
Executor executor
) throws IOException {
this(reader, similarity, queryCache, queryCachingPolicy, new MutableQueryTimeout(), wrapWithExitableDirectoryReader, executor);
}
private ContextIndexSearcher(
@ -111,9 +123,10 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
MutableQueryTimeout cancellable,
boolean wrapWithExitableDirectoryReader
boolean wrapWithExitableDirectoryReader,
Executor executor
) throws IOException {
super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader);
super(wrapWithExitableDirectoryReader ? new ExitableDirectoryReader((DirectoryReader) reader, cancellable) : reader, executor);
setSimilarity(similarity);
setQueryCache(queryCache);
setQueryCachingPolicy(queryCachingPolicy);
@ -178,7 +191,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
// createWeight() is called for each query in the tree, so we tell the queryProfiler
// each invocation so that it can build an internal representation of the query
// tree
QueryProfileBreakdown profile = profiler.getQueryBreakdown(query);
ContextualProfileBreakdown<QueryTimingType> profile = profiler.getQueryBreakdown(query);
Timer timer = profile.getTimer(QueryTimingType.CREATE_WEIGHT);
timer.start();
final Weight weight;
@ -411,4 +424,8 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
runnables.clear();
}
}
public boolean allowConcurrentSegmentSearch() {
return (getExecutor() != null);
}
}

View File

@ -69,13 +69,20 @@ public abstract class AbstractProfileBreakdown<T extends Enum<T>> {
}
/**
* Build a timing count breakdown.
* Build a timing count breakdown for current instance
*/
public final Map<String, Long> toBreakdownMap() {
Map<String, Long> map = new HashMap<>(timings.length * 2);
for (T timingType : timingTypes) {
map.put(timingType.toString(), timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", timings[timingType.ordinal()].getCount());
public Map<String, Long> toBreakdownMap() {
return buildBreakdownMap(this);
}
/**
* Build a timing count breakdown for arbitrary instance
*/
protected final Map<String, Long> buildBreakdownMap(AbstractProfileBreakdown<T> breakdown) {
Map<String, Long> map = new HashMap<>(breakdown.timings.length * 2);
for (T timingType : breakdown.timingTypes) {
map.put(timingType.toString(), breakdown.timings[timingType.ordinal()].getApproximateTiming());
map.put(timingType.toString() + "_count", breakdown.timings[timingType.ordinal()].getCount());
}
return Collections.unmodifiableMap(map);
}

View File

@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.search.profile;
/**
* Provide contextual profile breakdowns which are associated with freestyle context. Used when concurrent
* search over segments is activated and each collector needs own non-shareable profile breakdown instance.
*/
public abstract class ContextualProfileBreakdown<T extends Enum<T>> extends AbstractProfileBreakdown<T> {
public ContextualProfileBreakdown(Class<T> clazz) {
super(clazz);
}
/**
* Return (or create) contextual profile breakdown instance
* @param context freestyle context
* @return contextual profile breakdown instance
*/
public abstract AbstractProfileBreakdown<T> context(Object context);
}

View File

@ -57,7 +57,7 @@ public final class Profilers {
/** Switch to a new profile. */
public QueryProfiler addQueryProfiler() {
QueryProfiler profiler = new QueryProfiler();
QueryProfiler profiler = new QueryProfiler(searcher.allowConcurrentSegmentSearch());
searcher.setProfiler(profiler);
queryProfilers.add(profiler);
return profiler;

View File

@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.search.profile.query;
import org.opensearch.search.profile.AbstractProfileBreakdown;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A record of timings for the various operations that may happen during query execution.
* A node's time may be composed of several internal attributes (rewriting, weighting,
* scoring, etc). The class supports profiling the concurrent search over segments.
*/
public final class ConcurrentQueryProfileBreakdown extends ContextualProfileBreakdown<QueryTimingType> {
private final Map<Object, AbstractProfileBreakdown<QueryTimingType>> contexts = new ConcurrentHashMap<>();
/** Sole constructor. */
public ConcurrentQueryProfileBreakdown() {
super(QueryTimingType.class);
}
@Override
public AbstractProfileBreakdown<QueryTimingType> context(Object context) {
// See please https://bugs.openjdk.java.net/browse/JDK-8161372
final AbstractProfileBreakdown<QueryTimingType> profile = contexts.get(context);
if (profile != null) {
return profile;
}
return contexts.computeIfAbsent(context, ctx -> new QueryProfileBreakdown());
}
@Override
public Map<String, Long> toBreakdownMap() {
final Map<String, Long> map = new HashMap<>(buildBreakdownMap(this));
for (final AbstractProfileBreakdown<QueryTimingType> context : contexts.values()) {
for (final Map.Entry<String, Long> entry : buildBreakdownMap(context).entrySet()) {
map.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
return map;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.lucene.search.ScoreMode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
@ -50,7 +51,7 @@ import java.util.List;
*
* InternalProfiler facilitates the linking of the Collector graph
*/
public class InternalProfileCollector implements Collector {
public class InternalProfileCollector implements Collector, InternalProfileComponent {
/**
* A more friendly representation of the Collector's class name
@ -68,9 +69,9 @@ public class InternalProfileCollector implements Collector {
/**
* A list of "embedded" children collectors
*/
private final List<InternalProfileCollector> children;
private final List<? extends InternalProfileComponent> children;
public InternalProfileCollector(Collector collector, String reason, List<InternalProfileCollector> children) {
public InternalProfileCollector(Collector collector, String reason, List<? extends InternalProfileComponent> children) {
this.collector = new ProfileCollector(collector);
this.reason = reason;
this.collectorName = deriveCollectorName(collector);
@ -98,6 +99,13 @@ public class InternalProfileCollector implements Collector {
return this.collectorName;
}
/**
* @return the underlying collector instance being profiled
*/
public Collector getCollector() {
return collector.getDelegate();
}
/**
* Creates a human-friendly representation of the Collector name.
*
@ -134,13 +142,19 @@ public class InternalProfileCollector implements Collector {
return collector.scoreMode();
}
@Override
public Collection<? extends InternalProfileComponent> children() {
return children;
}
@Override
public CollectorResult getCollectorTree() {
return InternalProfileCollector.doGetCollectorTree(this);
}
private static CollectorResult doGetCollectorTree(InternalProfileCollector collector) {
List<CollectorResult> childResults = new ArrayList<>(collector.children.size());
for (InternalProfileCollector child : collector.children) {
static CollectorResult doGetCollectorTree(InternalProfileComponent collector) {
List<CollectorResult> childResults = new ArrayList<>(collector.children().size());
for (InternalProfileComponent child : collector.children()) {
CollectorResult result = doGetCollectorTree(child);
childResults.add(result);
}

View File

@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.search.profile.query;
import java.util.Collection;
public interface InternalProfileComponent {
/**
* @return profile component name
*/
String getName();
/**
* @return the reason this profile component has been included
*/
String getReason();
/**
* @return the time taken by this profile component
*/
long getTime();
/**
* @return the profiling results for this profile component
*/
CollectorResult getCollectorTree();
/**
* @return the children of this profile component (if any)
*/
Collection<? extends InternalProfileComponent> children();
}

View File

@ -34,6 +34,7 @@ package org.opensearch.search.profile.query;
import org.apache.lucene.search.Query;
import org.opensearch.search.profile.AbstractInternalProfileTree;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.ProfileResult;
/**
@ -41,15 +42,20 @@ import org.opensearch.search.profile.ProfileResult;
* generates {@link QueryProfileBreakdown} for each node in the tree. It also finalizes the tree
* and returns a list of {@link ProfileResult} that can be serialized back to the client
*/
final class InternalQueryProfileTree extends AbstractInternalProfileTree<QueryProfileBreakdown, Query> {
final class InternalQueryProfileTree extends AbstractInternalProfileTree<ContextualProfileBreakdown<QueryTimingType>, Query> {
/** Rewrite time */
private long rewriteTime;
private long rewriteScratch;
private final boolean concurrent;
InternalQueryProfileTree(boolean concurrent) {
this.concurrent = concurrent;
}
@Override
protected QueryProfileBreakdown createProfileBreakdown() {
return new QueryProfileBreakdown();
protected ContextualProfileBreakdown<QueryTimingType> createProfileBreakdown() {
return (concurrent) ? new ConcurrentQueryProfileBreakdown() : new QueryProfileBreakdown();
}
@Override

View File

@ -36,6 +36,7 @@ import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TwoPhaseIterator;
import org.apache.lucene.search.Weight;
import org.opensearch.search.profile.AbstractProfileBreakdown;
import org.opensearch.search.profile.Timer;
import java.io.IOException;
@ -53,7 +54,7 @@ final class ProfileScorer extends Scorer {
private final Timer scoreTimer, nextDocTimer, advanceTimer, matchTimer, shallowAdvanceTimer, computeMaxScoreTimer,
setMinCompetitiveScoreTimer;
ProfileScorer(ProfileWeight w, Scorer scorer, QueryProfileBreakdown profile) throws IOException {
ProfileScorer(ProfileWeight w, Scorer scorer, AbstractProfileBreakdown<QueryTimingType> profile) throws IOException {
super(w);
this.scorer = scorer;
this.profileWeight = w;

View File

@ -40,6 +40,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import org.opensearch.search.profile.Timer;
import java.io.IOException;
@ -53,9 +54,9 @@ import java.util.Set;
public final class ProfileWeight extends Weight {
private final Weight subQueryWeight;
private final QueryProfileBreakdown profile;
private final ContextualProfileBreakdown<QueryTimingType> profile;
public ProfileWeight(Query query, Weight subQueryWeight, QueryProfileBreakdown profile) throws IOException {
public ProfileWeight(Query query, Weight subQueryWeight, ContextualProfileBreakdown<QueryTimingType> profile) throws IOException {
super(query);
this.subQueryWeight = subQueryWeight;
this.profile = profile;
@ -72,7 +73,7 @@ public final class ProfileWeight extends Weight {
@Override
public ScorerSupplier scorerSupplier(LeafReaderContext context) throws IOException {
Timer timer = profile.getTimer(QueryTimingType.BUILD_SCORER);
Timer timer = profile.context(context).getTimer(QueryTimingType.BUILD_SCORER);
timer.start();
final ScorerSupplier subQueryScorerSupplier;
try {
@ -91,7 +92,7 @@ public final class ProfileWeight extends Weight {
public Scorer get(long loadCost) throws IOException {
timer.start();
try {
return new ProfileScorer(weight, subQueryScorerSupplier.get(loadCost), profile);
return new ProfileScorer(weight, subQueryScorerSupplier.get(loadCost), profile.context(context));
} finally {
timer.stop();
}

View File

@ -33,16 +33,22 @@
package org.opensearch.search.profile.query;
import org.opensearch.search.profile.AbstractProfileBreakdown;
import org.opensearch.search.profile.ContextualProfileBreakdown;
/**
* A record of timings for the various operations that may happen during query execution.
* A node's time may be composed of several internal attributes (rewriting, weighting,
* scoring, etc).
*/
public final class QueryProfileBreakdown extends AbstractProfileBreakdown<QueryTimingType> {
public final class QueryProfileBreakdown extends ContextualProfileBreakdown<QueryTimingType> {
/** Sole constructor. */
public QueryProfileBreakdown() {
super(QueryTimingType.class);
}
@Override
public AbstractProfileBreakdown<QueryTimingType> context(Object context) {
return this;
}
}

View File

@ -34,6 +34,7 @@ package org.opensearch.search.profile.query;
import org.apache.lucene.search.Query;
import org.opensearch.search.profile.AbstractProfiler;
import org.opensearch.search.profile.ContextualProfileBreakdown;
import java.util.Objects;
@ -48,19 +49,19 @@ import java.util.Objects;
* request may execute two searches (query + global agg). A Profiler just
* represents one of those
*/
public final class QueryProfiler extends AbstractProfiler<QueryProfileBreakdown, Query> {
public final class QueryProfiler extends AbstractProfiler<ContextualProfileBreakdown<QueryTimingType>, Query> {
/**
* The root Collector used in the search
*/
private InternalProfileCollector collector;
private InternalProfileComponent collector;
public QueryProfiler() {
super(new InternalQueryProfileTree());
public QueryProfiler(boolean concurrent) {
super(new InternalQueryProfileTree(concurrent));
}
/** Set the collector that is associated with this profiler. */
public void setCollector(InternalProfileCollector collector) {
public void setCollector(InternalProfileComponent collector) {
if (this.collector != null) {
throw new IllegalStateException("The collector can only be set once.");
}

View File

@ -32,6 +32,8 @@
package org.opensearch.search.profile.query;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
@ -61,26 +63,43 @@ import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.profile.ProfileResult;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
public class QueryProfilerTests extends OpenSearchTestCase {
static Directory dir;
static IndexReader reader;
static ContextIndexSearcher searcher;
private Directory dir;
private IndexReader reader;
private ContextIndexSearcher searcher;
private ExecutorService executor;
@ParametersFactory
public static Collection<Object[]> concurrency() {
return Arrays.asList(new Integer[] { 0 }, new Integer[] { 5 });
}
public QueryProfilerTests(int concurrency) {
this.executor = (concurrency > 0) ? Executors.newFixedThreadPool(concurrency) : null;
}
@Before
public void setUp() throws Exception {
super.setUp();
@BeforeClass
public static void setup() throws IOException {
dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
final int numDocs = TestUtil.nextInt(random(), 1, 20);
@ -100,21 +119,25 @@ public class QueryProfilerTests extends OpenSearchTestCase {
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
ALWAYS_CACHE_POLICY,
true
true,
executor
);
}
@After
public void checkNoCache() {
public void tearDown() throws Exception {
super.tearDown();
LRUQueryCache cache = (LRUQueryCache) searcher.getQueryCache();
assertThat(cache.getHitCount(), equalTo(0L));
assertThat(cache.getCacheCount(), equalTo(0L));
assertThat(cache.getTotalCount(), equalTo(cache.getMissCount()));
assertThat(cache.getCacheSize(), equalTo(0L));
}
@AfterClass
public static void cleanup() throws IOException {
if (executor != null) {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
IOUtils.close(reader, dir);
dir = null;
reader = null;
@ -122,7 +145,7 @@ public class QueryProfilerTests extends OpenSearchTestCase {
}
public void testBasic() throws IOException {
QueryProfiler profiler = new QueryProfiler();
QueryProfiler profiler = new QueryProfiler(searcher.allowConcurrentSegmentSearch());
searcher.setProfiler(profiler);
Query query = new TermQuery(new Term("foo", "bar"));
searcher.search(query, 1);
@ -148,7 +171,7 @@ public class QueryProfilerTests extends OpenSearchTestCase {
}
public void testNoScoring() throws IOException {
QueryProfiler profiler = new QueryProfiler();
QueryProfiler profiler = new QueryProfiler(searcher.allowConcurrentSegmentSearch());
searcher.setProfiler(profiler);
Query query = new TermQuery(new Term("foo", "bar"));
searcher.search(query, 1, Sort.INDEXORDER); // scores are not needed
@ -174,7 +197,7 @@ public class QueryProfilerTests extends OpenSearchTestCase {
}
public void testUseIndexStats() throws IOException {
QueryProfiler profiler = new QueryProfiler();
QueryProfiler profiler = new QueryProfiler(searcher.allowConcurrentSegmentSearch());
searcher.setProfiler(profiler);
Query query = new TermQuery(new Term("foo", "bar"));
searcher.count(query); // will use index stats
@ -186,7 +209,7 @@ public class QueryProfilerTests extends OpenSearchTestCase {
}
public void testApproximations() throws IOException {
QueryProfiler profiler = new QueryProfiler();
QueryProfiler profiler = new QueryProfiler(searcher.allowConcurrentSegmentSearch());
searcher.setProfiler(profiler);
Query query = new RandomApproximationQuery(new TermQuery(new Term("foo", "bar")), random());
searcher.count(query);