Working queuing of publishing

This commit is contained in:
George Wu 2024-11-22 13:03:42 -05:00
parent e4cdbca23c
commit 52d807c488
7 changed files with 86 additions and 4 deletions

View File

@ -27,6 +27,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -69,6 +70,7 @@ public class LocalTaskActionClient implements TaskActionClient
return result;
}
catch (Throwable t) {
log.error("Failed to perform action [%s]", Arrays.toString(t.getStackTrace()));
throw new RuntimeException(t);
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
@ -32,7 +33,9 @@ import org.apache.druid.indexing.common.task.TaskLockHelper;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
@ -71,6 +74,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
private final String dataSource;
@Nullable
private final SegmentSchemaMapping segmentSchemaMapping;
private static final EmittingLogger log = new EmittingLogger(SegmentTransactionalAppendAction.class);
public static SegmentTransactionalInsertAction overwriteAction(
@Nullable Set<DataSegment> segmentsToBeOverwritten,
@ -212,7 +217,22 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
checkWithSegmentLock();
}
}
String dataSourceToInsert = segments.stream().findFirst().get().getDataSource();
log.info("dataSource [%s]", dataSourceToInsert);
if (task instanceof SeekableStreamIndexTask) {
SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task;
log.info("Running transactional append for streaming task [%s].", seekableStreamIndexTask.getId());
if (!toolbox.getSupervisorManager().canPublishSegments(segments.stream().findFirst().get().getDataSource(), seekableStreamIndexTask.getIOConfig().getTaskGroupId(), task.getId())) {
log.warn("Streaming task [%s] is not currently publishable.", task.getId());
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.SERVICE_UNAVAILABLE)
.build("Cannot append segments to [%s] right now." +
"There might be another task waiting to publish its segments. Check the overlord logs for details.",
dataSource
);
}
}
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,

View File

@ -148,6 +148,19 @@ public class SupervisorManager
return true;
}
public boolean canPublishSegments(String id, Integer taskGroupId, String taskId)
{
log.info("Supervisor ids [%s]", supervisors.keys().toString());
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
if (supervisor == null || supervisor.lhs == null) {
log.info("Could not find supervisor [%s]", id);
return true;
}
log.info("Found supervisor [%s]", id);
final StreamSupervisor streamSupervisor = requireStreamSupervisor(id, "publishSegments");
return streamSupervisor.canPublishSegments(taskGroupId, taskId);
}
public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");

View File

@ -975,7 +975,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata)
{
log.debug("Publishing segments for sequence [%s].", sequenceMetadata);
log.info("Publishing segments for sequence [%s].", sequenceMetadata);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture = Futures.transform(
driver.publish(

View File

@ -36,6 +36,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.vavr.collection.Seq;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.commons.codec.digest.DigestUtils;
@ -146,7 +147,7 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType extends Comparable<SequenceOffsetType>, RecordType extends ByteEntity>
implements StreamSupervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
@ -1995,6 +1996,36 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
addNotice(new HandoffTaskGroupsNotice(taskGroupIds));
}
@Override
public boolean canPublishSegments(Integer taskGroupId, String taskId) {
log.info("Checking if publishing is allowed for [%s] [%s]", taskGroupId, taskId);
CopyOnWriteArrayList<TaskGroup> pendingCompletionTasksForGroup = pendingCompletionTaskGroups.get(taskGroupId);
TaskGroup taskGroupToCheck = pendingCompletionTasksForGroup.stream().filter(taskGroup -> taskGroup.taskIds().contains(taskId)).findFirst().orElse(null);
if (taskGroupToCheck == null) {
// This function is called by the SegmentTransactionAppendAction.
// This is only triggered after a task has already started publishing so this shouldn't really happen.
// It's okay to just let the task try publishing in this case.
log.info("Did not find a task group to check for publishing");
return true;
}
log.info("Found a task group to check for publishing [%s]", taskGroupToCheck);
for (TaskGroup taskGroup : pendingCompletionTasksForGroup) {
if (!taskGroup.startingSequences.equals(taskGroupToCheck.startingSequences)) {
log.info("Found a task group with different starting sequences [%s]", taskGroup.startingSequences);
for (PartitionIdType sequence : taskGroup.startingSequences.keySet()) {
SequenceOffsetType publishingGroupOffset = taskGroupToCheck.startingSequences.getOrDefault(sequence, null);
SequenceOffsetType taskGroupOffset = taskGroup.startingSequences.getOrDefault(sequence, null);
if (publishingGroupOffset != null && taskGroupOffset != null) {
if (publishingGroupOffset.compareTo(taskGroupOffset) > 0) {
return false;
}
}
}
}
}
return true;
}
private void discoverTasks() throws ExecutionException, InterruptedException
{
int taskCount = 0;

View File

@ -398,7 +398,12 @@ public class DruidException extends RuntimeException
* A catch-all for any time when we cannot come up with a meaningful categorization. This is hopefully only
* used when converting generic exceptions from frameworks and libraries that we do not control into DruidExceptions
*/
UNCATEGORIZED(500);
UNCATEGORIZED(500),
/**
* Indicates the druid service is not available. This error code is retriable.
*/
SERVICE_UNAVAILABLE(503);
private final int expectedStatus;

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.Pair;
import java.util.List;
@ -69,4 +70,9 @@ public interface StreamSupervisor extends Supervisor
{
throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented");
}
default boolean canPublishSegments(Integer taskGroupId, String taskId)
{
return true;
}
}