diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 6986ec683a5..98ab50cff78 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -268,7 +268,7 @@ public class SegmentAllocationQueue catch (Throwable t) { currentBatch.failPendingRequests(t); processed = true; - log.error(t, "Error while processing batch [%s]", currentBatch.key); + log.error(t, "Error while processing batch[%s].", currentBatch.key); } // Requeue if not fully processed yet @@ -619,7 +619,7 @@ public class SegmentAllocationQueue void failPendingRequests(Throwable cause) { if (!requestToFuture.isEmpty()) { - log.warn("Failing [%d] requests in batch [%s], reason [%s].", size(), cause.getMessage(), key); + log.warn("Failing [%d] requests in batch[%s], reason[%s].", size(), key, cause.getMessage()); requestToFuture.values().forEach(future -> future.completeExceptionally(cause)); requestToFuture.keySet().forEach( request -> emitTaskMetric("task/action/failed/count", 1L, request) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 7776663330b..2155ac2c265 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -657,7 +657,7 @@ public class TaskLockbox } } else { - log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId()); + log.debug("Task[%s] already present in TaskLock[%s].", task.getId(), posseToUse.getTaskLock().getGroupId()); } return posseToUse; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d413a9bec3e..ec4de45cac7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -83,6 +83,7 @@ import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -96,13 +97,12 @@ import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; +import org.joda.time.Duration; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.IOException; -import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -1203,19 +1203,17 @@ public abstract class SeekableStreamSupervisor>> futures = new ArrayList<>(); final List futureGroupIds = new ArrayList<>(); - boolean stopTasksEarly; - if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { - log.info("Early stop requested - signalling tasks to complete"); - + final boolean stopTasksEarly; + if (earlyStopTime != null && !earlyStopTime.isAfterNow()) { + log.info("Early stop requested, signalling tasks to complete."); earlyStopTime = null; stopTasksEarly = true; } else { stopTasksEarly = false; } - AtomicInteger stoppedTasks = new AtomicInteger(); + final AtomicInteger numStoppedTasks = new AtomicInteger(); // Sort task groups by start time to prioritize early termination of earlier groups, then iterate for processing - activelyReadingTaskGroups - .entrySet().stream().sorted( + activelyReadingTaskGroups.entrySet().stream().sorted( Comparator.comparingLong( - (Entry entry) -> - computeEarliestTaskStartTime(entry.getValue()) - .getMillis())) + taskGroupEntry -> computeEarliestTaskStartTime(taskGroupEntry.getValue()).getMillis() + ) + ) .forEach(entry -> { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); - if (stopTasksEarly) { + final DateTime earliestTaskStart = computeEarliestTaskStartTime(group); + final Duration runDuration = Duration.millis(DateTimes.nowUtc().getMillis() - earliestTaskStart.getMillis()); + if (stopTasksEarly || group.getHandoffEarly()) { + // If handoffEarly has been set, stop tasks irrespective of stopTaskCount log.info( - "Stopping task group [%d] early. It has run for [%s]", - groupId, - ioConfig.getTaskDuration() + "Stopping taskGroup[%d] early after running for duration[%s].", + groupId, runDuration ); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); - } else { - DateTime earliestTaskStart = computeEarliestTaskStartTime(group); - - if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || group.getHandoffEarly()) { - // if this task has run longer than the configured duration - // as long as the pending task groups are less than the configured stop task count. - // If shutdownEarly has been set, ignore stopTaskCount since this is a manual operator action. - if (pendingCompletionTaskGroups.values() - .stream() - .mapToInt(CopyOnWriteArrayList::size) - .sum() + stoppedTasks.get() - < ioConfig.getMaxAllowedStops() || group.getHandoffEarly()) { - log.info( - "Task group [%d] has run for [%s]. Stopping.", - groupId, - ioConfig.getTaskDuration() - ); - futureGroupIds.add(groupId); - futures.add(checkpointTaskGroup(group, true)); - stoppedTasks.getAndIncrement(); - } + if (group.getHandoffEarly()) { + numStoppedTasks.getAndIncrement(); + } + } else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { + // Stop this task group if it has run longer than the configured duration + // and the pending task groups are less than the configured stop task count. + int numPendingCompletionTaskGroups = pendingCompletionTaskGroups.values().stream() + .mapToInt(List::size).sum(); + if (numPendingCompletionTaskGroups + numStoppedTasks.get() < ioConfig.getMaxAllowedStops()) { + log.info( + "Stopping taskGroup[%d] as it has already run for duration[%s], configured task duration[%s].", + groupId, runDuration, ioConfig.getTaskDuration() + ); + futureGroupIds.add(groupId); + futures.add(checkpointTaskGroup(group, true)); + numStoppedTasks.getAndIncrement(); } } }); @@ -3384,7 +3374,7 @@ public abstract class SeekableStreamSupervisor> iTasks = taskGroup.tasks.entrySet().iterator(); while (iTasks.hasNext()) { @@ -3589,7 +3579,7 @@ public abstract class SeekableStreamSupervisor latestSequencesFromStream = getLatestSequencesFromStream(); - long nowTime = Instant.now().toEpochMilli(); - boolean idle; - long idleTime; - + final long nowTime = DateTimes.nowUtc().getMillis(); + final boolean idle; + final long idleTime; if (lastActiveTimeMillis > 0 && previousSequencesFromStream.equals(latestSequencesFromStream) && computeTotalLag() == 0) { @@ -3684,7 +3673,7 @@ public abstract class SeekableStreamSupervisor minimumMessageTime; if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) { minimumMessageTime = Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get()); @@ -3771,13 +3760,13 @@ public abstract class SeekableStreamSupervisor x == null || isEndOfShard(x))) { - log.debug("Nothing to read in any partition for taskGroup [%d], skipping task creation", groupId); + log.debug("Nothing to read in any partition for taskGroup[%d], skipping task creation.", groupId); continue; } if (ioConfig.getReplicas() > taskGroup.tasks.size()) { log.info( - "Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", + "Number of tasks[%d] does not match configured numReplicas[%d] in taskGroup[%d], creating more tasks.", taskGroup.tasks.size(), ioConfig.getReplicas(), groupId ); createTasksForGroup(groupId, ioConfig.getReplicas() - taskGroup.tasks.size());