mirror of
https://github.com/apache/druid.git
synced 2025-03-08 18:39:45 +00:00
Scan query: More accurate error message when segment per time chunk limit is exceeded. (#10630)
* Scan query: More accurate error message when segment per time chunk limit is exceeded. * Add guardrail test.
This commit is contained in:
parent
f9fc1892d1
commit
48e576a307
@ -38,6 +38,7 @@ import org.apache.druid.query.QueryPlus;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
import org.apache.druid.query.QueryRunnerFactory;
|
||||
import org.apache.druid.query.QueryToolChest;
|
||||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.SinkQueryRunners;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
@ -168,10 +169,10 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||
.max(Comparator.comparing(Integer::valueOf))
|
||||
.get();
|
||||
|
||||
int segmentPartitionLimit = query.getMaxSegmentPartitionsOrderedInMemory() == null
|
||||
int maxSegmentPartitionsOrderedInMemory = query.getMaxSegmentPartitionsOrderedInMemory() == null
|
||||
? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
|
||||
: query.getMaxSegmentPartitionsOrderedInMemory();
|
||||
if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
|
||||
if (maxNumPartitionsInSegment <= maxSegmentPartitionsOrderedInMemory) {
|
||||
// Use n-way merge strategy
|
||||
|
||||
// Create a list of grouped runner lists (i.e. each sublist/"runner group" corresponds to an interval) ->
|
||||
@ -188,14 +189,15 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
||||
|
||||
return nWayMergeAndLimit(groupedRunners, queryPlus, responseContext);
|
||||
}
|
||||
throw new UOE(
|
||||
"Time ordering for queries of %,d partitions per segment and a row limit of %,d is not supported."
|
||||
+ " Try reducing the scope of the query to scan fewer partitions than the configurable limit of"
|
||||
+ " %,d partitions or lower the row limit below %,d.",
|
||||
throw new ResourceLimitExceededException(
|
||||
"Time ordering is not supported for a Scan query with %,d segments per time chunk and a row limit of %,d. "
|
||||
+ "Try reducing your query limit below maxRowsQueuedForOrdering (currently %,d), or using compaction to "
|
||||
+ "reduce the number of segments per time chunk, or raising maxSegmentPartitionsOrderedInMemory "
|
||||
+ "(currently %,d) above the number of segments you have per time chunk.",
|
||||
maxNumPartitionsInSegment,
|
||||
query.getScanRowsLimit(),
|
||||
scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory(),
|
||||
scanQueryConfig.getMaxRowsQueuedForOrdering()
|
||||
maxRowsQueuedForOrdering,
|
||||
maxSegmentPartitionsOrderedInMemory
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,9 @@ package org.apache.druid.query.scan;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.UOE;
|
||||
import org.apache.druid.java.util.common.concurrent.Execs;
|
||||
import org.apache.druid.java.util.common.guava.Sequence;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
|
||||
@ -29,6 +31,7 @@ import org.apache.druid.query.Druids;
|
||||
import org.apache.druid.query.QueryPlus;
|
||||
import org.apache.druid.query.QueryRunner;
|
||||
import org.apache.druid.query.QueryRunnerTestHelper;
|
||||
import org.apache.druid.query.ResourceLimitExceededException;
|
||||
import org.apache.druid.query.SegmentDescriptor;
|
||||
import org.apache.druid.query.context.ResponseContext;
|
||||
import org.apache.druid.query.spec.LegacySegmentSpec;
|
||||
@ -38,8 +41,10 @@ import org.apache.druid.query.spec.QuerySegmentSpec;
|
||||
import org.apache.druid.query.spec.SpecificSegmentSpec;
|
||||
import org.joda.time.Interval;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.runners.Enclosed;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@ -48,19 +53,35 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
|
||||
@RunWith(Enclosed.class)
|
||||
public class ScanQueryRunnerFactoryTest
|
||||
{
|
||||
private static final ScanQueryConfig CONFIG = new ScanQueryConfig()
|
||||
{
|
||||
@Override
|
||||
public int getMaxRowsQueuedForOrdering()
|
||||
{
|
||||
return 10000;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxSegmentPartitionsOrderedInMemory()
|
||||
{
|
||||
return 4;
|
||||
}
|
||||
};
|
||||
|
||||
private static final ScanQueryRunnerFactory FACTORY = new ScanQueryRunnerFactory(
|
||||
new ScanQueryQueryToolChest(
|
||||
new ScanQueryConfig(),
|
||||
CONFIG,
|
||||
DefaultGenericQueryMetricsFactory.instance()
|
||||
),
|
||||
new ScanQueryEngine(),
|
||||
new ScanQueryConfig()
|
||||
CONFIG
|
||||
);
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
@ -275,6 +296,9 @@ public class ScanQueryRunnerFactoryTest
|
||||
DateTimes.of("2019-01-01").plusHours(1)
|
||||
), "1", 0);
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testGetValidIntervalsFromSpec()
|
||||
{
|
||||
@ -319,5 +343,43 @@ public class ScanQueryRunnerFactoryTest
|
||||
);
|
||||
FACTORY.getIntervalsFromSpecificQuerySpec(legacySpec);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testMergeRunnersGuardrailsExceeded()
|
||||
{
|
||||
final QueryRunner<ScanResultValue> runner = FACTORY.mergeRunners(
|
||||
Execs.directExecutor(),
|
||||
IntStream.range(0, CONFIG.getMaxSegmentPartitionsOrderedInMemory() + 1)
|
||||
.mapToObj(i -> (QueryRunner<ScanResultValue>) (queryPlus, responseContext) -> Sequences.empty())
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
|
||||
expectedException.expect(ResourceLimitExceededException.class);
|
||||
expectedException.expectMessage(
|
||||
"Time ordering is not supported for a Scan query with 5 segments per time chunk and a row limit of 10,001. "
|
||||
+ "Try reducing your query limit below maxRowsQueuedForOrdering (currently 10,000), or using compaction to "
|
||||
+ "reduce the number of segments per time chunk, or raising maxSegmentPartitionsOrderedInMemory "
|
||||
+ "(currently 4) above the number of segments you have per time chunk."
|
||||
);
|
||||
|
||||
runner.run(
|
||||
QueryPlus.wrap(
|
||||
Druids.newScanQueryBuilder()
|
||||
.dataSource("foo")
|
||||
.limit(CONFIG.getMaxRowsQueuedForOrdering() + 1)
|
||||
.intervals(
|
||||
new MultipleSpecificSegmentSpec(
|
||||
IntStream.range(0, CONFIG.getMaxSegmentPartitionsOrderedInMemory() + 1)
|
||||
.mapToObj(i -> new SegmentDescriptor(Intervals.ETERNITY, "v0", i))
|
||||
.collect(Collectors.toList())
|
||||
)
|
||||
)
|
||||
.order(ScanQuery.Order.ASCENDING)
|
||||
.build()
|
||||
),
|
||||
ResponseContext.createEmpty()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user