mirror of https://github.com/apache/druid.git
Fix flakiness in query-retry ITs (#12818)
This commit is contained in:
parent
cceb2e849e
commit
eabce8a159
|
@ -50,15 +50,6 @@ services:
|
||||||
- druid-metadata-storage
|
- druid-metadata-storage
|
||||||
- druid-zookeeper-kafka
|
- druid-zookeeper-kafka
|
||||||
|
|
||||||
druid-historical:
|
|
||||||
extends:
|
|
||||||
file: docker-compose.base.yml
|
|
||||||
service: druid-historical
|
|
||||||
environment:
|
|
||||||
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
|
||||||
depends_on:
|
|
||||||
- druid-zookeeper-kafka
|
|
||||||
|
|
||||||
druid-broker:
|
druid-broker:
|
||||||
extends:
|
extends:
|
||||||
file: docker-compose.base.yml
|
file: docker-compose.base.yml
|
||||||
|
@ -67,7 +58,7 @@ services:
|
||||||
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
|
||||||
depends_on:
|
depends_on:
|
||||||
- druid-zookeeper-kafka
|
- druid-zookeeper-kafka
|
||||||
- druid-historical
|
- druid-historical-for-query-retry-test
|
||||||
|
|
||||||
druid-router:
|
druid-router:
|
||||||
extends:
|
extends:
|
||||||
|
|
|
@ -51,9 +51,7 @@ import org.apache.druid.server.SegmentManager;
|
||||||
import org.apache.druid.server.initialization.ServerConfig;
|
import org.apache.druid.server.initialization.ServerConfig;
|
||||||
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
import org.apache.druid.timeline.VersionedIntervalTimeline;
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -63,8 +61,9 @@ import java.util.function.Function;
|
||||||
*
|
*
|
||||||
* - Missing segments. A segment can be missing during a query if a historical drops the segment
|
* - Missing segments. A segment can be missing during a query if a historical drops the segment
|
||||||
* after the broker issues the query to the historical. To mimic this situation, the historical
|
* after the broker issues the query to the historical. To mimic this situation, the historical
|
||||||
* with this server manager announces all segments assigned, but reports missing segments for the
|
* with this server manager announces all segments assigned, but reports missing segment for the
|
||||||
* first 3 segments specified in the query. See ITQueryRetryTestOnMissingSegments.
|
* first segment of the datasource specified in the query. The missing report is only generated once for the first
|
||||||
|
* segment. Post that report, all segments are served for the datasource. See ITQueryRetryTestOnMissingSegments.
|
||||||
* - Other query errors. This server manager returns a sequence that always throws an exception
|
* - Other query errors. This server manager returns a sequence that always throws an exception
|
||||||
* based on a given query context value. See ITQueryErrorTest.
|
* based on a given query context value. See ITQueryErrorTest.
|
||||||
*
|
*
|
||||||
|
@ -82,9 +81,9 @@ public class ServerManagerForQueryErrorTest extends ServerManager
|
||||||
public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = "query-failure-test";
|
public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = "query-failure-test";
|
||||||
|
|
||||||
private static final Logger LOG = new Logger(ServerManagerForQueryErrorTest.class);
|
private static final Logger LOG = new Logger(ServerManagerForQueryErrorTest.class);
|
||||||
private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3;
|
private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 1;
|
||||||
|
|
||||||
private final ConcurrentHashMap<String, Set<SegmentDescriptor>> queryToIgnoredSegments = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<String, Integer> queryToIgnoredSegments = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ServerManagerForQueryErrorTest(
|
public ServerManagerForQueryErrorTest(
|
||||||
|
@ -130,15 +129,15 @@ public class ServerManagerForQueryErrorTest extends ServerManager
|
||||||
final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
|
final MutableBoolean isIgnoreSegment = new MutableBoolean(false);
|
||||||
queryToIgnoredSegments.compute(
|
queryToIgnoredSegments.compute(
|
||||||
query.getMostSpecificId(),
|
query.getMostSpecificId(),
|
||||||
(queryId, ignoredSegments) -> {
|
(queryId, ignoreCounter) -> {
|
||||||
if (ignoredSegments == null) {
|
if (ignoreCounter == null) {
|
||||||
ignoredSegments = new HashSet<>();
|
ignoreCounter = 0;
|
||||||
}
|
}
|
||||||
if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
|
if (ignoreCounter < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) {
|
||||||
ignoredSegments.add(descriptor);
|
ignoreCounter++;
|
||||||
isIgnoreSegment.setTrue();
|
isIgnoreSegment.setTrue();
|
||||||
}
|
}
|
||||||
return ignoredSegments;
|
return ignoreCounter;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,8 @@ import java.util.Map;
|
||||||
/**
|
/**
|
||||||
* This class tests the query retry on missing segments. A segment can be missing in a historical during a query if
|
* This class tests the query retry on missing segments. A segment can be missing in a historical during a query if
|
||||||
* the historical drops the segment after the broker issues the query to the historical. To mimic this case, this
|
* the historical drops the segment after the broker issues the query to the historical. To mimic this case, this
|
||||||
* test spawns two historicals, a normal historical and a historical modified for testing. The later historical
|
* test spawns a historical modified for testing. This historical announces all segments assigned, but doesn't serve
|
||||||
* announces all segments assigned, but doesn't serve all of them. Instead, it can report missing segments for some
|
* all of them always. Instead, it can report missing segments for some
|
||||||
* segments. See {@link ServerManagerForQueryErrorTest} for more details.
|
* segments. See {@link ServerManagerForQueryErrorTest} for more details.
|
||||||
* <p>
|
* <p>
|
||||||
* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}.
|
* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}.
|
||||||
|
@ -63,25 +63,22 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
{
|
{
|
||||||
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
|
||||||
private static final String QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries_query_retry_test.json";
|
private static final String QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries_query_retry_test.json";
|
||||||
private static final int TIMES_TO_RUN = 50;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test runs the same query multiple times. This enumeration represents an expectation after finishing
|
* This enumeration represents an expectation after finishing running the test query.
|
||||||
* running the query.
|
|
||||||
*/
|
*/
|
||||||
private enum Expectation
|
private enum Expectation
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Expect that all runs succeed.
|
* Expect that the test query succeed and with correct results.
|
||||||
*/
|
*/
|
||||||
ALL_SUCCESS,
|
ALL_SUCCESS,
|
||||||
/**
|
/**
|
||||||
* Expect that all runs returns the 200 HTTP response, but some of them can return incorrect result.
|
* Expect that the test query returns the 200 HTTP response, but will surely return incorrect result.
|
||||||
*/
|
*/
|
||||||
INCORRECT_RESULT,
|
INCORRECT_RESULT,
|
||||||
/**
|
/**
|
||||||
* Expect that some runs can return the 500 HTTP response. For the runs returned the 200 HTTP response, the query
|
* Expect that the test query must return the 500 HTTP response.
|
||||||
* result must be correct.
|
|
||||||
*/
|
*/
|
||||||
QUERY_FAILURE
|
QUERY_FAILURE
|
||||||
}
|
}
|
||||||
|
@ -100,7 +97,7 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
@BeforeMethod
|
@BeforeMethod
|
||||||
public void before()
|
public void before()
|
||||||
{
|
{
|
||||||
// ensure that wikipedia segments are loaded completely
|
// ensure that wikipedia segment is loaded completely
|
||||||
ITRetryUtil.retryUntilTrue(
|
ITRetryUtil.retryUntilTrue(
|
||||||
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
|
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
|
||||||
);
|
);
|
||||||
|
@ -109,25 +106,26 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
@Test
|
@Test
|
||||||
public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
|
public void testWithRetriesDisabledPartialResultDisallowed() throws Exception
|
||||||
{
|
{
|
||||||
// Since retry is disabled and partial result is not allowed, we can expect some queries can fail.
|
// Since retry is disabled and partial result is not allowed, the query must fail.
|
||||||
// If a query succeed, its result must be correct.
|
|
||||||
testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
|
testQueries(buildQuery(0, false), Expectation.QUERY_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithRetriesDisabledPartialResultAllowed() throws Exception
|
public void testWithRetriesDisabledPartialResultAllowed() throws Exception
|
||||||
{
|
{
|
||||||
// Since retry is disabled but partial result is allowed, all queries must succeed.
|
// Since retry is disabled but partial result is allowed, the query must succeed.
|
||||||
// However, some queries can return incorrect result.
|
// However, the query must return incorrect result.
|
||||||
testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
|
testQueries(buildQuery(0, true), Expectation.INCORRECT_RESULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
|
public void testWithRetriesEnabledPartialResultDisallowed() throws Exception
|
||||||
{
|
{
|
||||||
// Since retry is enabled, all queries must succeed even though partial result is disallowed.
|
// Since retry is enabled, the query must succeed even though partial result is disallowed.
|
||||||
// All queries must return correct result.
|
// The retry count is set to 1 since on the first retry of the query (i.e second overall try), the historical
|
||||||
testQueries(buildQuery(30, false), Expectation.ALL_SUCCESS);
|
// will start processing the segment and not call it missing.
|
||||||
|
// The query must return correct results.
|
||||||
|
testQueries(buildQuery(1, false), Expectation.ALL_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testQueries(String queryWithResultsStr, Expectation expectation) throws Exception
|
private void testQueries(String queryWithResultsStr, Expectation expectation) throws Exception
|
||||||
|
@ -147,74 +145,73 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
int queryFailure = 0;
|
int queryFailure = 0;
|
||||||
int resultMatches = 0;
|
int resultMatches = 0;
|
||||||
int resultMismatches = 0;
|
int resultMismatches = 0;
|
||||||
for (int i = 0; i < TIMES_TO_RUN; i++) {
|
|
||||||
for (QueryWithResults queryWithResult : queries) {
|
|
||||||
final StatusResponseHolder responseHolder = queryClient
|
|
||||||
.queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()), queryWithResult.getQuery())
|
|
||||||
.get();
|
|
||||||
|
|
||||||
if (responseHolder.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
|
for (QueryWithResults queryWithResult : queries) {
|
||||||
querySuccess++;
|
final StatusResponseHolder responseHolder = queryClient
|
||||||
|
.queryAsync(queryHelper.getQueryURL(config.getBrokerUrl()), queryWithResult.getQuery())
|
||||||
|
.get();
|
||||||
|
|
||||||
List<Map<String, Object>> result = jsonMapper.readValue(
|
if (responseHolder.getStatus().getCode() == HttpResponseStatus.OK.getCode()) {
|
||||||
responseHolder.getContent(),
|
querySuccess++;
|
||||||
new TypeReference<List<Map<String, Object>>>()
|
|
||||||
{
|
List<Map<String, Object>> result = jsonMapper.readValue(
|
||||||
}
|
responseHolder.getContent(),
|
||||||
);
|
new TypeReference<List<Map<String, Object>>>()
|
||||||
if (!QueryResultVerifier.compareResults(
|
{
|
||||||
result,
|
|
||||||
queryWithResult.getExpectedResults(),
|
|
||||||
queryWithResult.getFieldsToTest()
|
|
||||||
)) {
|
|
||||||
if (expectation != Expectation.INCORRECT_RESULT) {
|
|
||||||
throw new ISE(
|
|
||||||
"Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",
|
|
||||||
queryWithResult.getQuery(),
|
|
||||||
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
|
|
||||||
jsonMapper.writeValueAsString(result)
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
resultMismatches++;
|
|
||||||
}
|
}
|
||||||
|
);
|
||||||
|
if (!QueryResultVerifier.compareResults(
|
||||||
|
result,
|
||||||
|
queryWithResult.getExpectedResults(),
|
||||||
|
queryWithResult.getFieldsToTest()
|
||||||
|
)) {
|
||||||
|
if (expectation != Expectation.INCORRECT_RESULT) {
|
||||||
|
throw new ISE(
|
||||||
|
"Incorrect query results for query %s \n expectedResults: %s \n actualResults : %s",
|
||||||
|
queryWithResult.getQuery(),
|
||||||
|
jsonMapper.writeValueAsString(queryWithResult.getExpectedResults()),
|
||||||
|
jsonMapper.writeValueAsString(result)
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
resultMatches++;
|
resultMismatches++;
|
||||||
}
|
}
|
||||||
} else if (responseHolder.getStatus().getCode() == HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
|
|
||||||
expectation == Expectation.QUERY_FAILURE) {
|
|
||||||
final Map<String, Object> response = jsonMapper.readValue(responseHolder.getContent(), Map.class);
|
|
||||||
final String errorMessage = (String) response.get("errorMessage");
|
|
||||||
Assert.assertNotNull(errorMessage, "errorMessage");
|
|
||||||
Assert.assertTrue(errorMessage.contains("No results found for segments"));
|
|
||||||
queryFailure++;
|
|
||||||
} else {
|
} else {
|
||||||
throw new ISE(
|
resultMatches++;
|
||||||
"Unexpected failure, code: [%s], content: [%s]",
|
|
||||||
responseHolder.getStatus(),
|
|
||||||
responseHolder.getContent()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
} else if (responseHolder.getStatus().getCode() == HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode() &&
|
||||||
|
expectation == Expectation.QUERY_FAILURE) {
|
||||||
|
final Map<String, Object> response = jsonMapper.readValue(responseHolder.getContent(), Map.class);
|
||||||
|
final String errorMessage = (String) response.get("errorMessage");
|
||||||
|
Assert.assertNotNull(errorMessage, "errorMessage");
|
||||||
|
Assert.assertTrue(errorMessage.contains("No results found for segments"));
|
||||||
|
queryFailure++;
|
||||||
|
} else {
|
||||||
|
throw new ISE(
|
||||||
|
"Unexpected failure, code: [%s], content: [%s]",
|
||||||
|
responseHolder.getStatus(),
|
||||||
|
responseHolder.getContent()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (expectation) {
|
switch (expectation) {
|
||||||
case ALL_SUCCESS:
|
case ALL_SUCCESS:
|
||||||
Assert.assertEquals(querySuccess, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
Assert.assertEquals(querySuccess, 1);
|
||||||
Assert.assertEquals(queryFailure, 0);
|
Assert.assertEquals(queryFailure, 0);
|
||||||
Assert.assertEquals(resultMatches, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
Assert.assertEquals(resultMatches, 1);
|
||||||
Assert.assertEquals(resultMismatches, 0);
|
Assert.assertEquals(resultMismatches, 0);
|
||||||
break;
|
break;
|
||||||
case QUERY_FAILURE:
|
case QUERY_FAILURE:
|
||||||
Assert.assertTrue(querySuccess > 0, "At least one query is expected to succeed.");
|
Assert.assertEquals(querySuccess, 0);
|
||||||
Assert.assertTrue(queryFailure > 0, "At least one query is expected to fail.");
|
Assert.assertEquals(queryFailure, 1);
|
||||||
Assert.assertEquals(querySuccess, resultMatches);
|
Assert.assertEquals(resultMatches, 0);
|
||||||
Assert.assertEquals(resultMismatches, 0);
|
Assert.assertEquals(resultMismatches, 0);
|
||||||
break;
|
break;
|
||||||
case INCORRECT_RESULT:
|
case INCORRECT_RESULT:
|
||||||
Assert.assertEquals(querySuccess, ITQueryRetryTestOnMissingSegments.TIMES_TO_RUN);
|
Assert.assertEquals(querySuccess, 1);
|
||||||
Assert.assertEquals(queryFailure, 0);
|
Assert.assertEquals(queryFailure, 0);
|
||||||
Assert.assertTrue(resultMatches > 0, "At least one query is expected to return correct results.");
|
Assert.assertEquals(resultMatches, 0);
|
||||||
Assert.assertTrue(resultMismatches > 0, "At least one query is expected to return less results.");
|
Assert.assertEquals(resultMismatches, 1);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new ISE("Unknown expectation[%s]", expectation);
|
throw new ISE("Unknown expectation[%s]", expectation);
|
||||||
|
@ -235,6 +232,7 @@ public class ITQueryRetryTestOnMissingSegments
|
||||||
final Map<String, Object> context = new HashMap<>();
|
final Map<String, Object> context = new HashMap<>();
|
||||||
// Disable cache so that each run hits historical.
|
// Disable cache so that each run hits historical.
|
||||||
context.put(QueryContexts.USE_CACHE_KEY, false);
|
context.put(QueryContexts.USE_CACHE_KEY, false);
|
||||||
|
context.put(QueryContexts.USE_RESULT_LEVEL_CACHE_KEY, false);
|
||||||
context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments);
|
context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments);
|
||||||
context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
|
context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults);
|
||||||
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, true);
|
context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, true);
|
||||||
|
|
Loading…
Reference in New Issue