mirror of https://github.com/apache/druid.git
Add retry around query loop in ITWikipediaQueryTest.testQueryLaningLaneIsLimited (#11077)
This commit is contained in:
parent
4576152e4a
commit
e7b2ecd0fd
|
@ -21,6 +21,7 @@ package org.apache.druid.tests.query;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
|
||||||
import org.apache.druid.query.Druids;
|
import org.apache.druid.query.Druids;
|
||||||
import org.apache.druid.query.QueryCapacityExceededException;
|
import org.apache.druid.query.QueryCapacityExceededException;
|
||||||
|
@ -47,6 +48,8 @@ import java.util.concurrent.Future;
|
||||||
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
@Guice(moduleFactory = DruidTestModuleFactory.class)
|
||||||
public class ITWikipediaQueryTest
|
public class ITWikipediaQueryTest
|
||||||
{
|
{
|
||||||
|
private static final Logger LOG = new Logger(ITWikipediaQueryTest.class);
|
||||||
|
|
||||||
public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
||||||
private static final String WIKI_LOOKUP = "wiki-simple";
|
private static final String WIKI_LOOKUP = "wiki-simple";
|
||||||
private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
|
private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
|
||||||
|
@ -85,37 +88,54 @@ public class ITWikipediaQueryTest
|
||||||
@Test
|
@Test
|
||||||
public void testQueryLaningLaneIsLimited() throws Exception
|
public void testQueryLaningLaneIsLimited() throws Exception
|
||||||
{
|
{
|
||||||
// the broker is configured with a manually defined query lane, 'one' with limit 1
|
ITRetryUtil.retryUntil(
|
||||||
// -Ddruid.query.scheduler.laning.type=manual
|
() -> {
|
||||||
// -Ddruid.query.scheduler.laning.lanes.one=1
|
// the broker is configured with a manually defined query lane, 'one' with limit 1
|
||||||
// by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to
|
// -Ddruid.query.scheduler.laning.type=manual
|
||||||
// get limited
|
// -Ddruid.query.scheduler.laning.lanes.one=1
|
||||||
final int numQueries = 50;
|
// by issuing 50 queries, at least 1 of them will succeed on 'one', and at least 1 of them will overlap enough to
|
||||||
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
|
// get limited.
|
||||||
for (int i = 0; i < numQueries; i++) {
|
// It's possible but unlikely that these queries execute in a way that none of them overlap, so we
|
||||||
futures.add(
|
// retry this test a few times to compensate for this.
|
||||||
queryClient.queryAsync(
|
final int numQueries = 50;
|
||||||
queryHelper.getQueryURL(config.getBrokerUrl()),
|
List<Future<StatusResponseHolder>> futures = new ArrayList<>(numQueries);
|
||||||
getQueryBuilder().build()
|
for (int i = 0; i < numQueries; i++) {
|
||||||
)
|
futures.add(
|
||||||
);
|
queryClient.queryAsync(
|
||||||
}
|
queryHelper.getQueryURL(config.getBrokerUrl()),
|
||||||
|
getQueryBuilder().build()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
int success = 0;
|
int success = 0;
|
||||||
int limited = 0;
|
int limited = 0;
|
||||||
|
|
||||||
for (Future<StatusResponseHolder> future : futures) {
|
for (Future<StatusResponseHolder> future : futures) {
|
||||||
StatusResponseHolder status = future.get();
|
StatusResponseHolder status = future.get();
|
||||||
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
|
if (status.getStatus().getCode() == QueryCapacityExceededException.STATUS_CODE) {
|
||||||
limited++;
|
limited++;
|
||||||
Assert.assertTrue(status.getContent().contains(QueryCapacityExceededException.makeLaneErrorMessage("one", 1)));
|
Assert.assertTrue(status.getContent().contains(QueryCapacityExceededException.makeLaneErrorMessage("one", 1)));
|
||||||
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
|
} else if (status.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
|
||||||
success++;
|
success++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertTrue(success > 0);
|
try {
|
||||||
Assert.assertTrue(limited > 0);
|
Assert.assertTrue(success > 0);
|
||||||
|
Assert.assertTrue(limited > 0);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (AssertionError ae) {
|
||||||
|
LOG.error(ae, "Got assertion error in testQueryLaningLaneIsLimited");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
true,
|
||||||
|
5000,
|
||||||
|
3,
|
||||||
|
"testQueryLaningLaneIsLimited"
|
||||||
|
);
|
||||||
|
|
||||||
// test another to make sure we can still issue one query at a time
|
// test another to make sure we can still issue one query at a time
|
||||||
StatusResponseHolder followUp = queryClient.queryAsync(
|
StatusResponseHolder followUp = queryClient.queryAsync(
|
||||||
|
|
Loading…
Reference in New Issue