Fix record validation in SeekableStreamIndexTaskRunner (#7246)

* Fix record validation in SeekableStreamIndexTaskRunner

* add kinesis test
This commit is contained in:
Jihoon Son 2019-03-12 21:12:21 -07:00 committed by Fangjin Yang
parent 4d3987c1dd
commit 32e86ea75e
8 changed files with 220 additions and 50 deletions

View File

@ -959,7 +959,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
}
@Override
public Map<Integer, Long> getCurrentOffsets()
public ConcurrentMap<Integer, Long> getCurrentOffsets()
{
return nextOffsets;
}

View File

@ -226,8 +226,6 @@ public class KafkaIndexTaskTest
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private int handoffCount = 0;
// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
@ -877,7 +875,14 @@ public class KafkaIndexTaskTest
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0"))
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0";
@ -904,9 +909,13 @@ public class KafkaIndexTaskTest
topic,
ImmutableMap.of(0, 5L)
);
final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 12L)
);
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(0, 7L)
ImmutableMap.of(0, Long.MAX_VALUE)
);
final KafkaIndexTask task = createTask(
@ -927,17 +936,28 @@ public class KafkaIndexTaskTest
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
// actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
// will continue reading through the end offset of the checkpointed sequence
task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
// Simulating the case when another replica has consumed up to the offset of 8
task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(
ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
true
);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
// processed count would be 5 if it stopped at it's current offsets
Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
// processed count would be 8 if it stopped at it's current offsets
Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
}

View File

@ -176,6 +176,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -639,7 +640,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
@Test(timeout = 120_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
@ -2277,7 +2277,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
verifyAll();
Map<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
ConcurrentMap<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
try {
future.get(10, TimeUnit.SECONDS);
@ -2423,6 +2423,154 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
@Test(timeout = 5000L)
public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
{
final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 2;
maxRecordsPerPoll = 1;
recordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
recordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();
expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
.once()
.andReturn(records.subList(4, 10))
.once()
.andReturn(records.subList(9, 15))
.once();
recordSupplier.close();
expectLastCall().once();
replayAll();
final SeekableStreamPartitions<String, String> startPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"0"
)
);
final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"4"
)
);
final SeekableStreamPartitions<String, String> checkpoint2 = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"9"
)
);
final SeekableStreamPartitions<String, String> endPartitions = new SeekableStreamPartitions<>(
stream,
ImmutableMap.of(
shardId1,
"14"
)
);
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
null,
baseSequenceName,
startPartitions,
endPartitions,
true,
null,
null,
"awsEndpoint",
null,
null,
null,
null,
null,
false
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
Thread.sleep(10);
}
Map<String, String> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(currentOffsets, false);
// The task is supposed to consume remaining rows up to the offset of 13
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
task.getRunner().setEndOffsets(
ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)),
true
);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyAll();
Assert.assertEquals(2, checkpointRequestsHash.size());
// Check metrics
Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
// Check published metadata
final Set<SegmentDescriptor> descriptors = new HashSet<>();
descriptors.add(sd(task, "2008/P1D", 0));
descriptors.add(sd(task, "2008/P1D", 1));
descriptors.add(sd(task, "2009/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 0));
descriptors.add(sd(task, "2010/P1D", 1));
descriptors.add(sd(task, "2011/P1D", 0));
descriptors.add(sd(task, "2011/P1D", 1));
descriptors.add(sd(task, "2012/P1D", 0));
descriptors.add(sd(task, "2013/P1D", 0));
Assert.assertEquals(descriptors, publishedDescriptors());
Assert.assertEquals(
new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
shardId1,
"10"
))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);
}
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {

View File

@ -61,7 +61,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable>
public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
extends AbstractTask implements ChatHandler
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;

View File

@ -130,7 +130,7 @@ import java.util.stream.Collectors;
* @param <PartitionIdType> Partition Number Type
* @param <SequenceOffsetType> Sequence Number Type
*/
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType extends Comparable> implements ChatHandler
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> implements ChatHandler
{
public enum Status
{
@ -196,7 +196,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>();
private final Map<PartitionIdType, SequenceOffsetType> initialOffsetsSnapshot = new HashMap<>();
private final Set<PartitionIdType> initialOffsetsSnapshot = new HashSet<>();
private final Set<PartitionIdType> exclusiveStartingPartitions = new HashSet<>();
private volatile DateTime startTime;
@ -454,7 +454,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.READING;
Throwable caughtExceptionInner = null;
initialOffsetsSnapshot.putAll(currOffsets);
initialOffsetsSnapshot.addAll(currOffsets.keySet());
exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions());
try {
@ -490,7 +490,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
maybePersistAndPublishSequences(committerSupplier);
// calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException
// are handled in the subclasses.
List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> records = getRecords(
@ -512,9 +511,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
continue;
}
// for the first message we receive, check that we were given a message with a sequenceNumber that matches our
// expected starting sequenceNumber
if (!verifyInitialRecordAndSkipExclusivePartition(record, initialOffsetsSnapshot)) {
// for the first message we receive, check that we were given a message with a sequenceNumber that matches
// our expected starting sequenceNumber
if (!verifyInitialRecordAndSkipExclusivePartition(record)) {
continue;
}
@ -1281,7 +1280,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return getCurrentOffsets();
}
public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
public ConcurrentMap<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
{
return currOffsets;
}
@ -1384,14 +1383,15 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// do not mark the starting sequence number as exclusive
Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
.stream()
.filter(x -> !initialOffsetsSnapshot.containsKey(x)
.filter(x -> !initialOffsetsSnapshot.contains(x)
|| ioConfig.getExclusiveStartSequenceNumberPartitions()
.contains(x))
.collect(Collectors.toSet());
if ((latestSequence.getStartOffsets().equals(sequenceNumbers) && latestSequence.exclusiveStartPartitions.equals(
exclusivePartitions) && !finish) ||
(latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
&& latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
&& !finish)
|| (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
resume();
return Response.ok(sequenceNumbers).build();
@ -1442,7 +1442,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusivePartitions
);
sequences.add(newSequence);
initialOffsetsSnapshot.putAll(sequenceNumbers);
initialOffsetsSnapshot.addAll(sequenceNumbers.keySet());
}
persistSequences();
}
@ -1882,33 +1882,35 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
private boolean verifyInitialRecordAndSkipExclusivePartition(
final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record,
final Map<PartitionIdType, SequenceOffsetType> intialSequenceSnapshot
final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
)
{
if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) {
throw new ISE(
"Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
record.getSequenceNumber(),
intialSequenceSnapshot.get(record.getPartitionId()),
record.getPartitionId()
// Check only for the first record among the record batch.
if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId());
if (currOffset != null) {
final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
record.getSequenceNumber()
);
final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
currOffset
);
if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
throw new ISE(
"sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]",
record.getSequenceNumber(),
currOffset,
record.getPartitionId()
);
}
}
log.info(
"Verified starting sequenceNumber [%s] for partition [%s]",
record.getSequenceNumber(), record.getPartitionId()
);
intialSequenceSnapshot.remove(record.getPartitionId());
if (intialSequenceSnapshot.isEmpty()) {
log.info("Verified starting sequences for all partitions");
}
// Remove the mark to notify that this partition has been read.
initialOffsetsSnapshot.remove(record.getPartitionId());
// check exclusive starting sequence
if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) {
log.info("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
return false;
}

View File

@ -35,7 +35,7 @@ import java.util.stream.Collectors;
* @param <PartitionIdType> partition id
* @param <SequenceOffsetType> sequence number
*/
public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType extends Comparable>
public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>
{
private final String stream;
private final PartitionIdType partitionId;

View File

@ -36,7 +36,7 @@ import java.util.Set;
* @param <SequenceOffsetType> Sequence Number Type
*/
@Beta
public interface RecordSupplier<PartitionIdType, SequenceOffsetType extends Comparable> extends Closeable
public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Closeable
{
/**
* assigns the given partitions to this RecordSupplier

View File

@ -119,7 +119,7 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType extends Comparable>
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>
implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";