diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategy.java index 9d3909b9358..f735e7b499d 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategy.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.query.DataSource; import org.apache.druid.query.Query; import org.apache.druid.query.QueryPlus; import org.apache.druid.server.QueryPrioritizationStrategy; @@ -34,6 +35,7 @@ import org.joda.time.base.AbstractInterval; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Optional; import java.util.Set; @@ -51,6 +53,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz private final Optional periodThreshold; private final Optional durationThreshold; private final Optional segmentRangeThreshold; + private final Set exemptDatasources; @JsonCreator public ThresholdBasedQueryPrioritizationStrategy( @@ -58,6 +61,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz @JsonProperty("durationThreshold") @Nullable String durationThresholdString, @JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold, @JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString, + @JsonProperty("exemptDatasources") @Nullable Set exemptDatasources, @JsonProperty("adjustment") @Nullable Integer adjustment ) { @@ -72,6 +76,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz this.segmentRangeThreshold = segmentRangeThresholdString == null ? Optional.empty() : Optional.of(new Period(segmentRangeThresholdString).toStandardDuration()); + this.exemptDatasources = (exemptDatasources == null) ? Collections.emptySet():exemptDatasources; Preconditions.checkArgument( segmentCountThreshold != null || periodThreshold.isPresent() || durationThreshold.isPresent() || segmentRangeThreshold.isPresent(), "periodThreshold, durationThreshold, segmentCountThreshold or segmentRangeThreshold must be set" @@ -82,6 +87,15 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz public Optional computePriority(QueryPlus query, Set segments) { Query theQuery = query.getQuery(); + DataSource datasource = theQuery.getDataSource(); + + if (!exemptDatasources.isEmpty()) { + boolean isExempt = exemptDatasources.containsAll(datasource.getTableNames()); + if(isExempt) { + return Optional.empty(); + } + } + final boolean violatesPeriodThreshold = periodThreshold.map(duration -> { final DateTime periodThresholdStartDate = DateTimes.nowUtc().minus(duration); return theQuery.getIntervals() diff --git a/server/src/test/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategyTest.java index 1cc46783d03..1feaa3643c8 100644 --- a/server/src/test/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategyTest.java +++ b/server/src/test/java/org/apache/druid/server/scheduling/ThresholdBasedQueryPrioritizationStrategyTest.java @@ -60,7 +60,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest public void testPrioritizationPeriodThresholdInsidePeriod() { QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy( - "P90D", null, null, null, adjustment); + "P90D", null, null, null,null, adjustment); DateTime startDate = DateTimes.nowUtc().minusDays(1); DateTime endDate = DateTimes.nowUtc(); TimeseriesQuery query = queryBuilder.intervals(ImmutableList.of(new Interval(startDate, endDate))) @@ -81,6 +81,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest null, null, null, + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(100); @@ -104,6 +105,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest "P7D", null, null, + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(1); @@ -126,6 +128,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest "P7D", null, null, + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(20); @@ -149,6 +152,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest null, 2, null, + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(1); @@ -174,6 +178,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest null, 2, null, + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(20); @@ -204,6 +209,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest null, null, "P7D", + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(1); @@ -228,6 +234,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest null, null, "P7D", + null, adjustment ); DateTime startDate = DateTimes.nowUtc().minusDays(20); @@ -244,4 +251,68 @@ public class ThresholdBasedQueryPrioritizationStrategyTest (int) strategy.computePriority(QueryPlus.wrap(query), ImmutableSet.of(segmentServerSelector)).get() ); } + + @Test + public void testPrioritizationWithExemptDatasource() + { + QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy( + null, + null, + 2, + null, + ImmutableSet.of("exemptDatasource"), + adjustment + ); + DateTime startDate = DateTimes.nowUtc().minusDays(20); + DateTime endDate = DateTimes.nowUtc(); + TimeseriesQuery query = queryBuilder.intervals(ImmutableList.of(new Interval(startDate, endDate))) + .granularity(Granularities.HOUR) + .context(ImmutableMap.of()) + .build(); + + Assert.assertFalse( + strategy.computePriority( + QueryPlus.wrap(query), + ImmutableSet.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ) + ).isPresent() + ); + } + + @Test + public void testPrioritizationWithNonExemptDatasource() + { + QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy( + null, + null, + 2, + null, + ImmutableSet.of("exemptDatasource"), + adjustment + ); + DateTime startDate = DateTimes.nowUtc().minusDays(20); + DateTime endDate = DateTimes.nowUtc(); + TimeseriesQuery query = queryBuilder.dataSource("nonExemptDatasource") + .intervals(ImmutableList.of(new Interval(startDate, endDate))) + .granularity(Granularities.HOUR) + .context(ImmutableMap.of()) + .build(); + + // Since "test" is not in the exempt list, priority should be adjusted + Assert.assertEquals( + -adjustment, + (int) strategy.computePriority( + QueryPlus.wrap(query), + ImmutableSet.of( + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class), + EasyMock.createMock(SegmentServerSelector.class) + ) + ).get() + ); + } + }