mirror of https://github.com/apache/druid.git
Adding in the option to provide datasource(s) that are high-priority, which should not be considered for de-prioritization
This commit is contained in:
parent
fe0f4150c9
commit
a89d5cd512
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import com.google.common.base.Preconditions;
|
||||
import org.apache.druid.client.SegmentServerSelector;
|
||||
import org.apache.druid.java.util.common.DateTimes;
|
||||
import org.apache.druid.query.DataSource;
|
||||
import org.apache.druid.query.Query;
|
||||
import org.apache.druid.query.QueryPlus;
|
||||
import org.apache.druid.server.QueryPrioritizationStrategy;
|
||||
|
@ -34,6 +35,7 @@ import org.joda.time.base.AbstractInterval;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -51,6 +53,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz
|
|||
private final Optional<Duration> periodThreshold;
|
||||
private final Optional<Duration> durationThreshold;
|
||||
private final Optional<Duration> segmentRangeThreshold;
|
||||
private final Set<String> exemptDatasources;
|
||||
|
||||
@JsonCreator
|
||||
public ThresholdBasedQueryPrioritizationStrategy(
|
||||
|
@ -58,6 +61,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz
|
|||
@JsonProperty("durationThreshold") @Nullable String durationThresholdString,
|
||||
@JsonProperty("segmentCountThreshold") @Nullable Integer segmentCountThreshold,
|
||||
@JsonProperty("segmentRangeThreshold") @Nullable String segmentRangeThresholdString,
|
||||
@JsonProperty("exemptDatasources") @Nullable Set<String> exemptDatasources,
|
||||
@JsonProperty("adjustment") @Nullable Integer adjustment
|
||||
)
|
||||
{
|
||||
|
@ -72,6 +76,7 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz
|
|||
this.segmentRangeThreshold = segmentRangeThresholdString == null
|
||||
? Optional.empty()
|
||||
: Optional.of(new Period(segmentRangeThresholdString).toStandardDuration());
|
||||
this.exemptDatasources = (exemptDatasources == null) ? Collections.emptySet():exemptDatasources;
|
||||
Preconditions.checkArgument(
|
||||
segmentCountThreshold != null || periodThreshold.isPresent() || durationThreshold.isPresent() || segmentRangeThreshold.isPresent(),
|
||||
"periodThreshold, durationThreshold, segmentCountThreshold or segmentRangeThreshold must be set"
|
||||
|
@ -82,6 +87,15 @@ public class ThresholdBasedQueryPrioritizationStrategy implements QueryPrioritiz
|
|||
public <T> Optional<Integer> computePriority(QueryPlus<T> query, Set<SegmentServerSelector> segments)
|
||||
{
|
||||
Query<T> theQuery = query.getQuery();
|
||||
DataSource datasource = theQuery.getDataSource();
|
||||
|
||||
if (!exemptDatasources.isEmpty()) {
|
||||
boolean isExempt = exemptDatasources.containsAll(datasource.getTableNames());
|
||||
if(isExempt) {
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
final boolean violatesPeriodThreshold = periodThreshold.map(duration -> {
|
||||
final DateTime periodThresholdStartDate = DateTimes.nowUtc().minus(duration);
|
||||
return theQuery.getIntervals()
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
public void testPrioritizationPeriodThresholdInsidePeriod()
|
||||
{
|
||||
QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy(
|
||||
"P90D", null, null, null, adjustment);
|
||||
"P90D", null, null, null,null, adjustment);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(1);
|
||||
DateTime endDate = DateTimes.nowUtc();
|
||||
TimeseriesQuery query = queryBuilder.intervals(ImmutableList.of(new Interval(startDate, endDate)))
|
||||
|
@ -81,6 +81,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(100);
|
||||
|
@ -104,6 +105,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
"P7D",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(1);
|
||||
|
@ -126,6 +128,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
"P7D",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(20);
|
||||
|
@ -149,6 +152,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
null,
|
||||
2,
|
||||
null,
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(1);
|
||||
|
@ -174,6 +178,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
null,
|
||||
2,
|
||||
null,
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(20);
|
||||
|
@ -204,6 +209,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
null,
|
||||
null,
|
||||
"P7D",
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(1);
|
||||
|
@ -228,6 +234,7 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
null,
|
||||
null,
|
||||
"P7D",
|
||||
null,
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(20);
|
||||
|
@ -244,4 +251,68 @@ public class ThresholdBasedQueryPrioritizationStrategyTest
|
|||
(int) strategy.computePriority(QueryPlus.wrap(query), ImmutableSet.of(segmentServerSelector)).get()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizationWithExemptDatasource()
|
||||
{
|
||||
QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy(
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
null,
|
||||
ImmutableSet.of("exemptDatasource"),
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(20);
|
||||
DateTime endDate = DateTimes.nowUtc();
|
||||
TimeseriesQuery query = queryBuilder.intervals(ImmutableList.of(new Interval(startDate, endDate)))
|
||||
.granularity(Granularities.HOUR)
|
||||
.context(ImmutableMap.of())
|
||||
.build();
|
||||
|
||||
Assert.assertFalse(
|
||||
strategy.computePriority(
|
||||
QueryPlus.wrap(query),
|
||||
ImmutableSet.of(
|
||||
EasyMock.createMock(SegmentServerSelector.class),
|
||||
EasyMock.createMock(SegmentServerSelector.class),
|
||||
EasyMock.createMock(SegmentServerSelector.class)
|
||||
)
|
||||
).isPresent()
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrioritizationWithNonExemptDatasource()
|
||||
{
|
||||
QueryPrioritizationStrategy strategy = new ThresholdBasedQueryPrioritizationStrategy(
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
null,
|
||||
ImmutableSet.of("exemptDatasource"),
|
||||
adjustment
|
||||
);
|
||||
DateTime startDate = DateTimes.nowUtc().minusDays(20);
|
||||
DateTime endDate = DateTimes.nowUtc();
|
||||
TimeseriesQuery query = queryBuilder.dataSource("nonExemptDatasource")
|
||||
.intervals(ImmutableList.of(new Interval(startDate, endDate)))
|
||||
.granularity(Granularities.HOUR)
|
||||
.context(ImmutableMap.of())
|
||||
.build();
|
||||
|
||||
// Since "test" is not in the exempt list, priority should be adjusted
|
||||
Assert.assertEquals(
|
||||
-adjustment,
|
||||
(int) strategy.computePriority(
|
||||
QueryPlus.wrap(query),
|
||||
ImmutableSet.of(
|
||||
EasyMock.createMock(SegmentServerSelector.class),
|
||||
EasyMock.createMock(SegmentServerSelector.class),
|
||||
EasyMock.createMock(SegmentServerSelector.class)
|
||||
)
|
||||
).get()
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue