diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 98494d7388a..b2924ea2fdf 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; @@ -80,6 +79,11 @@ import java.util.stream.Collectors; */ public class KafkaSupervisor extends SeekableStreamSupervisor { + public static final TypeReference>> CHECKPOINTS_TYPE_REF = + new TypeReference>>() + { + }; + private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class); private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000; private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000; @@ -229,20 +233,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor RowIngestionMetersFactory rowIngestionMetersFactory ) throws JsonProcessingException { - final String checkpoints = sortingMapper.writerWithType(new TypeReference>>() - { - }).writeValueAsString(sequenceOffsets); - final Map context = spec.getContext() == null - ? ImmutableMap.of( - "checkpoints", - checkpoints, - IS_INCREMENTAL_HANDOFF_SUPPORTED, - true - ) : ImmutableMap.builder() - .put("checkpoints", checkpoints) - .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true) - .putAll(spec.getContext()) - .build(); + final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets); + final Map context = createBaseTaskContexts(); + context.put(CHECKPOINTS_CTX_KEY, checkpoints); List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { @@ -358,7 +351,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor } @Override - protected boolean useExclusiveStartSequenceNumberForStartSequence() + protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return false; } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index ebef3fc4596..24573da46a2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; @@ -67,6 +68,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTaskTest; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.test.TestBroker; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -2143,9 +2145,10 @@ public class KafkaIndexTaskTest // and this task should start reading from offset 2 for partition 0 sequences.put(1, ImmutableMap.of(0, 2L)); final Map context = new HashMap<>(); - context.put("checkpoints", objectMapper.writerWithType(new TypeReference>>() - { - }).writeValueAsString(sequences)); + context.put( + SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, + objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences) + ); final KafkaIndexTask task = createTask( null, @@ -2454,7 +2457,7 @@ public class KafkaIndexTaskTest private KafkaIndexTask createTask( final String taskId, final KafkaIndexTaskIOConfig ioConfig - ) + ) throws JsonProcessingException { return createTask(taskId, DATA_SCHEMA, ioConfig); } @@ -2463,7 +2466,7 @@ public class KafkaIndexTaskTest final String taskId, final KafkaIndexTaskIOConfig ioConfig, final Map context - ) + ) throws JsonProcessingException { return createTask(taskId, DATA_SCHEMA, ioConfig, context); } @@ -2472,7 +2475,18 @@ public class KafkaIndexTaskTest final String taskId, final DataSchema dataSchema, final KafkaIndexTaskIOConfig ioConfig - ) + ) throws JsonProcessingException + { + final Map context = new HashMap<>(); + return createTask(taskId, dataSchema, ioConfig, context); + } + + private KafkaIndexTask createTask( + final String taskId, + final DataSchema dataSchema, + final KafkaIndexTaskIOConfig ioConfig, + final Map context + ) throws JsonProcessingException { final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig( 1000, @@ -2493,57 +2507,17 @@ public class KafkaIndexTaskTest maxParseExceptions, maxSavedParseExceptions ); - final Map context = isIncrementalHandoffSupported - ? ImmutableMap.of( - SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, - true - ) - : null; - final KafkaIndexTask task = new KafkaIndexTask( - taskId, - null, - cloneDataSchema(dataSchema), - tuningConfig, - ioConfig, - context, - null, - null, - rowIngestionMetersFactory, - objectMapper - ); - task.setPollRetryMs(POLL_RETRY_MS); - return task; - } - - - private KafkaIndexTask createTask( - final String taskId, - final DataSchema dataSchema, - final KafkaIndexTaskIOConfig ioConfig, - final Map context - ) - { - final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig( - 1000, - null, - maxRowsPerSegment, - null, - new Period("P1Y"), - null, - null, - null, - true, - reportParseExceptions, - handoffConditionTimeout, - resetOffsetAutomatically, - null, - null, - logParseExceptions, - maxParseExceptions, - maxSavedParseExceptions - ); if (isIncrementalHandoffSupported) { context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + + if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); + final String checkpointsJson = objectMapper + .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF) + .writeValueAsString(checkpoints); + context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson); + } } final KafkaIndexTask task = new KafkaIndexTask( @@ -2629,8 +2603,8 @@ public class KafkaIndexTaskTest objectMapper.registerModule(module); } final TaskConfig taskConfig = new TaskConfig( - new File(directory, "taskBaseDir").getPath(), - null, + new File(directory, "baseDir").getPath(), + new File(directory, "baseTaskDir").getPath(), null, 50000, null, diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 332e69379af..870d7ecb7d3 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -75,6 +75,11 @@ import java.util.concurrent.ScheduledExecutorService; */ public class KinesisSupervisor extends SeekableStreamSupervisor { + public static final TypeReference>> CHECKPOINTS_TYPE_REF = + new TypeReference>>() + { + }; + private static final String NOT_SET = "-1"; private final KinesisSupervisorSpec spec; private final AWSCredentialsConfig awsCredentialsConfig; @@ -151,20 +156,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor RowIngestionMetersFactory rowIngestionMetersFactory ) throws JsonProcessingException { - final String checkpoints = sortingMapper.writerFor(new TypeReference>>() - { - }).writeValueAsString(sequenceOffsets); - final Map context = spec.getContext() == null - ? ImmutableMap.of( - "checkpoints", - checkpoints, - IS_INCREMENTAL_HANDOFF_SUPPORTED, - true - ) : ImmutableMap.builder() - .put("checkpoints", checkpoints) - .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true) - .putAll(spec.getContext()) - .build(); + final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets); + final Map context = createBaseTaskContexts(); + context.put(CHECKPOINTS_CTX_KEY, checkpoints); + List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId()); @@ -313,7 +308,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor } @Override - protected boolean useExclusiveStartSequenceNumberForStartSequence() + protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return true; } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 435bb2abf8e..bbdd2ddc713 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.Module; @@ -74,6 +75,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTaskTest; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.MetadataTaskStorage; @@ -83,6 +85,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SequenceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -178,6 +181,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -509,7 +513,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 120_000L) public void testIncrementalHandOff() throws Exception { @@ -736,7 +739,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport DATA_SCHEMA.getDataSource(), 0, new KinesisDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, ImmutableSet.of()) + new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, currentOffsets.keySet()) ), new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, nextOffsets)) ) @@ -1949,7 +1952,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport new KinesisIndexTaskIOConfig( null, "sequence0", - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of(shardId1)), + new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")), true, null, @@ -2047,10 +2050,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport ) ); - final SeekableStreamStartSequenceNumbers checkpoint1 = new SeekableStreamStartSequenceNumbers<>( + final SeekableStreamEndSequenceNumbers checkpoint1 = new SeekableStreamEndSequenceNumbers<>( stream, - ImmutableMap.of(shardId1, "4"), - ImmutableSet.of() + ImmutableMap.of(shardId1, "4") ); final ListenableFuture future1 = runTask(task1); @@ -2095,7 +2097,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport new KinesisIndexTaskIOConfig( null, "sequence0", - new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of(shardId1)), + new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()), new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")), true, null, @@ -2271,9 +2273,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive) sequences.put(1, ImmutableMap.of(shardId1, "1")); final Map context = new HashMap<>(); - context.put("checkpoints", objectMapper.writerWithType(new TypeReference>>() - { - }).writeValueAsString(sequences)); + context.put( + SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, + objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences) + ); final KinesisIndexTask task = createTask( @@ -2473,6 +2476,75 @@ public class KinesisIndexTaskTest extends EasyMockSupport ); } + @Test + public void testSequencesFromContext() throws IOException + { + final TreeMap> checkpoints = new TreeMap<>(); + // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task + // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive) + checkpoints.put(0, ImmutableMap.of(shardId0, "0", shardId1, "0")); + checkpoints.put(1, ImmutableMap.of(shardId0, "0", shardId1, "1")); + checkpoints.put(2, ImmutableMap.of(shardId0, "1", shardId1, "3")); + final Map context = new HashMap<>(); + context.put("checkpoints", objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF) + .writeValueAsString(checkpoints)); + + final KinesisIndexTask task = createTask( + "task1", + DATA_SCHEMA, + new KinesisIndexTaskIOConfig( + null, + "sequence0", + new SeekableStreamStartSequenceNumbers<>( + stream, + ImmutableMap.of(shardId0, "0", shardId1, "0"), + ImmutableSet.of(shardId0) + ), + new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "5")), + true, + null, + null, + "awsEndpoint", + null, + null, + null, + null, + false + ), + context + ); + + task.getRunner().setToolbox(toolboxFactory.build(task)); + task.getRunner().initializeSequences(); + final CopyOnWriteArrayList> sequences = task.getRunner().getSequences(); + + Assert.assertEquals(3, sequences.size()); + + SequenceMetadata sequenceMetadata = sequences.get(0); + Assert.assertEquals(checkpoints.get(0), sequenceMetadata.getStartOffsets()); + Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getEndOffsets()); + Assert.assertEquals( + task.getIOConfig().getStartSequenceNumbers().getExclusivePartitions(), + sequenceMetadata.getExclusiveStartPartitions() + ); + Assert.assertTrue(sequenceMetadata.isCheckpointed()); + + sequenceMetadata = sequences.get(1); + Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getStartOffsets()); + Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getEndOffsets()); + Assert.assertEquals(checkpoints.get(1).keySet(), sequenceMetadata.getExclusiveStartPartitions()); + Assert.assertTrue(sequenceMetadata.isCheckpointed()); + + sequenceMetadata = sequences.get(2); + Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getStartOffsets()); + Assert.assertEquals( + task.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap(), + sequenceMetadata.getEndOffsets() + ); + Assert.assertEquals(checkpoints.get(2).keySet(), sequenceMetadata.getExclusiveStartPartitions()); + Assert.assertFalse(sequenceMetadata.isCheckpointed()); + } + private ListenableFuture runTask(final Task task) { try { @@ -2522,7 +2594,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport private KinesisIndexTask createTask( final String taskId, final KinesisIndexTaskIOConfig ioConfig - ) + ) throws JsonProcessingException { return createTask(taskId, DATA_SCHEMA, ioConfig, null); } @@ -2531,7 +2603,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final String taskId, final DataSchema dataSchema, final KinesisIndexTaskIOConfig ioConfig - ) + ) throws JsonProcessingException { return createTask(taskId, dataSchema, ioConfig, null); } @@ -2541,7 +2613,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport final DataSchema dataSchema, final KinesisIndexTaskIOConfig ioConfig, @Nullable final Map context - ) + ) throws JsonProcessingException { final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( maxRowsInMemory, @@ -2578,13 +2650,22 @@ public class KinesisIndexTaskTest extends EasyMockSupport final KinesisIndexTaskIOConfig ioConfig, final KinesisIndexTaskTuningConfig tuningConfig, @Nullable final Map context - ) + ) throws JsonProcessingException { if (context != null) { context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + + if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); + final String checkpointsJson = objectMapper + .writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF) + .writeValueAsString(checkpoints); + context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson); + } } - final KinesisIndexTask task = new TestableKinesisIndexTask( + return new TestableKinesisIndexTask( taskId, null, cloneDataSchema(dataSchema), @@ -2596,8 +2677,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport rowIngestionMetersFactory, null ); - - return task; } private static DataSchema cloneDataSchema(final DataSchema dataSchema) @@ -2657,8 +2736,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport objectMapper.registerModule(module); } final TaskConfig taskConfig = new TaskConfig( - new File(directory, "taskBaseDir").getPath(), - null, + new File(directory, "baseDir").getPath(), + new File(directory, "baseTaskDir").getPath(), null, 50000, null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 0af850e0fcd..ca595a0505a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; @@ -159,8 +158,8 @@ public class SegmentTransactionalInsertAction implements TaskAction> checkpoints = getCheckPointsFromContext( toolbox, - task.getContextValue("checkpoints") + task.getContextValue(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY) ); if (checkpoints != null) { - boolean exclusive = false; Iterator>> sequenceOffsets = checkpoints.entrySet() .iterator(); Map.Entry> previous = sequenceOffsets.next(); while (sequenceOffsets.hasNext()) { Map.Entry> current = sequenceOffsets.next(); - addSequence(new SequenceMetadata<>( - previous.getKey(), - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), - previous.getValue(), - current.getValue(), - true, - exclusive ? previous.getValue().keySet() : null - )); + final Set exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence( + previous.getValue() + ); + addSequence( + new SequenceMetadata<>( + previous.getKey(), + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), + previous.getValue(), + current.getValue(), + true, + exclusiveStartPartitions + ) + ); previous = current; - exclusive = true; } - addSequence(new SequenceMetadata<>( - previous.getKey(), - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), - previous.getValue(), - endOffsets, - false, - exclusive ? previous.getValue().keySet() : null - )); + final Set exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence( + previous.getValue() + ); + addSequence( + new SequenceMetadata<>( + previous.getKey(), + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), + previous.getValue(), + endOffsets, + false, + exclusiveStartPartitions + ) + ); } else { - addSequence(new SequenceMetadata<>( - 0, - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), - ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), - endOffsets, - false, - null - )); + addSequence( + new SequenceMetadata<>( + 0, + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), + ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), + endOffsets, + false, + ioConfig.getStartSequenceNumbers().getExclusivePartitions() + ) + ); } } log.info("Starting with sequences: %s", sequences); + } + + private TaskStatus runInternal(TaskToolbox toolbox) throws Exception + { + log.info("SeekableStream indexing task starting up!"); + startTime = DateTimes.nowUtc(); + status = Status.STARTING; + + setToolbox(toolbox); + initializeSequences(); if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); @@ -392,10 +425,10 @@ public abstract class SeekableStreamIndexTaskRunner nextCheckpointTime) { - sequenceToCheckpoint = sequences.get(sequences.size() - 1); + sequenceToCheckpoint = getLastSequenceMetadata(); } if (sequenceToCheckpoint != null && stillReading) { Preconditions.checkArgument( - sequences.get(sequences.size() - 1) + getLastSequenceMetadata() .getSequenceName() .equals(sequenceToCheckpoint.getSequenceName()), "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", @@ -649,7 +682,7 @@ public abstract class SeekableStreamIndexTaskRunner( stream, sequenceToCheckpoint.getStartOffsets(), - ioConfig.getStartSequenceNumbers().getExclusivePartitions() + sequenceToCheckpoint.getExclusiveStartPartitions() ) ), createDataSourceMetadata( @@ -1060,7 +1093,7 @@ public abstract class SeekableStreamIndexTaskRunner lastMetadata = getLastSequenceMetadata(); + if (!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) { + throw new ISE( + "Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]", + sequenceMetadata.getExclusiveStartPartitions(), + lastMetadata + ); + } + } + // Actually do the add. sequences.add(sequenceMetadata); } + + private SequenceMetadata getLastSequenceMetadata() + { + Preconditions.checkState(!sequences.isEmpty(), "Empty sequences"); + return sequences.get(sequences.size() - 1); + } /** * Returns true if the given record has already been read, based on lastReadOffsets. @@ -1453,7 +1503,7 @@ public abstract class SeekableStreamIndexTaskRunner 0, "WTH?! No Sequences found to set end sequences"); - final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); + final SequenceMetadata latestSequence = getLastSequenceMetadata(); final Set exclusiveStartPartitions; if (isEndOffsetExclusive()) { @@ -1545,6 +1595,12 @@ public abstract class SeekableStreamIndexTaskRunner> getSequences() + { + return sequences; + } + @GET @Path("/checkpoints") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 9cbafd0c392..20eec3591ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -216,11 +216,12 @@ public class SequenceMetadata lock.lock(); try { return "SequenceMetadata{" + - "sequenceName='" + sequenceName + '\'' + - ", sequenceId=" + sequenceId + - ", startOffsets=" + startOffsets + - ", endOffsets=" + endOffsets + + "sequenceId=" + sequenceId + + ", sequenceName='" + sequenceName + '\'' + ", assignments=" + assignments + + ", startOffsets=" + startOffsets + + ", exclusiveStartPartitions=" + exclusiveStartPartitions + + ", endOffsets=" + endOffsets + ", sentinel=" + sentinel + ", checkpointed=" + checkpointed + '}'; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ab7f3bcbd7c..265b3a32936 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -121,6 +121,7 @@ import java.util.stream.Stream; public abstract class SeekableStreamSupervisor implements Supervisor { public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED"; + public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120; @@ -163,7 +164,7 @@ public abstract class SeekableStreamSupervisor startingSequences, Optional minimumMessageTime, Optional maximumMessageTime, - Set exclusiveStartSequenceNumberPartitions + @Nullable Set exclusiveStartSequenceNumberPartitions ) { this.groupId = groupId; @@ -173,7 +174,7 @@ public abstract class SeekableStreamSupervisor(); + : Collections.emptySet(); this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime); } @@ -2347,7 +2348,7 @@ public abstract class SeekableStreamSupervisor exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions; + Set exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups + .get(groupId) + .exclusiveStartSequenceNumberPartitions; DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull(); DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull(); @@ -2660,6 +2663,16 @@ public abstract class SeekableStreamSupervisor createBaseTaskContexts() + { + final Map contexts = new HashMap<>(); + contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + if (spec.getContext() != null) { + contexts.putAll(spec.getContext()); + } + return contexts; + } + /** * creates a specific task IOConfig instance for Kafka/Kinesis * @@ -2827,5 +2840,5 @@ public abstract class SeekableStreamSupervisor context; protected final ServiceEmitter emitter; protected final DruidMonitorSchedulerConfig monitorSchedulerConfig; @@ -61,7 +63,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig, @JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig, - @JsonProperty("context") Map context, + @JsonProperty("context") @Nullable Map context, @JsonProperty("suspended") Boolean suspended, @JacksonInject TaskStorage taskStorage, @JacksonInject TaskMaster taskMaster, @@ -107,6 +109,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec return ioConfig; } + @Nullable @JsonProperty public Map getContext() { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 51e33db1261..c234148f0b7 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -891,8 +891,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor if (!startMetadataMatchesExisting) { // Not in the desired start state. - log.info("Not updating metadata, existing state is not the expected start state."); - log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata); + log.error( + "Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].", + oldCommitMetadataBytesFromDb, + startMetadata + ); return DataSourceMetadataUpdateResult.FAILURE; }