diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index febebbfcab7..94b560e8f4d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
@@ -343,12 +340,7 @@ public class CachingClusteredClientBenchmark
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool,
- new QueryScheduler(
- 0,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- )
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
diff --git a/extensions-contrib/moving-average-query/pom.xml b/extensions-contrib/moving-average-query/pom.xml
index c5ad9eebf6c..6c77b605118 100644
--- a/extensions-contrib/moving-average-query/pom.xml
+++ b/extensions-contrib/moving-average-query/pom.xml
@@ -105,6 +105,13 @@
test-jar
test
+
+ org.apache.druid
+ druid-server
+ ${project.parent.version}
+ test-jar
+ test
+
junit
junit
diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
index 8d76692e9c8..971f8ed8b26 100644
--- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
+++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java
@@ -65,10 +65,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@@ -365,12 +363,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
}
},
ForkJoinPool.commonPool(),
- new QueryScheduler(
- 0,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- )
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(
diff --git a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
index 007b7a25459..5f5de0e7813 100644
--- a/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/SegmentServerSelector.java
@@ -19,22 +19,46 @@
package org.apache.druid.client;
+import com.google.common.base.Preconditions;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.SegmentDescriptor;
+import javax.annotation.Nullable;
+
/**
* Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
*
- * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
+ * Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used
+ * by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries
*/
public class SegmentServerSelector extends Pair
{
+ /**
+ * This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick
+ * a {@link DruidServer} to query.
+ */
public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
+ Preconditions.checkNotNull(server, "ServerSelector must not be null");
+ Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}
+ /**
+ * This is for a segment hosted locally
+ */
+ public SegmentServerSelector(SegmentDescriptor segment)
+ {
+ super(null, segment);
+ Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
+ }
+
+ /**
+ * This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments
+ * which must be queried remotely (e.g. {@link CachingClusteredClient})
+ */
+ @Nullable
public ServerSelector getServer()
{
return lhs;
diff --git a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
index 0a01b8c4bb0..d7f39adaa4b 100644
--- a/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/server/LocalQuerySegmentWalker.java
@@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.google.inject.Inject;
+import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -27,6 +28,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@@ -39,6 +41,8 @@ import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.joda.time.Interval;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
@@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final SegmentWrangler segmentWrangler;
private final JoinableFactory joinableFactory;
+ private final QueryScheduler scheduler;
private final ServiceEmitter emitter;
@Inject
@@ -64,12 +69,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
+ QueryScheduler scheduler,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactory = joinableFactory;
+ this.scheduler = scheduler;
this.emitter = emitter;
}
@@ -82,21 +89,23 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
}
- final AtomicLong cpuAccumulator = new AtomicLong(0L);
- final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(query);
final Iterable segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
+ final Query prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
+
+ final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
- QueryContexts.getEnableJoinFilterPushDown(query),
- QueryContexts.getEnableJoinFilterRewrite(query),
- QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
- QueryContexts.getJoinFilterRewriteMaxSize(query),
- query.getFilter() == null ? null : query.getFilter().toFilter(),
- query.getVirtualColumns()
+ QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
+ QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
+ QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
+ QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
+ prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
+ prioritizedAndLaned.getVirtualColumns()
);
+ final QueryRunnerFactory> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);
final QueryRunner baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
() -> StreamSupport.stream(segments.spliterator(), false)
@@ -107,17 +116,25 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
- .create(baseRunner)
+ .create(scheduler.wrapQueryRunner(baseRunner))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}
-
@Override
public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs)
{
// SegmentWranglers only work based on intervals and cannot run with specific segments.
throw new ISE("Cannot run with specific segments");
}
+
+ private Query prioritizeAndLaneQuery(Query query, Iterable segments)
+ {
+ Set segmentServerSelectors = new HashSet<>();
+ for (Segment s : segments) {
+ segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor()));
+ }
+ return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors);
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 86c9ec96b87..f50b50c8039 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -30,10 +30,12 @@ import io.github.resilience4j.bulkhead.BulkheadRegistry;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
+import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.server.initialization.ServerConfig;
@@ -140,6 +142,17 @@ public class QueryScheduler implements QueryWatcher
return resultSequence.withBaggage(() -> finishLanes(bulkheads));
}
+ /**
+ * Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called.
+ */
+ public QueryRunner wrapQueryRunner(QueryRunner baseRunner)
+ {
+ return (queryPlus, responseContext) ->
+ QueryScheduler.this.run(
+ queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext))
+ );
+ }
+
/**
* Forcibly cancel all futures that have been registered to a specific query id
*/
diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
index 779fd5478d0..5303989b80c 100644
--- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
+++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java
@@ -47,11 +47,8 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
-import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
-import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -335,7 +332,7 @@ public class CachingClusteredClientFunctionalityTest
}
},
ForkJoinPool.commonPool(),
- new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}
diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
index 2215dcb162a..c98c7768dec 100644
--- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
+++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java
@@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest
// version VERSION, and shard spec SHARD_SPEC.
private ClientQuerySegmentWalker walker;
+ private ObservableQueryScheduler scheduler;
+
@Before
public void setUp()
{
closer = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
- initWalker(ImmutableMap.of());
+ scheduler = new ObservableQueryScheduler(
+ 8,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ NoQueryLaningStrategy.INSTANCE,
+ new ServerConfig()
+ );
+ initWalker(ImmutableMap.of(), scheduler);
}
@After
@@ -182,6 +192,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -200,6 +215,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.local(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -236,6 +256,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -263,6 +290,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(subquery)),
ImmutableList.of(new Object[]{3L})
);
+
+ Assert.assertEquals(1, scheduler.getTotalRun().get());
+ Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(1, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@@ -299,6 +331,13 @@ public class ClientQuerySegmentWalkerTest
new Object[]{"z", 1L}
)
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -351,6 +390,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{"y", "y", 1L})
);
+
+ // note: this should really be 1, but in the interim queries that are composed of multiple queries count each
+ // invocation of either the cluster or local walker in ClientQuerySegmentWalker
+ Assert.assertEquals(2, scheduler.getTotalRun().get());
+ Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
+ Assert.assertEquals(2, scheduler.getTotalAcquired().get());
+ Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@@ -408,9 +454,17 @@ public class ClientQuerySegmentWalkerTest
}
/**
- * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+ * Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler.
*/
private void initWalker(final Map serverProperties)
+ {
+ initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
+ }
+
+ /**
+ * Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
+ */
+ private void initWalker(final Map serverProperties, QueryScheduler schedulerForTest)
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class);
@@ -472,7 +526,7 @@ public class ClientQuerySegmentWalkerTest
),
joinableFactory,
conglomerate,
- null /* QueryScheduler */
+ schedulerForTest
),
ClusterOrLocal.CLUSTER
),
@@ -480,8 +534,9 @@ public class ClientQuerySegmentWalkerTest
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
- joinableFactory
- ),
+ joinableFactory,
+ schedulerForTest
+ ),
ClusterOrLocal.LOCAL
),
conglomerate,
diff --git a/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
new file mode 100644
index 00000000000..638f5f28dc9
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/ObservableQueryScheduler.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import org.apache.druid.client.SegmentServerSelector;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryPlus;
+import org.apache.druid.server.initialization.ServerConfig;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed
+ * and verified by tests
+ */
+public class ObservableQueryScheduler extends QueryScheduler
+{
+ private final AtomicLong totalAcquired;
+ private final AtomicLong totalReleased;
+ private final AtomicLong laneAcquired;
+ private final AtomicLong laneNotAcquired;
+ private final AtomicLong laneReleased;
+ private final AtomicLong totalPrioritizedAndLaned;
+ private final AtomicLong totalRun;
+
+ public ObservableQueryScheduler(
+ int totalNumThreads,
+ QueryPrioritizationStrategy prioritizationStrategy,
+ QueryLaningStrategy laningStrategy,
+ ServerConfig serverConfig
+ )
+ {
+ super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
+
+ totalAcquired = new AtomicLong();
+ totalReleased = new AtomicLong();
+ laneAcquired = new AtomicLong();
+ laneNotAcquired = new AtomicLong();
+ laneReleased = new AtomicLong();
+ totalPrioritizedAndLaned = new AtomicLong();
+ totalRun = new AtomicLong();
+ }
+
+ @Override
+ public Sequence run(
+ Query> query,
+ Sequence resultSequence
+ )
+ {
+ return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet);
+ }
+
+ @Override
+ public Query prioritizeAndLaneQuery(
+ QueryPlus queryPlus,
+ Set segments
+ )
+ {
+ totalPrioritizedAndLaned.incrementAndGet();
+ return super.prioritizeAndLaneQuery(queryPlus, segments);
+ }
+
+ @Override
+ List acquireLanes(Query> query)
+ {
+ List bulkheads = super.acquireLanes(query);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalAcquired.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneAcquired.incrementAndGet();
+ }
+
+ return bulkheads;
+ }
+
+ @Override
+ void releaseLanes(List bulkheads)
+ {
+ super.releaseLanes(bulkheads);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalReleased.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneReleased.incrementAndGet();
+ if (bulkheads.size() == 1) {
+ laneNotAcquired.incrementAndGet();
+ }
+ }
+ }
+
+ @Override
+ void finishLanes(List bulkheads)
+ {
+ super.finishLanes(bulkheads);
+ if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
+ totalReleased.incrementAndGet();
+ }
+ if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
+ laneReleased.incrementAndGet();
+ }
+ }
+
+ /**
+ * Number of times that 'total' query count semaphore was acquired
+ */
+ public AtomicLong getTotalAcquired()
+ {
+ return totalAcquired;
+ }
+
+ /**
+ * Number of times that 'total' query count semaphore was released
+ */
+ public AtomicLong getTotalReleased()
+ {
+ return totalReleased;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was acquired
+ */
+ public AtomicLong getLaneAcquired()
+ {
+ return laneAcquired;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired
+ */
+ public AtomicLong getLaneNotAcquired()
+ {
+ return laneNotAcquired;
+ }
+
+ /**
+ * Number of times that the query count semaphore of any lane was released
+ */
+ public AtomicLong getLaneReleased()
+ {
+ return laneReleased;
+ }
+
+ /**
+ * Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called
+ */
+ public AtomicLong getTotalPrioritizedAndLaned()
+ {
+ return totalPrioritizedAndLaned;
+ }
+
+ /**
+ * Number of times that {@link QueryScheduler#run} was called
+ */
+ public AtomicLong getTotalRun()
+ {
+ return totalRun;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index d17a3726be0..5bda4338738 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -174,12 +174,7 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
- queryScheduler = new QueryScheduler(
- 8,
- ManualQueryPrioritizationStrategy.INSTANCE,
- NoQueryLaningStrategy.INSTANCE,
- new ServerConfig()
- );
+ queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER;
testRequestLogger = new TestRequestLogger();
queryResource = new QueryResource(
new QueryLifecycleFactory(
diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 177be8214b1..485275b07bf 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
-import io.github.resilience4j.bulkhead.Bulkhead;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
@@ -72,7 +71,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicLong;
public class QuerySchedulerTest
{
@@ -708,95 +706,4 @@ public class QuerySchedulerTest
);
return injector;
}
-
- private static class ObservableQueryScheduler extends QueryScheduler
- {
- private final AtomicLong totalAcquired;
- private final AtomicLong totalReleased;
- private final AtomicLong laneAcquired;
- private final AtomicLong laneNotAcquired;
- private final AtomicLong laneReleased;
-
- public ObservableQueryScheduler(
- int totalNumThreads,
- QueryPrioritizationStrategy prioritizationStrategy,
- QueryLaningStrategy laningStrategy,
- ServerConfig serverConfig
- )
- {
- super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
-
- totalAcquired = new AtomicLong();
- totalReleased = new AtomicLong();
- laneAcquired = new AtomicLong();
- laneNotAcquired = new AtomicLong();
- laneReleased = new AtomicLong();
- }
-
- @Override
- List acquireLanes(Query> query)
- {
- List bulkheads = super.acquireLanes(query);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalAcquired.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneAcquired.incrementAndGet();
- }
-
- return bulkheads;
- }
-
- @Override
- void releaseLanes(List bulkheads)
- {
- super.releaseLanes(bulkheads);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalReleased.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneReleased.incrementAndGet();
- if (bulkheads.size() == 1) {
- laneNotAcquired.incrementAndGet();
- }
- }
- }
-
- @Override
- void finishLanes(List bulkheads)
- {
- super.finishLanes(bulkheads);
- if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
- totalReleased.incrementAndGet();
- }
- if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
- laneReleased.incrementAndGet();
- }
- }
-
- public AtomicLong getTotalAcquired()
- {
- return totalAcquired;
- }
-
- public AtomicLong getTotalReleased()
- {
- return totalReleased;
- }
-
- public AtomicLong getLaneAcquired()
- {
- return laneAcquired;
- }
-
- public AtomicLong getLaneNotAcquired()
- {
- return laneNotAcquired;
- }
-
- public AtomicLong getLaneReleased()
- {
- return laneReleased;
- }
- }
}
diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
index a07e14ed5fa..1370ed73eba 100644
--- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java
+++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java
@@ -64,6 +64,8 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
+import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
@@ -75,6 +77,12 @@ import java.util.Map;
*/
public class QueryStackTests
{
+ public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler(
+ 0,
+ ManualQueryPrioritizationStrategy.INSTANCE,
+ NoQueryLaningStrategy.INSTANCE,
+ new ServerConfig()
+ );
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
@@ -148,10 +156,17 @@ public class QueryStackTests
public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler,
- final JoinableFactory joinableFactory
+ final JoinableFactory joinableFactory,
+ final QueryScheduler scheduler
)
{
- return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER);
+ return new LocalQuerySegmentWalker(
+ conglomerate,
+ segmentWrangler,
+ joinableFactory,
+ scheduler,
+ EMITTER
+ );
}
/**
@@ -255,4 +270,5 @@ public class QueryStackTests
return conglomerate;
}
+
}
diff --git a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
index 7fef5a9d71c..cc3a406cfe7 100644
--- a/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
+++ b/server/src/test/java/org/apache/druid/server/TestClusterQuerySegmentWalker.java
@@ -165,11 +165,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
- // to function properly.
+ // to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using
+ // the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
+ // to actually serve the queries
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set segments = new HashSet<>();
- specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
+ specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 5d446c54a83..5ae13902c7e 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -77,6 +77,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
@@ -572,13 +573,13 @@ public class CalciteTests
final File tmpDir
)
{
- return createMockWalker(conglomerate, tmpDir, null);
+ return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir,
- @Nullable final QueryScheduler scheduler
+ final QueryScheduler scheduler
)
{
final QueryableIndex index1 = IndexBuilder
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
index 50134ffcb35..1da0fbabf9c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java
@@ -85,7 +85,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory,
- @Nullable final QueryScheduler scheduler
+ final QueryScheduler scheduler
)
{
final JoinableFactory joinableFactoryToUse;
@@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
- joinableFactoryToUse
+ joinableFactoryToUse,
+ scheduler
),
conglomerate,
new ServerConfig()
@@ -146,7 +147,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
},
null,
- null
+ QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}