modify QueryScheduler to lazily acquire lanes when executing queries to avoid leaks (#14184)

This PR fixes an issue that could occur if druid.query.scheduler.numThreads is configured and any exception occurs after QueryScheduler.run has been called to create a Sequence. This would result in total and/or lane specific locks being acquired, but because the sequence was not actually being evaluated, the "baggage" which typically releases these locks was not being executed. An example of how this can happen is if a group-by having filter, which wraps and transforms this sequence happens to explode while wrapping the sequence. The end result is that the locks are acquired, but never released, eventually halting the ability to execute any queries.
This commit is contained in:
Clint Wylie 2023-05-07 23:12:05 -07:00 committed by GitHub
parent 4d8feeb279
commit a7a4bfd331
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 221 additions and 52 deletions

View File

@ -228,6 +228,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
return GroupByStrategySelector.STRATEGY_V1;
}
};
final GroupByQueryConfig v1SingleThreadedConfig = new GroupByQueryConfig()
{
@Override
@ -361,6 +362,20 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config
)
{
return makeQueryRunnerFactory(
DEFAULT_MAPPER,
config,
new TestGroupByBuffers(
DEFAULT_PROCESSING_CONFIG.intermediateComputeSizeBytes(),
DEFAULT_PROCESSING_CONFIG.getNumMergeBuffers()
),
DEFAULT_PROCESSING_CONFIG
);
}
public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
final GroupByQueryConfig config,
final TestGroupByBuffers bufferPools

View File

@ -32,6 +32,8 @@ import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@ -185,8 +187,23 @@ public class QueryScheduler implements QueryWatcher
*/
public <T> Sequence<T> run(Query<?> query, Sequence<T> resultSequence)
{
List<Bulkhead> bulkheads = acquireLanes(query);
return resultSequence.withBaggage(() -> finishLanes(bulkheads));
return Sequences.wrap(resultSequence, new SequenceWrapper()
{
private List<Bulkhead> bulkheads = null;
@Override
public void before()
{
bulkheads = acquireLanes(query);
}
@Override
public void after(boolean isDone, Throwable thrown)
{
if (bulkheads != null) {
finishLanes(bulkheads);
}
}
});
}
/**

View File

@ -896,7 +896,7 @@ public class QueryResourceTest
assertAsyncResponseAndCountdownOrBlockForever(
SIMPLE_TIMESERIES_QUERY,
waitAllFinished,
response -> Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus())
response -> Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus())
);
waitTwoScheduled.await();
assertSynchronousResponseAndCountdownOrBlockForever(
@ -1043,19 +1043,21 @@ public class QueryResourceTest
return (queryPlus, responseContext) -> {
beforeScheduler.forEach(CountDownLatch::countDown);
return scheduler.run(
scheduler.prioritizeAndLaneQuery(queryPlus, ImmutableSet.of()),
new LazySequence<T>(() -> {
inScheduler.forEach(CountDownLatch::countDown);
try {
// pretend to be a query that is waiting on results
Thread.sleep(500);
}
catch (InterruptedException ignored) {
}
// all that waiting for nothing :(
return Sequences.empty();
})
return Sequences.simple(
scheduler.run(
scheduler.prioritizeAndLaneQuery(queryPlus, ImmutableSet.of()),
new LazySequence<T>(() -> {
inScheduler.forEach(CountDownLatch::countDown);
try {
// pretend to be a query that is waiting on results
Thread.sleep(500);
}
catch (InterruptedException ignored) {
}
// all that waiting for nothing :(
return Sequences.empty();
})
).toList()
);
};
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
@ -46,10 +47,20 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.having.HavingSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.server.initialization.ServerConfig;
@ -60,9 +71,7 @@ import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
@ -81,9 +90,6 @@ public class QuerySchedulerTest
private static final int TEST_HI_CAPACITY = 5;
private static final int TEST_LO_CAPACITY = 2;
@Rule
public ExpectedException expected = ExpectedException.none();
private ListeningExecutorService executorService;
private ObservableQueryScheduler scheduler;
@ -176,10 +182,8 @@ public class QuerySchedulerTest
}
@Test
public void testHiLoReleaseLaneWhenSequenceExplodes() throws Exception
public void testHiLoReleaseLaneWhenSequenceExplodes()
{
expected.expectMessage("exploded");
expected.expect(ExecutionException.class);
TopNQuery interactive = makeInteractiveQuery();
ListenableFuture<?> future = executorService.submit(() -> {
try {
@ -204,71 +208,91 @@ public class QuerySchedulerTest
throw new RuntimeException(ex);
}
});
future.get();
Throwable t = Assert.assertThrows(ExecutionException.class, future::get);
Assert.assertEquals("java.lang.RuntimeException: exploded", t.getMessage());
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
}
@Test
public void testHiLoFailsWhenOutOfLaneCapacity()
{
expected.expectMessage(
QueryCapacityExceededException.makeLaneErrorMessage(HiLoQueryLaningStrategy.LOW, TEST_LO_CAPACITY)
);
expected.expect(QueryCapacityExceededException.class);
Query<?> report1 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report1, Sequences.empty());
Sequence<?> sequence = scheduler.run(report1, Sequences.empty());
// making the sequence doesn't count, only running it does
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
// this counts though since we are doing stuff
Yielders.each(sequence);
Assert.assertNotNull(report1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> report2 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report2, Sequences.empty());
Yielders.each(scheduler.run(report2, Sequences.empty()));
Assert.assertNotNull(report2);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
// too many reports
scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty()
Throwable t = Assert.assertThrows(
QueryCapacityExceededException.class,
() -> Yielders.each(
scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()),
Sequences.empty()
)
)
);
Assert.assertEquals(
"Too many concurrent queries for lane 'low', query capacity of 2 exceeded. Please try your query again later.",
t.getMessage()
);
}
@Test
public void testHiLoFailsWhenOutOfTotalCapacity()
{
expected.expectMessage(QueryCapacityExceededException.makeTotalErrorMessage(TEST_HI_CAPACITY));
expected.expect(QueryCapacityExceededException.class);
Query<?> interactive1 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive1, Sequences.empty());
Sequence<?> sequence = scheduler.run(interactive1, Sequences.empty());
// making the sequence doesn't count, only running it does
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
// this counts tho
Yielders.each(sequence);
Assert.assertNotNull(interactive1);
Assert.assertEquals(4, scheduler.getTotalAvailableCapacity());
Query<?> report1 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report1, Sequences.empty());
Yielders.each(scheduler.run(report1, Sequences.empty()));
Assert.assertNotNull(report1);
Assert.assertEquals(3, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive2 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive2, Sequences.empty());
Yielders.each(scheduler.run(interactive2, Sequences.empty()));
Assert.assertNotNull(interactive2);
Assert.assertEquals(2, scheduler.getTotalAvailableCapacity());
Query<?> report2 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of());
scheduler.run(report2, Sequences.empty());
Yielders.each(scheduler.run(report2, Sequences.empty()));
Assert.assertNotNull(report2);
Assert.assertEquals(1, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Query<?> interactive3 = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of());
scheduler.run(interactive3, Sequences.empty());
Yielders.each(scheduler.run(interactive3, Sequences.empty()));
Assert.assertNotNull(interactive3);
Assert.assertEquals(0, scheduler.getTotalAvailableCapacity());
// one too many
scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty()
Throwable t = Assert.assertThrows(
QueryCapacityExceededException.class,
() -> Yielders.each(scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()),
Sequences.empty()
))
);
Assert.assertEquals(
"Too many concurrent queries, total query capacity of 5 exceeded. Please try your query again later.",
t.getMessage()
);
}
@ -324,6 +348,74 @@ public class QuerySchedulerTest
getFuturesAndAssertAftermathIsChill(futures, scheduler, true, true);
}
@Test
public void testExplodingWrapperDoesNotLeakLocks()
{
scheduler = new ObservableQueryScheduler(
5,
ManualQueryPrioritizationStrategy.INSTANCE,
new NoQueryLaningStrategy(),
new ServerConfig()
);
QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(
new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
@Override
public String toString()
{
return "v2";
}
}
);
Future<?> f = makeMergingQueryFuture(
executorService,
scheduler,
GroupByQuery.builder()
.setDataSource("foo")
.setInterval("2020-01-01/2020-01-02")
.setDimensions(DefaultDimensionSpec.of("bar"))
.setAggregatorSpecs(new CountAggregatorFactory("chocula"))
.setGranularity(Granularities.ALL)
.setHavingSpec(
new HavingSpec()
{
@Override
public void setQuery(GroupByQuery query)
{
throw new RuntimeException("exploded");
}
@Override
public boolean eval(ResultRow row)
{
return false;
}
@Override
public byte[] getCacheKey()
{
return new byte[0];
}
}
)
.build(),
factory.getToolchest(),
NUM_ROWS
);
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
Throwable t = Assert.assertThrows(Throwable.class, f::get);
Assert.assertEquals("java.lang.RuntimeException: exploded", t.getMessage());
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
}
@Test
public void testConfigNone()
{
@ -367,7 +459,6 @@ public class QuerySchedulerTest
@Test
public void testMisConfigHiLo()
{
expected.expect(ProvisionException.class);
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
@ -377,9 +468,16 @@ public class QuerySchedulerTest
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get().get());
Assert.assertEquals(
"Unable to provision, see the following errors:\n"
+ "\n"
+ "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot construct instance of `org.apache.druid.server.scheduling.HiLoQueryLaningStrategy`, problem: maxLowPercent must be set\n"
+ " at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: org.apache.druid.server.QuerySchedulerProvider[\"laning\"]).\n"
+ "\n"
+ "1 error",
t.getMessage()
);
}
@Test
@ -418,7 +516,6 @@ public class QuerySchedulerTest
@Test
public void testMisConfigThreshold()
{
expected.expect(ProvisionException.class);
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
@ -428,9 +525,16 @@ public class QuerySchedulerTest
final Properties properties = new Properties();
properties.setProperty(propertyPrefix + ".prioritization.strategy", "threshold");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get().get());
Assert.assertEquals(
"Unable to provision, see the following errors:\n"
+ "\n"
+ "1) Problem parsing object at prefix[druid.query.scheduler]: Cannot construct instance of `org.apache.druid.server.scheduling.ThresholdBasedQueryPrioritizationStrategy`, problem: periodThreshold, durationThreshold, or segmentCountThreshold must be set\n"
+ " at [Source: UNKNOWN; line: -1, column: -1] (through reference chain: org.apache.druid.server.QuerySchedulerProvider[\"prioritization\"]).\n"
+ "\n"
+ "1 error",
t.getMessage()
);
}
@ -497,6 +601,7 @@ public class QuerySchedulerTest
.context(ImmutableMap.of("queryId", "default-" + UUID.randomUUID()))
.build();
}
private TopNQuery makeInteractiveQuery()
{
return makeBaseBuilder()
@ -638,6 +743,36 @@ public class QuerySchedulerTest
});
}
private ListenableFuture<?> makeMergingQueryFuture(
ListeningExecutorService executorService,
QueryScheduler scheduler,
Query<?> query,
QueryToolChest toolChest,
int numRows
)
{
return executorService.submit(() -> {
try {
Query<?> scheduled = scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(query), ImmutableSet.of());
Assert.assertNotNull(scheduled);
FluentQueryRunnerBuilder fluentQueryRunnerBuilder = new FluentQueryRunnerBuilder(toolChest);
FluentQueryRunnerBuilder.FluentQueryRunner runner = fluentQueryRunnerBuilder.create((queryPlus, responseContext) -> {
Sequence<Integer> underlyingSequence = makeSequence(numRows);
Sequence<Integer> results = scheduler.run(scheduled, underlyingSequence);
return results;
});
final int actualNumRows = consumeAndCloseSequence(runner.mergeResults().run(QueryPlus.wrap(query)));
Assert.assertEquals(actualNumRows, numRows);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
});
}
private void getFuturesAndAssertAftermathIsChill(
List<Future<?>> futures,