fix SequenceMetadata deserialization (#7256)

* wip

* fix tests, stop reading if we are at end offset

* fix build

* remove restore at end offsets fix in favor of a separate PR

* use typereference from method for serialization too
This commit is contained in:
Clint Wylie 2019-03-13 14:29:39 -07:00 committed by Gian Merlino
parent f9d99b245b
commit fb1489d313
8 changed files with 754 additions and 406 deletions

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -111,7 +112,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
} }
@Override @Override
protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata( protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper, ObjectMapper mapper,
Object object Object object
) )
@ -225,6 +226,14 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
return false; return false;
} }
@Override
public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
{
};
}
@Nullable @Nullable
@Override @Override
protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext( protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.kafka; package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -50,6 +51,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -610,6 +612,14 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
return false; return false;
} }
@Override
public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
{
};
}
@Nonnull @Nonnull
@Override @Override
protected List<OrderedPartitionableRecord<Integer, Long>> getRecords( protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
@ -709,7 +719,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
@Override @Override
protected SeekableStreamPartitions<Integer, Long> deserializeSeekableStreamPartitionsFromMetadata( protected SeekableStreamPartitions<Integer, Long> deserializePartitionsFromMetadata(
ObjectMapper mapper, ObjectMapper mapper,
Object object Object object
) )

View File

@ -293,6 +293,26 @@ public class KafkaIndexTaskTest
); );
} }
private static List<ProducerRecord<byte[], byte[]>> generateSinglePartitionRecords(String topic)
{
return ImmutableList.of(
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
);
}
private static String getTopicName() private static String getTopicName()
{ {
return "topic" + topicPostfix++; return "topic" + topicPostfix++;
@ -694,7 +714,7 @@ public class KafkaIndexTaskTest
} }
final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); final Map<Integer, Long> nextOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets)); Assert.assertTrue(checkpoint2.getPartitionSequenceNumberMap().equals(nextOffsets));
task.getRunner().setEndOffsets(nextOffsets, false); task.getRunner().setEndOffsets(nextOffsets, false);
@ -726,7 +746,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable()); Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway()); Assert.assertEquals(1, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata // Check published metadata
SegmentDescriptor desc1 = sd(task, "2008/P1D", 0); SegmentDescriptor desc1 = sd(task, "2008/P1D", 0);
SegmentDescriptor desc2 = sd(task, "2009/P1D", 0); SegmentDescriptor desc2 = sd(task, "2009/P1D", 0);
@ -739,7 +759,7 @@ public class KafkaIndexTaskTest
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())); metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals( Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))), new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L, 1, 2L))),
@ -867,23 +887,7 @@ public class KafkaIndexTaskTest
if (!isIncrementalHandoffSupported) { if (!isIncrementalHandoffSupported) {
return; return;
} }
records = generateSinglePartitionRecords(topic);
List<ProducerRecord<byte[], byte[]>> records = ImmutableList.of(
new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0"; final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen // as soon as any segment has more than one record, incremental publishing should happen
@ -901,22 +905,14 @@ public class KafkaIndexTaskTest
Map<String, Object> consumerProps = kafkaServer.consumerProperties(); Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1"); consumerProps.put("max.poll.records", "1");
final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>( final SeekableStreamPartitions<Integer, Long> startPartitions =
topic, new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L));
ImmutableMap.of(0, 0L) final SeekableStreamPartitions<Integer, Long> checkpoint1 =
); new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 5L));
final SeekableStreamPartitions<Integer, Long> checkpoint1 = new SeekableStreamPartitions<>( final SeekableStreamPartitions<Integer, Long> checkpoint2 =
topic, new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 12L));
ImmutableMap.of(0, 5L) final SeekableStreamPartitions<Integer, Long> endPartitions =
); new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, Long.MAX_VALUE));
final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 12L)
);
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, Long.MAX_VALUE)
);
final KafkaIndexTask task = createTask( final KafkaIndexTask task = createTask(
null, null,
@ -1859,6 +1855,119 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
} }
@Test(timeout = 60_000L)
public void testRestoreAfterPersistingSequences() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
records = generateSinglePartitionRecords(topic);
maxRowsPerSegment = 2;
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final KafkaIndexTask task1 = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
final SeekableStreamPartitions<Integer, Long> checkpoint = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 5L)
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
// Insert some data, but not enough for the task to finish
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.limit(records, 5)) {
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
while (task1.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint.getPartitionSequenceNumberMap(), currentOffsets);
// Set endOffsets to persist sequences
task1.getRunner().setEndOffsets(ImmutableMap.of(0, 5L), false);
// Stop without publishing segment
task1.stopGracefully(toolboxFactory.build(task1).getConfig());
unlockAppenderatorBasePersistDirForTask(task1);
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
// Start a new task
final KafkaIndexTask task2 = createTask(
task1.getId(),
new KafkaIndexTaskIOConfig(
0,
"sequence0",
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 0L)),
new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L)),
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
final ListenableFuture<TaskStatus> future2 = runTask(task2);
// Wait for the task to start reading
// Insert remaining data
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : Iterables.skip(records, 5)) {
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
// Check metrics
Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(4, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0);
SegmentDescriptor desc2 = sd(task1, "2008/P1D", 1);
SegmentDescriptor desc3 = sd(task1, "2009/P1D", 0);
SegmentDescriptor desc4 = sd(task1, "2009/P1D", 1);
SegmentDescriptor desc5 = sd(task1, "2010/P1D", 0);
SegmentDescriptor desc6 = sd(task1, "2011/P1D", 0);
SegmentDescriptor desc7 = sd(task1, "2012/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 10L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
}
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testRunWithPauseAndResume() throws Exception public void testRunWithPauseAndResume() throws Exception
{ {

View File

@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions; import org.apache.druid.indexing.seekablestream.SeekableStreamPartitions;
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
@ -92,7 +93,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
} }
@Override @Override
protected SeekableStreamPartitions<String, String> deserializeSeekableStreamPartitionsFromMetadata( protected SeekableStreamPartitions<String, String> deserializePartitionsFromMetadata(
ObjectMapper mapper, ObjectMapper mapper,
Object object Object object
) )
@ -176,6 +177,14 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum); return KinesisSequenceNumber.END_OF_SHARD_MARKER.equals(seqNum);
} }
@Override
public TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference()
{
return new TypeReference<List<SequenceMetadata<String, String>>>()
{
};
}
@Nullable @Nullable
@Override @Override
protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext( protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(

View File

@ -198,28 +198,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
private static String shardId1 = "1"; private static String shardId1 = "1";
private static String shardId0 = "0"; private static String shardId0 = "0";
private static KinesisRecordSupplier recordSupplier; private static KinesisRecordSupplier recordSupplier;
private static List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of( private static List<OrderedPartitionableRecord<String, String>> records;
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(
stream,
"1",
"5",
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
),
new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
);
private static ServiceEmitter emitter; private static ServiceEmitter emitter;
private static ListeningExecutorService taskExec; private static ListeningExecutorService taskExec;
@ -315,6 +294,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
maxSavedParseExceptions = null; maxSavedParseExceptions = null;
skipAvailabilityCheck = false; skipAvailabilityCheck = false;
doHandoff = true; doHandoff = true;
records = generateRecords(stream);
reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json"); reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json");
maxRecordsPerPoll = 1; maxRecordsPerPoll = 1;
@ -347,6 +327,52 @@ public class KinesisIndexTaskTest extends EasyMockSupport
emitter.close(); emitter.close();
} }
private static List<OrderedPartitionableRecord<String, String>> generateRecords(String stream)
{
return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(
stream,
"1",
"5",
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
),
new OrderedPartitionableRecord<>(stream, "1", "6", Collections.singletonList(StringUtils.toUtf8("unparseable"))),
new OrderedPartitionableRecord<>(stream, "1", "7", Collections.singletonList(StringUtils.toUtf8("unparseable2"))),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(StringUtils.toUtf8("{}"))),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2013", "f", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2049", "f", "y", "notanumber", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2049", "f", "y", "10", "notanumber", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2049", "f", "y", "10", "20.0", "notanumber")),
new OrderedPartitionableRecord<>(stream, "0", "0", jb("2012", "g", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "0", "1", jb("2011", "h", "y", "10", "20.0", "1.0"))
);
}
private static List<OrderedPartitionableRecord<String, String>> generateSinglePartitionRecords(String stream)
{
return ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
);
}
@Test(timeout = 120_000L) @Test(timeout = 120_000L)
public void testRunAfterDataInserted() throws Exception public void testRunAfterDataInserted() throws Exception
{ {
@ -2213,6 +2239,165 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
} }
@Test(timeout = 120_000L)
public void testRestoreAfterPersistingSequences() throws Exception
{
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
records = generateSinglePartitionRecords(stream);
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();
// simulate 1 record at a time
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(0)))
.once()
.andReturn(Collections.singletonList(records.get(1)))
.once()
.andReturn(Collections.singletonList(records.get(2)))
.once()
.andReturn(Collections.singletonList(records.get(3)))
.once()
.andReturn(Collections.singletonList(records.get(4)))
.once()
.andReturn(Collections.emptyList())
.anyTimes();
replayAll();
final KinesisIndexTask task1 = createTask(
"task1",
new KinesisIndexTaskIOConfig(
null,
"sequence0",
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"0"
)),
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"6"
)),
true,
null,
null,
"awsEndpoint",
null,
null,
null,
null,
null,
false
)
);
final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(shardId1, "4")
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
while (task1.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
Thread.sleep(10);
}
final Map<String, String> currentOffsets = ImmutableMap.copyOf(task1.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
task1.getRunner().setEndOffsets(currentOffsets, false);
// Stop without publishing segment
task1.stopGracefully(toolboxFactory.build(task1).getConfig());
unlockAppenderatorBasePersistDirForTask(task1);
Assert.assertEquals(TaskState.SUCCESS, future1.get().getStatusCode());
verifyAll();
reset(recordSupplier);
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();
expect(recordSupplier.poll(anyLong())).andReturn(Collections.singletonList(records.get(5)))
.once()
.andReturn(Collections.singletonList(records.get(6)))
.once()
.andReturn(Collections.emptyList())
.anyTimes();
recordSupplier.close();
expectLastCall();
replayAll();
// Start a new task
final KinesisIndexTask task2 = createTask(
task1.getId(),
new KinesisIndexTaskIOConfig(
null,
"sequence0",
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"0"
)),
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"6"
)),
true,
null,
null,
"awsEndpoint",
null,
null,
ImmutableSet.of(shardId1),
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future2 = runTask(task2);
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future2.get().getStatusCode());
verifyAll();
// Check metrics
Assert.assertEquals(5, task1.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task1.getRunner().getRowIngestionMeters().getThrownAway());
Assert.assertEquals(1, task2.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task2.getRunner().getRowIngestionMeters().getThrownAway());
// Check published segments & metadata
SegmentDescriptor desc1 = sd(task1, "2008/P1D", 0);
SegmentDescriptor desc2 = sd(task1, "2009/P1D", 0);
SegmentDescriptor desc3 = sd(task1, "2010/P1D", 0);
SegmentDescriptor desc4 = sd(task1, "2011/P1D", 0);
SegmentDescriptor desc5 = sd(task1, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5), publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(
new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"6"
))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
}
@Test(timeout = 120_000L) @Test(timeout = 120_000L)
public void testRunWithPauseAndResume() throws Exception public void testRunWithPauseAndResume() throws Exception
@ -2426,23 +2611,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
@Test(timeout = 5000L) @Test(timeout = 5000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{ {
final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of( records = generateSinglePartitionRecords(stream);
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0"; final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen // as soon as any segment has more than one record, incremental publishing should happen

View File

@ -20,8 +20,6 @@
package org.apache.druid.indexing.seekablestream; package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
@ -53,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction; import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.stats.RowIngestionMeters; import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.IndexTaskUtils;
@ -75,7 +72,6 @@ import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata; import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler; import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Access;
@ -142,8 +138,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class);
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
private final Map<PartitionIdType, SequenceOffsetType> endOffsets; private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets = new ConcurrentHashMap<>(); private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets = new ConcurrentHashMap<>();
@ -210,7 +206,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected volatile boolean pauseRequested = false; protected volatile boolean pauseRequested = false;
private volatile long nextCheckpointTime; private volatile long nextCheckpointTime;
private volatile CopyOnWriteArrayList<SequenceMetadata> sequences; private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequences;
private volatile Throwable backgroundThreadException; private volatile Throwable backgroundThreadException;
public SeekableStreamIndexTaskRunner( public SeekableStreamIndexTaskRunner(
@ -276,7 +272,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next(); Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
while (sequenceOffsets.hasNext()) { while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next(); Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
sequences.add(new SequenceMetadata( sequences.add(new SequenceMetadata<>(
previous.getKey(), previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(), previous.getValue(),
@ -287,7 +283,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
previous = current; previous = current;
exclusive = true; exclusive = true;
} }
sequences.add(new SequenceMetadata( sequences.add(new SequenceMetadata<>(
previous.getKey(), previous.getKey(),
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
previous.getValue(), previous.getValue(),
@ -296,7 +292,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusive ? previous.getValue().keySet() : null exclusive ? previous.getValue().keySet() : null
)); ));
} else { } else {
sequences.add(new SequenceMetadata( sequences.add(new SequenceMetadata<>(
0, 0,
StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
ioConfig.getStartPartitions().getPartitionSequenceNumberMap(), ioConfig.getStartPartitions().getPartitionSequenceNumberMap(),
@ -369,7 +365,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} else { } else {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata; final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> restoredNextPartitions = deserializeSeekableStreamPartitionsFromMetadata( final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> restoredNextPartitions = deserializePartitionsFromMetadata(
toolbox.getObjectMapper(), toolbox.getObjectMapper(),
restoredMetadataMap.get(METADATA_NEXT_PARTITIONS) restoredMetadataMap.get(METADATA_NEXT_PARTITIONS)
); );
@ -543,9 +539,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
boolean isPersistRequired = false; boolean isPersistRequired = false;
final SequenceMetadata sequenceToUse = sequences final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceToUse = sequences
.stream() .stream()
.filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) .filter(sequenceMetadata -> sequenceMetadata.canHandle(this, record))
.findFirst() .findFirst()
.orElse(null); .orElse(null);
@ -692,11 +688,11 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.PUBLISHING; status = Status.PUBLISHING;
} }
for (SequenceMetadata sequenceMetadata : sequences) { for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) { if (!publishingSequences.contains(sequenceMetadata.getSequenceName())) {
// this is done to prevent checks in sequence specific commit supplier from failing // this is done to prevent checks in sequence specific commit supplier from failing
sequenceMetadata.setEndOffsets(currOffsets); sequenceMetadata.setEndOffsets(currOffsets);
sequenceMetadata.updateAssignments(currOffsets); sequenceMetadata.updateAssignments(this, currOffsets);
publishingSequences.add(sequenceMetadata.getSequenceName()); publishingSequences.add(sequenceMetadata.getSequenceName());
// persist already done in finally, so directly add to publishQueue // persist already done in finally, so directly add to publishQueue
publishAndRegisterHandoff(sequenceMetadata); publishAndRegisterHandoff(sequenceMetadata);
@ -812,11 +808,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return TaskStatus.success(task.getId()); return TaskStatus.success(task.getId());
} }
/**
* checks if the input seqNum marks end of shard. Used by Kinesis only
*/
protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
{ {
// Check if any publishFuture failed. // Check if any publishFuture failed.
@ -846,14 +837,14 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
handOffWaitList.removeAll(handoffFinished); handOffWaitList.removeAll(handoffFinished);
} }
private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata)
{ {
log.info("Publishing segments for sequence [%s]", sequenceMetadata); log.info("Publishing segments for sequence [%s]", sequenceMetadata);
final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform( final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
driver.publish( driver.publish(
sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), sequenceMetadata.createPublisher(this, toolbox, ioConfig.isUseTransaction()),
sequenceMetadata.getCommitterSupplier(stream, lastPersistedOffsets).get(), sequenceMetadata.getCommitterSupplier(this, stream, lastPersistedOffsets).get(),
Collections.singletonList(sequenceMetadata.getSequenceName()) Collections.singletonList(sequenceMetadata.getSequenceName())
), ),
(Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> { (Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
@ -938,11 +929,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final File sequencesPersistFile = getSequencesPersistFile(toolbox); final File sequencesPersistFile = getSequencesPersistFile(toolbox);
if (sequencesPersistFile.exists()) { if (sequencesPersistFile.exists()) {
sequences = new CopyOnWriteArrayList<>( sequences = new CopyOnWriteArrayList<>(
toolbox.getObjectMapper().<List<SequenceMetadata>>readValue( toolbox.getObjectMapper().<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>>readValue(
sequencesPersistFile, sequencesPersistFile,
new TypeReference<List<SequenceMetadata>>() getSequenceMetadataTypeReference()
{
}
) )
); );
return true; return true;
@ -955,9 +944,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{ {
log.info("Persisting Sequences Metadata [%s]", sequences); log.info("Persisting Sequences Metadata [%s]", sequences);
toolbox.getObjectMapper().writerWithType( toolbox.getObjectMapper().writerWithType(
new TypeReference<List<SequenceMetadata>>() getSequenceMetadataTypeReference()
{
}
).writeValue(getSequencesPersistFile(toolbox), sequences); ).writeValue(getSequencesPersistFile(toolbox), sequences);
} }
@ -1002,8 +989,8 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier) private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
throws InterruptedException throws InterruptedException
{ {
for (SequenceMetadata sequenceMetadata : sequences) { for (SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata : sequences) {
sequenceMetadata.updateAssignments(currOffsets); sequenceMetadata.updateAssignments(this, currOffsets);
if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) { if (!sequenceMetadata.isOpen() && !publishingSequences.contains(sequenceMetadata.getSequenceName())) {
publishingSequences.add(sequenceMetadata.getSequenceName()); publishingSequences.add(sequenceMetadata.getSequenceName());
try { try {
@ -1378,7 +1365,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// and after acquiring pauseLock to correctly guard against duplicate requests // and after acquiring pauseLock to correctly guard against duplicate requests
Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences"); Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences");
final SequenceMetadata latestSequence = sequences.get(sequences.size() - 1); final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = sequences.get(sequences.size() - 1);
// if a partition has not been read yet (contained in initialOffsetsSnapshot), then // if a partition has not been read yet (contained in initialOffsetsSnapshot), then
// do not mark the starting sequence number as exclusive // do not mark the starting sequence number as exclusive
Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet() Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
@ -1389,7 +1376,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
.collect(Collectors.toSet()); .collect(Collectors.toSet());
if ((latestSequence.getStartOffsets().equals(sequenceNumbers) if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
&& latestSequence.exclusiveStartPartitions.equals(exclusivePartitions) && latestSequence.getExclusiveStartPartitions().equals(exclusivePartitions)
&& !finish) && !finish)
|| (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
@ -1409,8 +1396,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceNumbers.entrySet()) { for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceNumbers.entrySet()) {
if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(currOffsets.get(entry.getKey()))) < 0) {
< 0) {
return Response.status(Response.Status.BAD_REQUEST) return Response.status(Response.Status.BAD_REQUEST)
.entity( .entity(
StringUtils.format( StringUtils.format(
@ -1433,7 +1419,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusiveStartingPartitions.addAll(exclusivePartitions); exclusiveStartingPartitions.addAll(exclusivePartitions);
// create new sequence // create new sequence
final SequenceMetadata newSequence = new SequenceMetadata( final SequenceMetadata<PartitionIdType, SequenceOffsetType> newSequence = new SequenceMetadata<>(
latestSequence.getSequenceId() + 1, latestSequence.getSequenceId() + 1,
StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
sequenceNumbers, sequenceNumbers,
@ -1596,291 +1582,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return startTime; return startTime;
} }
private class SequenceMetadata
{
private final int sequenceId;
private final String sequenceName;
private final Set<PartitionIdType> exclusiveStartPartitions;
private final Set<PartitionIdType> assignments;
private final boolean sentinel;
private boolean checkpointed;
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
* {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
*/
private final ReentrantLock lock = new ReentrantLock();
final Map<PartitionIdType, SequenceOffsetType> startOffsets;
final Map<PartitionIdType, SequenceOffsetType> endOffsets;
@JsonCreator
public SequenceMetadata(
@JsonProperty("sequenceId") int sequenceId,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets,
@JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets,
@JsonProperty("checkpointed") boolean checkpointed,
@JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions
)
{
Preconditions.checkNotNull(sequenceName);
Preconditions.checkNotNull(startOffsets);
Preconditions.checkNotNull(endOffsets);
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
this.endOffsets = new HashMap<>(endOffsets);
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
this.exclusiveStartPartitions = exclusiveStartPartitions == null
? Collections.emptySet()
: exclusiveStartPartitions;
}
@JsonProperty
public Set<PartitionIdType> getExclusiveStartPartitions()
{
return exclusiveStartPartitions;
}
@JsonProperty
public int getSequenceId()
{
return sequenceId;
}
@JsonProperty
public boolean isCheckpointed()
{
lock.lock();
try {
return checkpointed;
}
finally {
lock.unlock();
}
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
{
return startOffsets;
}
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
{
lock.lock();
try {
return endOffsets;
}
finally {
lock.unlock();
}
}
@JsonProperty
public boolean isSentinel()
{
return sentinel;
}
void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
{
lock.lock();
try {
endOffsets.putAll(newEndOffsets);
checkpointed = true;
}
finally {
lock.unlock();
}
}
void updateAssignments(Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset)
{
lock.lock();
try {
assignments.clear();
nextPartitionOffset.forEach((key, value) -> {
if (endOffsets.get(key).equals(SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER)
|| createSequenceNumber(endOffsets.get(key)).compareTo(createSequenceNumber(nextPartitionOffset.get(key)))
> 0) {
assignments.add(key);
}
});
}
finally {
lock.unlock();
}
}
boolean isOpen()
{
return !assignments.isEmpty();
}
boolean canHandle(OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record)
{
lock.lock();
try {
final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = createSequenceNumber(endOffsets.get(record.getPartitionId()));
final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = createSequenceNumber(startOffsets.get(
record.getPartitionId()));
final OrderedSequenceNumber<SequenceOffsetType> recordOffset = createSequenceNumber(record.getSequenceNumber());
if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) {
return false;
}
boolean ret;
if (isStartingSequenceOffsetsExclusive()) {
ret = recordOffset.compareTo(partitionStartOffset)
>= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
} else {
ret = recordOffset.compareTo(partitionStartOffset) >= 0;
}
if (isEndSequenceOffsetsExclusive()) {
ret &= recordOffset.compareTo(partitionEndOffset) < 0;
} else {
ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
}
return ret;
}
finally {
lock.unlock();
}
}
@Override
public String toString()
{
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
}
finally {
lock.unlock();
}
}
Supplier<Committer> getCommitterSupplier(
String stream,
Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
)
{
// Set up committer.
return () ->
new Committer()
{
@Override
public Object getMetadata()
{
lock.lock();
try {
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till sequences [%s] have been consumed, also make"
+ " sure to call updateAssignments before using this committer",
endOffsets
);
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of sequences and persist that
for (Map.Entry<PartitionIdType, SequenceOffsetType> partitionOffset : endOffsets.entrySet()) {
SequenceOffsetType newOffsets = partitionOffset.getValue();
if (lastPersistedOffsets.containsKey(partitionOffset.getKey()) &&
createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey())).compareTo(
createSequenceNumber(newOffsets)) > 0) {
newOffsets = lastPersistedOffsets.get(partitionOffset.getKey());
}
lastPersistedOffsets.put(
partitionOffset.getKey(),
newOffsets
);
}
// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new SeekableStreamPartitions<>(stream, endOffsets)
);
}
finally {
lock.unlock();
}
}
@Override
public void run()
{
// Do nothing.
}
};
}
TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
{
return (segments, commitMetadata) -> {
final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> finalPartitions = deserializeSeekableStreamPartitionsFromMetadata(
toolbox.getObjectMapper(),
((Map) Preconditions
.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_PUBLISH_PARTITIONS)
);
// Sanity check, we should only be publishing things that match our desired end state.
if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
throw new ISE(
"WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].",
toString(),
commitMetadata
);
}
final SegmentTransactionalInsertAction action;
if (useTransaction) {
action = new SegmentTransactionalInsertAction(
segments,
createDataSourceMetadata(new SeekableStreamPartitions<>(
finalPartitions.getStream(),
getStartOffsets()
)),
createDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}
log.info("Publishing with isTransaction[%s].", useTransaction);
return toolbox.getTaskActionClient().submit(action);
};
}
}
private boolean verifyInitialRecordAndSkipExclusivePartition( private boolean verifyInitialRecordAndSkipExclusivePartition(
final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
) )
@ -1923,7 +1624,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
} }
/** /**
* deserailizes the checkpoints into of Map<sequenceId, Map<PartitionIdType, SequenceOffsetType>> * checks if the input seqNum marks end of shard. Used by Kinesis only
*/
protected abstract boolean isEndOfShard(SequenceOffsetType seqNum);
/**
* deserializes the checkpoints into of Map<sequenceId, Map<PartitionIdType, SequenceOffsetType>>
* *
* @param toolbox task toolbox * @param toolbox task toolbox
* @param checkpointsString the json-serialized checkpoint string * @param checkpointsString the json-serialized checkpoint string
@ -1939,7 +1645,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
) throws IOException; ) throws IOException;
/** /**
* Calculates the sequence number used to update `currentOffsets` after finishing reading a record. * Calculates the sequence number used to update `currentOffsets` after finished reading a record.
* In Kafka this returns sequenceNumeber + 1 since that's the next expected offset * In Kafka this returns sequenceNumeber + 1 since that's the next expected offset
* In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not * In Kinesis this simply returns sequenceNumber, since the sequence numbers in Kinesis are not
* contiguous and finding the next sequence number requires an expensive API call * contiguous and finding the next sequence number requires an expensive API call
@ -1951,14 +1657,14 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber); protected abstract SequenceOffsetType getSequenceNumberToStoreAfterRead(SequenceOffsetType sequenceNumber);
/** /**
* deserialzies stored metadata into SeekableStreamPartitions * deserializes stored metadata into SeekableStreamPartitions
* *
* @param mapper json objectMapper * @param mapper json objectMapper
* @param object metadata * @param object metadata
* *
* @return SeekableStreamPartitions * @return SeekableStreamPartitions
*/ */
protected abstract SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> deserializeSeekableStreamPartitionsFromMetadata( protected abstract SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata(
ObjectMapper mapper, ObjectMapper mapper,
Object object Object object
); );
@ -2028,4 +1734,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
* partition we read from stream * partition we read from stream
*/ */
protected abstract boolean isStartingSequenceOffsetsExclusive(); protected abstract boolean isStartingSequenceOffsetsExclusive();
protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();
} }

View File

@ -0,0 +1,335 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
{
private final int sequenceId;
private final String sequenceName;
private final Set<PartitionIdType> exclusiveStartPartitions;
private final Set<PartitionIdType> assignments;
private final boolean sentinel;
private boolean checkpointed;
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
* {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
*/
private final ReentrantLock lock = new ReentrantLock();
final Map<PartitionIdType, SequenceOffsetType> startOffsets;
final Map<PartitionIdType, SequenceOffsetType> endOffsets;
@JsonCreator
public SequenceMetadata(
@JsonProperty("sequenceId") int sequenceId,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> startOffsets,
@JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> endOffsets,
@JsonProperty("checkpointed") boolean checkpointed,
@JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> exclusiveStartPartitions
)
{
Preconditions.checkNotNull(sequenceName);
Preconditions.checkNotNull(startOffsets);
Preconditions.checkNotNull(endOffsets);
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
this.endOffsets = new HashMap<>(endOffsets);
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
this.exclusiveStartPartitions = exclusiveStartPartitions == null
? Collections.emptySet()
: exclusiveStartPartitions;
}
@JsonProperty
public Set<PartitionIdType> getExclusiveStartPartitions()
{
return exclusiveStartPartitions;
}
@JsonProperty
public int getSequenceId()
{
return sequenceId;
}
@JsonProperty
public boolean isCheckpointed()
{
lock.lock();
try {
return checkpointed;
}
finally {
lock.unlock();
}
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getStartOffsets()
{
return startOffsets;
}
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getEndOffsets()
{
lock.lock();
try {
return endOffsets;
}
finally {
lock.unlock();
}
}
@JsonProperty
public boolean isSentinel()
{
return sentinel;
}
void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> newEndOffsets)
{
lock.lock();
try {
endOffsets.putAll(newEndOffsets);
checkpointed = true;
}
finally {
lock.unlock();
}
}
void updateAssignments(
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
Map<PartitionIdType, SequenceOffsetType> nextPartitionOffset
)
{
lock.lock();
try {
assignments.clear();
nextPartitionOffset.forEach((key, value) -> {
SequenceOffsetType endOffset = endOffsets.get(key);
if (SeekableStreamPartitions.NO_END_SEQUENCE_NUMBER.equals(endOffset)
|| runner.createSequenceNumber(endOffset).compareTo(runner.createSequenceNumber(nextPartitionOffset.get(key))) > 0) {
assignments.add(key);
}
});
}
finally {
lock.unlock();
}
}
boolean isOpen()
{
return !assignments.isEmpty();
}
boolean canHandle(
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
)
{
lock.lock();
try {
final OrderedSequenceNumber<SequenceOffsetType> partitionEndOffset = runner.createSequenceNumber(endOffsets.get(record.getPartitionId()));
final OrderedSequenceNumber<SequenceOffsetType> partitionStartOffset = runner.createSequenceNumber(startOffsets.get(
record.getPartitionId()));
final OrderedSequenceNumber<SequenceOffsetType> recordOffset = runner.createSequenceNumber(record.getSequenceNumber());
if (!isOpen() || recordOffset == null || partitionEndOffset == null || partitionStartOffset == null) {
return false;
}
boolean ret;
if (runner.isStartingSequenceOffsetsExclusive()) {
ret = recordOffset.compareTo(partitionStartOffset)
>= (getExclusiveStartPartitions().contains(record.getPartitionId()) ? 1 : 0);
} else {
ret = recordOffset.compareTo(partitionStartOffset) >= 0;
}
if (runner.isEndSequenceOffsetsExclusive()) {
ret &= recordOffset.compareTo(partitionEndOffset) < 0;
} else {
ret &= recordOffset.compareTo(partitionEndOffset) <= 0;
}
return ret;
}
finally {
lock.unlock();
}
}
@Override
public String toString()
{
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
}
finally {
lock.unlock();
}
}
Supplier<Committer> getCommitterSupplier(
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
String stream,
Map<PartitionIdType, SequenceOffsetType> lastPersistedOffsets
)
{
// Set up committer.
return () ->
new Committer()
{
@Override
public Object getMetadata()
{
lock.lock();
try {
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till sequences [%s] have been consumed, also make"
+ " sure to call updateAssignments before using this committer",
endOffsets
);
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of sequences and persist that
for (Map.Entry<PartitionIdType, SequenceOffsetType> partitionOffset : endOffsets.entrySet()) {
SequenceOffsetType newOffsets = partitionOffset.getValue();
if (lastPersistedOffsets.containsKey(partitionOffset.getKey())
&& runner.createSequenceNumber(lastPersistedOffsets.get(partitionOffset.getKey()))
.compareTo(runner.createSequenceNumber(newOffsets)) > 0) {
newOffsets = lastPersistedOffsets.get(partitionOffset.getKey());
}
lastPersistedOffsets.put(
partitionOffset.getKey(),
newOffsets
);
}
// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(
SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS,
new SeekableStreamPartitions<>(stream, lastPersistedOffsets),
SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS,
new SeekableStreamPartitions<>(stream, endOffsets)
);
}
finally {
lock.unlock();
}
}
@Override
public void run()
{
// Do nothing.
}
};
}
TransactionalSegmentPublisher createPublisher(
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> runner,
TaskToolbox toolbox,
boolean useTransaction
)
{
return (segments, commitMetadata) -> {
final Map commitMetaMap = (Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata");
final SeekableStreamPartitions<PartitionIdType, SequenceOffsetType> finalPartitions =
runner.deserializePartitionsFromMetadata(
toolbox.getObjectMapper(),
commitMetaMap.get(SeekableStreamIndexTaskRunner.METADATA_PUBLISH_PARTITIONS)
);
// Sanity check, we should only be publishing things that match our desired end state.
if (!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
throw new ISE(
"WTF?! Driver for sequence [%s], attempted to publish invalid metadata[%s].",
toString(),
commitMetadata
);
}
final SegmentTransactionalInsertAction action;
if (useTransaction) {
action = new SegmentTransactionalInsertAction(
segments,
runner.createDataSourceMetadata(
new SeekableStreamPartitions<>(finalPartitions.getStream(), getStartOffsets())
),
runner.createDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}
return toolbox.getTaskActionClient().submit(action);
};
}
}

View File

@ -119,8 +119,7 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type * @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers * @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/ */
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
implements Supervisor
{ {
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED"; public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";