Fix KafkaRecordSupplier assign (#7260)

* Fix KafkaRecordSupplier assign

* TeamCity fix
This commit is contained in:
Jonathan Wei 2019-03-13 23:36:14 -07:00 committed by Benedict Jin
parent 48bc523bdf
commit c020272add
2 changed files with 63 additions and 1 deletions

View File

@ -69,7 +69,6 @@ public class KafkaRecordSupplier implements RecordSupplier<Integer, Long>
.stream()
.map(x -> new TopicPartition(x.getStream(), x.getPartitionId()))
.collect(Collectors.toSet()));
seekToEarliest(streamPartitions);
}
@Override

View File

@ -2330,6 +2330,69 @@ public class KafkaIndexTaskTest
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc4));
}
@Test(timeout = 60_000L)
public void testCanStartFromLaterThanEarliestOffset() throws Exception
{
if (!isIncrementalHandoffSupported) {
return;
}
final String baseSequenceName = "sequence0";
maxRowsPerSegment = Integer.MAX_VALUE;
maxTotalRows = null;
// Insert data
int numToAdd = records.size();
try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (ProducerRecord<byte[], byte[]> record : records) {
kafkaProducer.send(record).get();
}
kafkaProducer.commitTransaction();
}
Map<String, Object> consumerProps = kafkaServer.consumerProperties();
consumerProps.put("max.poll.records", "1");
final SeekableStreamPartitions<Integer, Long> startPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(
0,
0L,
1,
1L
)
);
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
ImmutableMap.of(
0,
10L,
1,
2L
)
);
final KafkaIndexTask task = createTask(
null,
new KafkaIndexTaskIOConfig(
0,
baseSequenceName,
startPartitions,
endPartitions,
consumerProps,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
true,
null,
null
)
);
final ListenableFuture<TaskStatus> future = runTask(task);
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
}
private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec spec)
{
ScanQuery query = new Druids.ScanQueryBuilder().dataSource(