Improve polling in segment allocation queue (#15590)

Description
When batchAllocationWaitTime is set to 0, the segment allocation queue is polled continuously even when it is empty. This would take up cpu cycles unnecessarily.

Some existing race conditions would also become more frequent when the batchAllocationWaitTime is 0. This PR tries to better address those race conditions as well.

Changes
Do not reschedule a poll if queue is empty
When a new batch is added to queue, schedule a poll
Simplify keyToBatch map
Handle race conditions better
As soon as a batch starts getting processed, do not add any more requests to it
This commit is contained in:
Kashif Faraz 2024-01-04 17:42:02 +05:30 committed by GitHub
parent b9679d0884
commit c937068625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 100 additions and 99 deletions

View File

@ -75,13 +75,17 @@ public class SegmentAllocationQueue
private final long maxWaitTimeMillis;
private final TaskLockbox taskLockbox;
private final ScheduledExecutorService executor;
private final IndexerMetadataStorageCoordinator metadataStorage;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final ServiceEmitter emitter;
/**
* Single-threaded executor to process allocation queue.
*/
private final ScheduledExecutorService executor;
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestKey> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
private final BlockingDeque<AllocateRequestBatch> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
@Inject
public SegmentAllocationQueue(
@ -149,6 +153,10 @@ public class SegmentAllocationQueue
return executor != null && !executor.isShutdown();
}
/**
* Schedules a poll of the allocation queue that runs on the {@link #executor}.
* It is okay to schedule multiple polls since the executor is single threaded.
*/
private void scheduleQueuePoll(long delay)
{
executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
@ -174,19 +182,20 @@ public class SegmentAllocationQueue
throw new ISE("Batched segment allocation is disabled.");
}
final AllocateRequestKey requestKey = getKeyForAvailableBatch(request);
final AllocateRequestKey requestKey = new AllocateRequestKey(request);
final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference = new AtomicReference<>();
// Possible race condition:
// t1 -> new batch is added to queue or batch already exists in queue
// t2 -> executor pops batch, processes all requests in it
// t1 -> new request is added to dangling batch and is never picked up
// Solution: For existing batch, call keyToBatch.remove() on the key to
// wait on keyToBatch.compute() to finish before proceeding with processBatch().
// For new batch, keyToBatch.remove() would not wait as key is not in map yet
// but a new batch is unlikely to be due immediately, so it won't get popped right away.
// Solution: Perform the following operations only inside keyToBatch.compute():
// 1. Add or remove from map
// 2. Add batch to queue
// 3. Mark batch as started
// 4. Update requests in batch
keyToBatch.compute(requestKey, (key, existingBatch) -> {
if (existingBatch == null) {
if (existingBatch == null || existingBatch.isStarted() || existingBatch.isFull()) {
AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
futureReference.set(newBatch.add(request));
return addBatchToQueue(newBatch) ? newBatch : null;
@ -199,36 +208,19 @@ public class SegmentAllocationQueue
return futureReference.get();
}
/**
* Returns the key for a batch that is not added to the queue yet and/or has
* available space. Throws an exception if the queue is already full and no
* batch has available capacity.
*/
private AllocateRequestKey getKeyForAvailableBatch(SegmentAllocateRequest request)
{
for (int batchIncrementalId = 0; batchIncrementalId < MAX_QUEUE_SIZE; ++batchIncrementalId) {
AllocateRequestKey nextKey = new AllocateRequestKey(request, maxWaitTimeMillis, batchIncrementalId);
AllocateRequestBatch nextBatch = keyToBatch.get(nextKey);
if (nextBatch == null || nextBatch.size() < MAX_BATCH_SIZE) {
return nextKey;
}
}
throw new ISE("Allocation queue is at capacity, all batches are full.");
}
/**
* Tries to add the given batch to the processing queue. Fails all the pending
* requests in the batch if we are not leader or if the queue is full.
*/
private boolean addBatchToQueue(AllocateRequestBatch batch)
{
batch.key.resetQueueTime();
batch.resetQueueTime();
if (!isLeader.get()) {
batch.failPendingRequests("Not leader anymore");
return false;
} else if (processingQueue.offer(batch.key)) {
log.debug("Added a new batch [%s] to queue.", batch.key);
} else if (processingQueue.offer(batch)) {
log.debug("Added a new batch for key[%s] to queue.", batch.key);
scheduleQueuePoll(maxWaitTimeMillis);
return true;
} else {
batch.failPendingRequests(
@ -248,7 +240,7 @@ public class SegmentAllocationQueue
{
log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key);
keyToBatch.compute(batch.key, (key, existingBatch) -> {
if (existingBatch == null) {
if (existingBatch == null || existingBatch.isFull() || existingBatch.isStarted()) {
return addBatchToQueue(batch) ? batch : null;
} else {
// Merge requests from this batch to existing one
@ -262,44 +254,43 @@ public class SegmentAllocationQueue
{
clearQueueIfNotLeader();
// Process all the batches that are already due
int numProcessedBatches = 0;
AllocateRequestKey nextKey = processingQueue.peekFirst();
while (nextKey != null && nextKey.isDue()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = processingQueue.peekFirst();
while (nextBatch != null && nextBatch.isDue()) {
// Process the next batch in the queue
processingQueue.pollFirst();
final AllocateRequestBatch currentBatch = nextBatch;
boolean processed;
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
try {
processed = processBatch(nextBatch);
processed = processBatch(currentBatch);
}
catch (Throwable t) {
nextBatch.failPendingRequests(t);
currentBatch.failPendingRequests(t);
processed = true;
log.error(t, "Error while processing batch [%s]", nextKey);
log.error(t, "Error while processing batch [%s]", currentBatch.key);
}
// Requeue if not fully processed yet
if (processed) {
++numProcessedBatches;
} else {
requeueBatch(nextBatch);
requeueBatch(currentBatch);
}
nextKey = processingQueue.peek();
nextBatch = processingQueue.peek();
}
// Schedule the next round of processing
final long nextScheduleDelay;
// Schedule the next round of processing if the queue is not empty
if (processingQueue.isEmpty()) {
nextScheduleDelay = maxWaitTimeMillis;
log.debug("Processed [%d] batches, not scheduling again since queue is empty.", numProcessedBatches);
} else {
nextKey = processingQueue.peek();
long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime();
nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
nextBatch = processingQueue.peek();
long timeElapsed = System.currentTimeMillis() - nextBatch.getQueueTime();
long nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
scheduleQueuePoll(nextScheduleDelay);
log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay);
}
scheduleQueuePoll(nextScheduleDelay);
log.debug("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay);
}
/**
@ -308,14 +299,14 @@ public class SegmentAllocationQueue
private void clearQueueIfNotLeader()
{
int failedBatches = 0;
AllocateRequestKey nextKey = processingQueue.peekFirst();
while (nextKey != null && !isLeader.get()) {
AllocateRequestBatch nextBatch = processingQueue.peekFirst();
while (nextBatch != null && !isLeader.get()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
keyToBatch.remove(nextBatch.key);
nextBatch.failPendingRequests("Not leader anymore");
++failedBatches;
nextKey = processingQueue.peekFirst();
nextBatch = processingQueue.peekFirst();
}
if (failedBatches > 0) {
log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", failedBatches, processingQueue.size());
@ -323,11 +314,21 @@ public class SegmentAllocationQueue
}
/**
* Processes the given batch. Returns true if the batch was completely processed
* and should not be requeued.
* Processes the given batch. This method marks the batch as started and
* removes it from the map {@link #keyToBatch} so that no more requests can be
* added to it.
*
* @return true if the batch was completely processed and should not be requeued.
*/
private boolean processBatch(AllocateRequestBatch requestBatch)
{
keyToBatch.compute(requestBatch.key, (batchKey, latestBatchForKey) -> {
// Mark the batch as started so that no more requests are added to it
requestBatch.markStarted();
// Remove the corresponding key from the map if this is the latest batch for the key
return requestBatch.equals(latestBatchForKey) ? null : latestBatchForKey;
});
final AllocateRequestKey requestKey = requestBatch.key;
if (requestBatch.isEmpty()) {
return true;
@ -338,13 +339,13 @@ public class SegmentAllocationQueue
log.debug(
"Processing [%d] requests for batch [%s], queue time [%s].",
requestBatch.size(), requestKey, requestKey.getQueueTime()
requestBatch.size(), requestKey, requestBatch.getQueueTime()
);
final long startTimeMillis = System.currentTimeMillis();
final int batchSize = requestBatch.size();
emitBatchMetric("task/action/batch/size", batchSize, requestKey);
emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestKey.getQueueTime()), requestKey);
emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestBatch.getQueueTime()), requestKey);
final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey);
final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments);
@ -546,15 +547,13 @@ public class SegmentAllocationQueue
*/
private class AllocateRequestBatch
{
private long queueTimeMillis;
private final AllocateRequestKey key;
private boolean started = false;
/**
* Map from allocate requests (represents a single SegmentAllocateAction)
* to the future of allocated segment id.
* <p>
* This must be accessed through methods synchronized on this batch.
* It is to avoid races between a new request being added just when the batch
* is being processed.
*/
private final Map<SegmentAllocateRequest, CompletableFuture<SegmentIdWithShardSpec>>
requestToFuture = new HashMap<>();
@ -564,29 +563,60 @@ public class SegmentAllocationQueue
this.key = key;
}
synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
long getQueueTime()
{
return queueTimeMillis;
}
boolean isDue()
{
return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis;
}
void resetQueueTime()
{
queueTimeMillis = System.currentTimeMillis();
started = false;
}
void markStarted()
{
started = true;
}
boolean isStarted()
{
return started;
}
boolean isFull()
{
return size() >= MAX_BATCH_SIZE;
}
Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
{
log.debug("Adding request to batch [%s]: %s", key, request.getAction());
return requestToFuture.computeIfAbsent(request, req -> new CompletableFuture<>());
}
synchronized void transferRequestsFrom(AllocateRequestBatch batch)
void transferRequestsFrom(AllocateRequestBatch batch)
{
requestToFuture.putAll(batch.requestToFuture);
batch.requestToFuture.clear();
}
synchronized Set<SegmentAllocateRequest> getRequests()
Set<SegmentAllocateRequest> getRequests()
{
return new HashSet<>(requestToFuture.keySet());
}
synchronized void failPendingRequests(String reason)
void failPendingRequests(String reason)
{
failPendingRequests(new ISE(reason));
}
synchronized void failPendingRequests(Throwable cause)
void failPendingRequests(Throwable cause)
{
if (!requestToFuture.isEmpty()) {
log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key);
@ -598,7 +628,7 @@ public class SegmentAllocationQueue
}
}
synchronized void completePendingRequestsWithNull()
void completePendingRequestsWithNull()
{
if (requestToFuture.isEmpty()) {
return;
@ -611,7 +641,7 @@ public class SegmentAllocationQueue
requestToFuture.clear();
}
synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
{
request.incrementAttempts();
@ -634,12 +664,12 @@ public class SegmentAllocationQueue
}
}
synchronized boolean isEmpty()
boolean isEmpty()
{
return requestToFuture.isEmpty();
}
synchronized int size()
int size()
{
return requestToFuture.size();
}
@ -650,14 +680,6 @@ public class SegmentAllocationQueue
*/
private static class AllocateRequestKey
{
/**
* ID to distinguish between two batches for the same datasource, groupId, etc.
*/
private final int batchIncrementalId;
private long queueTimeMillis;
private final long maxWaitTimeMillis;
private final String dataSource;
private final String groupId;
private final Interval preferredAllocationInterval;
@ -675,12 +697,11 @@ public class SegmentAllocationQueue
* Creates a new key for the given request. The batch for a unique key will
* always contain a single request.
*/
AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis, int batchIncrementalId)
AllocateRequestKey(SegmentAllocateRequest request)
{
final SegmentAllocateAction action = request.getAction();
final Task task = request.getTask();
this.batchIncrementalId = batchIncrementalId;
this.dataSource = action.getDataSource();
this.groupId = task.getGroupId();
this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
@ -694,30 +715,12 @@ public class SegmentAllocationQueue
this.hash = Objects.hash(
dataSource,
groupId,
batchIncrementalId,
skipSegmentLineageCheck,
useNonRootGenPartitionSpace,
preferredAllocationInterval,
lockGranularity
);
this.serialized = serialize();
this.maxWaitTimeMillis = maxWaitTimeMillis;
}
void resetQueueTime()
{
queueTimeMillis = System.currentTimeMillis();
}
long getQueueTime()
{
return queueTimeMillis;
}
boolean isDue()
{
return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis;
}
@Override
@ -732,7 +735,6 @@ public class SegmentAllocationQueue
AllocateRequestKey that = (AllocateRequestKey) o;
return dataSource.equals(that.dataSource)
&& groupId.equals(that.groupId)
&& batchIncrementalId == that.batchIncrementalId
&& skipSegmentLineageCheck == that.skipSegmentLineageCheck
&& useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
&& preferredAllocationInterval.equals(that.preferredAllocationInterval)
@ -756,7 +758,6 @@ public class SegmentAllocationQueue
return "{" +
"datasource='" + dataSource + '\'' +
", groupId='" + groupId + '\'' +
", batchId=" + batchIncrementalId +
", lock=" + lockGranularity +
", allocInterval=" + preferredAllocationInterval +
", skipLineageCheck=" + skipSegmentLineageCheck +