Fix regression in batch segment allocation (#14337)

* Improve batch segment allocation logs

* Fix batch seg alloc regression

* Fix logs

* Fix logs

* Fix tests and logs
This commit is contained in:
Kashif Faraz 2023-05-26 11:04:54 +05:30 committed by GitHub
parent 1873fca6c7
commit 0cde3a8b52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 78 additions and 45 deletions

View File

@ -74,4 +74,17 @@ public class SegmentAllocateRequest
{
return rowInterval;
}
@Override
public String toString()
{
return "SegmentAllocateRequest{" +
"taskId=" + task.getId() +
", queryGranularity=" + action.getQueryGranularity() +
", segmentGranularity=" + action.getPreferredSegmentGranularity() +
", maxAttempts=" + maxAttempts +
", rowInterval=" + rowInterval +
", attempts=" + attempts +
'}';
}
}

View File

@ -225,7 +225,7 @@ public class SegmentAllocationQueue
{
batch.key.resetQueueTime();
if (!isLeader.get()) {
batch.failPendingRequests("Cannot allocate segment if not leader");
batch.failPendingRequests("Not leader anymore");
return false;
} else if (processingQueue.offer(batch.key)) {
log.debug("Added a new batch [%s] to queue.", batch.key);
@ -312,7 +312,7 @@ public class SegmentAllocationQueue
while (nextKey != null && !isLeader.get()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
nextBatch.failPendingRequests("Cannot allocate segment if not leader");
nextBatch.failPendingRequests("Not leader anymore");
++failedBatches;
nextKey = processingQueue.peekFirst();
@ -332,15 +332,13 @@ public class SegmentAllocationQueue
if (requestBatch.isEmpty()) {
return true;
} else if (!isLeader.get()) {
requestBatch.failPendingRequests("Cannot allocate segment if not leader");
requestBatch.failPendingRequests("Not leader anymore");
return true;
}
log.debug(
"Processing [%d] requests for batch [%s], queue time [%s].",
requestBatch.size(),
requestKey,
requestKey.getQueueTime()
requestBatch.size(), requestKey, requestKey.getQueueTime()
);
final long startTimeMillis = System.currentTimeMillis();
@ -364,7 +362,14 @@ public class SegmentAllocationQueue
final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);
if (updatedUsedSegments.equals(usedSegments)) {
requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments.");
log.warn(
"Completing [%d] failed requests in batch [%s] with null value as there"
+ " are conflicting segments. Cannot retry allocation until the set of"
+ " used segments overlapping the allocation interval [%s] changes.",
size(), requestKey, requestKey.preferredAllocationInterval
);
requestBatch.completePendingRequestsWithNull();
return true;
} else {
log.debug("Used segments have changed. Requeuing failed requests.");
@ -390,6 +395,7 @@ public class SegmentAllocationQueue
// Find requests whose row interval overlaps with an existing used segment
final Set<SegmentAllocateRequest> allRequests = requestBatch.getRequests();
final Set<SegmentAllocateRequest> requestsWithNoOverlappingSegment = new HashSet<>();
final List<SegmentAllocateRequest> requestsWithPartialOverlap = new ArrayList<>();
if (usedSegments.isEmpty()) {
requestsWithNoOverlappingSegment.addAll(allRequests);
@ -415,38 +421,42 @@ public class SegmentAllocationQueue
// There is no valid allocation interval for this request due to a
// partially overlapping used segment. Need not do anything right now.
// The request will be retried upon requeueing the batch.
requestsWithPartialOverlap.add(request);
}
}
// Try to allocate segments for the identified used segment intervals.
// Do not retry the failed requests with other intervals unless the batch is requeued.
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : overlapIntervalToRequests.entrySet()) {
successCount += allocateSegmentsForInterval(
entry.getKey(),
entry.getValue(),
requestBatch
);
successCount +=
allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
}
}
// For requests that do not overlap with a used segment, first try to allocate
// using the preferred granularity, then smaller granularities
// using the preferred granularity, then successively smaller granularities
final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment);
for (Granularity granularity :
Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) {
final List<Granularity> candidateGranularities
= Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity);
for (Granularity granularity : candidateGranularities) {
Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
getRequestsByInterval(pendingRequests, granularity);
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : requestsByInterval.entrySet()) {
successCount += allocateSegmentsForInterval(
entry.getKey(),
entry.getValue(),
requestBatch
);
successCount +=
allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch);
pendingRequests.retainAll(requestBatch.getRequests());
}
}
if (!requestsWithPartialOverlap.isEmpty()) {
log.info(
"Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments."
+ " These cannot be processed until the set of used segments changes. Example request: [%s]",
requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0)
);
}
return successCount;
}
@ -474,9 +484,7 @@ public class SegmentAllocationQueue
final AllocateRequestKey requestKey = requestBatch.key;
log.debug(
"Trying allocation for [%d] requests, interval [%s] in batch [%s]",
requests.size(),
tryInterval,
requestKey
requests.size(), tryInterval, requestKey
);
final List<SegmentAllocateResult> results = taskLockbox.allocateSegments(
@ -581,7 +589,7 @@ public class SegmentAllocationQueue
synchronized void failPendingRequests(Throwable cause)
{
if (!requestToFuture.isEmpty()) {
log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key);
log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key);
requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request)
@ -590,6 +598,19 @@ public class SegmentAllocationQueue
}
}
synchronized void completePendingRequestsWithNull()
{
if (requestToFuture.isEmpty()) {
return;
}
requestToFuture.values().forEach(future -> future.complete(null));
requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request)
);
requestToFuture.clear();
}
synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
{
request.incrementAttempts();
@ -599,20 +620,17 @@ public class SegmentAllocationQueue
requestToFuture.remove(request).complete(result.getSegmentId());
} else if (request.canRetry()) {
log.info(
"Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s",
request.getAttempts(),
result.getErrorMessage(),
request.getAction()
"Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].",
request.getAttempts(), result.getErrorMessage(), request.getAction()
);
} else {
emitTaskMetric("task/action/failed/count", 1L, request);
log.error(
"Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s",
request.getAttempts(),
result.getErrorMessage(),
request.getAction()
"Exhausted max attempts [%d] for allocation with latest error [%s]."
+ " Completing action [%s] with a null value.",
request.getAttempts(), result.getErrorMessage(), request.getAction()
);
requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage()));
requestToFuture.remove(request).complete(null);
}
}
@ -651,6 +669,7 @@ public class SegmentAllocationQueue
private final boolean useNonRootGenPartitionSpace;
private final int hash;
private final String serialized;
/**
* Creates a new key for the given request. The batch for a unique key will
@ -681,6 +700,7 @@ public class SegmentAllocationQueue
preferredAllocationInterval,
lockGranularity
);
this.serialized = serialize();
this.maxWaitTimeMillis = maxWaitTimeMillis;
}
@ -727,14 +747,19 @@ public class SegmentAllocationQueue
@Override
public String toString()
{
return serialized;
}
private String serialize()
{
return "{" +
"ds='" + dataSource + '\'' +
", gr='" + groupId + '\'' +
", incId=" + batchIncrementalId +
"datasource='" + dataSource + '\'' +
", groupId='" + groupId + '\'' +
", batchId=" + batchIncrementalId +
", lock=" + lockGranularity +
", invl=" + preferredAllocationInterval +
", slc=" + skipSegmentLineageCheck +
", allocInterval=" + preferredAllocationInterval +
", skipLineageCheck=" + skipSegmentLineageCheck +
'}';
}
}

View File

@ -64,7 +64,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -984,9 +983,6 @@ public class SegmentAllocateActionTest
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}
}
catch (ExecutionException e) {
return null;
}
catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -223,8 +223,7 @@ public class SegmentAllocationQueueTest
executor.finishNextPendingTask();
Assert.assertNotNull(getSegmentId(hourSegmentFuture));
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture));
Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage());
Assert.assertNull(getSegmentId(halfHourSegmentFuture));
}
@Test
@ -309,7 +308,7 @@ public class SegmentAllocationQueueTest
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage());
Assert.assertEquals("Not leader anymore", t.getMessage());
}
}