kafka 8 simple consumer firehose

This commit is contained in:
lvjq 2014-09-29 15:22:17 -07:00 committed by Himanshu Gupta
parent 10af233b37
commit 2237a8cf0f
36 changed files with 1848 additions and 137 deletions

View File

@ -67,9 +67,14 @@ public class SerializerUtils
public String readString(ByteBuffer in) throws IOException
{
final int length = in.getInt();
byte[] stringBytes = new byte[length];
in.get(stringBytes);
return new String(stringBytes, UTF8);
return new String(readBytes(in, length), UTF8);
}
public byte[] readBytes(ByteBuffer in, int length) throws IOException
{
byte[] bytes = new byte[length];
in.get(bytes);
return bytes;
}
public void writeStrings(OutputStream out, String[] names) throws IOException

View File

@ -11,4 +11,4 @@ To enable experimental features, include their artifacts in the configuration ru
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:{VERSION}"]
```
The configuration for all the indexer and query nodes need to be updated with this.
The configuration for all the indexer and query nodes need to be updated with this.

View File

@ -0,0 +1,30 @@
---
layout: doc_page
---
# KafkaSimpleConsumerFirehose
This firehose acts as a Kafka simple consumer and ingests data from Kafka, currently still in experimental section.
The configuration for KafkaSimpleConsumerFirehose is similar to the KafkaFirehose [Kafka firehose example](realtime-ingestion.html#realtime-specfile), except `firehose` should be replaced with `firehoseV2` like this:
```json
"firehoseV2": {
"type" : "kafka-0.8-v2",
"brokerList" : ["localhost:4443"],
"queueBufferLength":10001,
"resetBehavior":"latest",
"partitionIdList" : ["0"],
"clientId" : "localclient",
"feed": "wikipedia"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|kafka-0.8-v2|yes|
|brokerList|list of the kafka brokers|yes|
|queueBufferLength|the buffer length for kafka message queue|no default(20000)|
|resetBehavior|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|no default(earliest)|
|partitionIdList|list of kafka partition ids|yes|
|clientId|the clientId for kafka SimpleConsumer|yes|
|feed|kafka topic|yes|
For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second topic, keeping one partition can work properly, but more partition could be added if higher throughput is required.

View File

@ -260,7 +260,6 @@ When using this firehose, events can be sent by submitting a POST request to the
|serviceName|name used to announce the event receiver service endpoint|yes|
|bufferSize| size of buffer used by firehose to store events|no default(100000)|
#### TimedShutoffFirehose
This can be used to start a firehose that will shut down at a specified time.
@ -283,3 +282,5 @@ An example is shown below:
|type|This should be "timed"|yes|
|shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes|
|delegate|firehose to use|yes|
=======

View File

@ -0,0 +1,77 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-eight-simple-consumer</artifactId>
<name>druid-kafka-eight-simple-consumer</name>
<description>druid-kafka-eight-simple-consumer</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.kafka;
import java.util.List;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
public class KafkaEightSimpleConsumerDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("KafkaEightSimpleConsumerFirehoseModule").registerSubtypes(
new NamedType(KafkaEightSimpleConsumerFirehoseFactory.class, "kafka-0.8-v2")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,319 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.kafka;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset;
public class KafkaEightSimpleConsumerFirehoseFactory implements
FirehoseFactoryV2<ByteBufferInputRowParser>
{
private static final Logger log = new Logger(
KafkaEightSimpleConsumerFirehoseFactory.class
);
@JsonProperty
private final List<String> brokerList;
@JsonProperty
private final List<Integer> partitionIdList;
@JsonProperty
private final String clientId;
@JsonProperty
private final String feed;
@JsonProperty
private final int queueBufferLength;
@JsonProperty
private boolean earliest;
private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
private static final String RESET_TO_LATEST = "latest";
private static final int CONSUMER_FETCH_TIMEOUT = 10000;
@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
@JsonProperty("brokerList") List<String> brokerList,
@JsonProperty("partitionIdList") List<Integer> partitionIdList,
@JsonProperty("clientId") String clientId,
@JsonProperty("feed") String feed,
@JsonProperty("queueBufferLength") Integer queueBufferLength,
@JsonProperty("resetBehavior") String resetBehavior
)
{
this.brokerList = brokerList;
this.partitionIdList = partitionIdList;
this.clientId = clientId;
this.feed = feed;
this.queueBufferLength = queueBufferLength == null ? DEFAULT_QUEUE_BUFFER_LENGTH : queueBufferLength;
log.info("queueBufferLength loaded as[%s]", this.queueBufferLength);
this.earliest = RESET_TO_LATEST.equalsIgnoreCase(resetBehavior) ? false : true;
log.info("Default behavior of cosumer set to earliest? [%s]", this.earliest);
}
private Map<Integer, Long> loadOffsetFromPreviousMetaData(Object lastCommit)
{
Map<Integer, Long> offsetMap = Maps.newHashMap();
if (lastCommit == null) {
return offsetMap;
}
if (lastCommit instanceof Map) {
Map<Object, Object> lastCommitMap = (Map) lastCommit;
for (Map.Entry<Object, Object> entry : lastCommitMap.entrySet()) {
try {
int partitionId = Integer.parseInt(entry.getKey().toString());
long offset = Long.parseLong(entry.getValue().toString());
log.debug("Recover last commit information partitionId [%s], offset [%s]", partitionId, offset);
offsetMap.put(partitionId, offset);
}
catch (NumberFormatException e) {
log.error(e, "Fail to load offset from previous meta data [%s]", entry);
}
}
log.info("Loaded offset map[%s]", offsetMap);
} else {
log.error("Unable to cast lastCommit to Map");
}
return offsetMap;
}
@Override
public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object lastCommit) throws IOException
{
final Map<Integer, Long> lastOffsets = loadOffsetFromPreviousMetaData(lastCommit);
Set<String> newDimExclus = Sets.union(
firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(),
Sets.newHashSet("feed")
);
final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec(
firehoseParser.getParseSpec()
.withDimensionsSpec(
firehoseParser.getParseSpec()
.getDimensionsSpec()
.withDimensionExclusions(
newDimExclus
)
)
);
for (Integer partition : partitionIdList) {
final KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer(
feed, partition, clientId, brokerList, earliest
);
Long startOffset = lastOffsets.get(partition);
PartitionConsumerWorker worker = new PartitionConsumerWorker(
feed, kafkaSimpleConsumer, partition, startOffset == null ? 0 : startOffset
);
consumerWorkers.add(worker);
}
final LinkedBlockingQueue<BytesMessageWithOffset> messageQueue = new LinkedBlockingQueue<BytesMessageWithOffset>(queueBufferLength);
log.info("Kicking off all consumers");
for (PartitionConsumerWorker worker : consumerWorkers) {
worker.go(messageQueue);
}
log.info("All consumer started");
return new FirehoseV2()
{
private ConcurrentMap<Integer, Long> lastOffsetPartitions;
private volatile boolean stop;
private volatile boolean interrupted;
private volatile BytesMessageWithOffset msg = null;
private volatile InputRow row = null;
{
lastOffsetPartitions = Maps.newConcurrentMap();
lastOffsetPartitions.putAll(lastOffsets);
}
@Override
public void start() throws Exception
{
nextMessage();
}
@Override
public boolean advance()
{
if (stop) {
return false;
}
nextMessage();
return true;
}
private void nextMessage()
{
try {
row = null;
while (row == null) {
if (msg != null) {
lastOffsetPartitions.put(msg.getPartition(), msg.offset());
}
msg = messageQueue.take();
interrupted = false;
final byte[] message = msg.message();
row = message == null ? null : theParser.parse(ByteBuffer.wrap(message));
}
}
catch (InterruptedException e) {
interrupted = true;
log.info(e, "Interrupted when taken from queue");
}
}
@Override
public InputRow currRow()
{
if (interrupted) {
return null;
}
return row;
}
@Override
public Committer makeCommitter()
{
final Map<Integer, Long> offsets = Maps.newHashMap(lastOffsetPartitions);
return new Committer()
{
@Override
public Object getMetadata()
{
return offsets;
}
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
log.info("Stopping kafka 0.8 simple firehose");
stop = true;
for (PartitionConsumerWorker t : consumerWorkers) {
t.close();
}
}
};
}
private static class PartitionConsumerWorker implements Closeable
{
private final String topic;
private final KafkaSimpleConsumer consumer;
private final int partitionId;
private final long startOffset;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private volatile Thread thread = null;
PartitionConsumerWorker(String topic, KafkaSimpleConsumer consumer, int partitionId, long startOffset)
{
this.topic = topic;
this.consumer = consumer;
this.partitionId = partitionId;
this.startOffset = startOffset;
}
public void go(final LinkedBlockingQueue<BytesMessageWithOffset> messageQueue) {
thread = new Thread()
{
@Override
public void run()
{
long offset = startOffset;
log.info("Start running parition[%s], offset[%s]", partitionId, offset);
try {
while (!stopped.get()) {
try {
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
messageQueue.put(msgWithOffset);
count++;
}
log.debug("fetch [%s] msgs for partition [%s] in one time ", count, partitionId);
}
catch (InterruptedException e) {
log.info("Interrupted when fetching data, shutting down.");
return;
}
catch (Exception e) {
log.error(e, "Exception happened in fetching data, but will continue consuming");
}
}
}
finally {
consumer.stop();
}
}
};
thread.setDaemon(true);
thread.setName(String.format("kafka-%s-%s", topic, partitionId));
thread.start();
}
@Override
public synchronized void close() throws IOException
{
if (stopped.compareAndSet(false, true)) {
thread.interrupt();
thread = null;
}
}
}
}

View File

@ -0,0 +1,400 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.kafka;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.javaapi.FetchResponse;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
/**
* refer @{link
* https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer
* +Example}
* <p/>
* This class is not thread safe, the caller must ensure all the methods be
* called from single thread
*/
public class KafkaSimpleConsumer
{
public static final List<BytesMessageWithOffset> EMPTY_MSGS = new ArrayList<>();
private static final Logger log = new Logger(KafkaSimpleConsumer.class);
private final List<KafkaBroker> allBrokers;
private final String topic;
private final int partitionId;
private final String clientId;
private final String leaderLookupClientId;
private final boolean earliest;
private volatile Broker leaderBroker;
private List<KafkaBroker> replicaBrokers;
private SimpleConsumer consumer = null;
private static final int SO_TIMEOUT = 30000;
private static final int BUFFER_SIZE = 65536;
private static final long RETRY_INTERVAL = 1000L;
private static final int FETCH_SIZE = 100000000;
public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
{
List<KafkaBroker> brokerList = new ArrayList<KafkaBroker>();
for (String broker : brokers) {
String[] tokens = broker.split(":");
if (tokens.length != 2) {
log.warn("wrong broker name [%s], its format should be host:port", broker);
continue;
}
try {
brokerList.add(new KafkaBroker(tokens[0], Integer.parseInt(tokens[1])));
}
catch (NumberFormatException e) {
log.warn("wrong broker name [%s], its format should be host:port", broker);
continue;
}
}
this.allBrokers = Collections.unmodifiableList(brokerList);
this.topic = topic;
this.partitionId = partitionId;
this.clientId = String.format("%s_%d_%s", topic, partitionId, clientId);
this.leaderLookupClientId = clientId + "leaderLookup";
this.replicaBrokers = new ArrayList<>();
this.replicaBrokers.addAll(this.allBrokers);
this.earliest = earliest;
log.info("KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", this.clientId, this.leaderLookupClientId);
}
private void ensureConsumer(Broker leader) throws InterruptedException
{
if (consumer == null) {
while (leaderBroker == null) {
leaderBroker = findNewLeader(leader);
}
log.info(
"making SimpleConsumer[%s][%s], leader broker[%s:%s]",
topic, partitionId, leaderBroker.host(), leaderBroker.port()
);
consumer = new SimpleConsumer(
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
);
}
}
public static class BytesMessageWithOffset
{
final byte[] msg;
final long offset;
final int partition;
public BytesMessageWithOffset(byte[] msg, long offset, int partition)
{
this.msg = msg;
this.offset = offset;
this.partition = partition;
}
public int getPartition()
{
return partition;
}
public byte[] message()
{
return msg;
}
public long offset()
{
return offset;
}
}
static class KafkaBroker
{
final String host;
final int port;
KafkaBroker(String host, int port)
{
this.host = host;
this.port = port;
}
@Override
public String toString()
{
return String.format("%s:%d", host, port);
}
}
private Iterable<BytesMessageWithOffset> filterAndDecode(Iterable<MessageAndOffset> kafkaMessages, final long offset)
{
return FunctionalIterable
.create(kafkaMessages)
.filter(
new Predicate<MessageAndOffset>()
{
@Override
public boolean apply(MessageAndOffset msgAndOffset)
{
return msgAndOffset.offset() >= offset;
}
}
)
.transform(
new Function<MessageAndOffset, BytesMessageWithOffset>()
{
@Override
public BytesMessageWithOffset apply(MessageAndOffset msgAndOffset)
{
ByteBuffer bb = msgAndOffset.message().payload();
byte[] payload = new byte[bb.remaining()];
bb.get(payload);
// add nextOffset here, thus next fetch will use nextOffset instead of current offset
return new BytesMessageWithOffset(payload, msgAndOffset.nextOffset(), partitionId);
}
}
);
}
private long getOffset(boolean earliest) throws InterruptedException
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(
topicAndPartition,
new PartitionOffsetRequestInfo(
earliest ? kafka.api.OffsetRequest.EarliestTime() : kafka.api.OffsetRequest.LatestTime(), 1
)
);
OffsetRequest request = new OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId
);
OffsetResponse response = null;
try {
response = consumer.getOffsetsBefore(request);
}
catch (Exception e) {
ensureNotInterrupted(e);
log.error(e, "caught exception in getOffsetsBefore [%s] - [%s]", topic, partitionId);
return -1;
}
if (response.hasError()) {
log.error(
"error fetching data Offset from the Broker [%s]. reason: [%s]", leaderBroker.host(),
response.errorCode(topic, partitionId)
);
return -1;
}
long[] offsets = response.offsets(topic, partitionId);
return earliest ? offsets[0] : offsets[offsets.length - 1];
}
public Iterable<BytesMessageWithOffset> fetch(long offset, int timeoutMs) throws InterruptedException
{
FetchResponse response = null;
Broker previousLeader = leaderBroker;
while (true) {
ensureConsumer(previousLeader);
FetchRequest request = new FetchRequestBuilder()
.clientId(clientId)
.addFetch(topic, partitionId, offset, FETCH_SIZE)
.maxWait(timeoutMs)
.minBytes(1)
.build();
log.debug("fetch offset %s", offset);
try {
response = consumer.fetch(request);
}
catch (Exception e) {
ensureNotInterrupted(e);
log.warn(e, "caughte exception in fetch {} - {}", topic, partitionId);
response = null;
}
if (response == null || response.hasError()) {
short errorCode = response != null ? response.errorCode(topic, partitionId) : ErrorMapping.UnknownCode();
log.warn("fetch %s - %s with offset %s encounters error: [%s]", topic, partitionId, offset, errorCode);
boolean needNewLeader = false;
if (errorCode == ErrorMapping.RequestTimedOutCode()) {
log.info("kafka request timed out, response[%s]", response);
} else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
long newOffset = getOffset(earliest);
log.info("got [%s] offset[%s] for [%s][%s]", earliest ? "earliest" : "latest", newOffset, topic, partitionId);
if (newOffset < 0) {
needNewLeader = true;
} else {
offset = newOffset;
continue;
}
} else {
needNewLeader = true;
}
if (needNewLeader) {
stopConsumer();
previousLeader = leaderBroker;
leaderBroker = null;
continue;
}
} else {
break;
}
}
return response != null ? filterAndDecode(response.messageSet(topic, partitionId), offset) : EMPTY_MSGS;
}
private void stopConsumer()
{
if (consumer != null) {
try {
consumer.close();
log.info("stop consumer[%s][%s], leaderBroker[%s]", topic, partitionId, leaderBroker);
}
catch (Exception e) {
log.warn(e, "stop consumer[%s][%s] failed", topic, partitionId);
}
finally {
consumer = null;
}
}
}
// stop the consumer
public void stop()
{
stopConsumer();
log.info("KafkaSimpleConsumer[%s][%s] stopped", topic, partitionId);
}
private PartitionMetadata findLeader() throws InterruptedException
{
for (KafkaBroker broker : replicaBrokers) {
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker %s:%s", broker.host, broker.port);
consumer = new SimpleConsumer(broker.host, broker.port, SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
if (topic.equals(item.topic())) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == partitionId) {
return part;
}
}
}
}
}
catch (Exception e) {
ensureNotInterrupted(e);
log.warn(
e, "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", broker, topic, partitionId
);
}
finally {
if (consumer != null) {
consumer.close();
}
}
}
return null;
}
private Broker findNewLeader(Broker oldLeader) throws InterruptedException
{
long retryCnt = 0;
while (true) {
PartitionMetadata metadata = findLeader();
if (metadata != null) {
replicaBrokers.clear();
for (Broker replica : metadata.replicas()) {
replicaBrokers.add(new KafkaBroker(replica.host(), replica.port()));
}
log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", metadata, oldLeader);
Broker newLeader = metadata.leader();
if (newLeader != null) {
// We check the retryCnt here as well to make sure that we have slept a little bit
// if we don't notice a change in leadership
// just in case if Zookeeper doesn't get updated fast enough
if (oldLeader == null || isValidNewLeader(newLeader) || retryCnt != 0) {
return newLeader;
}
}
}
Thread.sleep(RETRY_INTERVAL);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
if (retryCnt >= 3 && (retryCnt - 3) % 5 == 0) {
log.warn("cannot find leader for [%s] - [%s] after [%s] retries", topic, partitionId, retryCnt);
replicaBrokers.clear();
replicaBrokers.addAll(allBrokers);
}
}
}
private boolean isValidNewLeader(Broker broker) {
// broker is considered valid new leader if it is not the same as old leaderBroker
return !(leaderBroker.host().equalsIgnoreCase(broker.host()) && leaderBroker.port() == broker.port());
}
private void ensureNotInterrupted(Exception e) throws InterruptedException
{
if (Thread.interrupted()) {
log.info(e, "Interrupted during fetching for %s - %s", topic, partitionId);
throw new InterruptedException();
}
}
}

View File

@ -0,0 +1 @@
io.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule

View File

@ -486,7 +486,7 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
return IndexMaker.persist(
index, interval, file, config.getIndexSpec(), progressIndicator
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -99,9 +100,9 @@ public class YeOldePlumberSchool implements PlumberSchool
return new Plumber()
{
@Override
public void startJob()
public Object startJob()
{
return null;
}
@Override
@ -227,6 +228,12 @@ public class YeOldePlumberSchool implements PlumberSchool
{
return new File(persistDir, String.format("spill%d", n));
}
@Override
public void persist(Committer commitRunnable)
{
persist(commitRunnable);
}
};
}
}

View File

@ -63,7 +63,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
{
return null;
}
}
}, null
), null
)
);

View File

@ -0,0 +1,88 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.indexing.common;
import java.io.File;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.firehose.LocalFirehoseFactoryV2;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
/**
*/
@JsonTypeName("test_realtime")
public class TestRealtimeTaskV2 extends RealtimeIndexTask
{
private final TaskStatus status;
@JsonCreator
public TestRealtimeTaskV2(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status
)
{
super(
id,
taskResource,
new FireDepartment(
new DataSchema(dataSource, null, new AggregatorFactory[]{}, null),
new RealtimeIOConfig(
null,
new PlumberSchool() {
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return null;
}
},
new LocalFirehoseFactoryV2(new File("lol"), "rofl", null)
), null
)
);
this.status = status;
}
@Override
@JsonProperty
public String getType()
{
return "test_realtime";
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
return status;
}
}

View File

@ -281,8 +281,9 @@ public class TaskSerdeTest
{
return null;
}
}
),
},
null),
new RealtimeTuningConfig(
1,
new Period("PT10M"),

View File

@ -31,6 +31,7 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
@ -38,6 +39,7 @@ import io.druid.indexing.common.IndexingServiceCondition;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TestMergeTask;
import io.druid.indexing.common.TestRealtimeTask;
import io.druid.indexing.common.TestRealtimeTaskV2;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
@ -48,6 +50,7 @@ import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ZkPathsConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
@ -231,6 +234,51 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
}
@Test
public void testRunSameAvailabilityGroupV2() throws Exception
{
doSetup();
TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1"));
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV2"));
remoteTaskRunner.run(task2);
TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV2", 1), "foo", TaskStatus.running("rtV3"));
remoteTaskRunner.run(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2"));
}
@Test
public void testRunWithCapacity() throws Exception
{
@ -278,6 +326,53 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2"));
}
@Test
public void testRunWithCapacityV2() throws Exception
{
doSetup();
TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1"));
remoteTaskRunner.run(task1);
Assert.assertTrue(taskAnnounced(task1.getId()));
mockWorkerRunningTask(task1);
TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV2", 3), "foo", TaskStatus.running("rtV2"));
remoteTaskRunner.run(task2);
TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV3", 2), "foo", TaskStatus.running("rtV3"));
remoteTaskRunner.run(task3);
Assert.assertTrue(taskAnnounced(task3.getId()));
mockWorkerRunningTask(task3);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getRunningTasks().size() == 2;
}
}
)
);
Assert.assertTrue(
TestUtils.conditionValid(
new IndexingServiceCondition()
{
@Override
public boolean isValid()
{
return remoteTaskRunner.getPendingTasks().size() == 1;
}
}
)
);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2"));
}
@Test
public void testStatusRemoved() throws Exception
{

View File

@ -801,8 +801,8 @@ public class TaskLifecycleTest
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,

View File

@ -56,7 +56,7 @@ public class TaskAnnouncementTest
{
return null;
}
}), null
}, null), null
)
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -96,6 +96,7 @@
Kafka 0.7 is not available in Maven Central -->
<!-- <module>extensions/kafka&#45;seven</module> -->
<module>extensions/kafka-eight</module>
<module>extensions/kafka-eight-simpleConsumer</module>
<module>extensions/rabbitmq</module>
<module>extensions/histogram</module>
<module>extensions/mysql-metadata-storage</module>
@ -103,10 +104,9 @@
<module>extensions/azure-extensions</module>
<module>extensions/namespace-lookup</module>
<module>extensions/kafka-extraction-namespace</module>
<!-- distribution packaging -->
<module>extensions-distribution</module>
<module>distribution</module>
</modules>
<dependencyManagement>

View File

@ -201,7 +201,13 @@ public class IndexIO
return convertSegment(toConvert, converted, indexSpec, false, true);
}
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate)
public static boolean convertSegment(
File toConvert,
File converted,
IndexSpec indexSpec,
boolean forceIfCurrent,
boolean validate
)
throws IOException
{
final int version = SegmentUtils.getVersionFromDir(toConvert);
@ -253,7 +259,7 @@ public class IndexIO
IndexableAdapter adapter2
)
{
if(rb1.getTimestamp() != rb2.getTimestamp()){
if (rb1.getTimestamp() != rb2.getTimestamp()) {
throw new SegmentValidationException(
"Timestamp mismatch. Expected %d found %d",
rb1.getTimestamp(), rb2.getTimestamp()
@ -852,14 +858,12 @@ public class IndexIO
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims9.writeToChannel(writer);
@ -970,7 +974,8 @@ public class IndexIO
index.getAvailableDimensions(),
new ConciseBitmapFactory(),
columns,
index.getFileMapper()
index.getFileMapper(),
null
);
}
}
@ -999,6 +1004,7 @@ public class IndexIO
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;
/**
* This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap
* index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where
@ -1010,6 +1016,12 @@ public class IndexIO
segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
}
Object commitMetaData = null;
ByteBuffer metadata = smooshedFiles.mapFile("metadata.drd");
if (metadata != null) {
commitMetaData = mapper.readValue(serializerUtils.readBytes(metadata, metadata.remaining()), Object.class);
}
Map<String, Column> columns = Maps.newHashMap();
for (String columnName : cols) {
@ -1019,7 +1031,7 @@ public class IndexIO
columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time")));
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, commitMetaData
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);

View File

@ -115,9 +115,9 @@ public class IndexMaker
mapper = injector.getInstance(ObjectMapper.class);
}
public static File persist(final IncrementalIndex index, File outDir, final IndexSpec indexSpec) throws IOException
public static File persist(final IncrementalIndex index, File outDir, final Object commitMetaData, final IndexSpec indexSpec) throws IOException
{
return persist(index, index.getInterval(), outDir, indexSpec);
return persist(index, index.getInterval(), outDir, commitMetaData, indexSpec);
}
/**
@ -134,16 +134,20 @@ public class IndexMaker
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
final Object commitMetaData,
final IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
return persist(
index, dataInterval, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString())
);
}
public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
final Object commitMetaData,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -181,6 +185,7 @@ public class IndexMaker
),
index.getMetricAggs(),
outDir,
commitMetaData,
indexSpec,
progress
);
@ -215,22 +220,26 @@ public class IndexMaker
),
metricAggs,
outDir,
null,
indexSpec,
progress
);
}
public static File merge(
List<IndexableAdapter> adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec
List<IndexableAdapter> adapters, final AggregatorFactory[] metricAggs, File outDir, final String commitMetaData, final IndexSpec indexSpec
) throws IOException
{
return merge(adapters, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
return merge(
adapters, metricAggs, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString())
);
}
public static File merge(
List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
File outDir,
final Object commitMetaData,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -320,7 +329,9 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(
adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec
);
}
@ -341,6 +352,7 @@ public class IndexMaker
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
adapter.getMetaData(),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@ -362,12 +374,13 @@ public class IndexMaker
final IndexSpec indexSpec
) throws IOException
{
return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()), indexSpec);
return append(adapters, outDir, null, new LoggingProgressIndicator(outDir.toString()), indexSpec);
}
public static File append(
final List<IndexableAdapter> adapters,
final File outDir,
final String commitMetaData,
final ProgressIndicator progress,
final IndexSpec indexSpec
) throws IOException
@ -438,7 +451,7 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec);
}
private static File makeIndexFiles(
@ -447,6 +460,7 @@ public class IndexMaker
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Object commitMetaData,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
@ -540,15 +554,9 @@ public class IndexMaker
progress.progress();
makeIndexBinary(
v9Smoosher,
adapters,
outDir,
mergedDimensions,
mergedMetrics,
skippedDimensions,
progress,
indexSpec
v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec
);
makeMetadataBinary(v9Smoosher, progress, commitMetaData);
v9Smoosher.close();
@ -1396,8 +1404,7 @@ public class IndexMaker
+ 16
+ serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims.writeToChannel(writer);
@ -1412,6 +1419,7 @@ public class IndexMaker
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
serializerUtils.writeString(
writer, bitmapSerdeFactoryType
);
@ -1422,6 +1430,19 @@ public class IndexMaker
progress.stopSection(section);
}
private static void makeMetadataBinary(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
final Object commitMetadata
) throws IOException
{
progress.startSection("metadata.drd");
if (commitMetadata != null) {
v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(commitMetadata)));
}
progress.stopSection("metadata.drd");
}
private static void writeColumn(
FileSmoosher v9Smoosher,
ColumnPartSerde serde,

View File

@ -44,4 +44,6 @@ public interface IndexableAdapter
String getMetricType(String metric);
ColumnCapabilities getCapabilities(String column);
Object getMetaData();
}

View File

@ -33,6 +33,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
public BitmapFactory getBitmapFactoryForDimensions();
public Object getMetaData();
/**
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.

View File

@ -81,6 +81,11 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
}
}
@Override
public Object getMetaData() {
return input.getMetaData();
}
@Override
public Interval getDataInterval()
{

View File

@ -37,6 +37,12 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
this.filter = filter;
}
@Override
public Object getMetaData()
{
return null;
}
@Override
public Interval getDataInterval()
{

View File

@ -37,6 +37,7 @@ public class SimpleQueryableIndex implements QueryableIndex
private final BitmapFactory bitmapFactory;
private final Map<String, Column> columns;
private final SmooshedFileMapper fileMapper;
private final Object commitMetaData;
public SimpleQueryableIndex(
Interval dataInterval,
@ -44,7 +45,8 @@ public class SimpleQueryableIndex implements QueryableIndex
Indexed<String> dimNames,
BitmapFactory bitmapFactory,
Map<String, Column> columns,
SmooshedFileMapper fileMapper
SmooshedFileMapper fileMapper,
Object commitMetaData
)
{
Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME));
@ -54,6 +56,7 @@ public class SimpleQueryableIndex implements QueryableIndex
this.bitmapFactory = bitmapFactory;
this.columns = columns;
this.fileMapper = fileMapper;
this.commitMetaData = commitMetaData;
}
@Override
@ -96,5 +99,11 @@ public class SimpleQueryableIndex implements QueryableIndex
public void close() throws IOException
{
fileMapper.close();
}
@Override
public Object getMetaData()
{
return commitMetaData;
}
}

View File

@ -98,6 +98,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
}
@Override
public Object getMetaData()
{
return null;
}
@Override
public Interval getDataInterval()
{

View File

@ -128,7 +128,7 @@ public class IndexMakerParameterizedTest
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, indexSpec)));
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, null, indexSpec)));
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -171,13 +171,13 @@ public class IndexMakerParameterizedTest
final File tempDir2 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec)));
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, indexSpec)));
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, null, indexSpec)));
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
@ -242,6 +242,7 @@ public class IndexMakerParameterizedTest
IndexMaker.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
@ -251,6 +252,7 @@ public class IndexMakerParameterizedTest
IndexMaker.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
@ -291,7 +293,7 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec)));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -394,7 +396,7 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec)));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -451,7 +453,7 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec)));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -500,7 +502,7 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec)));
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);

View File

@ -20,6 +20,7 @@ package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.segment.realtime.plumber.PlumberSchool;
/**
@ -28,15 +29,18 @@ public class RealtimeIOConfig implements IOConfig
{
private final FirehoseFactory firehoseFactory;
private final PlumberSchool plumberSchool;
private final FirehoseFactoryV2 firehoseFactoryV2;
@JsonCreator
public RealtimeIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
@JsonProperty("plumber") PlumberSchool plumberSchool,
@JsonProperty("firehoseV2") FirehoseFactoryV2 firehoseFactoryV2
)
{
this.firehoseFactory = firehoseFactory;
this.plumberSchool = plumberSchool;
this.firehoseFactoryV2 = firehoseFactoryV2;
}
@JsonProperty("firehose")
@ -45,6 +49,12 @@ public class RealtimeIOConfig implements IOConfig
return firehoseFactory;
}
@JsonProperty("firehoseV2")
public FirehoseFactoryV2 getFirehoseFactoryV2()
{
return firehoseFactoryV2;
}
public PlumberSchool getPlumberSchool()
{
return plumberSchool;

View File

@ -21,12 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Plumber;
import java.io.Closeable;
import java.io.IOException;
/**
@ -41,7 +43,6 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
private final RealtimeTuningConfig tuningConfig;
private final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
@JsonCreator
@ -92,11 +93,21 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics);
}
public boolean checkFirehoseV2()
{
return ioConfig.getFirehoseFactoryV2() != null;
}
public Firehose connect() throws IOException
{
return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());
}
public FirehoseV2 connect(Object metaData) throws IOException
{
return ioConfig.getFirehoseFactoryV2().connect(dataSchema.getParser(), metaData);
}
public FireDepartmentMetrics getMetrics()
{
return metrics;

View File

@ -29,7 +29,9 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
@ -182,6 +184,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final RealtimeTuningConfig config;
private volatile Firehose firehose = null;
private volatile FirehoseV2 firehoseV2 = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
@ -214,6 +217,26 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
public FirehoseV2 initFirehose(Object metaData)
{
synchronized (this) {
if (firehose == null && firehoseV2 == null) {
try {
log.info("Calling the FireDepartment and getting a FirehoseV2.");
firehoseV2 = fireDepartment.connect(metaData);
log.info("FirehoseV2 acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehoseV2().");
}
return firehoseV2;
}
}
public Plumber initPlumber()
{
synchronized (this) {
@ -238,67 +261,18 @@ public class RealtimeManager implements QuerySegmentWalker
public void run()
{
plumber = initPlumber();
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try {
plumber.startJob();
Object metadata = plumber.startJob();
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
firehose = initFirehose();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
InputRow inputRow = null;
try {
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
}
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
metrics.incrementProcessed();
}
catch (ParseException e) {
if (inputRow != null) {
log.error(e, "unparseable line: %s", inputRow);
}
metrics.incrementUnparseable();
}
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehose(metadata);
runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
runFirehose(firehose);
}
}
catch (RuntimeException e) {
log.makeAlert(
@ -325,6 +299,135 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
private void runFirehoseV2(FirehoseV2 firehose)
{
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try {
firehose.start();
}
catch (Exception e) {
log.error(e, "Failed to start firehoseV2");
return;
}
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
log.info("FirehoseV2 started with nextFlush [%s]", nextFlush);
boolean haveRow = true;
while (haveRow) {
InputRow inputRow = null;
int numRows = 0;
try {
inputRow = firehose.currRow();
if (inputRow != null) {
try {
numRows = plumber.add(inputRow);
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
continue;
}
if (numRows < 0) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
} else {
metrics.incrementProcessed();
}
}
}
catch (Exception e) {
log.makeAlert(e, "Some exception got thrown while processing rows. Ignoring and continuing.")
.addData("inputRow", inputRow);
}
try {
haveRow = firehose.advance();
}
catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
}
try {
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
}
} catch (Exception e) {
log.makeAlert(e, "An exception happened while queue to persist!? We hope it is transient. Ignore and continue.");
}
}
}
private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod)
{
plumber.persist(committer);
return new DateTime().plus(intermediatePersistPeriod).getMillis();
}
private void runFirehose(Firehose firehose)
{
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
InputRow inputRow = null;
try {
try {
inputRow = firehose.nextRow();
if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
log.info("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
}
boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
continue;
}
final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch());
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
plumber.persist(firehose.commit());
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
}
metrics.incrementProcessed();
}
catch (ParseException e) {
if (inputRow != null) {
log.error(e, "unparseable line: %s", inputRow);
}
metrics.incrementUnparseable();
}
}
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{
QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);

View File

@ -0,0 +1,229 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
/**
*/
public class LocalFirehoseFactoryV2 implements FirehoseFactoryV2<StringInputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class);
private final File baseDir;
private final String filter;
private final StringInputRowParser parser;
@JsonCreator
public LocalFirehoseFactoryV2(
@JsonProperty("baseDir") File baseDir,
@JsonProperty("filter") String filter,
// Backwards compatible
@JsonProperty("parser") StringInputRowParser parser
)
{
this.baseDir = baseDir;
this.filter = filter;
this.parser = parser;
}
@JsonProperty
public File getBaseDir()
{
return baseDir;
}
@JsonProperty
public String getFilter()
{
return filter;
}
@JsonProperty
public StringInputRowParser getParser()
{
return parser;
}
@Override
public FirehoseV2 connect(StringInputRowParser firehoseParser, Object metadata) throws IOException, ParseException
{
log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile());
Collection<File> foundFiles = FileUtils.listFiles(
baseDir.getAbsoluteFile(),
new WildcardFileFilter(filter),
TrueFileFilter.INSTANCE
);
if (foundFiles == null || foundFiles.isEmpty()) {
throw new ISE("Found no files to ingest! Check your schema.");
}
log.info ("Found files: " + foundFiles);
final LinkedList<File> files = Lists.newLinkedList(
foundFiles
);
return new FileIteratingFirehoseV2(new Iterator<LineIterator>()
{
@Override
public boolean hasNext()
{
return !files.isEmpty();
}
@Override
public LineIterator next()
{
try {
return FileUtils.lineIterator(files.poll());
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}, firehoseParser);
}
class FileIteratingFirehoseV2 implements FirehoseV2 {
private ConcurrentMap<Integer, Long> lastOffsetPartitions;
private volatile boolean stop;
private volatile InputRow row = null;
private final Iterator<LineIterator> lineIterators;
private final StringInputRowParser parser;
private LineIterator lineIterator = null;
public FileIteratingFirehoseV2(
Iterator<LineIterator> lineIterators,
StringInputRowParser parser
)
{
this.lineIterators = lineIterators;
this.parser = parser;
}
@Override
public void close() throws IOException
{
stop = true;
}
@Override
public boolean advance()
{
if (stop) {
return false;
}
nextMessage();
return true;
}
@Override
public InputRow currRow()
{
return row;
}
@Override
public Committer makeCommitter()
{
final Map<Integer, Long> offsets = Maps.newHashMap(lastOffsetPartitions);//TODO no test on offset
return new Committer()
{
@Override
public Object getMetadata()
{
return offsets;
}
@Override
public void run()
{
}
};
}
@Override
public void start() throws Exception
{
nextMessage();
}
private void nextMessage()
{
while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) {
lineIterator = lineIterators.next();
}
stop = !(lineIterator != null && lineIterator.hasNext());
try {
if (lineIterator == null || !lineIterator.hasNext()) {
// Close old streams, maybe.
if (lineIterator != null) {
lineIterator.close();
}
lineIterator = lineIterators.next();
}
row = parser.parse((String)lineIterator.next());//parser.parse(lineIterator.next());TODO
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}

View File

@ -81,7 +81,7 @@ public class FlushingPlumber extends RealtimePlumber
}
@Override
public void startJob()
public Object startJob()
{
log.info("Starting job for %s", getSchema().getDataSource());
@ -92,8 +92,9 @@ public class FlushingPlumber extends RealtimePlumber
flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d");
}
bootstrapSinksFromDisk();
Object retVal = bootstrapSinksFromDisk();
startFlushThread();
return retVal;
}
protected void flushAfterDuration(final long truncatedTime, final Sink sink)

View File

@ -17,6 +17,7 @@
package io.druid.segment.realtime.plumber;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -27,8 +28,10 @@ public interface Plumber
/**
* Perform any initial setup. Should be called before using any other methods, and should be paired
* with a corresponding call to {@link #finishJob}.
*
* @return the metadata of the "newest" segment that might have previously been persisted
*/
public void startJob();
public Object startJob();
/**
* @param row - the row to insert
@ -44,6 +47,7 @@ public interface Plumber
*
* @param commitRunnable code to run after persisting data
*/
void persist(Committer commitRunnable);
void persist(Runnable commitRunnable);
/**

View File

@ -40,6 +40,9 @@ import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
@ -82,6 +85,9 @@ import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@ -121,7 +127,6 @@ public class RealtimePlumber implements Plumber
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
public RealtimePlumber(
DataSchema schema,
RealtimeTuningConfig config,
@ -171,15 +176,17 @@ public class RealtimePlumber implements Plumber
}
@Override
public void startJob()
public Object startJob()
{
computeBaseDir(schema).mkdirs();
initializeExecutors();
bootstrapSinksFromDisk();
Object retVal = bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
// Push pending sinks bootstrapped from previous run
mergeAndPush();
return retVal;
}
@Override
@ -332,6 +339,27 @@ public class RealtimePlumber implements Plumber
@Override
public void persist(final Runnable commitRunnable)
{
persist(
new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
commitRunnable.run();
}
}
);
}
@Override
public void persist(final Committer commitRunnable)
{
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) {
@ -352,7 +380,11 @@ public class RealtimePlumber implements Plumber
{
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
metrics.incrementRowOutputCount(
persistHydrant(
pair.lhs, schema, pair.rhs, commitRunnable.getMetadata()
)
);
}
commitRunnable.run();
}
@ -417,7 +449,7 @@ public class RealtimePlumber implements Plumber
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
final int rowCount = persistHydrant(hydrant, schema, interval, null);
metrics.incrementRowOutputCount(rowCount);
}
}
@ -450,19 +482,27 @@ public class RealtimePlumber implements Plumber
}
QueryableIndex index = IndexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
try {
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
segmentPublisher.publishSegment(segment);
if (!isPushedMarker.createNewFile()) {
log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.addData("partitionNum", segment.getShardSpec().getPartitionNum())
.addData("marker", isPushedMarker)
if (!isPushedMarker.createNewFile()) {
log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource())
.addData("interval", sink.getInterval())
.addData("partitionNum", segment.getShardSpec().getPartitionNum())
.addData("marker", isPushedMarker)
.emit();
}
}
catch (Throwable e) {
log.info("Exception happen when pushing to deep storage");
log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource())
.addData("interval", interval)
.emit();
}
}
@ -565,20 +605,22 @@ public class RealtimePlumber implements Plumber
}
}
protected void bootstrapSinksFromDisk()
protected Object bootstrapSinksFromDisk()
{
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
return null;
}
File[] files = baseDir.listFiles();
if (files == null) {
return;
return null;
}
Object metadata = null;
long latestCommitTime = 0;
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
@ -611,7 +653,7 @@ public class RealtimePlumber implements Plumber
}
}
);
boolean isCorrupted = false;
try {
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
@ -623,7 +665,35 @@ public class RealtimePlumber implements Plumber
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
QueryableIndex queryableIndex = null;
try {
queryableIndex = IndexIO.loadIndex(segmentDir);
}
catch (IOException e) {
log.error(e, "Problem loading segmentDir from disk.");
isCorrupted = true;
}
if (isCorrupted) {
try {
File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema);
log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath());
FileUtils.copyDirectory(segmentDir, corruptSegmentDir);
FileUtils.deleteDirectory(segmentDir);
}
catch (Exception e1) {
log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath());
}
continue;
}
BasicFileAttributes attr = Files.readAttributes(segmentDir.toPath(), BasicFileAttributes.class);
if (attr.creationTime().toMillis() > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
queryableIndex.getMetaData(), attr.creationTime().toMillis(), latestCommitTime
);
latestCommitTime = attr.creationTime().toMillis();
metadata = queryableIndex.getMetaData();
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(
@ -634,13 +704,12 @@ public class RealtimePlumber implements Plumber
versioningPolicy.getVersion(sinkInterval),
config.getShardSpec()
),
IndexIO.loadIndex(segmentDir)
queryableIndex
),
Integer.parseInt(segmentDir.getName())
)
);
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
@ -657,6 +726,7 @@ public class RealtimePlumber implements Plumber
.emit();
}
}
return metadata;
}
protected void startPersistThread()
@ -798,6 +868,11 @@ public class RealtimePlumber implements Plumber
return new File(config.getBasePersistDirectory(), schema.getDataSource());
}
protected File computeCorruptedFileDumpDir(File persistDir, DataSchema schema)
{
return new File(persistDir.getAbsolutePath().replace(schema.getDataSource(), "corrupted/"+schema.getDataSource()));
}
protected File computePersistDir(DataSchema schema, Interval interval)
{
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
@ -812,7 +887,7 @@ public class RealtimePlumber implements Plumber
*
* @return the number of rows persisted
*/
protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval)
protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Object commitMetaData)
{
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
@ -824,9 +899,10 @@ public class RealtimePlumber implements Plumber
}
log.info(
"DataSource[%s], Interval[%s], persisting Hydrant[%s]",
"DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]",
schema.getDataSource(),
interval,
commitMetaData,
indexToPersist
);
try {
@ -838,6 +914,7 @@ public class RealtimePlumber implements Plumber
persistedFile = IndexMaker.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
commitMetaData,
indexSpec
);
} else {

View File

@ -72,7 +72,8 @@ public class FireDepartmentTest
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null
)
),
null
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null, null

View File

@ -24,6 +24,9 @@ import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
@ -41,6 +44,7 @@ import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.utils.Runnables;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@ -59,8 +63,11 @@ import java.util.concurrent.TimeUnit;
public class RealtimeManagerTest
{
private RealtimeManager realtimeManager;
private RealtimeManager realtimeManager2;
private DataSchema schema;
private DataSchema schema2;
private TestPlumber plumber;
private TestPlumber plumber2;
@Before
public void setUp() throws Exception
@ -78,6 +85,12 @@ public class RealtimeManagerTest
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
schema2 = new DataSchema(
"testV2",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
{
@ -96,6 +109,28 @@ public class RealtimeManagerTest
{
return plumber;
}
},
null
);
RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(
null,
new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return plumber2;
}
},
new FirehoseFactoryV2()
{
@Override
public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException, ParseException
{
return new TestFirehoseV2(rows.iterator());
}
}
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
@ -125,6 +160,18 @@ public class RealtimeManagerTest
),
null
);
plumber2 = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema2, tuningConfig, new DateTime().toString()));
realtimeManager2 = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema2,
ioConfig2,
tuningConfig
)
),
null
);
}
@Test
@ -148,6 +195,27 @@ public class RealtimeManagerTest
Assert.assertEquals(1, plumber.getPersistCount());
}
@Test
public void testRunV2() throws Exception
{
realtimeManager2.start();
Stopwatch stopwatch = Stopwatch.createStarted();
while (realtimeManager2.getMetrics("testV2").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Realtime manager should have completed processing 2 events!");
}
}
Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed());
Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway());
Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").unparseable());
Assert.assertTrue(plumber2.isStartedJob());
Assert.assertTrue(plumber2.isFinishedJob());
Assert.assertEquals(1, plumber2.getPersistCount());
}
private TestInputRowHolder makeRow(final long timestamp)
{
return new TestInputRowHolder(timestamp, null);
@ -266,6 +334,71 @@ public class RealtimeManagerTest
}
}
private static class TestFirehoseV2 implements FirehoseV2
{
private final Iterator<TestInputRowHolder> rows;
private InputRow currRow;
private boolean stop;
private TestFirehoseV2(Iterator<TestInputRowHolder> rows)
{
this.rows = rows;
}
private void nextMessage() {
currRow = null;
while (currRow == null) {
final TestInputRowHolder holder = rows.next();
currRow = holder == null ? null : holder.getRow();
}
}
@Override
public void close() throws IOException
{
}
@Override
public boolean advance()
{
stop = !rows.hasNext();
if (stop) {
return false;
}
nextMessage();
return true;
}
@Override
public InputRow currRow()
{
return currRow;
}
@Override
public Committer makeCommitter()
{
return new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
}
};
}
@Override
public void start() throws Exception
{
nextMessage();
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;
@ -296,9 +429,10 @@ public class RealtimeManagerTest
}
@Override
public void startJob()
public Object startJob()
{
startedJob = true;
return null;
}
@Override
@ -342,5 +476,11 @@ public class RealtimeManagerTest
{
finishedJob = true;
}
@Override
public void persist(Committer commitRunnable)
{
persistCount++;
}
}
}