add manual laning strategy, integration test (#9492)

* add manual laning strategy, integration test, json config test

* share percent conversion method

* wrong assert

* review stuffs

* doc adjustments

* more tests

* test adjustment

* adjust docs

* Update index.md
This commit is contained in:
Clint Wylie 2020-03-13 20:06:55 -07:00 committed by GitHub
parent bcb9a632c7
commit 69af760a19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 479 additions and 19 deletions

View File

@ -1524,7 +1524,16 @@ This strategy can be enabled by setting `druid.query.scheduler.laning.strategy=h
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of`druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|
|`druid.query.scheduler.laning.maxLowPercent`|Maximum percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`, defining the number of HTTP threads that can be used by queries with a priority lower than 0. Value must be an integer in the range 1 to 100, and will be rounded up|No default, must be set if using this mode|
###### 'Manual' laning strategy
This laning strategy is best suited for cases where one or more external applications which query Druid are capable of manually deciding what lane a given query should belong to. Configured with a map of lane names to percent or exact max capacities, queries with a matching `lane` parameter in the [query context](../querying/query-context.md) will be subjected to those limits.
|Property|Description|Default|
|--------|-----------|-------|
|`druid.query.scheduler.laning.lanes.{name}`|Maximum percent or exact limit of queries that can concurrently run in the defined lanes. Any number of lanes may be defined like this.|No default, must define at least one lane with a limit above 0. If `druid.query.scheduler.laning.isLimitPercent` is set to `true`, values must be integers in the range of 1 to 100.|
|`druid.query.scheduler.laning.isLimitPercent`|If set to `true`, the values set for `druid.query.scheduler.laning.lanes` will be treated as a percent of the smaller number of `druid.server.http.numThreads` or `druid.query.scheduler.numThreads`. Note that in this mode, these lane values across lanes are _not_ required to add up to, and can exceed, 100%.|`false`|
##### Server Configuration

View File

@ -37,3 +37,5 @@ druid_cache_sizeInBytes=40000000
druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker
druid_sql_avatica_enable=true
druid_server_https_crlPath=/tls/revocations.crl
druid_query_scheduler_laning_strategy=manual
druid_query_scheduler_laning_lanes_one=1

View File

@ -35,6 +35,7 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
public abstract class AbstractQueryResourceTestClient<QueryType>
{
@ -87,4 +88,19 @@ public abstract class AbstractQueryResourceTestClient<QueryType>
}
}
public Future<StatusResponseHolder> queryAsync(String url, QueryType query)
{
try {
return httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
),
StatusResponseHandler.getInstance()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -41,6 +41,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.net.URL;
import java.util.List;
import java.util.Map;
@ -262,13 +263,29 @@ public class CoordinatorResourceTestClient
return results2;
}
@Nullable
private Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> getLookupLoadStatus()
{
String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL());
Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, url);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.GET, new URL(url)),
responseHandler
).get();
if (response.getStatus().getCode() == HttpResponseStatus.NOT_FOUND.getCode()) {
return null;
}
if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
throw new ISE(
"Error while making request to url[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}
status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>>>()
@ -286,6 +303,10 @@ public class CoordinatorResourceTestClient
{
final Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status = getLookupLoadStatus();
if (status == null) {
return false;
}
final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> defaultTier = status.get("__default");
boolean isLoaded = true;

View File

@ -39,6 +39,7 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
{
public static final Logger LOG = new Logger(TestQueryHelper.class);
private final AbstractQueryResourceTestClient queryClient;
private final ObjectMapper jsonMapper;
protected final String broker;
@ -64,7 +65,7 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
public abstract void testQueriesFromFile(String filePath, int timesToRun) throws Exception;
protected abstract String getQueryURL(String schemeAndHost);
public abstract String getQueryURL(String schemeAndHost);
public void testQueriesFromFile(String url, String filePath, int timesToRun) throws Exception
{
@ -145,5 +146,4 @@ public abstract class AbstractTestQueryHelper<QueryResultType extends AbstractQu
return (Integer) map.get("rows");
}
}
}

View File

@ -48,7 +48,7 @@ public class SqlTestQueryHelper extends AbstractTestQueryHelper<SqlQueryWithResu
}
@Override
protected String getQueryURL(String schemeAndHost)
public String getQueryURL(String schemeAndHost)
{
return StringUtils.format("%s/druid/v2/sql", schemeAndHost);
}

View File

@ -57,7 +57,7 @@ public class TestQueryHelper extends AbstractTestQueryHelper<QueryWithResults>
@Override
protected String getQueryURL(String schemeAndHost)
public String getQueryURL(String schemeAndHost)
{
return StringUtils.format("%s/druid/v2?pretty", schemeAndHost);
}

View File

@ -19,16 +19,31 @@
package org.apache.druid.tests.query;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.QueryResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Future;
@Test(groups = TestNGGroup.QUERY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITWikipediaQueryTest
@ -42,6 +57,10 @@ public class ITWikipediaQueryTest
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private TestQueryHelper queryHelper;
@Inject
private QueryResourceTestClient queryClient;
@Inject
private IntegrationTestingConfig config;
@BeforeMethod
public void before() throws Exception
@ -51,10 +70,12 @@ public class ITWikipediaQueryTest
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
if (!coordinatorClient.areLookupsLoaded(WIKI_LOOKUP)) {
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
}
}
@Test
@ -62,4 +83,99 @@ public class ITWikipediaQueryTest
{
queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2);
}
@Test
public void testQueryLaningLaneIsLimited() throws Exception
{
// the broker is configured with a manually defined query lane, 'one' with limit 1
// -Ddruid.query.scheduler.laning.type=manual
// -Ddruid.query.scheduler.laning.lanes.one=1
// by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to
// get limited
final int numQueries = 50;
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
for (int i = 0; i < numQueries; i++) {
futures.add(
queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
)
);
}
int success = 0;
int limited = 0;
for (Future<StatusResponseHolder> future : futures) {
StatusResponseHolder status = future.get();
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
limited++;
Assert.assertTrue(status.getContent().contains(StringUtils.format(QueryCapacityExceededException.ERROR_MESSAGE_TEMPLATE, "one")));
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
success++;
}
}
Assert.assertTrue(success > 0);
Assert.assertTrue(limited > 0);
// test another to make sure we can still issue one query at a time
StatusResponseHolder followUp = queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
).get();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), followUp.getStatus().getCode());
StatusResponseHolder andAnother = queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().build()
).get();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), andAnother.getStatus().getCode());
}
@Test
public void testQueryLaningWithNoLane() throws Exception
{
// the broker is configured with a manually defined query lane, 'one' with limit 1
// -Ddruid.query.scheduler.laning.type=manual
// -Ddruid.query.scheduler.laning.lanes.one=1
// these queries will not belong to the lane so none of them should be limited
final int numQueries = 50;
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
for (int i = 0; i < numQueries; i++) {
futures.add(
queryClient.queryAsync(
queryHelper.getQueryURL(config.getBrokerUrl()),
getQueryBuilder().context(ImmutableMap.of("queryId", UUID.randomUUID().toString())).build()
)
);
}
int success = 0;
int limited = 0;
for (Future<StatusResponseHolder> future : futures) {
StatusResponseHolder status = future.get();
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
limited++;
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
success++;
}
}
Assert.assertTrue(success > 0);
Assert.assertEquals(limited, 0);
}
private Druids.TimeseriesQueryBuilder getQueryBuilder()
{
return Druids.newTimeseriesQueryBuilder()
.dataSource("wikipedia_editstream")
.aggregators(new CountAggregatorFactory("chocula"))
.intervals("2013-01-01T00:00:00.000/2013-01-08T00:00:00.000")
.context(ImmutableMap.of("lane", "one", "queryId", UUID.randomUUID().toString()));
}
}

View File

@ -21,10 +21,12 @@ package org.apache.druid.server;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
import org.apache.druid.server.scheduling.ManualQueryLaningStrategy;
import org.apache.druid.server.scheduling.NoQueryLaningStrategy;
import java.util.Optional;
@ -34,7 +36,8 @@ import java.util.Set;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = NoQueryLaningStrategy.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "none", value = NoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class)
@JsonSubTypes.Type(name = "hilo", value = HiLoQueryLaningStrategy.class),
@JsonSubTypes.Type(name = "manual", value = ManualQueryLaningStrategy.class)
})
public interface QueryLaningStrategy
{
@ -50,4 +53,9 @@ public interface QueryLaningStrategy
* This method must be thread safe
*/
<T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments);
default int computeLimitFromPercent(int totalLimit, int value)
{
return Ints.checkedCast((long) Math.ceil(totalLimit * ((double) value / 100)));
}
}

View File

@ -55,7 +55,7 @@ import java.util.Set;
*/
public class QueryScheduler implements QueryWatcher
{
private static final int NO_CAPACITY = -1;
public static final int UNAVAILABLE = -1;
static final String TOTAL = "default";
private final int totalCapacity;
private final QueryPrioritizationStrategy prioritizationStrategy;
@ -173,7 +173,7 @@ public class QueryScheduler implements QueryWatcher
{
return laneRegistry.getConfiguration(TOTAL)
.map(config -> laneRegistry.bulkhead(TOTAL, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
.orElse(UNAVAILABLE);
}
/**
@ -184,7 +184,7 @@ public class QueryScheduler implements QueryWatcher
{
return laneRegistry.getConfiguration(lane)
.map(config -> laneRegistry.bulkhead(lane, config).getMetrics().getAvailableConcurrentCalls())
.orElse(NO_CAPACITY);
.orElse(UNAVAILABLE);
}
/**

View File

@ -22,7 +22,6 @@ package org.apache.druid.server.scheduling;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
@ -60,7 +59,7 @@ public class HiLoQueryLaningStrategy implements QueryLaningStrategy
public Object2IntMap<String> getLaneLimits(int totalLimit)
{
Object2IntMap<String> onlyLow = new Object2IntArrayMap<>(1);
onlyLow.put(LOW, Ints.checkedCast((long) Math.ceil(totalLimit * ((double) maxLowPercent / 100))));
onlyLow.put(LOW, computeLimitFromPercent(totalLimit, maxLowPercent));
return onlyLow;
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.server.QueryLaningStrategy;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class ManualQueryLaningStrategy implements QueryLaningStrategy
{
@JsonProperty
private Map<String, Integer> lanes;
@JsonProperty
private boolean isLimitPercent;
@JsonCreator
public ManualQueryLaningStrategy(
@JsonProperty("lanes") Map<String, Integer> lanes,
@JsonProperty("isLimitPercent") @Nullable Boolean isLimitPercent
)
{
this.lanes = Preconditions.checkNotNull(lanes, "lanes must be set");
this.isLimitPercent = isLimitPercent != null ? isLimitPercent : false;
Preconditions.checkArgument(lanes.size() > 0, "lanes must define at least one lane");
Preconditions.checkArgument(
lanes.values().stream().allMatch(x -> this.isLimitPercent ? 0 < x && x <= 100 : x > 0),
this.isLimitPercent ? "All lane limits must be in the range 1 to 100" : "All lane limits must be greater than 0"
);
}
@Override
public Object2IntMap<String> getLaneLimits(int totalLimit)
{
if (isLimitPercent) {
Object2IntMap<String> laneLimits = new Object2IntArrayMap<>(lanes.size());
lanes.forEach((key, value) -> laneLimits.put(key, computeLimitFromPercent(totalLimit, value)));
return laneLimits;
}
return new Object2IntArrayMap<>(lanes);
}
@Override
public <T> Optional<String> computeLane(QueryPlus<T> query, Set<SegmentServerSelector> segments)
{
return Optional.ofNullable(QueryContexts.getLane(query.getQuery()));
}
}

View File

@ -183,6 +183,7 @@ public class QuerySchedulerTest
});
future.get();
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
@Test
@ -217,6 +218,7 @@ public class QuerySchedulerTest
future.get();
Assert.assertEquals(5, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
@Test
@ -272,7 +274,9 @@ public class QuerySchedulerTest
Assert.assertEquals(0, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
// too many reports
scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty());
scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeReportQuery()), ImmutableSet.of()), Sequences.empty()
);
}
@Test
@ -309,7 +313,9 @@ public class QuerySchedulerTest
Assert.assertEquals(0, scheduler.getTotalAvailableCapacity());
// one too many
scheduler.run(scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty());
scheduler.run(
scheduler.prioritizeAndLaneQuery(QueryPlus.wrap(makeInteractiveQuery()), ImmutableSet.of()), Sequences.empty()
);
}
@Test
@ -359,7 +365,8 @@ public class QuerySchedulerTest
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(-1, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
@Test
@ -380,6 +387,7 @@ public class QuerySchedulerTest
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
@ -421,6 +429,7 @@ public class QuerySchedulerTest
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
Query<?> query = scheduler.prioritizeAndLaneQuery(
QueryPlus.wrap(makeDefaultQuery()),
@ -452,6 +461,51 @@ public class QuerySchedulerTest
}
@Test
public void testConfigManual()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
propertyPrefix,
QuerySchedulerProvider.class
);
final Properties properties = new Properties();
properties.put(propertyPrefix + ".numThreads", "10");
properties.put(propertyPrefix + ".laning.strategy", "manual");
properties.put(propertyPrefix + ".laning.lanes.one", "1");
properties.put(propertyPrefix + ".laning.lanes.two", "2");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one"));
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("two"));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
@Test
public void testConfigManualPercent()
{
final Injector injector = createInjector();
final String propertyPrefix = "druid.query.scheduler";
final JsonConfigProvider<QuerySchedulerProvider> provider = JsonConfigProvider.of(
propertyPrefix,
QuerySchedulerProvider.class
);
final Properties properties = new Properties();
properties.put(propertyPrefix + ".numThreads", "10");
properties.put(propertyPrefix + ".laning.strategy", "manual");
properties.put(propertyPrefix + ".laning.isLimitPercent", "true");
properties.put(propertyPrefix + ".laning.lanes.one", "1");
properties.put(propertyPrefix + ".laning.lanes.twenty", "20");
provider.inject(properties, injector.getInstance(JsonConfigurator.class));
final QueryScheduler scheduler = provider.get().get().get();
Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one"));
Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("twenty"));
Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
}
private void maybeDelayNextIteration(int i) throws InterruptedException
{
if (i > 0 && i % 10 == 0) {

View File

@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.scheduling;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.server.QueryLaningStrategy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@SuppressWarnings("ResultOfObjectAllocationIgnored")
public class ManualQueryLaningStrategyTest
{
private Druids.TimeseriesQueryBuilder queryBuilder;
private QueryLaningStrategy exactStrategy;
private QueryLaningStrategy percentStrategy;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup()
{
this.queryBuilder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals(ImmutableList.of(Intervals.ETERNITY))
.granularity(Granularities.DAY)
.aggregators(new CountAggregatorFactory("count"));
this.exactStrategy =
new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10), null);
this.percentStrategy =
new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "ten", 10, "one-hundred", 100), true);
}
@Test
public void testLanesMustBeSet()
{
expectedException.expect(NullPointerException.class);
expectedException.expectMessage("lanes must be set");
new ManualQueryLaningStrategy(null, null);
}
@Test
public void testMustDefineAtLeastOneLane()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("lanes must define at least one lane");
new ManualQueryLaningStrategy(ImmutableMap.of(), null);
}
@Test
public void testExactLaneLimitsMustBeAboveZero()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("All lane limits must be greater than 0");
new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 1), null);
}
@Test
public void testPercentLaneLimitsMustBeAboveZero()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("All lane limits must be in the range 1 to 100");
new ManualQueryLaningStrategy(ImmutableMap.of("zero", 0, "one", 25), true);
}
@Test
public void testPercentLaneLimitsMustBeLessThanOneHundred()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("All lane limits must be in the range 1 to 100");
new ManualQueryLaningStrategy(ImmutableMap.of("one", 1, "one-hundred-and-one", 101), true);
}
@Test
public void testExactLimits()
{
Object2IntMap<String> exactLanes = exactStrategy.getLaneLimits(50);
Assert.assertEquals(1, exactLanes.getInt("one"));
Assert.assertEquals(10, exactLanes.getInt("ten"));
}
@Test
public void testPercentLimits()
{
Object2IntMap<String> exactLanes = percentStrategy.getLaneLimits(50);
Assert.assertEquals(1, exactLanes.getInt("one"));
Assert.assertEquals(5, exactLanes.getInt("ten"));
Assert.assertEquals(50, exactLanes.getInt("one-hundred"));
}
@Test
public void testDoesntSetLane()
{
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of()).build();
Assert.assertFalse(exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
Assert.assertFalse(percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).isPresent());
}
@Test
public void testPreservesManualLaneFromContextThatArentInMapAndIgnoresThem()
{
final String someLane = "some-lane";
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build();
Assert.assertEquals(
someLane,
exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
Assert.assertEquals(
someLane,
percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
}
@Test
public void testPreservesManualLaneFromContext()
{
final String someLane = "ten";
TimeseriesQuery query = queryBuilder.context(ImmutableMap.of(QueryContexts.LANE_KEY, someLane)).build();
Assert.assertEquals(
someLane,
exactStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
Assert.assertEquals(
someLane,
percentStrategy.computeLane(QueryPlus.wrap(query), ImmutableSet.of()).get()
);
}
}