diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index db41462b007..f677bc93c22 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -153,7 +153,7 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements ); Long startOffset = lastOffsets.get(partition); PartitionConsumerWorker worker = new PartitionConsumerWorker( - feed, kafkaSimpleConsumer, partition, startOffset == null ? 0 : startOffset + feed, kafkaSimpleConsumer, partition, startOffset == null ? -1 : startOffset ); consumerWorkers.add(worker); } diff --git a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java index dd1f93b3f2c..ecbd07ba50a 100644 --- a/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions-contrib/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -238,7 +238,7 @@ public class KafkaSimpleConsumer } catch (Exception e) { ensureNotInterrupted(e); - log.warn(e, "caughte exception in fetch {} - {}", topic, partitionId); + log.warn(e, "caught exception in fetch {} - {}", topic, partitionId); response = null; }