diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 1d0059335ed..7601566f9df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -69,6 +70,7 @@ public class LocalTaskActionClient implements TaskActionClient return result; } catch (Throwable t) { + log.error("Failed to perform action [%s]", Arrays.toString(t.getStackTrace())); throw new RuntimeException(t); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index e8dd472cf31..1e227da7eed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -32,7 +33,9 @@ import org.apache.druid.indexing.common.task.TaskLockHelper; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -71,6 +74,8 @@ public class SegmentTransactionalInsertAction implements TaskAction segmentsToBeOverwritten, @@ -212,7 +217,22 @@ public class SegmentTransactionalInsertAction implements TaskAction supervisor = supervisors.get(id); + if (supervisor == null || supervisor.lhs == null) { + log.info("Could not find supervisor [%s]", id); + return true; + } + log.info("Found supervisor [%s]", id); + final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments"); + return streamSupervisor.canPublishSegments(taskGroupId, taskId); + } + public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index ab437eb7a60..b2eb2554836 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -975,7 +975,12 @@ public abstract class SeekableStreamIndexTaskRunner sequenceMetadata) { - log.debug("Publishing segments for sequence [%s].", sequenceMetadata); + log.info("Publishing segments for sequence [%s].", sequenceMetadata); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } final ListenableFuture publishFuture = Futures.transform( driver.publish( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 8b4845b08d0..f37303872df 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import io.vavr.collection.Seq; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; @@ -146,7 +147,7 @@ import java.util.stream.Stream; * @param the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers */ -public abstract class SeekableStreamSupervisor +public abstract class SeekableStreamSupervisor, RecordType extends ByteEntity> implements StreamSupervisor { public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; @@ -1995,6 +1996,36 @@ public abstract class SeekableStreamSupervisor pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId); + TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null); + if (taskGroupToCheck == null) { + // This function is called by the SegmentTransactionAppendAction. + // This is only triggered after a task has already started publishing so this shouldn't really happen. + // It's okay to just let the task try publishing in this case. + log.info("Did not find a task group to check for publishing"); + return true; + } + log.info("Found a task group to check for publishing [%s]", taskGroupToCheck); + for (TaskGroup taskGroup : pendingCompletionTasksForGroup) { + if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) { + log.info("Found a task group with different starting sequences [%s]", taskGroup.startingSequences); + for (PartitionIdType sequence : taskGroup.startingSequences.keySet()) { + SequenceOffsetType publishingGroupOffset = taskGroupToCheck.startingSequences.getOrDefault(sequence, null); + SequenceOffsetType taskGroupOffset = taskGroup.startingSequences.getOrDefault(sequence, null); + if (publishingGroupOffset != null && taskGroupOffset != null) { + if (publishingGroupOffset.compareTo(taskGroupOffset) > 0) { + return false; + } + } + } + } + } + return true; + } + private void discoverTasks() throws ExecutionException, InterruptedException { int taskCount = 0; diff --git a/processing/src/main/java/org/apache/druid/error/DruidException.java b/processing/src/main/java/org/apache/druid/error/DruidException.java index 66190d13a91..7c36869fd67 100644 --- a/processing/src/main/java/org/apache/druid/error/DruidException.java +++ b/processing/src/main/java/org/apache/druid/error/DruidException.java @@ -398,7 +398,12 @@ public class DruidException extends RuntimeException * A catch-all for any time when we cannot come up with a meaningful categorization. This is hopefully only * used when converting generic exceptions from frameworks and libraries that we do not control into DruidExceptions */ - UNCATEGORIZED(500); + UNCATEGORIZED(500), + + /** + * Indicates the druid service is not available. This error code is retriable. + */ + SERVICE_UNAVAILABLE(503); private final int expectedStatus; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java index cbae3c4eaa4..0d1052f2416 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisor.java @@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.supervisor; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.java.util.common.Pair; import java.util.List; @@ -69,4 +70,9 @@ public interface StreamSupervisor extends Supervisor { throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented"); } + + default boolean canPublishSegments(Integer taskGroupId, String taskId) + { + return true; + } }