Clean up allocation and supervisor logs for easier debugging (#16535)

Changes:
- Use string taskGroup consistently to easily search for a task group
- Clean up other logs
- No change in any logic
This commit is contained in:
Kashif Faraz 2024-06-03 16:41:04 +05:30 committed by GitHub
parent d0916865d0
commit 1974a38bc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 51 additions and 62 deletions

View File

@ -268,7 +268,7 @@ public class SegmentAllocationQueue
catch (Throwable t) { catch (Throwable t) {
currentBatch.failPendingRequests(t); currentBatch.failPendingRequests(t);
processed = true; processed = true;
log.error(t, "Error while processing batch [%s]", currentBatch.key); log.error(t, "Error while processing batch[%s].", currentBatch.key);
} }
// Requeue if not fully processed yet // Requeue if not fully processed yet
@ -619,7 +619,7 @@ public class SegmentAllocationQueue
void failPendingRequests(Throwable cause) void failPendingRequests(Throwable cause)
{ {
if (!requestToFuture.isEmpty()) { if (!requestToFuture.isEmpty()) {
log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key); log.warn("Failing [%d] requests in batch[%s], reason[%s].", size(), key, cause.getMessage());
requestToFuture.values().forEach(future -> future.completeExceptionally(cause)); requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
requestToFuture.keySet().forEach( requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request) request -> emitTaskMetric("task/action/failed/count", 1L, request)

View File

@ -657,7 +657,7 @@ public class TaskLockbox
} }
} else { } else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId()); log.debug("Task[%s] already present in TaskLock[%s].", task.getId(), posseToUse.getTaskLock().getGroupId());
} }
return posseToUse; return posseToUse;
} }

View File

@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
@ -96,13 +97,12 @@ import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -1203,19 +1203,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} }
try { try {
Instant handleNoticeStartTime = Instant.now(); final Stopwatch noticeHandleTime = Stopwatch.createStarted();
notice.handle(); notice.handle();
Instant handleNoticeEndTime = Instant.now();
Duration timeElapsed = Duration.between(handleNoticeStartTime, handleNoticeEndTime);
String noticeType = notice.getType(); String noticeType = notice.getType();
emitNoticeProcessTime(noticeType, noticeHandleTime.millisElapsed());
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug( log.debug(
"Handled notice [%s] from notices queue in [%d] ms, " "Handled notice[%s] from notices queue in [%d] ms, "
+ "current notices queue size [%d] for datasource [%s]", + "current notices queue size [%d] for datasource[%s].",
noticeType, timeElapsed.toMillis(), getNoticesQueueSize(), dataSource noticeType, noticeHandleTime.millisElapsed(), getNoticesQueueSize(), dataSource
); );
} }
emitNoticeProcessTime(noticeType, timeElapsed.toMillis());
} }
catch (Throwable e) { catch (Throwable e) {
stateManager.recordThrowableEvent(e); stateManager.recordThrowableEvent(e);
@ -2837,10 +2835,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
earlyStopTime = DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration()); earlyStopTime = DateTimes.nowUtc().plus(tuningConfig.getRepartitionTransitionDuration());
log.info( log.info(
"Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]", "Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]",
previousPartitionIds, previousPartitionIds, partitionIds, tuningConfig.getRepartitionTransitionDuration(), earlyStopTime
partitionIds,
tuningConfig.getRepartitionTransitionDuration(),
earlyStopTime
); );
break; break;
} }
@ -3161,57 +3156,52 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>(); final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>(); final List<Integer> futureGroupIds = new ArrayList<>();
boolean stopTasksEarly; final boolean stopTasksEarly;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
log.info("Early stop requested - signalling tasks to complete"); log.info("Early stop requested, signalling tasks to complete.");
earlyStopTime = null; earlyStopTime = null;
stopTasksEarly = true; stopTasksEarly = true;
} else { } else {
stopTasksEarly = false; stopTasksEarly = false;
} }
AtomicInteger stoppedTasks = new AtomicInteger(); final AtomicInteger numStoppedTasks = new AtomicInteger();
// Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing // Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing
activelyReadingTaskGroups activelyReadingTaskGroups.entrySet().stream().sorted(
.entrySet().stream().sorted(
Comparator.comparingLong( Comparator.comparingLong(
(Entry<Integer, TaskGroup> entry) -> taskGroupEntry -> computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis()
computeEarliestTaskStartTime(entry.getValue()) )
.getMillis())) )
.forEach(entry -> { .forEach(entry -> {
Integer groupId = entry.getKey(); Integer groupId = entry.getKey();
TaskGroup group = entry.getValue(); TaskGroup group = entry.getValue();
if (stopTasksEarly) { final DateTime earliestTaskStart = computeEarliestTaskStartTime(group);
final Duration runDuration = Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis());
if (stopTasksEarly || group.getHandoffEarly()) {
// If handoffEarly has been set, stop tasks irrespective of stopTaskCount
log.info( log.info(
"Stopping task group [%d] early. It has run for [%s]", "Stopping taskGroup[%d] early after running for duration[%s].",
groupId, groupId, runDuration
ioConfig.getTaskDuration()
); );
futureGroupIds.add(groupId); futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true)); futures.add(checkpointTaskGroup(group, true));
} else { if (group.getHandoffEarly()) {
DateTime earliestTaskStart = computeEarliestTaskStartTime(group); numStoppedTasks.getAndIncrement();
}
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getHandoffEarly()) { } else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration // Stop this task group if it has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count. // and the pending task groups are less than the configured stop task count.
// If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action. int numPendingCompletionTaskGroups = pendingCompletionTaskGroups.values().stream()
if (pendingCompletionTaskGroups.values() .mapToInt(List::size).sum();
.stream() if (numPendingCompletionTaskGroups + numStoppedTasks.get() < ioConfig.getMaxAllowedStops()) {
.mapToInt(CopyOnWriteArrayList::size) log.info(
.sum() + stoppedTasks.get() "Stopping taskGroup[%d] as it has already run for duration[%s], configured task duration[%s].",
< ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) { groupId, runDuration, ioConfig.getTaskDuration()
log.info( );
"Task group [%d] has run for [%s]. Stopping.", futureGroupIds.add(groupId);
groupId, futures.add(checkpointTaskGroup(group, true));
ioConfig.getTaskDuration() numStoppedTasks.getAndIncrement();
);
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks.getAndIncrement();
}
} }
} }
}); });
@ -3384,7 +3374,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) { if (endOffsets.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
log.warn( log.warn(
"Checkpoint [%s] is same as the start sequences [%s] of latest sequence for the task group [%d]", "Checkpoint[%s] is same as the start sequences[%s] of latest sequence for the taskGroup[%d].",
endOffsets, endOffsets,
taskGroup.checkpointSequences.lastEntry().getValue(), taskGroup.checkpointSequences.lastEntry().getValue(),
taskGroup.groupId taskGroup.groupId
@ -3579,7 +3569,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// 2) Remove any tasks that have failed from the list // 2) Remove any tasks that have failed from the list
// 3) If any task completed successfully, stop all the tasks in this group and move to the next group // 3) If any task completed successfully, stop all the tasks in this group and move to the next group
log.debug("Task group [%d] pre-pruning: %s", groupId, taskGroup.taskIds()); log.debug("taskGroup[%d] pre-pruning: %s.", groupId, taskGroup.taskIds());
Iterator<Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator(); Iterator<Entry<String, TaskData>> iTasks = taskGroup.tasks.entrySet().iterator();
while (iTasks.hasNext()) { while (iTasks.hasNext()) {
@ -3589,7 +3579,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// stop and remove bad tasks from the task group // stop and remove bad tasks from the task group
if (!isTaskCurrent(groupId, taskId, activeTaskMap)) { if (!isTaskCurrent(groupId, taskId, activeTaskMap)) {
log.info("Stopping task [%s] which does not match the expected sequence range and ingestion spec", taskId); log.info("Stopping task[%s] as it does not match the expected sequence range and ingestion spec.", taskId);
futures.add(stopTask(taskId, false)); futures.add(stopTask(taskId, false));
iTasks.remove(); iTasks.remove();
continue; continue;
@ -3613,7 +3603,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
break; break;
} }
} }
log.debug("Task group [%d] post-pruning: %s", groupId, taskGroup.taskIds()); log.debug("After pruning, taskGroup[%d] has tasks[%s].", groupId, taskGroup.taskIds());
} }
// Ignore return value; just await. // Ignore return value; just await.
@ -3627,10 +3617,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
} }
Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequencesFromStream(); Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequencesFromStream();
long nowTime = Instant.now().toEpochMilli(); final long nowTime = DateTimes.nowUtc().getMillis();
boolean idle; final boolean idle;
long idleTime; final long idleTime;
if (lastActiveTimeMillis > 0 if (lastActiveTimeMillis > 0
&& previousSequencesFromStream.equals(latestSequencesFromStream) && previousSequencesFromStream.equals(latestSequencesFromStream)
&& computeTotalLag() == 0) { && computeTotalLag() == 0) {
@ -3684,7 +3673,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// check that there is a current task group for each group of partitions in [partitionGroups] // check that there is a current task group for each group of partitions in [partitionGroups]
for (Integer groupId : partitionGroups.keySet()) { for (Integer groupId : partitionGroups.keySet()) {
if (!activelyReadingTaskGroups.containsKey(groupId)) { if (!activelyReadingTaskGroups.containsKey(groupId)) {
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId)); log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, partitionGroups.get(groupId));
Optional<DateTime> minimumMessageTime; Optional<DateTime> minimumMessageTime;
if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) { if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get()); minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
@ -3771,13 +3760,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (taskGroup.startingSequences == null || if (taskGroup.startingSequences == null ||
taskGroup.startingSequences.size() == 0 || taskGroup.startingSequences.size() == 0 ||
taskGroup.startingSequences.values().stream().allMatch(x -> x == null || isEndOfShard(x))) { taskGroup.startingSequences.values().stream().allMatch(x -> x == null || isEndOfShard(x))) {
log.debug("Nothing to read in any partition for taskGroup [%d], skipping task creation", groupId); log.debug("Nothing to read in any partition for taskGroup[%d], skipping task creation.", groupId);
continue; continue;
} }
if (ioConfig.getReplicas() > taskGroup.tasks.size()) { if (ioConfig.getReplicas() > taskGroup.tasks.size()) {
log.info( log.info(
"Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", "Number of tasks[%d] does not match configured numReplicas[%d] in taskGroup[%d], creating more tasks.",
taskGroup.tasks.size(), ioConfig.getReplicas(), groupId taskGroup.tasks.size(), ioConfig.getReplicas(), groupId
); );
createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size()); createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size());