add lane enforcement for joinish queries (#9563)

* add lane enforcement for joinish queries

* oops

* style

* review stuffs
This commit is contained in:
Clint Wylie 2020-03-30 11:58:16 -07:00 committed by GitHub
parent c0195a19e4
commit fa5da6693c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 348 additions and 148 deletions

View File

@ -104,11 +104,8 @@ import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.DataSegment.PruneSpecsHolder;
import org.apache.druid.timeline.SegmentId;
@ -343,12 +340,7 @@ public class CachingClusteredClientBenchmark
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool,
new QueryScheduler(
0,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
)
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}

View File

@ -105,6 +105,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -65,10 +65,8 @@ import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.TimelineLookup;
import org.hamcrest.core.IsInstanceOf;
@ -365,12 +363,7 @@ public class MovingAverageQueryTest extends InitializedNullHandlingTest
}
},
ForkJoinPool.commonPool(),
new QueryScheduler(
0,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
)
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(

View File

@ -19,22 +19,46 @@
package org.apache.druid.client;
import com.google.common.base.Preconditions;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.SegmentDescriptor;
import javax.annotation.Nullable;
/**
* Given a {@link SegmentDescriptor}, get a {@link ServerSelector} to use to pick a {@link DruidServer} to query.
*
* Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data
* Used by {@link CachingClusteredClient} on the broker to fan out queries to historical and realtime data. Used
* by {@link org.apache.druid.server.LocalQuerySegmentWalker} on the broker for on broker queries
*/
public class SegmentServerSelector extends Pair<ServerSelector, SegmentDescriptor>
{
/**
* This is for a segment hosted on a remote server, where {@link ServerSelector} may be used to pick
* a {@link DruidServer} to query.
*/
public SegmentServerSelector(ServerSelector server, SegmentDescriptor segment)
{
super(server, segment);
Preconditions.checkNotNull(server, "ServerSelector must not be null");
Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}
/**
* This is for a segment hosted locally
*/
public SegmentServerSelector(SegmentDescriptor segment)
{
super(null, segment);
Preconditions.checkNotNull(segment, "SegmentDescriptor must not be null");
}
/**
* This may be null if {@link SegmentDescriptor} is locally available, but will definitely not be null for segments
* which must be queried remotely (e.g. {@link CachingClusteredClient})
*/
@Nullable
public ServerSelector getServer()
{
return lhs;

View File

@ -20,6 +20,7 @@
package org.apache.druid.server;
import com.google.inject.Inject;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -27,6 +28,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
@ -39,6 +41,8 @@ import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.joda.time.Interval;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
@ -57,6 +61,7 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
private final QueryRunnerFactoryConglomerate conglomerate;
private final SegmentWrangler segmentWrangler;
private final JoinableFactory joinableFactory;
private final QueryScheduler scheduler;
private final ServiceEmitter emitter;
@Inject
@ -64,12 +69,14 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
QueryRunnerFactoryConglomerate conglomerate,
SegmentWrangler segmentWrangler,
JoinableFactory joinableFactory,
QueryScheduler scheduler,
ServiceEmitter emitter
)
{
this.conglomerate = conglomerate;
this.segmentWrangler = segmentWrangler;
this.joinableFactory = joinableFactory;
this.scheduler = scheduler;
this.emitter = emitter;
}
@ -82,21 +89,23 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
throw new IAE("Cannot query dataSource locally: %s", analysis.getDataSource());
}
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(query);
final Iterable<Segment> segments = segmentWrangler.getSegmentsForIntervals(analysis.getBaseDataSource(), intervals);
final Query<T> prioritizedAndLaned = prioritizeAndLaneQuery(query, segments);
final AtomicLong cpuAccumulator = new AtomicLong(0L);
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
cpuAccumulator,
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
QueryContexts.getEnableJoinFilterPushDown(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewrite(prioritizedAndLaned),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(prioritizedAndLaned),
QueryContexts.getJoinFilterRewriteMaxSize(prioritizedAndLaned),
prioritizedAndLaned.getFilter() == null ? null : prioritizedAndLaned.getFilter().toFilter(),
prioritizedAndLaned.getVirtualColumns()
);
final QueryRunnerFactory<T, Query<T>> queryRunnerFactory = conglomerate.findFactory(prioritizedAndLaned);
final QueryRunner<T> baseRunner = queryRunnerFactory.mergeRunners(
Execs.directExecutor(),
() -> StreamSupport.stream(segments.spliterator(), false)
@ -107,17 +116,25 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
.create(baseRunner)
.create(scheduler.wrapQueryRunner(baseRunner))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter, cpuAccumulator);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
// SegmentWranglers only work based on intervals and cannot run with specific segments.
throw new ISE("Cannot run with specific segments");
}
private <T> Query<T> prioritizeAndLaneQuery(Query<T> query, Iterable<Segment> segments)
{
Set<SegmentServerSelector> segmentServerSelectors = new HashSet<>();
for (Segment s : segments) {
segmentServerSelectors.add(new SegmentServerSelector(s.getId().toDescriptor()));
}
return scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), segmentServerSelectors);
}
}

View File

@ -30,10 +30,12 @@ import io.github.resilience4j.bulkhead.BulkheadRegistry;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.server.initialization.ServerConfig;
@ -140,6 +142,17 @@ public class QueryScheduler implements QueryWatcher
return resultSequence.withBaggage(() -> finishLanes(bulkheads));
}
/**
* Returns a {@link QueryRunner} that will call {@link QueryScheduler#run} when {@link QueryRunner#run} is called.
*/
public <T> QueryRunner<T> wrapQueryRunner(QueryRunner<T> baseRunner)
{
return (queryPlus, responseContext) ->
QueryScheduler.this.run(
queryPlus.getQuery(), new LazySequence<>(() -> baseRunner.run(queryPlus, responseContext))
);
}
/**
* Forcibly cancel all futures that have been registered to a specific query id
*/

View File

@ -47,11 +47,8 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@ -335,7 +332,7 @@ public class CachingClusteredClientFunctionalityTest
}
},
ForkJoinPool.commonPool(),
new QueryScheduler(0, ManualQueryPrioritizationStrategy.INSTANCE, NoQueryLaningStrategy.INSTANCE, new ServerConfig())
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}

View File

@ -70,6 +70,8 @@ import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
@ -151,12 +153,20 @@ public class ClientQuerySegmentWalkerTest
// version VERSION, and shard spec SHARD_SPEC.
private ClientQuerySegmentWalker walker;
private ObservableQueryScheduler scheduler;
@Before
public void setUp()
{
closer = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
initWalker(ImmutableMap.of());
scheduler = new ObservableQueryScheduler(
8,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
);
initWalker(ImmutableMap.of(), scheduler);
}
@After
@ -182,6 +192,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
Assert.assertEquals(1, scheduler.getTotalRun().get());
Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(1, scheduler.getTotalAcquired().get());
Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@ -200,6 +215,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.local(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
Assert.assertEquals(1, scheduler.getTotalRun().get());
Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(1, scheduler.getTotalAcquired().get());
Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@ -236,6 +256,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
// note: this should really be 1, but in the interim queries that are composed of multiple queries count each
// invocation of either the cluster or local walker in ClientQuerySegmentWalker
Assert.assertEquals(2, scheduler.getTotalRun().get());
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@ -263,6 +290,11 @@ public class ClientQuerySegmentWalkerTest
ImmutableList.of(ExpectedQuery.cluster(subquery)),
ImmutableList.of(new Object[]{3L})
);
Assert.assertEquals(1, scheduler.getTotalRun().get());
Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(1, scheduler.getTotalAcquired().get());
Assert.assertEquals(1, scheduler.getTotalReleased().get());
}
@Test
@ -299,6 +331,13 @@ public class ClientQuerySegmentWalkerTest
new Object[]{"z", 1L}
)
);
// note: this should really be 1, but in the interim queries that are composed of multiple queries count each
// invocation of either the cluster or local walker in ClientQuerySegmentWalker
Assert.assertEquals(2, scheduler.getTotalRun().get());
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@ -351,6 +390,13 @@ public class ClientQuerySegmentWalkerTest
),
ImmutableList.of(new Object[]{"y", "y", 1L})
);
// note: this should really be 1, but in the interim queries that are composed of multiple queries count each
// invocation of either the cluster or local walker in ClientQuerySegmentWalker
Assert.assertEquals(2, scheduler.getTotalRun().get());
Assert.assertEquals(2, scheduler.getTotalPrioritizedAndLaned().get());
Assert.assertEquals(2, scheduler.getTotalAcquired().get());
Assert.assertEquals(2, scheduler.getTotalReleased().get());
}
@Test
@ -408,9 +454,17 @@ public class ClientQuerySegmentWalkerTest
}
/**
* Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
* Initialize (or reinitialize) our {@link #walker} and {@link #closer} with default scheduler.
*/
private void initWalker(final Map<String, String> serverProperties)
{
initWalker(serverProperties, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
}
/**
* Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
*/
private void initWalker(final Map<String, String> serverProperties, QueryScheduler schedulerForTest)
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class);
@ -472,7 +526,7 @@ public class ClientQuerySegmentWalkerTest
),
joinableFactory,
conglomerate,
null /* QueryScheduler */
schedulerForTest
),
ClusterOrLocal.CLUSTER
),
@ -480,8 +534,9 @@ public class ClientQuerySegmentWalkerTest
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
joinableFactory
),
joinableFactory,
schedulerForTest
),
ClusterOrLocal.LOCAL
),
conglomerate,

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import io.github.resilience4j.bulkhead.Bulkhead;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.initialization.ServerConfig;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
/**
* {@link QueryScheduler} for testing, with counters on its internal functions so its operation can be observed
* and verified by tests
*/
public class ObservableQueryScheduler extends QueryScheduler
{
private final AtomicLong totalAcquired;
private final AtomicLong totalReleased;
private final AtomicLong laneAcquired;
private final AtomicLong laneNotAcquired;
private final AtomicLong laneReleased;
private final AtomicLong totalPrioritizedAndLaned;
private final AtomicLong totalRun;
public ObservableQueryScheduler(
int totalNumThreads,
QueryPrioritizationStrategy prioritizationStrategy,
QueryLaningStrategy laningStrategy,
ServerConfig serverConfig
)
{
super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
totalAcquired = new AtomicLong();
totalReleased = new AtomicLong();
laneAcquired = new AtomicLong();
laneNotAcquired = new AtomicLong();
laneReleased = new AtomicLong();
totalPrioritizedAndLaned = new AtomicLong();
totalRun = new AtomicLong();
}
@Override
public <T> Sequence<T> run(
Query<?> query,
Sequence<T> resultSequence
)
{
return super.run(query, resultSequence).withBaggage(totalRun::incrementAndGet);
}
@Override
public <T> Query<T> prioritizeAndLaneQuery(
QueryPlus<T> queryPlus,
Set<SegmentServerSelector> segments
)
{
totalPrioritizedAndLaned.incrementAndGet();
return super.prioritizeAndLaneQuery(queryPlus, segments);
}
@Override
List<Bulkhead> acquireLanes(Query<?> query)
{
List<Bulkhead> bulkheads = super.acquireLanes(query);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalAcquired.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneAcquired.incrementAndGet();
}
return bulkheads;
}
@Override
void releaseLanes(List<Bulkhead> bulkheads)
{
super.releaseLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
if (bulkheads.size() == 1) {
laneNotAcquired.incrementAndGet();
}
}
}
@Override
void finishLanes(List<Bulkhead> bulkheads)
{
super.finishLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
}
}
/**
* Number of times that 'total' query count semaphore was acquired
*/
public AtomicLong getTotalAcquired()
{
return totalAcquired;
}
/**
* Number of times that 'total' query count semaphore was released
*/
public AtomicLong getTotalReleased()
{
return totalReleased;
}
/**
* Number of times that the query count semaphore of any lane was acquired
*/
public AtomicLong getLaneAcquired()
{
return laneAcquired;
}
/**
* Number of times that the query count semaphore of any lane was acquired but the 'total' semaphore was NOT acquired
*/
public AtomicLong getLaneNotAcquired()
{
return laneNotAcquired;
}
/**
* Number of times that the query count semaphore of any lane was released
*/
public AtomicLong getLaneReleased()
{
return laneReleased;
}
/**
* Number of times that {@link QueryScheduler#prioritizeAndLaneQuery} was called
*/
public AtomicLong getTotalPrioritizedAndLaned()
{
return totalPrioritizedAndLaned;
}
/**
* Number of times that {@link QueryScheduler#run} was called
*/
public AtomicLong getTotalRun()
{
return totalRun;
}
}

View File

@ -174,12 +174,7 @@ public class QueryResourceTest
EasyMock.expect(testServletRequest.getHeader("Accept")).andReturn(MediaType.APPLICATION_JSON).anyTimes();
EasyMock.expect(testServletRequest.getHeader(QueryResource.HEADER_IF_NONE_MATCH)).andReturn(null).anyTimes();
EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes();
queryScheduler = new QueryScheduler(
8,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
);
queryScheduler = QueryStackTests.DEFAULT_NOOP_SCHEDULER;
testRequestLogger = new TestRequestLogger();
queryResource = new QueryResource(
new QueryLifecycleFactory(

View File

@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.ProvisionException;
import io.github.resilience4j.bulkhead.Bulkhead;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
@ -72,7 +71,6 @@ import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
public class QuerySchedulerTest
{
@ -708,95 +706,4 @@ public class QuerySchedulerTest
);
return injector;
}
private static class ObservableQueryScheduler extends QueryScheduler
{
private final AtomicLong totalAcquired;
private final AtomicLong totalReleased;
private final AtomicLong laneAcquired;
private final AtomicLong laneNotAcquired;
private final AtomicLong laneReleased;
public ObservableQueryScheduler(
int totalNumThreads,
QueryPrioritizationStrategy prioritizationStrategy,
QueryLaningStrategy laningStrategy,
ServerConfig serverConfig
)
{
super(totalNumThreads, prioritizationStrategy, laningStrategy, serverConfig);
totalAcquired = new AtomicLong();
totalReleased = new AtomicLong();
laneAcquired = new AtomicLong();
laneNotAcquired = new AtomicLong();
laneReleased = new AtomicLong();
}
@Override
List<Bulkhead> acquireLanes(Query<?> query)
{
List<Bulkhead> bulkheads = super.acquireLanes(query);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalAcquired.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneAcquired.incrementAndGet();
}
return bulkheads;
}
@Override
void releaseLanes(List<Bulkhead> bulkheads)
{
super.releaseLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
if (bulkheads.size() == 1) {
laneNotAcquired.incrementAndGet();
}
}
}
@Override
void finishLanes(List<Bulkhead> bulkheads)
{
super.finishLanes(bulkheads);
if (bulkheads.stream().anyMatch(b -> b.getName().equals(QueryScheduler.TOTAL))) {
totalReleased.incrementAndGet();
}
if (bulkheads.stream().anyMatch(b -> !b.getName().equals(QueryScheduler.TOTAL))) {
laneReleased.incrementAndGet();
}
}
public AtomicLong getTotalAcquired()
{
return totalAcquired;
}
public AtomicLong getTotalReleased()
{
return totalReleased;
}
public AtomicLong getLaneAcquired()
{
return laneAcquired;
}
public AtomicLong getLaneNotAcquired()
{
return laneNotAcquired;
}
public AtomicLong getLaneReleased()
{
return laneReleased;
}
}
}

View File

@ -64,6 +64,8 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
@ -75,6 +77,12 @@ import java.util.Map;
*/
public class QueryStackTests
{
public static final QueryScheduler DEFAULT_NOOP_SCHEDULER = new QueryScheduler(
0,
ManualQueryPrioritizationStrategy.INSTANCE,
NoQueryLaningStrategy.INSTANCE,
new ServerConfig()
);
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
@ -148,10 +156,17 @@ public class QueryStackTests
public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler,
final JoinableFactory joinableFactory
final JoinableFactory joinableFactory,
final QueryScheduler scheduler
)
{
return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER);
return new LocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
joinableFactory,
scheduler,
EMITTER
);
}
/**
@ -255,4 +270,5 @@ public class QueryStackTests
return conglomerate;
}
}

View File

@ -165,11 +165,13 @@ public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly.
// to function properly. SegmentServerSelector does not currently mimic CachingClusteredClient, it is using
// the LocalQuerySegmentWalker constructor instead since this walker is not mimic remote DruidServer objects
// to actually serve the queries
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
specs.forEach(spec -> segments.add(new SegmentServerSelector(spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(

View File

@ -77,6 +77,7 @@ import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFacto
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.log.NoopRequestLogger;
import org.apache.druid.server.security.Access;
@ -572,13 +573,13 @@ public class CalciteTests
final File tmpDir
)
{
return createMockWalker(conglomerate, tmpDir, null);
return createMockWalker(conglomerate, tmpDir, QueryStackTests.DEFAULT_NOOP_SCHEDULER);
}
public static SpecificSegmentsQuerySegmentWalker createMockWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final File tmpDir,
@Nullable final QueryScheduler scheduler
final QueryScheduler scheduler
)
{
final QueryableIndex index1 = IndexBuilder

View File

@ -85,7 +85,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory,
@Nullable final QueryScheduler scheduler
final QueryScheduler scheduler
)
{
final JoinableFactory joinableFactoryToUse;
@ -116,7 +116,8 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
.put(LookupDataSource.class, new LookupSegmentWrangler(lookupProvider))
.build()
),
joinableFactoryToUse
joinableFactoryToUse,
scheduler
),
conglomerate,
new ServerConfig()
@ -146,7 +147,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
},
null,
null
QueryStackTests.DEFAULT_NOOP_SCHEDULER
);
}