Fixes intermittent failure in ITAutoCompactionTest (#9739)

* fix intermittent failure in ITAutoCompactionTest

* fix typo

* update javadoc
This commit is contained in:
Maytas Monsereenusorn 2020-04-21 17:56:17 -10:00 committed by GitHub
parent b146f8a2a7
commit cff39892ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 17 deletions

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
@ -190,6 +191,14 @@ public class CoordinatorResourceTestClient
return status;
}
/**
* Warning: This API reads segments from {@link SqlSegmentsMetadataManager} of the Coordinator which
* caches segments in memory and periodically updates them. Hence, there can be a race condition as
* this API implementation compares segments metadata from cache with segments in historicals.
* Particularly, when number of segment changes after the first initial load of the datasource.
* Workaround is to verify the number of segments matches expected from {@link #getSegments(String) getSegments}
* before calling this method (since, that would wait until the cache is updated with expected data)
*/
public boolean areSegmentsLoaded(String dataSource)
{
final Map<String, Integer> status = getLoadStatus();

View File

@ -87,17 +87,15 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifyQuery(INDEX_QUERIES_RESOURCE);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
forceTriggerAutoCompaction();
//...compacted into 1 new segment for 1 day. 1 day compacted and 1 day skipped/remains uncompacted. (5 total)
verifySegmentsCount(5);
forceTriggerAutoCompaction(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();
//...compacted into 1 new segment for the remaining one day. 2 day compacted and 0 day uncompacted. (6 total)
verifySegmentsCount(6);
forceTriggerAutoCompaction(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
@ -119,11 +117,10 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
submitCompactionConfig(10000, SKIP_OFFSET_FROM_LATEST);
// New compaction config should overwrites the existing compaction config
submitCompactionConfig(1, SKIP_OFFSET_FROM_LATEST);
forceTriggerAutoCompaction();
// Instead of merging segments, the updated config will split segments!
//...compacted into 10 new segments across 2 days. 5 new segments each day (14 total)
verifySegmentsCount(14);
forceTriggerAutoCompaction(14);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(10, 1);
@ -144,10 +141,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, SKIP_OFFSET_FROM_LATEST);
deleteCompactionConfig();
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
@ -171,35 +167,31 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
// Set compactionTaskSlotRatio to 0 to prevent any compaction
updateCompactionTaskSlot(0, 100);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
// Set maxCompactionTaskSlots to 0 to prevent any compaction
updateCompactionTaskSlot(0.1, 0);
forceTriggerAutoCompaction();
// ...should remains unchanged (4 total)
verifySegmentsCount(4);
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(0, null);
checkCompactionIntervals(intervalsBeforeCompaction);
// Update compaction slots to be 1
updateCompactionTaskSlot(1, 1);
forceTriggerAutoCompaction();
// One day compacted (1 new segment) and one day remains uncompacted. (5 total)
verifySegmentsCount(5);
forceTriggerAutoCompaction(5);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14312");
// Run compaction again to compact the remaining day
forceTriggerAutoCompaction();
// Remaining day compacted (1 new segment). Now both days compacted (6 total)
verifySegmentsCount(6);
forceTriggerAutoCompaction(6);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);
@ -284,10 +276,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Assert.assertNull(foundDataSourceCompactionConfig);
}
private void forceTriggerAutoCompaction() throws Exception
private void forceTriggerAutoCompaction(int numExpectedSegmentsAfterCompaction) throws Exception
{
compactionResource.forceTriggerAutoCompaction();
waitForAllTasksToComplete();
verifySegmentsCount(numExpectedSegmentsAfterCompaction);
ITRetryUtil.retryUntilTrue(
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
"Segment Compaction"