mirror of https://github.com/apache/druid.git
Fix lz4 library incompatibility in kafka-indexing-service extension (#4115)
* Fix lz4 library incompatibility in kafka-indexing-service extension #3266 * Bumped Kafka version to 0.10.2.0 for : Fix lz4 library incompatibility in kafka-indexing-service extension #3266 * Replaced Lists.newArrayList() with Collections.singletonList() For Fix lz4 library incompatibility in kafka-indexing-service extension #4115
This commit is contained in:
parent
723a855ab9
commit
d51097c809
|
@ -17,9 +17,9 @@ currently designated as an *experimental feature* and is subject to the usual
|
|||
[experimental caveats](../experimental.html).
|
||||
|
||||
<div class="note info">
|
||||
The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.9. As there were protocol changes
|
||||
made in this version, Kafka 0.9 consumers are not compatible with older brokers. Ensure that your Kafka brokers are
|
||||
version 0.9 or better before using this service.
|
||||
The Kafka indexing service uses the Java consumer that was introduced in Kafka 0.10.x. As there were protocol changes
|
||||
made in this version, Kafka 0.10.x consumers might not be compatible with older brokers. Ensure that your Kafka brokers are
|
||||
version 0.10.x or better before using this service. Refer <a href="https://kafka.apache.org/documentation/#upgrade">Kafka upgrade guide</a> if you are using older version of kafka brokers.
|
||||
</div>
|
||||
|
||||
## Submitting a Supervisor Spec
|
||||
|
|
|
@ -55,7 +55,7 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
<version>0.10.2.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
|
@ -67,7 +67,7 @@
|
|||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.9.0.0</version>
|
||||
<version>0.10.2.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -96,6 +96,7 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -1008,7 +1009,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
|
|||
final TopicPartition topicPartition = outOfRangePartition.getKey();
|
||||
final long nextOffset = outOfRangePartition.getValue();
|
||||
// seek to the beginning to get the least available offset
|
||||
consumer.seekToBeginning(topicPartition);
|
||||
consumer.seekToBeginning(Collections.singletonList(topicPartition));
|
||||
final long leastAvailableOffset = consumer.position(topicPartition);
|
||||
// reset the seek
|
||||
consumer.seek(topicPartition, nextOffset);
|
||||
|
|
|
@ -77,6 +77,7 @@ import org.joda.time.DateTime;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -1480,13 +1481,13 @@ public class KafkaSupervisor implements Supervisor
|
|||
{
|
||||
TopicPartition topicPartition = new TopicPartition(ioConfig.getTopic(), partition);
|
||||
if (!consumer.assignment().contains(topicPartition)) {
|
||||
consumer.assign(Lists.newArrayList(topicPartition));
|
||||
consumer.assign(Collections.singletonList(topicPartition));
|
||||
}
|
||||
|
||||
if (useEarliestOffset) {
|
||||
consumer.seekToBeginning(topicPartition);
|
||||
consumer.seekToBeginning(Collections.singletonList(topicPartition));
|
||||
} else {
|
||||
consumer.seekToEnd(topicPartition);
|
||||
consumer.seekToEnd(Collections.singletonList(topicPartition));
|
||||
}
|
||||
|
||||
return consumer.position(topicPartition);
|
||||
|
|
|
@ -23,12 +23,13 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.collect.Maps;
|
||||
import kafka.server.KafkaConfig;
|
||||
import kafka.server.KafkaServer;
|
||||
import kafka.utils.SystemTime$;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.utils.SystemTime;
|
||||
import scala.Some;
|
||||
import scala.collection.immutable.List$;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
|
@ -69,7 +70,7 @@ public class TestBroker implements Closeable
|
|||
|
||||
final KafkaConfig config = new KafkaConfig(props);
|
||||
|
||||
server = new KafkaServer(config, SystemTime$.MODULE$, Some.apply(String.format("TestingBroker[%d]-", id)));
|
||||
server = new KafkaServer(config, SystemTime.SYSTEM, Some.apply(String.format("TestingBroker[%d]-", id)), List$.MODULE$.empty());
|
||||
server.startup();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue