KafkaIndexTask.

Reads a specific offset range from specific partitions, and can use dataSource metadata
transactions to guarantee exactly-once ingestion.

Each task has a finite lifecycle, so it is expected that some process will be supervising
existing tasks and creating new ones when needed.
This commit is contained in:
Gian Merlino 2016-03-01 16:51:50 -08:00
parent 187569e702
commit f22fb2c2cf
13 changed files with 2674 additions and 0 deletions

View File

@ -81,6 +81,8 @@
<argument>-c</argument>
<argument>io.druid.extensions:druid-kafka-extraction-namespace</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-kafka-indexing-service</argument>
<argument>-c</argument>
<argument>io.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>io.druid.extensions:druid-namespace-lookup</argument>

View File

@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-indexing-service</artifactId>
<name>druid-kafka-indexing-service</name>
<description>druid-kafka-indexing-service</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>0.9.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>0.9.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>0.9.0-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,123 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import io.druid.indexing.overlord.DataSourceMetadata;
import java.util.Map;
import java.util.Objects;
public class KafkaDataSourceMetadata implements DataSourceMetadata
{
private final KafkaPartitions kafkaPartitions;
@JsonCreator
public KafkaDataSourceMetadata(
@JsonProperty("partitions") KafkaPartitions kafkaPartitions
)
{
this.kafkaPartitions = kafkaPartitions;
}
@JsonProperty("partitions")
public KafkaPartitions getKafkaPartitions()
{
return kafkaPartitions;
}
@Override
public boolean isValidStart()
{
return true;
}
@Override
public boolean matches(DataSourceMetadata other)
{
if (getClass() != other.getClass()) {
return false;
}
return plus(other).equals(other.plus(this));
}
@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
if (!(other instanceof KafkaDataSourceMetadata)) {
throw new IAE(
"Expected instance of %s, got %s",
KafkaDataSourceMetadata.class.getCanonicalName(),
other.getClass().getCanonicalName()
);
}
final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;
if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
// Same topic, merge offsets.
final Map<Integer, Long> newMap = Maps.newHashMap();
for (Map.Entry<Integer, Long> entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<Integer, Long> entry : that.getKafkaPartitions().getPartitionOffsetMap().entrySet()) {
newMap.put(entry.getKey(), entry.getValue());
}
return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap));
} else {
// Different topic, prefer "other".
return other;
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) o;
return Objects.equals(kafkaPartitions, that.kafkaPartitions);
}
@Override
public int hashCode()
{
return Objects.hash(kafkaPartitions);
}
@Override
public String toString()
{
return "KafkaDataSourceMetadata{" +
"kafkaPartitions=" + kafkaPartitions +
'}';
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.segment.indexing.IOConfig;
import java.util.Map;
public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private final String sequenceName;
private final KafkaPartitions startPartitions;
private final KafkaPartitions endPartitions;
private final Map<String, String> consumerProperties;
private final boolean useTransaction;
@JsonCreator
public KafkaIOConfig(
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("startPartitions") KafkaPartitions startPartitions,
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction
)
{
this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions");
this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
"start topic and end topic must match"
);
Preconditions.checkArgument(
startPartitions.getPartitionOffsetMap().keySet().equals(endPartitions.getPartitionOffsetMap().keySet()),
"start partition set and end partition set must match"
);
for (int partition : endPartitions.getPartitionOffsetMap().keySet()) {
Preconditions.checkArgument(
endPartitions.getPartitionOffsetMap().get(partition) >= startPartitions.getPartitionOffsetMap()
.get(partition),
"end offset must be >= start offset for partition[%d]",
partition
);
}
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public KafkaPartitions getStartPartitions()
{
return startPartitions;
}
@JsonProperty
public KafkaPartitions getEndPartitions()
{
return endPartitions;
}
@JsonProperty
public Map<String, String> getConsumerProperties()
{
return consumerProperties;
}
@JsonProperty
public boolean isUseTransaction()
{
return useTransaction;
}
@Override
public String toString()
{
return "KafkaIOConfig{" +
"sequenceName='" + sequenceName + '\'' +
", startPartitions=" + startPartitions +
", endPartitions=" + endPartitions +
", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction +
'}';
}
}

View File

@ -0,0 +1,587 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.guava.Sequence;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.timeline.DataSegment;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
public class KafkaIndexTask extends AbstractTask
{
private static final Logger log = new Logger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
private static final long POLL_TIMEOUT = 100;
private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
private final DataSchema dataSchema;
private final InputRowParser<ByteBuffer> parser;
private final KafkaTuningConfig tuningConfig;
private final KafkaIOConfig ioConfig;
private volatile Appenderator appenderator = null;
private volatile FireDepartmentMetrics fireDepartmentMetrics = null;
private volatile boolean startedReading = false;
private volatile boolean stopping = false;
private volatile boolean publishing = false;
private volatile Thread runThread = null;
@JsonCreator
public KafkaIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context
)
{
super(
id == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : id,
String.format("%s_%s", TYPE, dataSchema.getDataSource()),
taskResource,
dataSchema.getDataSource(),
context
);
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.parser = Preconditions.checkNotNull((InputRowParser<ByteBuffer>) dataSchema.getParser(), "parser");
this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
}
private static String makeTaskId(String dataSource, int randomBits)
{
final StringBuilder suffix = new StringBuilder(8);
for (int i = 0; i < Ints.BYTES * 2; ++i) {
suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F)));
}
return Joiner.on("_").join(TYPE, dataSource, suffix);
}
@Override
public String getType()
{
return TYPE;
}
@Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
return true;
}
@JsonProperty
public DataSchema getDataSchema()
{
return dataSchema;
}
@JsonProperty
public KafkaTuningConfig getTuningConfig()
{
return tuningConfig;
}
@JsonProperty("ioConfig")
public KafkaIOConfig getIOConfig()
{
return ioConfig;
}
/**
* Public for tests.
*/
@JsonIgnore
public boolean hasStartedReading()
{
return startedReading;
}
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
log.info("Starting up!");
runThread = Thread.currentThread();
// Set up FireDepartmentMetrics
final FireDepartment fireDepartmentForMetrics = new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null, null),
null
);
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
toolbox.getMonitorScheduler().addMonitor(
new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartmentForMetrics),
ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
)
);
try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
appenderator = appenderator0;
final String topic = ioConfig.getStartPartitions().getTopic();
// Start up, set up initial offsets.
final Object restoredMetadata = driver.startJob();
final Map<Integer, Long> nextOffsets = Maps.newHashMap();
if (restoredMetadata == null) {
nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap());
} else {
final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue(
restoredMetadataMap.get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);
nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap());
// Sanity checks.
if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) {
throw new ISE(
"WTF?! Restored topic[%s] but expected topic[%s]",
restoredNextPartitions.getTopic(),
ioConfig.getStartPartitions().getTopic()
);
}
if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
throw new ISE(
"WTF?! Restored partitions[%s] but expected partitions[%s]",
nextOffsets.keySet(),
ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()
);
}
}
// Set up committer.
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets);
return new Committer()
{
@Override
public Object getMetadata()
{
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(
ioConfig.getStartPartitions().getTopic(),
snapshot
)
);
}
@Override
public void run()
{
// Do nothing.
}
};
}
};
// Initialize consumer assignment.
final Set<Integer> assignment = Sets.newHashSet();
for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(entry.getKey());
if (entry.getValue() < endOffset) {
assignment.add(entry.getKey());
} else if (entry.getValue() == endOffset) {
log.info("Finished reading partition[%d].", entry.getKey());
} else {
throw new ISE(
"WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
entry.getValue(),
endOffset
);
}
}
assignPartitions(consumer, topic, assignment);
// Seek to starting offsets.
for (final int partition : assignment) {
final long offset = nextOffsets.get(partition);
log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
consumer.seek(new TopicPartition(topic, partition), offset);
}
// Main loop.
// Could eventually support early termination (triggered by a supervisor)
// Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = true;
while (stillReading) {
if (stopping) {
log.info("Stopping early.");
break;
}
// The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
final ConsumerRecords<byte[], byte[]> records = RetryUtils.retry(
new Callable<ConsumerRecords<byte[], byte[]>>()
{
@Override
public ConsumerRecords<byte[], byte[]> call() throws Exception
{
try {
return consumer.poll(POLL_TIMEOUT);
}
finally {
startedReading = true;
}
}
},
new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
return input instanceof OffsetOutOfRangeException;
}
},
Integer.MAX_VALUE
);
for (ConsumerRecord<byte[], byte[]> record : records) {
if (log.isTraceEnabled()) {
log.trace(
"Got topic[%s] partition[%d] offset[%,d].",
record.topic(),
record.partition(),
record.offset()
);
}
if (record.offset() < ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
}
try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");
final SegmentIdentifier identifier = driver.add(row, committerSupplier);
if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
fireDepartmentMetrics.incrementProcessed();
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {
throw e;
} else {
log.debug(
e,
"Dropping unparseable row from partition[%d] offset[%,d].",
record.partition(),
record.offset()
);
fireDepartmentMetrics.incrementUnparseable();
}
}
final long nextOffset = record.offset() + 1;
final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition());
nextOffsets.put(record.partition(), nextOffset);
if (nextOffset == endOffset && assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment);
stillReading = !assignment.isEmpty();
}
}
}
}
// Persist pending data.
final Committer finalCommitter = committerSupplier.get();
driver.persist(finalCommitter);
publishing = true;
if (stopping) {
// Stopped gracefully. Exit code shouldn't matter, so fail to be on the safe side.
return TaskStatus.failure(getId());
}
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
// Sanity check, we should only be publishing things that match our desired end state.
if (!ioConfig.getEndPartitions().equals(((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS))) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}
final SegmentInsertAction action;
if (ioConfig.isUseTransaction()) {
action = new SegmentInsertAction(
segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(ioConfig.getEndPartitions())
);
} else {
action = new SegmentInsertAction(segments, null, null);
}
log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
return toolbox.getTaskActionClient().submit(action).isSuccess();
}
};
final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get());
if (published == null) {
throw new ISE("Transaction failure publishing segments, aborting");
} else {
log.info(
"Published segments[%s] with metadata[%s].",
Joiner.on(", ").join(
Iterables.transform(
published.getSegments(),
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getIdentifier();
}
}
)
),
published.getCommitMetadata()
);
}
}
return success();
}
@Override
public boolean canRestore()
{
return true;
}
@Override
public void stopGracefully()
{
log.info("Stopping gracefully.");
stopping = true;
if (publishing && runThread.isAlive()) {
log.info("stopGracefully: Run thread started publishing, interrupting it.");
runThread.interrupt();
}
}
@Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
if (appenderator == null) {
// Not yet initialized, no data yet, just return a noop runner.
return new NoopQueryRunner<>();
}
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
return query.run(appenderator, responseContext);
}
};
}
@VisibleForTesting
public FireDepartmentMetrics getFireDepartmentMetrics()
{
return fireDepartmentMetrics;
}
private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
{
return Appenderators.createRealtime(
dataSchema,
tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")),
metrics,
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
toolbox.getIndexIO(),
tuningConfig.getBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(),
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
toolbox.getQueryExecutorService(),
toolbox.getCache(),
toolbox.getCacheConfig()
);
}
private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox
)
{
return new FiniteAppenderatorDriver(
appenderator,
new ActionBasedSegmentAllocator(
toolbox.getTaskActionClient(),
dataSchema,
ioConfig.getSequenceName()
),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getHandoffConditionTimeout()
);
}
private KafkaConsumer<byte[], byte[]> newConsumer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Properties props = new Properties();
for (Map.Entry<String, String> entry : ioConfig.getConsumerProperties().entrySet()) {
props.setProperty(entry.getKey(), entry.getValue());
}
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "none");
props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}
private static void assignPartitions(
final KafkaConsumer consumer,
final String topic,
final Set<Integer> partitions
)
{
consumer.assign(
Lists.newArrayList(
Iterables.transform(
partitions,
new Function<Integer, TopicPartition>()
{
@Override
public TopicPartition apply(Integer n)
{
return new TopicPartition(topic, n);
}
}
)
)
);
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
public class KafkaIndexTaskModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(KafkaIndexTask.class, "index_kafka"),
new NamedType(KafkaDataSourceMetadata.class, "kafka"),
new NamedType(KafkaIOConfig.class, "kafka"),
new NamedType(KafkaTuningConfig.class, "kafka")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Objects;
public class KafkaPartitions
{
private final String topic;
private final Map<Integer, Long> partitionOffsetMap;
@JsonCreator
public KafkaPartitions(
@JsonProperty("topic") final String topic,
@JsonProperty("partitionOffsetMap") final Map<Integer, Long> partitionOffsetMap
)
{
this.topic = topic;
this.partitionOffsetMap = ImmutableMap.copyOf(partitionOffsetMap);
// Validate partitionOffsetMap
for (Map.Entry<Integer, Long> entry : partitionOffsetMap.entrySet()) {
Preconditions.checkArgument(
entry.getValue() >= 0,
String.format(
"partition[%d] offset[%d] invalid",
entry.getKey(),
entry.getValue()
)
);
}
}
@JsonProperty
public String getTopic()
{
return topic;
}
@JsonProperty
public Map<Integer, Long> getPartitionOffsetMap()
{
return partitionOffsetMap;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaPartitions that = (KafkaPartitions) o;
return Objects.equals(topic, that.topic) &&
Objects.equals(partitionOffsetMap, that.partitionOffsetMap);
}
@Override
public int hashCode()
{
return Objects.hash(topic, partitionOffsetMap);
}
@Override
public String toString()
{
return "KafkaPartitions{" +
"topic='" + topic + '\'' +
", partitionOffsetMap=" + partitionOffsetMap +
'}';
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfig;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.joda.time.Period;
import java.io.File;
public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
private final int maxRowsInMemory;
private final int maxRowsPerSegment;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final boolean buildV9Directly;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
@JsonCreator
public KafkaTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaults.getBuildV9Directly() : buildV9Directly;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
: reportParseExceptions;
this.handoffConditionTimeout = handoffConditionTimeout == null
? defaults.getHandoffConditionTimeout()
: handoffConditionTimeout;
}
@JsonProperty
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
}
@JsonProperty
public int getMaxRowsPerSegment()
{
return maxRowsPerSegment;
}
@JsonProperty
public Period getIntermediatePersistPeriod()
{
return intermediatePersistPeriod;
}
@JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
}
@JsonProperty
public int getMaxPendingPersists()
{
return maxPendingPersists;
}
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}
@JsonProperty
public boolean getBuildV9Directly()
{
return buildV9Directly;
}
@JsonProperty
public boolean isReportParseExceptions()
{
return reportParseExceptions;
}
@JsonProperty
public long getHandoffConditionTimeout()
{
return handoffConditionTimeout;
}
public KafkaTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaTuningConfig(
maxRowsInMemory,
maxRowsPerSegment,
intermediatePersistPeriod,
dir,
maxPendingPersists,
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
);
}
}

View File

@ -0,0 +1 @@
io.druid.indexing.kafka.KafkaIndexTaskModule

View File

@ -0,0 +1,101 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
import org.junit.Test;
import java.util.Map;
public class KafkaDataSourceMetadataTest
{
private static final KafkaDataSourceMetadata KM0 = KM("foo", ImmutableMap.<Integer, Long>of());
private static final KafkaDataSourceMetadata KM1 = KM("foo", ImmutableMap.of(0, 2L, 1, 3L));
private static final KafkaDataSourceMetadata KM2 = KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
private static final KafkaDataSourceMetadata KM3 = KM("foo", ImmutableMap.of(0, 2L, 2, 5L));
@Test
public void testMatches()
{
Assert.assertTrue(KM0.matches(KM0));
Assert.assertTrue(KM0.matches(KM1));
Assert.assertTrue(KM0.matches(KM2));
Assert.assertTrue(KM0.matches(KM3));
Assert.assertTrue(KM1.matches(KM0));
Assert.assertTrue(KM1.matches(KM1));
Assert.assertFalse(KM1.matches(KM2));
Assert.assertTrue(KM1.matches(KM3));
Assert.assertTrue(KM2.matches(KM0));
Assert.assertFalse(KM2.matches(KM1));
Assert.assertTrue(KM2.matches(KM2));
Assert.assertTrue(KM2.matches(KM3));
Assert.assertTrue(KM3.matches(KM0));
Assert.assertTrue(KM3.matches(KM1));
Assert.assertTrue(KM3.matches(KM2));
Assert.assertTrue(KM3.matches(KM3));
}
@Test
public void testIsValidStart()
{
Assert.assertTrue(KM0.isValidStart());
Assert.assertTrue(KM1.isValidStart());
Assert.assertTrue(KM2.isValidStart());
Assert.assertTrue(KM3.isValidStart());
}
@Test
public void testPlus()
{
Assert.assertEquals(
KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
KM1.plus(KM3)
);
Assert.assertEquals(
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM0.plus(KM2)
);
Assert.assertEquals(
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM1.plus(KM2)
);
Assert.assertEquals(
KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
KM2.plus(KM1)
);
Assert.assertEquals(
KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
KM2.plus(KM2)
);
}
private static KafkaDataSourceMetadata KM(String topic, Map<Integer, Long> offsets)
{
return new KafkaDataSourceMetadata(new KafkaPartitions(topic, offsets));
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.kafka.test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import scala.Some;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
public class TestBroker implements Closeable
{
private final String zookeeperConnect;
private final File directory;
private final int id;
private final Map<String, String> brokerProps;
private volatile KafkaServer server;
public TestBroker(String zookeeperConnect, File directory, int id, Map<String, String> brokerProps)
{
this.zookeeperConnect = zookeeperConnect;
this.directory = directory;
this.id = id;
this.brokerProps = brokerProps == null ? ImmutableMap.<String, String>of() : brokerProps;
}
public void start()
{
final Properties props = new Properties();
props.setProperty("zookeeper.connect", zookeeperConnect);
props.setProperty("log.dirs", directory.toString());
props.setProperty("broker.id", String.valueOf(id));
props.setProperty("port", String.valueOf(new Random().nextInt(9999) + 10000));
props.putAll(brokerProps);
final KafkaConfig config = new KafkaConfig(props);
server = new KafkaServer(config, SystemTime$.MODULE$, Some.apply(String.format("TestingBroker[%d]-", id)));
server.startup();
}
public int getPort()
{
return server.socketServer().config().port();
}
public KafkaProducer<byte[], byte[]> newProducer()
{
return new KafkaProducer(producerProperties());
}
public KafkaConsumer<byte[], byte[]> newConsumer()
{
return new KafkaConsumer(consumerProperties());
}
public Map<String, String> producerProperties()
{
final Map<String, String> props = Maps.newHashMap();
props.put("bootstrap.servers", String.format("localhost:%d", getPort()));
props.put("key.serializer", ByteArraySerializer.class.getName());
props.put("value.serializer", ByteArraySerializer.class.getName());
props.put("acks", "all");
return props;
}
public Map<String, String> consumerProperties()
{
final Map<String, String> props = Maps.newHashMap();
props.put("bootstrap.servers", String.format("localhost:%d", getPort()));
props.put("key.deserializer", ByteArrayDeserializer.class.getName());
props.put("value.deserializer", ByteArrayDeserializer.class.getName());
return props;
}
@Override
public void close() throws IOException
{
if (server != null) {
server.shutdown();
server.awaitShutdown();
}
}
}

View File

@ -88,6 +88,7 @@
<module>extensions-core/histogram</module>
<module>extensions-core/kafka-eight</module>
<module>extensions-core/kafka-extraction-namespace</module>
<module>extensions-core/kafka-indexing-service</module>
<module>extensions-core/mysql-metadata-storage</module>
<module>extensions-core/postgresql-metadata-storage</module>
<module>extensions-core/namespace-lookup</module>