Fix exclusive start partitions for sequenceMetadata (#7339)

* Fix exclusvie start partitions for sequenceMetadata

* add empty check
This commit is contained in:
Jihoon Son 2019-03-26 14:39:07 -07:00 committed by GitHub
parent 105b4fa237
commit 5294277cb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 280 additions and 164 deletions

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@ -80,6 +79,11 @@ import java.util.stream.Collectors;
*/
public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
{
public static final TypeReference<TreeMap<Integer, Map<Integer, Long>>> CHECKPOINTS_TYPE_REF =
new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
};
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@ -229,20 +233,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
final String checkpoints = sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = spec.getContext() == null
? ImmutableMap.of(
"checkpoints",
checkpoints,
IS_INCREMENTAL_HANDOFF_SUPPORTED,
true
) : ImmutableMap.<String, Object>builder()
.put("checkpoints", checkpoints)
.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
.putAll(spec.getContext())
.build();
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
@ -358,7 +351,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
}
@Override
protected boolean useExclusiveStartSequenceNumberForStartSequence()
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return false;
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -67,6 +68,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@ -2143,9 +2145,10 @@ public class KafkaIndexTaskTest
// and this task should start reading from offset 2 for partition 0
sequences.put(1, ImmutableMap.of(0, 2L));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
{
}).writeValueAsString(sequences));
context.put(
SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
);
final KafkaIndexTask task = createTask(
null,
@ -2454,7 +2457,7 @@ public class KafkaIndexTaskTest
private KafkaIndexTask createTask(
final String taskId,
final KafkaIndexTaskIOConfig ioConfig
)
) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig);
}
@ -2463,7 +2466,7 @@ public class KafkaIndexTaskTest
final String taskId,
final KafkaIndexTaskIOConfig ioConfig,
final Map<String, Object> context
)
) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
}
@ -2472,7 +2475,18 @@ public class KafkaIndexTaskTest
final String taskId,
final DataSchema dataSchema,
final KafkaIndexTaskIOConfig ioConfig
)
) throws JsonProcessingException
{
final Map<String, Object> context = new HashMap<>();
return createTask(taskId, dataSchema, ioConfig, context);
}
private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIndexTaskIOConfig ioConfig,
final Map<String, Object> context
) throws JsonProcessingException
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
1000,
@ -2493,57 +2507,17 @@ public class KafkaIndexTaskTest
maxParseExceptions,
maxSavedParseExceptions
);
final Map<String, Object> context = isIncrementalHandoffSupported
? ImmutableMap.of(
SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED,
true
)
: null;
final KafkaIndexTask task = new KafkaIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
tuningConfig,
ioConfig,
context,
null,
null,
rowIngestionMetersFactory,
objectMapper
);
task.setPollRetryMs(POLL_RETRY_MS);
return task;
}
private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIndexTaskIOConfig ioConfig,
final Map<String, Object> context
)
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
1000,
null,
maxRowsPerSegment,
null,
new Period("P1Y"),
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
null,
null,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
if (isIncrementalHandoffSupported) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = objectMapper
.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
}
}
final KafkaIndexTask task = new KafkaIndexTask(
@ -2629,8 +2603,8 @@ public class KafkaIndexTaskTest
objectMapper.registerModule(module);
}
final TaskConfig taskConfig = new TaskConfig(
new File(directory, "taskBaseDir").getPath(),
null,
new File(directory, "baseDir").getPath(),
new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,

View File

@ -75,6 +75,11 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF =
new TypeReference<TreeMap<Integer, Map<String, String>>>()
{
};
private static final String NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
@ -151,20 +156,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
final String checkpoints = sortingMapper.writerFor(new TypeReference<TreeMap<Integer, Map<String, String>>>()
{
}).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = spec.getContext() == null
? ImmutableMap.of(
"checkpoints",
checkpoints,
IS_INCREMENTAL_HANDOFF_SUPPORTED,
true
) : ImmutableMap.<String, Object>builder()
.put("checkpoints", checkpoints)
.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
.putAll(spec.getContext())
.build();
final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
final Map<String, Object> context = createBaseTaskContexts();
context.put(CHECKPOINTS_CTX_KEY, checkpoints);
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
@ -313,7 +308,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
}
@Override
protected boolean useExclusiveStartSequenceNumberForStartSequence()
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return true;
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
@ -74,6 +75,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.MetadataTaskStorage;
@ -83,6 +85,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@ -178,6 +181,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -509,7 +513,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 120_000L)
public void testIncrementalHandOff() throws Exception
{
@ -736,7 +739,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
DATA_SCHEMA.getDataSource(),
0,
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, ImmutableSet.of())
new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, currentOffsets.keySet())
),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, nextOffsets))
)
@ -1949,7 +1952,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
new KinesisIndexTaskIOConfig(
null,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of(shardId1)),
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
true,
null,
@ -2047,10 +2050,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
)
);
final SeekableStreamStartSequenceNumbers<String, String> checkpoint1 = new SeekableStreamStartSequenceNumbers<>(
final SeekableStreamEndSequenceNumbers<String, String> checkpoint1 = new SeekableStreamEndSequenceNumbers<>(
stream,
ImmutableMap.of(shardId1, "4"),
ImmutableSet.of()
ImmutableMap.of(shardId1, "4")
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
@ -2095,7 +2097,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
new KinesisIndexTaskIOConfig(
null,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of(shardId1)),
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
true,
null,
@ -2271,9 +2273,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
// and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
sequences.put(1, ImmutableMap.of(shardId1, "1"));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<String, String>>>()
{
}).writeValueAsString(sequences));
context.put(
SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
);
final KinesisIndexTask task = createTask(
@ -2473,6 +2476,75 @@ public class KinesisIndexTaskTest extends EasyMockSupport
);
}
@Test
public void testSequencesFromContext() throws IOException
{
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
// Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
// and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
checkpoints.put(0, ImmutableMap.of(shardId0, "0", shardId1, "0"));
checkpoints.put(1, ImmutableMap.of(shardId0, "0", shardId1, "1"));
checkpoints.put(2, ImmutableMap.of(shardId0, "1", shardId1, "3"));
final Map<String, Object> context = new HashMap<>();
context.put("checkpoints", objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints));
final KinesisIndexTask task = createTask(
"task1",
DATA_SCHEMA,
new KinesisIndexTaskIOConfig(
null,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(
stream,
ImmutableMap.of(shardId0, "0", shardId1, "0"),
ImmutableSet.of(shardId0)
),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "5")),
true,
null,
null,
"awsEndpoint",
null,
null,
null,
null,
false
),
context
);
task.getRunner().setToolbox(toolboxFactory.build(task));
task.getRunner().initializeSequences();
final CopyOnWriteArrayList<SequenceMetadata<String, String>> sequences = task.getRunner().getSequences();
Assert.assertEquals(3, sequences.size());
SequenceMetadata<String, String> sequenceMetadata = sequences.get(0);
Assert.assertEquals(checkpoints.get(0), sequenceMetadata.getStartOffsets());
Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getEndOffsets());
Assert.assertEquals(
task.getIOConfig().getStartSequenceNumbers().getExclusivePartitions(),
sequenceMetadata.getExclusiveStartPartitions()
);
Assert.assertTrue(sequenceMetadata.isCheckpointed());
sequenceMetadata = sequences.get(1);
Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getStartOffsets());
Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getEndOffsets());
Assert.assertEquals(checkpoints.get(1).keySet(), sequenceMetadata.getExclusiveStartPartitions());
Assert.assertTrue(sequenceMetadata.isCheckpointed());
sequenceMetadata = sequences.get(2);
Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getStartOffsets());
Assert.assertEquals(
task.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap(),
sequenceMetadata.getEndOffsets()
);
Assert.assertEquals(checkpoints.get(2).keySet(), sequenceMetadata.getExclusiveStartPartitions());
Assert.assertFalse(sequenceMetadata.isCheckpointed());
}
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
@ -2522,7 +2594,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
private KinesisIndexTask createTask(
final String taskId,
final KinesisIndexTaskIOConfig ioConfig
)
) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig, null);
}
@ -2531,7 +2603,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final String taskId,
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig
)
) throws JsonProcessingException
{
return createTask(taskId, dataSchema, ioConfig, null);
}
@ -2541,7 +2613,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig,
@Nullable final Map<String, Object> context
)
) throws JsonProcessingException
{
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
maxRowsInMemory,
@ -2578,13 +2650,22 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTaskIOConfig ioConfig,
final KinesisIndexTaskTuningConfig tuningConfig,
@Nullable final Map<String, Object> context
)
) throws JsonProcessingException
{
if (context != null) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
final String checkpointsJson = objectMapper
.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
.writeValueAsString(checkpoints);
context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
}
}
final KinesisIndexTask task = new TestableKinesisIndexTask(
return new TestableKinesisIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
@ -2596,8 +2677,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
rowIngestionMetersFactory,
null
);
return task;
}
private static DataSchema cloneDataSchema(final DataSchema dataSchema)
@ -2657,8 +2736,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
objectMapper.registerModule(module);
}
final TaskConfig taskConfig = new TaskConfig(
new File(directory, "taskBaseDir").getPath(),
null,
new File(directory, "baseDir").getPath(),
new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
@ -159,8 +158,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
@Override
public String toString()
{
return "SegmentInsertAction{" +
"segments=" + Iterables.transform(segments, DataSegment::getId) +
return "SegmentTransactionalInsertAction{" +
"segments=" + segments +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
'}';

View File

@ -59,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -239,7 +240,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
resetNextCheckpointTime();
}
public TaskStatus run(TaskToolbox toolbox)
{
try {
@ -256,57 +256,90 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(
Map<PartitionIdType, SequenceOffsetType> sequenceStartOffsets
)
{
log.info("SeekableStream indexing task starting up!");
startTime = DateTimes.nowUtc();
status = Status.STARTING;
this.toolbox = toolbox;
if (sequenceStartOffsets.equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap())) {
return ioConfig.getStartSequenceNumbers().getExclusivePartitions();
} else {
return isEndOffsetExclusive() ? Collections.emptySet() : sequenceStartOffsets.keySet();
}
}
@VisibleForTesting
public void setToolbox(TaskToolbox toolbox)
{
this.toolbox = toolbox;
}
@VisibleForTesting
public void initializeSequences() throws IOException
{
if (!restoreSequences()) {
final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpoints = getCheckPointsFromContext(
toolbox,
task.getContextValue("checkpoints")
task.getContextValue(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)
);
if (checkpoints != null) {
boolean exclusive = false;
Iterator<Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>>> sequenceOffsets = checkpoints.entrySet()
.iterator();
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
addSequence(new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
current.getValue(),
true,
exclusive ? previous.getValue().keySet() : null
));
final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
previous.getValue()
);
addSequence(
new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
current.getValue(),
true,
exclusiveStartPartitions
)
);
previous = current;
exclusive = true;
}
addSequence(new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
endOffsets,
false,
exclusive ? previous.getValue().keySet() : null
));
final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
previous.getValue()
);
addSequence(
new SequenceMetadata<>(
previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(),
endOffsets,
false,
exclusiveStartPartitions
)
);
} else {
addSequence(new SequenceMetadata<>(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
endOffsets,
false,
null
));
addSequence(
new SequenceMetadata<>(
0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
endOffsets,
false,
ioConfig.getStartSequenceNumbers().getExclusivePartitions()
)
);
}
}
log.info("Starting with sequences: %s", sequences);
}
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
log.info("SeekableStream indexing task starting up!");
startTime = DateTimes.nowUtc();
status = Status.STARTING;
setToolbox(toolbox);
initializeSequences();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@ -392,10 +425,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// sequences size can be 0 only when all sequences got published and task stopped before it could finish
// which is super rare
if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
if (sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
this.endOffsets.putAll(sequences.size() == 0
? currOffsets
: sequences.get(sequences.size() - 1).getEndOffsets());
: getLastSequenceMetadata().getEndOffsets());
log.info("End sequences changed to [%s]", endOffsets);
}
}
@ -479,7 +512,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
if (stopRequested.get() || sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
status = Status.PUBLISHING;
}
@ -628,12 +661,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
if (System.currentTimeMillis() > nextCheckpointTime) {
sequenceToCheckpoint = sequences.get(sequences.size() - 1);
sequenceToCheckpoint = getLastSequenceMetadata();
}
if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
sequences.get(sequences.size() - 1)
getLastSequenceMetadata()
.getSequenceName()
.equals(sequenceToCheckpoint.getSequenceName()),
"Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s",
@ -649,7 +682,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
new SeekableStreamStartSequenceNumbers<>(
stream,
sequenceToCheckpoint.getStartOffsets(),
ioConfig.getStartSequenceNumbers().getExclusivePartitions()
sequenceToCheckpoint.getExclusiveStartPartitions()
)
),
createDataSourceMetadata(
@ -1060,7 +1093,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final SequenceOffsetType startOffset = entry.getValue();
if (!sequences.isEmpty()) {
final SequenceOffsetType priorOffset = sequences.get(sequences.size() - 1).endOffsets.get(partition);
final SequenceOffsetType priorOffset = getLastSequenceMetadata().endOffsets.get(partition);
if (!startOffset.equals(priorOffset)) {
throw new ISE(
@ -1072,9 +1105,26 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
if (!isEndOffsetExclusive() && !sequences.isEmpty()) {
final SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata = getLastSequenceMetadata();
if (!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) {
throw new ISE(
"Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]",
sequenceMetadata.getExclusiveStartPartitions(),
lastMetadata
);
}
}
// Actually do the add.
sequences.add(sequenceMetadata);
}
private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata()
{
Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
return sequences.get(sequences.size() - 1);
}
/**
* Returns true if the given record has already been read, based on lastReadOffsets.
@ -1453,7 +1503,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// and after acquiring pauseLock to correctly guard against duplicate requests
Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences");
final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = sequences.get(sequences.size() - 1);
final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = getLastSequenceMetadata();
final Set<PartitionIdType> exclusiveStartPartitions;
if (isEndOffsetExclusive()) {
@ -1545,6 +1595,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
}
@VisibleForTesting
public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> getSequences()
{
return sequences;
}
@GET
@Path("/checkpoints")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -216,11 +216,12 @@ public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
"sequenceId=" + sequenceId +
", sequenceName='" + sequenceName + '\'' +
", assignments=" + assignments +
", startOffsets=" + startOffsets +
", exclusiveStartPartitions=" + exclusiveStartPartitions +
", endOffsets=" + endOffsets +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';

View File

@ -121,6 +121,7 @@ import java.util.stream.Stream;
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
@ -163,7 +164,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
@Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
this.groupId = groupId;
@ -173,7 +174,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.checkpointSequences.put(0, startingSequences);
this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
? exclusiveStartSequenceNumberPartitions
: new HashSet<>();
: Collections.emptySet();
this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
}
@ -2347,7 +2348,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!getNotSetMarker().equals(sequence)) {
// if we are given a startingOffset (set by a previous task group which is pending completion) then use it
if (!isEndOfShard(sequence)) {
builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence()));
builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
// if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
@ -2393,7 +2394,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
}
return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence());
return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence());
} else {
boolean useEarliestSequenceNumber = ioConfig.isUseEarliestSequenceNumber();
if (subsequentlyDiscoveredPartitions.contains(partition)) {
@ -2469,7 +2470,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (PartitionIdType partition : startPartitions.keySet()) {
endPartitions.put(partition, getEndOfPartitionMarker());
}
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions;
Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups
.get(groupId)
.exclusiveStartSequenceNumberPartitions;
DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
@ -2660,6 +2663,16 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
protected Map<String, Object> createBaseTaskContexts()
{
final Map<String, Object> contexts = new HashMap<>();
contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
if (spec.getContext() != null) {
contexts.putAll(spec.getContext());
}
return contexts;
}
/**
* creates a specific task IOConfig instance for Kafka/Kinesis
*
@ -2827,5 +2840,5 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following
* sequences. In Kafka, start offsets are always inclusive.
*/
protected abstract boolean useExclusiveStartSequenceNumberForStartSequence();
protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence();
}

View File

@ -37,6 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@ -51,6 +52,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
private final DataSchema dataSchema;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
private final SeekableStreamSupervisorIOConfig ioConfig;
@Nullable
private final Map<String, Object> context;
protected final ServiceEmitter emitter;
protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
@ -61,7 +63,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("context") @Nullable Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@ -107,6 +109,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
return ioConfig;
}
@Nullable
@JsonProperty
public Map<String, Object> getContext()
{

View File

@ -891,8 +891,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.info("Not updating metadata, existing state is not the expected start state.");
log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata);
log.error(
"Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].",
oldCommitMetadataBytesFromDb,
startMetadata
);
return DataSourceMetadataUpdateResult.FAILURE;
}