From 0cde3a8b522fbd5c09222069ac51689a614b8ec2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 May 2023 11:04:54 +0530 Subject: [PATCH] Fix regression in batch segment allocation (#14337) * Improve batch segment allocation logs * Fix batch seg alloc regression * Fix logs * Fix logs * Fix tests and logs --- .../actions/SegmentAllocateRequest.java | 13 +++ .../actions/SegmentAllocationQueue.java | 101 +++++++++++------- .../actions/SegmentAllocateActionTest.java | 4 - .../actions/SegmentAllocationQueueTest.java | 5 +- 4 files changed, 78 insertions(+), 45 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java index adac7523f44..74a32c31c41 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java @@ -74,4 +74,17 @@ public class SegmentAllocateRequest { return rowInterval; } + + @Override + public String toString() + { + return "SegmentAllocateRequest{" + + "taskId=" + task.getId() + + ", queryGranularity=" + action.getQueryGranularity() + + ", segmentGranularity=" + action.getPreferredSegmentGranularity() + + ", maxAttempts=" + maxAttempts + + ", rowInterval=" + rowInterval + + ", attempts=" + attempts + + '}'; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index d7ab0fd21fd..4d51350c595 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -225,7 +225,7 @@ public class SegmentAllocationQueue { batch.key.resetQueueTime(); if (!isLeader.get()) { - batch.failPendingRequests("Cannot allocate segment if not leader"); + batch.failPendingRequests("Not leader anymore"); return false; } else if (processingQueue.offer(batch.key)) { log.debug("Added a new batch [%s] to queue.", batch.key); @@ -312,7 +312,7 @@ public class SegmentAllocationQueue while (nextKey != null && !isLeader.get()) { processingQueue.pollFirst(); AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); - nextBatch.failPendingRequests("Cannot allocate segment if not leader"); + nextBatch.failPendingRequests("Not leader anymore"); ++failedBatches; nextKey = processingQueue.peekFirst(); @@ -332,15 +332,13 @@ public class SegmentAllocationQueue if (requestBatch.isEmpty()) { return true; } else if (!isLeader.get()) { - requestBatch.failPendingRequests("Cannot allocate segment if not leader"); + requestBatch.failPendingRequests("Not leader anymore"); return true; } log.debug( "Processing [%d] requests for batch [%s], queue time [%s].", - requestBatch.size(), - requestKey, - requestKey.getQueueTime() + requestBatch.size(), requestKey, requestKey.getQueueTime() ); final long startTimeMillis = System.currentTimeMillis(); @@ -364,7 +362,14 @@ public class SegmentAllocationQueue final Set updatedUsedSegments = retrieveUsedSegments(requestKey); if (updatedUsedSegments.equals(usedSegments)) { - requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments."); + log.warn( + "Completing [%d] failed requests in batch [%s] with null value as there" + + " are conflicting segments. Cannot retry allocation until the set of" + + " used segments overlapping the allocation interval [%s] changes.", + size(), requestKey, requestKey.preferredAllocationInterval + ); + + requestBatch.completePendingRequestsWithNull(); return true; } else { log.debug("Used segments have changed. Requeuing failed requests."); @@ -390,6 +395,7 @@ public class SegmentAllocationQueue // Find requests whose row interval overlaps with an existing used segment final Set allRequests = requestBatch.getRequests(); final Set requestsWithNoOverlappingSegment = new HashSet<>(); + final List requestsWithPartialOverlap = new ArrayList<>(); if (usedSegments.isEmpty()) { requestsWithNoOverlappingSegment.addAll(allRequests); @@ -415,38 +421,42 @@ public class SegmentAllocationQueue // There is no valid allocation interval for this request due to a // partially overlapping used segment. Need not do anything right now. // The request will be retried upon requeueing the batch. + requestsWithPartialOverlap.add(request); } } // Try to allocate segments for the identified used segment intervals. // Do not retry the failed requests with other intervals unless the batch is requeued. for (Map.Entry> entry : overlapIntervalToRequests.entrySet()) { - successCount += allocateSegmentsForInterval( - entry.getKey(), - entry.getValue(), - requestBatch - ); + successCount += + allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch); } } // For requests that do not overlap with a used segment, first try to allocate - // using the preferred granularity, then smaller granularities + // using the preferred granularity, then successively smaller granularities final Set pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment); - for (Granularity granularity : - Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) { + final List candidateGranularities + = Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity); + for (Granularity granularity : candidateGranularities) { Map> requestsByInterval = getRequestsByInterval(pendingRequests, granularity); for (Map.Entry> entry : requestsByInterval.entrySet()) { - successCount += allocateSegmentsForInterval( - entry.getKey(), - entry.getValue(), - requestBatch - ); + successCount += + allocateSegmentsForInterval(entry.getKey(), entry.getValue(), requestBatch); pendingRequests.retainAll(requestBatch.getRequests()); } } + if (!requestsWithPartialOverlap.isEmpty()) { + log.info( + "Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments." + + " These cannot be processed until the set of used segments changes. Example request: [%s]", + requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0) + ); + } + return successCount; } @@ -474,9 +484,7 @@ public class SegmentAllocationQueue final AllocateRequestKey requestKey = requestBatch.key; log.debug( "Trying allocation for [%d] requests, interval [%s] in batch [%s]", - requests.size(), - tryInterval, - requestKey + requests.size(), tryInterval, requestKey ); final List results = taskLockbox.allocateSegments( @@ -581,7 +589,7 @@ public class SegmentAllocationQueue synchronized void failPendingRequests(Throwable cause) { if (!requestToFuture.isEmpty()) { - log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key); + log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key); requestToFuture.values().forEach(future -> future.completeExceptionally(cause)); requestToFuture.keySet().forEach( request -> emitTaskMetric("task/action/failed/count", 1L, request) @@ -590,6 +598,19 @@ public class SegmentAllocationQueue } } + synchronized void completePendingRequestsWithNull() + { + if (requestToFuture.isEmpty()) { + return; + } + + requestToFuture.values().forEach(future -> future.complete(null)); + requestToFuture.keySet().forEach( + request -> emitTaskMetric("task/action/failed/count", 1L, request) + ); + requestToFuture.clear(); + } + synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) { request.incrementAttempts(); @@ -599,20 +620,17 @@ public class SegmentAllocationQueue requestToFuture.remove(request).complete(result.getSegmentId()); } else if (request.canRetry()) { log.info( - "Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s", - request.getAttempts(), - result.getErrorMessage(), - request.getAction() + "Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].", + request.getAttempts(), result.getErrorMessage(), request.getAction() ); } else { emitTaskMetric("task/action/failed/count", 1L, request); log.error( - "Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s", - request.getAttempts(), - result.getErrorMessage(), - request.getAction() + "Exhausted max attempts [%d] for allocation with latest error [%s]." + + " Completing action [%s] with a null value.", + request.getAttempts(), result.getErrorMessage(), request.getAction() ); - requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage())); + requestToFuture.remove(request).complete(null); } } @@ -651,6 +669,7 @@ public class SegmentAllocationQueue private final boolean useNonRootGenPartitionSpace; private final int hash; + private final String serialized; /** * Creates a new key for the given request. The batch for a unique key will @@ -681,6 +700,7 @@ public class SegmentAllocationQueue preferredAllocationInterval, lockGranularity ); + this.serialized = serialize(); this.maxWaitTimeMillis = maxWaitTimeMillis; } @@ -727,14 +747,19 @@ public class SegmentAllocationQueue @Override public String toString() + { + return serialized; + } + + private String serialize() { return "{" + - "ds='" + dataSource + '\'' + - ", gr='" + groupId + '\'' + - ", incId=" + batchIncrementalId + + "datasource='" + dataSource + '\'' + + ", groupId='" + groupId + '\'' + + ", batchId=" + batchIncrementalId + ", lock=" + lockGranularity + - ", invl=" + preferredAllocationInterval + - ", slc=" + skipSegmentLineageCheck + + ", allocInterval=" + preferredAllocationInterval + + ", skipLineageCheck=" + skipSegmentLineageCheck + '}'; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index c8861a92cd3..b498834fbab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -64,7 +64,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -984,9 +983,6 @@ public class SegmentAllocateActionTest return action.perform(task, taskActionTestKit.getTaskActionToolbox()); } } - catch (ExecutionException e) { - return null; - } catch (Exception e) { throw new RuntimeException(e); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 974b3096f92..8adb18ddb76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -223,8 +223,7 @@ public class SegmentAllocationQueueTest executor.finishNextPendingTask(); Assert.assertNotNull(getSegmentId(hourSegmentFuture)); - Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture)); - Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage()); + Assert.assertNull(getSegmentId(halfHourSegmentFuture)); } @Test @@ -309,7 +308,7 @@ public class SegmentAllocationQueueTest for (Future future : segmentFutures) { Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future)); - Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage()); + Assert.assertEquals("Not leader anymore", t.getMessage()); } }