diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 20842af91c2..80004403668 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1524,7 +1524,16 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h |Property|Description|Default| |--------|-----------|-------| -|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| +|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be an integer in the range 1 to 100, and will be rounded up|No default, must be set if using this mode| + + +###### 'Manual' laning strategy +This laning strategy is best suited for cases where one or more external applications which query Druid are capable of manually deciding what lane a given query should belong to. Configured with a map of lane names to percent or exact max capacities, queries with a matching `lane` parameter in the [query context](../querying/query-context.md) will be subjected to those limits. + +|Property|Description|Default| +|--------|-----------|-------| +|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.| +|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`| ##### Server Configuration diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index c8835419229..b8794d4c0bd 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -37,3 +37,5 @@ druid_cache_sizeInBytes=40000000 druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker druid_sql_avatica_enable=true druid_server_https_crlPath=/tls/revocations.crl +druid_query_scheduler_laning_strategy=manual +druid_query_scheduler_laning_lanes_one=1 \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java index 1cb9dbf44ae..15851c5dbdd 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java @@ -35,6 +35,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; public abstract class AbstractQueryResourceTestClient { @@ -87,4 +88,19 @@ public abstract class AbstractQueryResourceTestClient } } + public Future queryAsync(String url, QueryType query) + { + try { + return httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(query) + ), + StatusResponseHandler.getInstance() + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index b6d5e28801c..5f96ca00b49 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -41,6 +41,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.net.URL; import java.util.List; import java.util.Map; @@ -262,13 +263,29 @@ public class CoordinatorResourceTestClient return results2; } + @Nullable private Map>> getLookupLoadStatus() { String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL()); Map>> status; try { - StatusResponseHolder response = makeRequest(HttpMethod.GET, url); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.GET, new URL(url)), + responseHandler + ).get(); + + if (response.getStatus().getCode() == HttpResponseStatus.NOT_FOUND.getCode()) { + return null; + } + if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) { + throw new ISE( + "Error while making request to url[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } status = jsonMapper.readValue( response.getContent(), new TypeReference>>>() @@ -286,6 +303,10 @@ public class CoordinatorResourceTestClient { final Map>> status = getLookupLoadStatus(); + if (status == null) { + return false; + } + final Map> defaultTier = status.get("__default"); boolean isLoaded = true; diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java index d677af9d115..a84eda399dc 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java @@ -39,6 +39,7 @@ public abstract class AbstractTestQueryHelper @Override - protected String getQueryURL(String schemeAndHost) + public String getQueryURL(String schemeAndHost) { return StringUtils.format("%s/druid/v2?pretty", schemeAndHost); } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index 700a35e1a47..9461ed9ed79 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -19,16 +19,31 @@ package org.apache.druid.tests.query; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.http.client.response.StatusResponseHolder; +import org.apache.druid.query.Druids; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.server.QueryCapacityExceededException; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.clients.QueryResourceTestClient; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.ITRetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.tests.TestNGGroup; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Future; + @Test(groups = TestNGGroup.QUERY) @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITWikipediaQueryTest @@ -42,6 +57,10 @@ public class ITWikipediaQueryTest private CoordinatorResourceTestClient coordinatorClient; @Inject private TestQueryHelper queryHelper; + @Inject + private QueryResourceTestClient queryClient; + @Inject + private IntegrationTestingConfig config; @BeforeMethod public void before() throws Exception @@ -51,10 +70,12 @@ public class ITWikipediaQueryTest ITRetryUtil.retryUntilTrue( () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load" ); - coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE); - ITRetryUtil.retryUntilTrue( - () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load" - ); + if (!coordinatorClient.areLookupsLoaded(WIKI_LOOKUP)) { + coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE); + ITRetryUtil.retryUntilTrue( + () -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load" + ); + } } @Test @@ -62,4 +83,99 @@ public class ITWikipediaQueryTest { queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2); } + + @Test + public void testQueryLaningLaneIsLimited() throws Exception + { + // the broker is configured with a manually defined query lane, 'one' with limit 1 + // -Ddruid.query.scheduler.laning.type=manual + // -Ddruid.query.scheduler.laning.lanes.one=1 + // by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to + // get limited + final int numQueries = 50; + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add( + queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ) + ); + } + + int success = 0; + int limited = 0; + + for (Future future : futures) { + StatusResponseHolder status = future.get(); + if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) { + limited++; + Assert.assertTrue(status.getContent().contains(StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, "one"))); + } else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) { + success++; + } + } + + Assert.assertTrue(success > 0); + Assert.assertTrue(limited > 0); + + // test another to make sure we can still issue one query at a time + StatusResponseHolder followUp = queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ).get(); + + Assert.assertEquals(HttpResponseStatus.OK.getCode(), followUp.getStatus().getCode()); + + StatusResponseHolder andAnother = queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().build() + ).get(); + + Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode()); + } + + @Test + public void testQueryLaningWithNoLane() throws Exception + { + // the broker is configured with a manually defined query lane, 'one' with limit 1 + // -Ddruid.query.scheduler.laning.type=manual + // -Ddruid.query.scheduler.laning.lanes.one=1 + // these queries will not belong to the lane so none of them should be limited + final int numQueries = 50; + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add( + queryClient.queryAsync( + queryHelper.getQueryURL(config.getBrokerUrl()), + getQueryBuilder().context(ImmutableMap.of("queryId", UUID.randomUUID().toString())).build() + ) + ); + } + + int success = 0; + int limited = 0; + + for (Future future : futures) { + StatusResponseHolder status = future.get(); + if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) { + limited++; + } else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) { + success++; + } + } + + Assert.assertTrue(success > 0); + Assert.assertEquals(limited, 0); + + } + + private Druids.TimeseriesQueryBuilder getQueryBuilder() + { + return Druids.newTimeseriesQueryBuilder() + .dataSource("wikipedia_editstream") + .aggregators(new CountAggregatorFactory("chocula")) + .intervals("2013-01-01T00:00:00.000/2013-01-08T00:00:00.000") + .context(ImmutableMap.of("lane", "one", "queryId", UUID.randomUUID().toString())); + } } diff --git a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java index a2e922ef48d..d601384c665 100644 --- a/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/QueryLaningStrategy.java @@ -21,10 +21,12 @@ package org.apache.druid.server; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; import org.apache.druid.query.QueryPlus; import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy; +import org.apache.druid.server.scheduling.ManualQueryLaningStrategy; import org.apache.druid.server.scheduling.NoQueryLaningStrategy; import java.util.Optional; @@ -34,7 +36,8 @@ import java.util.Set; @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class), - @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class) + @JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class), + @JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class) }) public interface QueryLaningStrategy { @@ -50,4 +53,9 @@ public interface QueryLaningStrategy * This method must be thread safe */ Optional computeLane(QueryPlus query, Set segments); + + default int computeLimitFromPercent(int totalLimit, int value) + { + return Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100))); + } } diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java index 8338907cc68..93786d77004 100644 --- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java +++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java @@ -55,7 +55,7 @@ import java.util.Set; */ public class QueryScheduler implements QueryWatcher { - private static final int NO_CAPACITY = -1; + public static final int UNAVAILABLE = -1; static final String TOTAL = "default"; private final int totalCapacity; private final QueryPrioritizationStrategy prioritizationStrategy; @@ -173,7 +173,7 @@ public class QueryScheduler implements QueryWatcher { return laneRegistry.getConfiguration(TOTAL) .map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls()) - .orElse(NO_CAPACITY); + .orElse(UNAVAILABLE); } /** @@ -184,7 +184,7 @@ public class QueryScheduler implements QueryWatcher { return laneRegistry.getConfiguration(lane) .map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls()) - .orElse(NO_CAPACITY); + .orElse(UNAVAILABLE); } /** diff --git a/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java index af4e23bddf7..0a974228f66 100644 --- a/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java +++ b/server/src/main/java/org/apache/druid/server/scheduling/HiLoQueryLaningStrategy.java @@ -22,7 +22,6 @@ package org.apache.druid.server.scheduling; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; import org.apache.druid.client.SegmentServerSelector; @@ -60,7 +59,7 @@ public class HiLoQueryLaningStrategy implements QueryLaningStrategy public Object2IntMap getLaneLimits(int totalLimit) { Object2IntMap onlyLow = new Object2IntArrayMap<>(1); - onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100)))); + onlyLow.put(LOW, computeLimitFromPercent(totalLimit, maxLowPercent)); return onlyLow; } diff --git a/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java new file mode 100644 index 00000000000..a670c2fc455 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategy.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.objects.Object2IntArrayMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.client.SegmentServerSelector; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.server.QueryLaningStrategy; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ManualQueryLaningStrategy implements QueryLaningStrategy +{ + @JsonProperty + private Map lanes; + + @JsonProperty + private boolean isLimitPercent; + + @JsonCreator + public ManualQueryLaningStrategy( + @JsonProperty("lanes") Map lanes, + @JsonProperty("isLimitPercent") @Nullable Boolean isLimitPercent + ) + { + this.lanes = Preconditions.checkNotNull(lanes, "lanes must be set"); + this.isLimitPercent = isLimitPercent != null ? isLimitPercent : false; + Preconditions.checkArgument(lanes.size() > 0, "lanes must define at least one lane"); + Preconditions.checkArgument( + lanes.values().stream().allMatch(x -> this.isLimitPercent ? 0 < x && x <= 100 : x > 0), + this.isLimitPercent ? "All lane limits must be in the range 1 to 100" : "All lane limits must be greater than 0" + ); + } + + @Override + public Object2IntMap getLaneLimits(int totalLimit) + { + + if (isLimitPercent) { + Object2IntMap laneLimits = new Object2IntArrayMap<>(lanes.size()); + lanes.forEach((key, value) -> laneLimits.put(key, computeLimitFromPercent(totalLimit, value))); + return laneLimits; + } + return new Object2IntArrayMap<>(lanes); + } + + @Override + public Optional computeLane(QueryPlus query, Set segments) + { + return Optional.ofNullable(QueryContexts.getLane(query.getQuery())); + } +} diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java index 65d7cfde39b..0e3066bea37 100644 --- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java +++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java @@ -183,6 +183,7 @@ public class QuerySchedulerTest }); future.get(); Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -217,6 +218,7 @@ public class QuerySchedulerTest future.get(); Assert.assertEquals(5, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -272,7 +274,9 @@ public class QuerySchedulerTest Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); // too many reports - scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty()); + scheduler.run( + scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty() + ); } @Test @@ -309,7 +313,9 @@ public class QuerySchedulerTest Assert.assertEquals(0, scheduler.getTotalAvailableCapacity()); // one too many - scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty()); + scheduler.run( + scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty() + ); } @Test @@ -359,7 +365,8 @@ public class QuerySchedulerTest provider.inject(properties, injector.getInstance(JsonConfigurator.class)); final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); - Assert.assertEquals(-1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @Test @@ -380,6 +387,7 @@ public class QuerySchedulerTest final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); } @@ -421,6 +429,7 @@ public class QuerySchedulerTest final QueryScheduler scheduler = provider.get().get().get(); Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW)); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); Query query = scheduler.prioritizeAndLaneQuery( QueryPlus.wrap(makeDefaultQuery()), @@ -452,6 +461,51 @@ public class QuerySchedulerTest } + @Test + public void testConfigManual() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.put(propertyPrefix + ".numThreads", "10"); + properties.put(propertyPrefix + ".laning.strategy", "manual"); + properties.put(propertyPrefix + ".laning.lanes.one", "1"); + properties.put(propertyPrefix + ".laning.lanes.two", "2"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("two")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); + } + + @Test + public void testConfigManualPercent() + { + final Injector injector = createInjector(); + final String propertyPrefix = "druid.query.scheduler"; + final JsonConfigProvider provider = JsonConfigProvider.of( + propertyPrefix, + QuerySchedulerProvider.class + ); + final Properties properties = new Properties(); + properties.put(propertyPrefix + ".numThreads", "10"); + properties.put(propertyPrefix + ".laning.strategy", "manual"); + properties.put(propertyPrefix + ".laning.isLimitPercent", "true"); + properties.put(propertyPrefix + ".laning.lanes.one", "1"); + properties.put(propertyPrefix + ".laning.lanes.twenty", "20"); + provider.inject(properties, injector.getInstance(JsonConfigurator.class)); + final QueryScheduler scheduler = provider.get().get().get(); + Assert.assertEquals(10, scheduler.getTotalAvailableCapacity()); + Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one")); + Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("twenty")); + Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent")); + } + private void maybeDelayNextIteration(int i) throws InterruptedException { if (i > 0 && i % 10 == 0) { diff --git a/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java new file mode 100644 index 00000000000..7b04c4730c3 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/scheduling/ManualQueryLaningStrategyTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.scheduling; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Druids; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.server.QueryLaningStrategy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +@SuppressWarnings("ResultOfObjectAllocationIgnored") +public class ManualQueryLaningStrategyTest +{ + private Druids.TimeseriesQueryBuilder queryBuilder; + private QueryLaningStrategy exactStrategy; + private QueryLaningStrategy percentStrategy; + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() + { + this.queryBuilder = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .intervals(ImmutableList.of(Intervals.ETERNITY)) + .granularity(Granularities.DAY) + .aggregators(new CountAggregatorFactory("count")); + this.exactStrategy = + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), null); + this.percentStrategy = + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10, "one-hundred", 100), true); + } + + @Test + public void testLanesMustBeSet() + { + expectedException.expect(NullPointerException.class); + expectedException.expectMessage("lanes must be set"); + new ManualQueryLaningStrategy(null, null); + } + + @Test + public void testMustDefineAtLeastOneLane() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lanes must define at least one lane"); + new ManualQueryLaningStrategy(ImmutableMap.of(), null); + } + + @Test + public void testExactLaneLimitsMustBeAboveZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be greater than 0"); + new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 1), null); + } + + @Test + public void testPercentLaneLimitsMustBeAboveZero() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be in the range 1 to 100"); + new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 25), true); + } + + @Test + public void testPercentLaneLimitsMustBeLessThanOneHundred() + { + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("All lane limits must be in the range 1 to 100"); + new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "one-hundred-and-one", 101), true); + } + + @Test + public void testExactLimits() + { + Object2IntMap exactLanes = exactStrategy.getLaneLimits(50); + Assert.assertEquals(1, exactLanes.getInt("one")); + Assert.assertEquals(10, exactLanes.getInt("ten")); + } + + @Test + public void testPercentLimits() + { + Object2IntMap exactLanes = percentStrategy.getLaneLimits(50); + Assert.assertEquals(1, exactLanes.getInt("one")); + Assert.assertEquals(5, exactLanes.getInt("ten")); + Assert.assertEquals(50, exactLanes.getInt("one-hundred")); + } + + @Test + public void testDoesntSetLane() + { + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of()).build(); + Assert.assertFalse(exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + Assert.assertFalse(percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent()); + } + + @Test + public void testPreservesManualLaneFromContextThatArentInMapAndIgnoresThem() + { + final String someLane = "some-lane"; + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build(); + Assert.assertEquals( + someLane, + exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + Assert.assertEquals( + someLane, + percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } + + @Test + public void testPreservesManualLaneFromContext() + { + final String someLane = "ten"; + TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build(); + Assert.assertEquals( + someLane, + exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + Assert.assertEquals( + someLane, + percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get() + ); + } +}