Reduce metadata IO during segment allocation (#17496)

Changes
---------
- Add Overlord runtime property `druid.indexer.tasklock.batchAllocationReduceMetadataIO`
- Setting this flag to true (default value) allows the Overlord to fetch only necessary segment
payloads during segment allocation
- Setting this flag to false restores original segment allocation behaviour
This commit is contained in:
Kashif Faraz 2024-11-25 22:10:09 -08:00 committed by GitHub
parent ede9e4077a
commit 207ad16f07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 424 additions and 35 deletions

View File

@ -25,7 +25,6 @@ import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
@ -41,6 +40,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.Partitions;
import org.joda.time.Interval;
import java.util.ArrayList;
@ -87,6 +87,8 @@ public class SegmentAllocationQueue
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
private final boolean reduceMetadataIO;
@Inject
public SegmentAllocationQueue(
TaskLockbox taskLockbox,
@ -100,6 +102,7 @@ public class SegmentAllocationQueue
this.taskLockbox = taskLockbox;
this.metadataStorage = metadataStorage;
this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime();
this.reduceMetadataIO = taskLockConfig.isBatchAllocationReduceMetadataIO();
this.executor = taskLockConfig.isBatchSegmentAllocation()
? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
@ -380,13 +383,11 @@ public class SegmentAllocationQueue
private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key)
{
return new HashSet<>(
metadataStorage.retrieveUsedSegmentsForInterval(
key.dataSource,
key.preferredAllocationInterval,
Segments.ONLY_VISIBLE
)
);
return metadataStorage.getSegmentTimelineForAllocation(
key.dataSource,
key.preferredAllocationInterval,
(key.lockGranularity == LockGranularity.TIME_CHUNK) && reduceMetadataIO
).findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE);
}
private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set<DataSegment> usedSegments)
@ -493,7 +494,8 @@ public class SegmentAllocationQueue
requestKey.dataSource,
tryInterval,
requestKey.skipSegmentLineageCheck,
requestKey.lockGranularity
requestKey.lockGranularity,
reduceMetadataIO
);
int successfulRequests = 0;

View File

@ -466,6 +466,8 @@ public class TaskLockbox
* @param skipSegmentLineageCheck Whether lineage check is to be skipped
* (this is true for streaming ingestion)
* @param lockGranularity Granularity of task lock
* @param reduceMetadataIO Whether to skip fetching payloads for all used
* segments and rely on their IDs instead.
* @return List of allocation results in the same order as the requests.
*/
public List<SegmentAllocateResult> allocateSegments(
@ -473,7 +475,8 @@ public class TaskLockbox
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
LockGranularity lockGranularity
LockGranularity lockGranularity,
boolean reduceMetadataIO
)
{
log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval);
@ -487,9 +490,15 @@ public class TaskLockbox
if (isTimeChunkLock) {
// For time-chunk locking, segment must be allocated only after acquiring the lock
holderList.getPending().forEach(holder -> acquireTaskLock(holder, true));
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
allocateSegmentIds(
dataSource,
interval,
skipSegmentLineageCheck,
holderList.getPending(),
reduceMetadataIO
);
} else {
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending(), false);
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
}
holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
@ -702,12 +711,12 @@ public class TaskLockbox
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
@VisibleForTesting
void allocateSegmentIds(
private void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
Collection<SegmentAllocationHolder> holders,
boolean reduceMetadataIO
)
{
if (holders.isEmpty()) {
@ -724,7 +733,8 @@ public class TaskLockbox
dataSource,
interval,
skipSegmentLineageCheck,
createRequests
createRequests,
reduceMetadataIO
);
for (SegmentAllocationHolder holder : holders) {

View File

@ -36,6 +36,9 @@ public class TaskLockConfig
@JsonProperty
private long batchAllocationWaitTime = 0L;
@JsonProperty
private boolean batchAllocationReduceMetadataIO = true;
public boolean isForceTimeChunkLock()
{
return forceTimeChunkLock;
@ -50,4 +53,10 @@ public class TaskLockConfig
{
return batchAllocationWaitTime;
}
public boolean isBatchAllocationReduceMetadataIO()
{
return batchAllocationReduceMetadataIO;
}
}

View File

@ -93,21 +93,28 @@ public class SegmentAllocateActionTest
private SegmentAllocationQueue allocationQueue;
@Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}")
@Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.SEGMENT, true},
new Object[]{LockGranularity.SEGMENT, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.TIME_CHUNK, false}
new Object[]{LockGranularity.SEGMENT, true, true},
new Object[]{LockGranularity.SEGMENT, true, false},
new Object[]{LockGranularity.SEGMENT, false, false},
new Object[]{LockGranularity.TIME_CHUNK, true, true},
new Object[]{LockGranularity.TIME_CHUNK, true, false},
new Object[]{LockGranularity.TIME_CHUNK, false, false}
);
}
public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch)
public SegmentAllocateActionTest(
LockGranularity lockGranularity,
boolean useBatch,
boolean skipSegmentPayloadFetchForAllocation
)
{
this.lockGranularity = lockGranularity;
this.useBatch = useBatch;
this.taskActionTestKit.setSkipSegmentPayloadFetchForAllocation(skipSegmentPayloadFetchForAllocation);
}
@Before

View File

@ -36,6 +36,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.List;
@ -44,6 +46,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class SegmentAllocationQueueTest
{
@Rule
@ -54,6 +57,19 @@ public class SegmentAllocationQueueTest
private StubServiceEmitter emitter;
private BlockingExecutorService executor;
private final boolean reduceMetadataIO;
@Parameterized.Parameters(name = "reduceMetadataIO = {0}")
public static Object[][] getTestParameters()
{
return new Object[][]{{true}, {false}};
}
public SegmentAllocationQueueTest(boolean reduceMetadataIO)
{
this.reduceMetadataIO = reduceMetadataIO;
}
@Before
public void setUp()
{
@ -73,6 +89,12 @@ public class SegmentAllocationQueueTest
{
return 0;
}
@Override
public boolean isBatchAllocationReduceMetadataIO()
{
return reduceMetadataIO;
}
};
allocationQueue = new SegmentAllocationQueue(

View File

@ -58,6 +58,8 @@ public class TaskActionTestKit extends ExternalResource
private SegmentSchemaManager segmentSchemaManager;
private SegmentSchemaCache segmentSchemaCache;
private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO();
public TaskLockbox getTaskLockbox()
{
return taskLockbox;
@ -78,6 +80,11 @@ public class TaskActionTestKit extends ExternalResource
return taskActionToolbox;
}
public void setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation)
{
this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation;
}
@Override
public void before()
{
@ -126,6 +133,12 @@ public class TaskActionTestKit extends ExternalResource
{
return 10L;
}
@Override
public boolean isBatchAllocationReduceMetadataIO()
{
return skipSegmentPayloadFetchForAllocation;
}
};
taskActionToolbox = new TaskActionToolbox(

View File

@ -35,6 +35,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -168,7 +169,8 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
List<SegmentCreateRequest> requests,
boolean isTimeChunk
)
{
return Collections.emptyMap();
@ -332,6 +334,20 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return Collections.emptyMap();
}
@Override
public SegmentTimeline getSegmentTimelineForAllocation(
String dataSource,
Interval interval,
boolean skipSegmentPayloadFetchForAllocation
)
{
return SegmentTimeline.forSegments(retrieveUsedSegmentsForIntervals(
dataSource,
Collections.singletonList(interval),
Segments.INCLUDING_OVERSHADOWED
));
}
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);

View File

@ -25,6 +25,7 @@ import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -179,6 +180,8 @@ public interface IndexerMetadataStorageCoordinator
* Should be set to false if replica tasks would index events in same order
* @param requests Requests for which to allocate segments. All
* the requests must share the same partition space.
* @param reduceMetadataIO If true, try to use the segment ids instead of fetching every segment
* payload from the metadata store
* @return Map from request to allocated segment id. The map does not contain
* entries for failed requests.
*/
@ -186,7 +189,20 @@ public interface IndexerMetadataStorageCoordinator
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
List<SegmentCreateRequest> requests,
boolean reduceMetadataIO
);
/**
* Return a segment timeline of all used segments including overshadowed ones for a given datasource and interval
* if skipSegmentPayloadFetchForAllocation is set to true, do not fetch all the segment payloads for allocation
* Instead fetch all the ids and numCorePartitions using exactly one segment per version per interval
* return a dummy DataSegment for each id that holds only the SegmentId and a NumberedShardSpec with numCorePartitions
*/
SegmentTimeline getSegmentTimelineForAllocation(
String dataSource,
Interval interval,
boolean skipSegmentPayloadFetchForAllocation
);
/**

View File

@ -719,7 +719,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
String dataSource,
Interval allocateInterval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
List<SegmentCreateRequest> requests,
boolean reduceMetadataIO
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
@ -727,7 +728,14 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests)
handle -> allocatePendingSegments(
handle,
dataSource,
interval,
skipSegmentLineageCheck,
requests,
reduceMetadataIO
)
);
}
@ -1003,18 +1011,36 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return newIdentifier;
}
@Override
public SegmentTimeline getSegmentTimelineForAllocation(
String dataSource,
Interval interval,
boolean reduceMetadataIO
)
{
return connector.retryWithHandle(
handle -> {
if (reduceMetadataIO) {
return SegmentTimeline.forSegments(retrieveUsedSegmentsForAllocation(handle, dataSource, interval));
} else {
return getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval));
}
}
);
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(
final Handle handle,
final String dataSource,
final Interval interval,
final boolean skipSegmentLineageCheck,
final List<SegmentCreateRequest> requests
final List<SegmentCreateRequest> requests,
final boolean reduceMetadataIO
) throws IOException
{
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
.lookup(interval);
final List<TimelineObjectHolder<String, DataSegment>> existingChunks
= getSegmentTimelineForAllocation(dataSource, interval, reduceMetadataIO).lookup(interval);
if (existingChunks.size() > 1) {
log.warn(
"Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.",
@ -2900,6 +2926,67 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
@VisibleForTesting
Set<DataSegment> retrieveUsedSegmentsForAllocation(
final Handle handle,
final String dataSource,
final Interval interval
)
{
final Set<SegmentId> overlappingSegmentIds
= SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUsedSegmentIds(dataSource, interval);
// Map from version -> interval -> segmentId with the smallest partitionNum
Map<String, Map<Interval, SegmentId>> versionIntervalToSmallestSegmentId = new HashMap<>();
for (SegmentId segmentId : overlappingSegmentIds) {
final Map<Interval, SegmentId> map
= versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> new HashMap<>());
final SegmentId value = map.get(segmentId.getInterval());
if (value == null || value.getPartitionNum() > segmentId.getPartitionNum()) {
map.put(interval, segmentId);
}
}
// Retrieve the segments for the ids stored in the map to get the numCorePartitions
final Set<String> segmentIdsToRetrieve = new HashSet<>();
for (Map<Interval, SegmentId> itvlMap : versionIntervalToSmallestSegmentId.values()) {
segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList()));
}
final Set<DataSegment> dataSegments = retrieveSegmentsById(dataSource, segmentIdsToRetrieve);
final Set<String> retrievedIds = new HashSet<>();
final Map<String, Map<Interval, Integer>> versionIntervalToNumCorePartitions = new HashMap<>();
for (DataSegment segment : dataSegments) {
versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), v -> new HashMap<>())
.put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions());
retrievedIds.add(segment.getId().toString());
}
if (!retrievedIds.equals(segmentIdsToRetrieve)) {
throw DruidException.defensive(
"Used segment IDs for dataSource[%s] and interval[%s] have changed in the metadata store.",
dataSource, interval
);
}
// Create dummy segments for each segmentId with only the shard spec populated
Set<DataSegment> segmentsWithAllocationInfo = new HashSet<>();
for (SegmentId id : overlappingSegmentIds) {
final int corePartitions = versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval());
segmentsWithAllocationInfo.add(
new DataSegment(
id,
null,
null,
null,
new NumberedShardSpec(id.getPartitionNum(), corePartitions),
null,
null,
1
)
);
}
return segmentsWithAllocationInfo;
}
@Override
public DataSegment retrieveSegmentForId(final String id, boolean includeUnused)
{

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@ -48,6 +49,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -242,6 +244,58 @@ public class SqlSegmentsMetadataQuery
);
}
public Set<SegmentId> retrieveUsedSegmentIds(
final String dataSource,
final Interval interval
)
{
final StringBuilder sb = new StringBuilder();
sb.append("SELECT id FROM %s WHERE used = :used AND dataSource = :dataSource");
// If the interval supports comparing as a string, bake it into the SQL
final boolean compareAsString = Intervals.canCompareEndpointsAsStrings(interval);
if (compareAsString) {
sb.append(
getConditionForIntervalsAndMatchMode(
Collections.singletonList(interval),
IntervalMode.OVERLAPS,
connector.getQuoteString()
)
);
}
return connector.inReadOnlyTransaction(
(handle, status) -> {
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", true)
.bind("dataSource", dataSource);
if (compareAsString) {
bindIntervalsToQuery(sql, Collections.singletonList(interval));
}
final Set<SegmentId> segmentIds = new HashSet<>();
try (final ResultIterator<String> iterator = sql.map((index, r, ctx) -> r.getString(1)).iterator()) {
while (iterator.hasNext()) {
final String id = iterator.next();
final SegmentId segmentId = SegmentId.tryParse(dataSource, id);
if (segmentId == null) {
throw DruidException.defensive(
"Failed to parse SegmentId for id[%s] and dataSource[%s].",
id, dataSource
);
}
if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) {
segmentIds.add(segmentId);
}
}
}
return segmentIds;
});
}
public List<DataSegmentPlus> retrieveSegmentsById(
String datasource,
Set<String> segmentIds

View File

@ -2669,6 +2669,76 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
}
@Test
public void testAllocatePendingSegmentsSkipSegmentPayloadFetch()
{
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request),
true
).get(request);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
final SegmentCreateRequest request1 =
new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request1),
true
).get(request1);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
final SegmentCreateRequest request2 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request2),
true
).get(request2);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
final SegmentCreateRequest request3 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request3),
true
).get(request3);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString());
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null);
final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request4),
true
).get(request4);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString());
}
@Test
public void testAllocatePendingSegments()
{
@ -2682,7 +2752,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
dataSource,
interval,
false,
Collections.singletonList(request)
Collections.singletonList(request),
false
).get(request);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
@ -2693,7 +2764,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
dataSource,
interval,
false,
Collections.singletonList(request1)
Collections.singletonList(request1),
false
).get(request1);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
@ -2704,7 +2776,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
dataSource,
interval,
false,
Collections.singletonList(request2)
Collections.singletonList(request2),
false
).get(request2);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
@ -2715,7 +2788,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
dataSource,
interval,
false,
Collections.singletonList(request3)
Collections.singletonList(request3),
false
).get(request3);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString());
@ -2727,7 +2801,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
dataSource,
interval,
false,
Collections.singletonList(request4)
Collections.singletonList(request4),
false
).get(request4);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString());
@ -3639,6 +3714,84 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata
Assert.assertEquals(expected, actual);
}
@Test
public void testRetrieveUsedSegmentsForSegmentAllocation()
{
final String datasource = "DS";
DataSegment firstSegment;
Set<DataSegment> nextSegments;
final Map<String, Object> loadspec = ImmutableMap.of("loadSpec", "loadSpec");
final List<String> dimensions = ImmutableList.of("dim1", "dim2");
final List<String> metrics = ImmutableList.of("metric1", "metric2");
final int numSegmentsPerInterval = 100;
final Interval month = Intervals.of("2024-10-01/2024-11-01");
final Interval year = Intervals.of("2024/2025");
final Interval overlappingDay = Intervals.of("2024-10-01/2024-10-02");
final Interval nonOverlappingDay = Intervals.of("2024-01-01/2024-01-02");
final List<Interval> intervals = ImmutableList.of(month, year, overlappingDay, nonOverlappingDay);
final List<String> versions = ImmutableList.of("v0", "v1", "v2", "v2");
for (int i = 0; i < 4; i++) {
nextSegments = new HashSet<>();
firstSegment = new DataSegment(
datasource,
intervals.get(i),
versions.get(i),
loadspec,
dimensions,
metrics,
new DimensionRangeShardSpec(dimensions, null, null, 0, 1),
0,
100
);
insertUsedSegments(Collections.singleton(firstSegment), Collections.emptyMap());
for (int j = 1; j < numSegmentsPerInterval; j++) {
nextSegments.add(
new DataSegment(
datasource,
intervals.get(i),
versions.get(i),
loadspec,
dimensions,
metrics,
// The numCorePartitions is intentionally 0
new NumberedShardSpec(j, 0),
0,
100
)
);
}
insertUsedSegments(nextSegments, Collections.emptyMap());
}
final Set<SegmentIdWithShardSpec> expected = new HashSet<>();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < numSegmentsPerInterval; j++) {
expected.add(
new SegmentIdWithShardSpec(
datasource,
intervals.get(i),
versions.get(i),
new NumberedShardSpec(j, 1)
)
);
}
}
Assert.assertEquals(expected,
derbyConnector.retryWithHandle(
handle -> coordinator.retrieveUsedSegmentsForAllocation(handle, datasource, month)
.stream()
.map(SegmentIdWithShardSpec::fromDataSegment)
.collect(Collectors.toSet())
)
);
}
private void insertUsedSegments(Set<DataSegment> segments, Map<String, String> upgradedFromSegmentIdMap)
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();