From 2237a8cf0f4315c681299b7404eab67d1e45e574 Mon Sep 17 00:00:00 2001 From: lvjq Date: Mon, 29 Sep 2014 15:22:17 -0700 Subject: [PATCH] kafka 8 simple consumer firehose --- .../druid/common/utils/SerializerUtils.java | 11 +- docs/content/development/experimental.md | 2 +- .../kafka-simple-consumer-firehose.md | 30 ++ docs/content/ingestion/firehose.md | 3 +- extensions/kafka-eight-simpleConsumer/pom.xml | 77 ++++ .../KafkaEightSimpleConsumerDruidModule.java | 47 ++ ...fkaEightSimpleConsumerFirehoseFactory.java | 319 ++++++++++++++ .../firehose/kafka/KafkaSimpleConsumer.java | 400 ++++++++++++++++++ .../io.druid.initialization.DruidModule | 1 + .../io/druid/indexer/IndexGeneratorJob.java | 2 +- .../common/index/YeOldePlumberSchool.java | 11 +- .../indexing/common/TestRealtimeTask.java | 2 +- .../indexing/common/TestRealtimeTaskV2.java | 88 ++++ .../indexing/common/task/TaskSerdeTest.java | 5 +- .../overlord/RemoteTaskRunnerTest.java | 95 +++++ .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../indexing/worker/TaskAnnouncementTest.java | 2 +- pom.xml | 4 +- .../main/java/io/druid/segment/IndexIO.java | 24 +- .../java/io/druid/segment/IndexMaker.java | 57 ++- .../io/druid/segment/IndexableAdapter.java | 2 + .../java/io/druid/segment/QueryableIndex.java | 1 + .../QueryableIndexIndexableAdapter.java | 5 + .../segment/RowboatFilteringIndexAdapter.java | 6 + .../druid/segment/SimpleQueryableIndex.java | 11 +- .../incremental/IncrementalIndexAdapter.java | 6 + .../segment/IndexMakerParameterizedTest.java | 16 +- .../segment/indexing/RealtimeIOConfig.java | 12 +- .../segment/realtime/FireDepartment.java | 13 +- .../segment/realtime/RealtimeManager.java | 217 +++++++--- .../firehose/LocalFirehoseFactoryV2.java | 229 ++++++++++ .../realtime/plumber/FlushingPlumber.java | 5 +- .../segment/realtime/plumber/Plumber.java | 6 +- .../realtime/plumber/RealtimePlumber.java | 129 ++++-- .../segment/realtime/FireDepartmentTest.java | 3 +- .../segment/realtime/RealtimeManagerTest.java | 142 ++++++- 36 files changed, 1848 insertions(+), 137 deletions(-) create mode 100644 docs/content/development/kafka-simple-consumer-firehose.md create mode 100644 extensions/kafka-eight-simpleConsumer/pom.xml create mode 100644 extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java create mode 100644 extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java create mode 100644 extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java create mode 100644 extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java create mode 100644 server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index 38e36e4084e..0fb8b59af32 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -67,9 +67,14 @@ public class SerializerUtils public String readString(ByteBuffer in) throws IOException { final int length = in.getInt(); - byte[] stringBytes = new byte[length]; - in.get(stringBytes); - return new String(stringBytes, UTF8); + return new String(readBytes(in, length), UTF8); + } + + public byte[] readBytes(ByteBuffer in, int length) throws IOException + { + byte[] bytes = new byte[length]; + in.get(bytes); + return bytes; } public void writeStrings(OutputStream out, String[] names) throws IOException diff --git a/docs/content/development/experimental.md b/docs/content/development/experimental.md index e99dbc60828..16295428bdd 100644 --- a/docs/content/development/experimental.md +++ b/docs/content/development/experimental.md @@ -11,4 +11,4 @@ To enable experimental features, include their artifacts in the configuration ru druid.extensions.coordinates=["io.druid.extensions:druid-histogram:{VERSION}"] ``` -The configuration for all the indexer and query nodes need to be updated with this. +The configuration for all the indexer and query nodes need to be updated with this. \ No newline at end of file diff --git a/docs/content/development/kafka-simple-consumer-firehose.md b/docs/content/development/kafka-simple-consumer-firehose.md new file mode 100644 index 00000000000..ef21535f9c7 --- /dev/null +++ b/docs/content/development/kafka-simple-consumer-firehose.md @@ -0,0 +1,30 @@ +--- +layout: doc_page +--- +# KafkaSimpleConsumerFirehose +This firehose acts as a Kafka simple consumer and ingests data from Kafka, currently still in experimental section. +The configuration for KafkaSimpleConsumerFirehose is similar to the KafkaFirehose [Kafka firehose example](realtime-ingestion.html#realtime-specfile), except `firehose` should be replaced with `firehoseV2` like this: +```json +"firehoseV2": { +"type" : "kafka-0.8-v2", +"brokerList" : ["localhost:4443"], +"queueBufferLength":10001, +"resetBehavior":"latest", +"partitionIdList" : ["0"], +"clientId" : "localclient", +"feed": "wikipedia" +} +``` + +|property|description|required?| +|--------|-----------|---------| +|type|kafka-0.8-v2|yes| +|brokerList|list of the kafka brokers|yes| +|queueBufferLength|the buffer length for kafka message queue|no default(20000)| +|resetBehavior|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|no default(earliest)| +|partitionIdList|list of kafka partition ids|yes| +|clientId|the clientId for kafka SimpleConsumer|yes| +|feed|kafka topic|yes| + +For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second topic, keeping one partition can work properly, but more partition could be added if higher throughput is required. + diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md index fe3408357ad..075c5da963d 100644 --- a/docs/content/ingestion/firehose.md +++ b/docs/content/ingestion/firehose.md @@ -260,7 +260,6 @@ When using this firehose, events can be sent by submitting a POST request to the |serviceName|name used to announce the event receiver service endpoint|yes| |bufferSize| size of buffer used by firehose to store events|no default(100000)| - #### TimedShutoffFirehose This can be used to start a firehose that will shut down at a specified time. @@ -283,3 +282,5 @@ An example is shown below: |type|This should be "timed"|yes| |shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes| |delegate|firehose to use|yes| +======= + diff --git a/extensions/kafka-eight-simpleConsumer/pom.xml b/extensions/kafka-eight-simpleConsumer/pom.xml new file mode 100644 index 00000000000..ee4de3f1035 --- /dev/null +++ b/extensions/kafka-eight-simpleConsumer/pom.xml @@ -0,0 +1,77 @@ + + + + + 4.0.0 + io.druid.extensions + druid-kafka-eight-simple-consumer + druid-kafka-eight-simple-consumer + druid-kafka-eight-simple-consumer + + + io.druid + druid + 0.8.0-SNAPSHOT + ../../pom.xml + + + + + io.druid + druid-api + + + org.apache.kafka + kafka_2.10 + 0.8.2.1 + + + log4j + log4j + + + org.apache.zookeeper + zookeeper + + + + + + junit + junit + test + + + + + + + maven-jar-plugin + + + + true + true + + + + + + + diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java new file mode 100644 index 00000000000..7648b1f5b3e --- /dev/null +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.firehose.kafka; + +import java.util.List; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; + +import io.druid.initialization.DruidModule; + +public class KafkaEightSimpleConsumerDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule("KafkaEightSimpleConsumerFirehoseModule").registerSubtypes( + new NamedType(KafkaEightSimpleConsumerFirehoseFactory.class, "kafka-0.8-v2") + ) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java new file mode 100644 index 00000000000..a06b4945aa5 --- /dev/null +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -0,0 +1,319 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.firehose.kafka; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.collect.Maps; +import com.metamx.common.logger.Logger; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Sets; + +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.Committer; +import io.druid.data.input.FirehoseFactoryV2; +import io.druid.data.input.FirehoseV2; +import io.druid.data.input.InputRow; +import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; + +public class KafkaEightSimpleConsumerFirehoseFactory implements + FirehoseFactoryV2 +{ + private static final Logger log = new Logger( + KafkaEightSimpleConsumerFirehoseFactory.class + ); + + @JsonProperty + private final List brokerList; + + @JsonProperty + private final List partitionIdList; + + @JsonProperty + private final String clientId; + + @JsonProperty + private final String feed; + + @JsonProperty + private final int queueBufferLength; + + @JsonProperty + private boolean earliest; + + private final List consumerWorkers = new CopyOnWriteArrayList<>(); + private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000; + private static final String RESET_TO_LATEST = "latest"; + private static final int CONSUMER_FETCH_TIMEOUT = 10000; + @JsonCreator + public KafkaEightSimpleConsumerFirehoseFactory( + @JsonProperty("brokerList") List brokerList, + @JsonProperty("partitionIdList") List partitionIdList, + @JsonProperty("clientId") String clientId, + @JsonProperty("feed") String feed, + @JsonProperty("queueBufferLength") Integer queueBufferLength, + @JsonProperty("resetBehavior") String resetBehavior + ) + { + this.brokerList = brokerList; + this.partitionIdList = partitionIdList; + this.clientId = clientId; + this.feed = feed; + + this.queueBufferLength = queueBufferLength == null ? DEFAULT_QUEUE_BUFFER_LENGTH : queueBufferLength; + log.info("queueBufferLength loaded as[%s]", this.queueBufferLength); + + this.earliest = RESET_TO_LATEST.equalsIgnoreCase(resetBehavior) ? false : true; + log.info("Default behavior of cosumer set to earliest? [%s]", this.earliest); + } + + private Map loadOffsetFromPreviousMetaData(Object lastCommit) + { + Map offsetMap = Maps.newHashMap(); + if (lastCommit == null) { + return offsetMap; + } + if (lastCommit instanceof Map) { + Map lastCommitMap = (Map) lastCommit; + for (Map.Entry entry : lastCommitMap.entrySet()) { + try { + int partitionId = Integer.parseInt(entry.getKey().toString()); + long offset = Long.parseLong(entry.getValue().toString()); + log.debug("Recover last commit information partitionId [%s], offset [%s]", partitionId, offset); + offsetMap.put(partitionId, offset); + } + catch (NumberFormatException e) { + log.error(e, "Fail to load offset from previous meta data [%s]", entry); + } + } + log.info("Loaded offset map[%s]", offsetMap); + } else { + log.error("Unable to cast lastCommit to Map"); + } + return offsetMap; + } + + @Override + public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object lastCommit) throws IOException + { + final Map lastOffsets = loadOffsetFromPreviousMetaData(lastCommit); + + Set newDimExclus = Sets.union( + firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), + Sets.newHashSet("feed") + ); + final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec( + firehoseParser.getParseSpec() + .withDimensionsSpec( + firehoseParser.getParseSpec() + .getDimensionsSpec() + .withDimensionExclusions( + newDimExclus + ) + ) + ); + for (Integer partition : partitionIdList) { + final KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer( + feed, partition, clientId, brokerList, earliest + ); + Long startOffset = lastOffsets.get(partition); + PartitionConsumerWorker worker = new PartitionConsumerWorker( + feed, kafkaSimpleConsumer, partition, startOffset == null ? 0 : startOffset + ); + consumerWorkers.add(worker); + } + + final LinkedBlockingQueue messageQueue = new LinkedBlockingQueue(queueBufferLength); + log.info("Kicking off all consumers"); + for (PartitionConsumerWorker worker : consumerWorkers) { + worker.go(messageQueue); + } + log.info("All consumer started"); + + return new FirehoseV2() + { + private ConcurrentMap lastOffsetPartitions; + private volatile boolean stop; + private volatile boolean interrupted; + + private volatile BytesMessageWithOffset msg = null; + private volatile InputRow row = null; + + { + lastOffsetPartitions = Maps.newConcurrentMap(); + lastOffsetPartitions.putAll(lastOffsets); + } + + @Override + public void start() throws Exception + { + nextMessage(); + } + + @Override + public boolean advance() + { + if (stop) { + return false; + } + + nextMessage(); + return true; + } + + private void nextMessage() + { + try { + row = null; + while (row == null) { + if (msg != null) { + lastOffsetPartitions.put(msg.getPartition(), msg.offset()); + } + + msg = messageQueue.take(); + interrupted = false; + + final byte[] message = msg.message(); + row = message == null ? null : theParser.parse(ByteBuffer.wrap(message)); + } + } + catch (InterruptedException e) { + interrupted = true; + log.info(e, "Interrupted when taken from queue"); + } + } + + @Override + public InputRow currRow() + { + if (interrupted) { + return null; + } + return row; + } + + @Override + public Committer makeCommitter() + { + final Map offsets = Maps.newHashMap(lastOffsetPartitions); + + return new Committer() + { + @Override + public Object getMetadata() + { + return offsets; + } + + @Override + public void run() + { + + } + }; + } + + @Override + public void close() throws IOException + { + log.info("Stopping kafka 0.8 simple firehose"); + stop = true; + for (PartitionConsumerWorker t : consumerWorkers) { + t.close(); + } + } + }; + } + + private static class PartitionConsumerWorker implements Closeable + { + private final String topic; + private final KafkaSimpleConsumer consumer; + private final int partitionId; + private final long startOffset; + + private final AtomicBoolean stopped = new AtomicBoolean(false); + private volatile Thread thread = null; + + PartitionConsumerWorker(String topic, KafkaSimpleConsumer consumer, int partitionId, long startOffset) + { + this.topic = topic; + this.consumer = consumer; + this.partitionId = partitionId; + this.startOffset = startOffset; + } + + public void go(final LinkedBlockingQueue messageQueue) { + thread = new Thread() + { + @Override + public void run() + { + long offset = startOffset; + log.info("Start running parition[%s], offset[%s]", partitionId, offset); + try { + while (!stopped.get()) { + try { + Iterable msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT); + int count = 0; + for (BytesMessageWithOffset msgWithOffset : msgs) { + offset = msgWithOffset.offset(); + messageQueue.put(msgWithOffset); + count++; + } + log.debug("fetch [%s] msgs for partition [%s] in one time ", count, partitionId); + } + catch (InterruptedException e) { + log.info("Interrupted when fetching data, shutting down."); + return; + } + catch (Exception e) { + log.error(e, "Exception happened in fetching data, but will continue consuming"); + } + } + } + finally { + consumer.stop(); + } + } + }; + thread.setDaemon(true); + thread.setName(String.format("kafka-%s-%s", topic, partitionId)); + thread.start(); + } + + @Override + public synchronized void close() throws IOException + { + if (stopped.compareAndSet(false, true)) { + thread.interrupt(); + thread = null; + } + } + } +} diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java new file mode 100644 index 00000000000..ad9760ff70d --- /dev/null +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -0,0 +1,400 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.firehose.kafka; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.metamx.common.guava.FunctionalIterable; +import com.metamx.common.logger.Logger; +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.javaapi.FetchResponse; +import kafka.cluster.Broker; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.OffsetRequest; +import kafka.javaapi.OffsetResponse; +import kafka.javaapi.PartitionMetadata; +import kafka.javaapi.TopicMetadata; +import kafka.javaapi.TopicMetadataRequest; +import kafka.javaapi.TopicMetadataResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +/** + * refer @{link + * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer + * +Example} + *

+ * This class is not thread safe, the caller must ensure all the methods be + * called from single thread + */ +public class KafkaSimpleConsumer +{ + + public static final List EMPTY_MSGS = new ArrayList<>(); + + private static final Logger log = new Logger(KafkaSimpleConsumer.class); + + private final List allBrokers; + private final String topic; + private final int partitionId; + private final String clientId; + private final String leaderLookupClientId; + private final boolean earliest; + + private volatile Broker leaderBroker; + private List replicaBrokers; + private SimpleConsumer consumer = null; + + private static final int SO_TIMEOUT = 30000; + private static final int BUFFER_SIZE = 65536; + private static final long RETRY_INTERVAL = 1000L; + private static final int FETCH_SIZE = 100000000; + + public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List brokers, boolean earliest) + { + List brokerList = new ArrayList(); + for (String broker : brokers) { + String[] tokens = broker.split(":"); + if (tokens.length != 2) { + log.warn("wrong broker name [%s], its format should be host:port", broker); + continue; + } + + try { + brokerList.add(new KafkaBroker(tokens[0], Integer.parseInt(tokens[1]))); + } + catch (NumberFormatException e) { + log.warn("wrong broker name [%s], its format should be host:port", broker); + continue; + } + } + + this.allBrokers = Collections.unmodifiableList(brokerList); + this.topic = topic; + this.partitionId = partitionId; + this.clientId = String.format("%s_%d_%s", topic, partitionId, clientId); + this.leaderLookupClientId = clientId + "leaderLookup"; + this.replicaBrokers = new ArrayList<>(); + this.replicaBrokers.addAll(this.allBrokers); + this.earliest = earliest; + log.info("KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", this.clientId, this.leaderLookupClientId); + } + + private void ensureConsumer(Broker leader) throws InterruptedException + { + if (consumer == null) { + while (leaderBroker == null) { + leaderBroker = findNewLeader(leader); + } + log.info( + "making SimpleConsumer[%s][%s], leader broker[%s:%s]", + topic, partitionId, leaderBroker.host(), leaderBroker.port() + ); + + consumer = new SimpleConsumer( + leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId + ); + } + } + + public static class BytesMessageWithOffset + { + final byte[] msg; + final long offset; + final int partition; + + public BytesMessageWithOffset(byte[] msg, long offset, int partition) + { + this.msg = msg; + this.offset = offset; + this.partition = partition; + } + + public int getPartition() + { + return partition; + } + + public byte[] message() + { + return msg; + } + + public long offset() + { + return offset; + } + } + + static class KafkaBroker + { + final String host; + final int port; + + KafkaBroker(String host, int port) + { + this.host = host; + this.port = port; + } + + @Override + public String toString() + { + return String.format("%s:%d", host, port); + } + } + + private Iterable filterAndDecode(Iterable kafkaMessages, final long offset) + { + return FunctionalIterable + .create(kafkaMessages) + .filter( + new Predicate() + { + @Override + public boolean apply(MessageAndOffset msgAndOffset) + { + return msgAndOffset.offset() >= offset; + } + } + ) + .transform( + new Function() + { + + @Override + public BytesMessageWithOffset apply(MessageAndOffset msgAndOffset) + { + ByteBuffer bb = msgAndOffset.message().payload(); + byte[] payload = new byte[bb.remaining()]; + bb.get(payload); + // add nextOffset here, thus next fetch will use nextOffset instead of current offset + return new BytesMessageWithOffset(payload, msgAndOffset.nextOffset(), partitionId); + } + } + ); + } + + private long getOffset(boolean earliest) throws InterruptedException + { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId); + Map requestInfo = new HashMap(); + requestInfo.put( + topicAndPartition, + new PartitionOffsetRequestInfo( + earliest ? kafka.api.OffsetRequest.EarliestTime() : kafka.api.OffsetRequest.LatestTime(), 1 + ) + ); + OffsetRequest request = new OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId + ); + OffsetResponse response = null; + try { + response = consumer.getOffsetsBefore(request); + } + catch (Exception e) { + ensureNotInterrupted(e); + log.error(e, "caught exception in getOffsetsBefore [%s] - [%s]", topic, partitionId); + return -1; + } + if (response.hasError()) { + log.error( + "error fetching data Offset from the Broker [%s]. reason: [%s]", leaderBroker.host(), + response.errorCode(topic, partitionId) + ); + return -1; + } + long[] offsets = response.offsets(topic, partitionId); + return earliest ? offsets[0] : offsets[offsets.length - 1]; + } + + public Iterable fetch(long offset, int timeoutMs) throws InterruptedException + { + FetchResponse response = null; + Broker previousLeader = leaderBroker; + while (true) { + ensureConsumer(previousLeader); + + FetchRequest request = new FetchRequestBuilder() + .clientId(clientId) + .addFetch(topic, partitionId, offset, FETCH_SIZE) + .maxWait(timeoutMs) + .minBytes(1) + .build(); + + log.debug("fetch offset %s", offset); + + try { + response = consumer.fetch(request); + } + catch (Exception e) { + ensureNotInterrupted(e); + log.warn(e, "caughte exception in fetch {} - {}", topic, partitionId); + response = null; + } + + if (response == null || response.hasError()) { + short errorCode = response != null ? response.errorCode(topic, partitionId) : ErrorMapping.UnknownCode(); + log.warn("fetch %s - %s with offset %s encounters error: [%s]", topic, partitionId, offset, errorCode); + + boolean needNewLeader = false; + if (errorCode == ErrorMapping.RequestTimedOutCode()) { + log.info("kafka request timed out, response[%s]", response); + } else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { + long newOffset = getOffset(earliest); + log.info("got [%s] offset[%s] for [%s][%s]", earliest ? "earliest" : "latest", newOffset, topic, partitionId); + if (newOffset < 0) { + needNewLeader = true; + } else { + offset = newOffset; + continue; + } + } else { + needNewLeader = true; + } + + if (needNewLeader) { + stopConsumer(); + previousLeader = leaderBroker; + leaderBroker = null; + continue; + } + } else { + break; + } + } + + return response != null ? filterAndDecode(response.messageSet(topic, partitionId), offset) : EMPTY_MSGS; + } + + private void stopConsumer() + { + if (consumer != null) { + try { + consumer.close(); + log.info("stop consumer[%s][%s], leaderBroker[%s]", topic, partitionId, leaderBroker); + } + catch (Exception e) { + log.warn(e, "stop consumer[%s][%s] failed", topic, partitionId); + } + finally { + consumer = null; + } + } + } + + // stop the consumer + public void stop() + { + stopConsumer(); + log.info("KafkaSimpleConsumer[%s][%s] stopped", topic, partitionId); + } + + private PartitionMetadata findLeader() throws InterruptedException + { + for (KafkaBroker broker : replicaBrokers) { + SimpleConsumer consumer = null; + try { + log.info("Finding new leader from Kafka brokers, try broker %s:%s", broker.host, broker.port); + consumer = new SimpleConsumer(broker.host, broker.port, SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId); + TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic))); + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + if (topic.equals(item.topic())) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partitionId) { + return part; + } + } + } + } + } + catch (Exception e) { + ensureNotInterrupted(e); + log.warn( + e, "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", broker, topic, partitionId + ); + } + finally { + if (consumer != null) { + consumer.close(); + } + } + } + + return null; + } + + private Broker findNewLeader(Broker oldLeader) throws InterruptedException + { + long retryCnt = 0; + while (true) { + PartitionMetadata metadata = findLeader(); + if (metadata != null) { + replicaBrokers.clear(); + for (Broker replica : metadata.replicas()) { + replicaBrokers.add(new KafkaBroker(replica.host(), replica.port())); + } + + log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", metadata, oldLeader); + Broker newLeader = metadata.leader(); + if (newLeader != null) { + // We check the retryCnt here as well to make sure that we have slept a little bit + // if we don't notice a change in leadership + // just in case if Zookeeper doesn't get updated fast enough + if (oldLeader == null || isValidNewLeader(newLeader) || retryCnt != 0) { + return newLeader; + } + } + } + + Thread.sleep(RETRY_INTERVAL); + retryCnt++; + // if could not find the leader for current replicaBrokers, let's try to + // find one via allBrokers + if (retryCnt >= 3 && (retryCnt - 3) % 5 == 0) { + log.warn("cannot find leader for [%s] - [%s] after [%s] retries", topic, partitionId, retryCnt); + replicaBrokers.clear(); + replicaBrokers.addAll(allBrokers); + } + } + } + + private boolean isValidNewLeader(Broker broker) { + // broker is considered valid new leader if it is not the same as old leaderBroker + return !(leaderBroker.host().equalsIgnoreCase(broker.host()) && leaderBroker.port() == broker.port()); + } + + private void ensureNotInterrupted(Exception e) throws InterruptedException + { + if (Thread.interrupted()) { + log.info(e, "Interrupted during fetching for %s - %s", topic, partitionId); + throw new InterruptedException(); + } + } +} diff --git a/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..f38689577eb --- /dev/null +++ b/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule \ No newline at end of file diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 241679188e5..b54eec9d185 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -486,7 +486,7 @@ public class IndexGeneratorJob implements Jobby ) throws IOException { return IndexMaker.persist( - index, interval, file, config.getIndexSpec(), progressIndicator + index, interval, file, null, config.getIndexSpec(), progressIndicator ); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 066f2708ac0..d0ab4ecce64 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.logger.Logger; +import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -99,9 +100,9 @@ public class YeOldePlumberSchool implements PlumberSchool return new Plumber() { @Override - public void startJob() + public Object startJob() { - + return null; } @Override @@ -227,6 +228,12 @@ public class YeOldePlumberSchool implements PlumberSchool { return new File(persistDir, String.format("spill%d", n)); } + + @Override + public void persist(Committer commitRunnable) + { + persist(commitRunnable); + } }; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 50eeaefcc5d..2e1d5c4fbcc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -63,7 +63,7 @@ public class TestRealtimeTask extends RealtimeIndexTask { return null; } - } + }, null ), null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java new file mode 100644 index 00000000000..9b65ddc4d08 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java @@ -0,0 +1,88 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.indexing.common; + +import java.io.File; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import io.druid.indexing.common.task.RealtimeIndexTask; +import io.druid.indexing.common.task.TaskResource; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.firehose.LocalFirehoseFactoryV2; +import io.druid.segment.realtime.plumber.Plumber; +import io.druid.segment.realtime.plumber.PlumberSchool; + +/** + */ +@JsonTypeName("test_realtime") +public class TestRealtimeTaskV2 extends RealtimeIndexTask +{ + private final TaskStatus status; + + @JsonCreator + public TestRealtimeTaskV2( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("dataSource") String dataSource, + @JsonProperty("taskStatus") TaskStatus status + ) + { + super( + id, + taskResource, + new FireDepartment( + new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), + new RealtimeIOConfig( + null, + new PlumberSchool() { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return null; + } + }, + new LocalFirehoseFactoryV2(new File("lol"), "rofl", null) + ), null + ) + ); + this.status = status; + } + + @Override + @JsonProperty + public String getType() + { + return "test_realtime"; + } + + @Override + public TaskStatus run(TaskToolbox toolbox) throws Exception + { + return status; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 8d0427fe8c7..4fb546406fe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -281,8 +281,9 @@ public class TaskSerdeTest { return null; } - } - ), + }, + null), + new RealtimeTuningConfig( 1, new Period("PT10M"), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index 57041757799..bc7d10f73f3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -31,6 +31,7 @@ import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; + import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; @@ -38,6 +39,7 @@ import io.druid.indexing.common.IndexingServiceCondition; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; +import io.druid.indexing.common.TestRealtimeTaskV2; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -48,6 +50,7 @@ import io.druid.indexing.worker.Worker; import io.druid.jackson.DefaultObjectMapper; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -231,6 +234,51 @@ public class RemoteTaskRunnerTest Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } + @Test + public void testRunSameAvailabilityGroupV2() throws Exception + { + doSetup(); + + TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1")); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + + TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV2")); + remoteTaskRunner.run(task2); + + TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV2", 1), "foo", TaskStatus.running("rtV3")); + remoteTaskRunner.run(task3); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getRunningTasks().size() == 2; + } + } + ) + ); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getPendingTasks().size() == 1; + } + } + ) + ); + + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2")); + } + @Test public void testRunWithCapacity() throws Exception { @@ -278,6 +326,53 @@ public class RemoteTaskRunnerTest Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } + @Test + public void testRunWithCapacityV2() throws Exception + { + doSetup(); + + TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1")); + remoteTaskRunner.run(task1); + Assert.assertTrue(taskAnnounced(task1.getId())); + mockWorkerRunningTask(task1); + + TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV2", 3), "foo", TaskStatus.running("rtV2")); + remoteTaskRunner.run(task2); + + TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV3", 2), "foo", TaskStatus.running("rtV3")); + remoteTaskRunner.run(task3); + Assert.assertTrue(taskAnnounced(task3.getId())); + mockWorkerRunningTask(task3); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getRunningTasks().size() == 2; + } + } + ) + ); + + Assert.assertTrue( + TestUtils.conditionValid( + new IndexingServiceCondition() + { + @Override + public boolean isValid() + { + return remoteTaskRunner.getPendingTasks().size() == 1; + } + } + ) + ); + + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2")); + } + @Test public void testStatusRemoved() throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index c76d4e53a49..661e60600c0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -801,8 +801,8 @@ public class TaskLifecycleTest ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( new MockFirehoseFactory(true), + null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class null - // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index c316ff1b34c..7d984843a78 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -56,7 +56,7 @@ public class TaskAnnouncementTest { return null; } - }), null + }, null), null ) ); final TaskStatus status = TaskStatus.running(task.getId()); diff --git a/pom.xml b/pom.xml index fcee95715cf..a748fdbab23 100644 --- a/pom.xml +++ b/pom.xml @@ -96,6 +96,7 @@ Kafka 0.7 is not available in Maven Central --> extensions/kafka-eight + extensions/kafka-eight-simpleConsumer extensions/rabbitmq extensions/histogram extensions/mysql-metadata-storage @@ -103,10 +104,9 @@ extensions/azure-extensions extensions/namespace-lookup extensions/kafka-extraction-namespace - - extensions-distribution distribution + diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 6f24f10abc5..057520a3983 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -201,7 +201,13 @@ public class IndexIO return convertSegment(toConvert, converted, indexSpec, false, true); } - public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate) + public static boolean convertSegment( + File toConvert, + File converted, + IndexSpec indexSpec, + boolean forceIfCurrent, + boolean validate + ) throws IOException { final int version = SegmentUtils.getVersionFromDir(toConvert); @@ -253,7 +259,7 @@ public class IndexIO IndexableAdapter adapter2 ) { - if(rb1.getTimestamp() != rb2.getTimestamp()){ + if (rb1.getTimestamp() != rb2.getTimestamp()) { throw new SegmentValidationException( "Timestamp mismatch. Expected %d found %d", rb1.getTimestamp(), rb2.getTimestamp() @@ -852,14 +858,12 @@ public class IndexIO Set columns = Sets.newTreeSet(); columns.addAll(Lists.newArrayList(dims9)); columns.addAll(Lists.newArrayList(availableMetrics)); - GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY); final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); dims9.writeToChannel(writer); @@ -970,7 +974,8 @@ public class IndexIO index.getAvailableDimensions(), new ConciseBitmapFactory(), columns, - index.getFileMapper() + index.getFileMapper(), + null ); } } @@ -999,6 +1004,7 @@ public class IndexIO final GenericIndexed dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY); final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong()); final BitmapSerdeFactory segmentBitmapSerdeFactory; + /** * This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap * index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where @@ -1010,6 +1016,12 @@ public class IndexIO segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); } + Object commitMetaData = null; + ByteBuffer metadata = smooshedFiles.mapFile("metadata.drd"); + if (metadata != null) { + commitMetaData = mapper.readValue(serializerUtils.readBytes(metadata, metadata.remaining()), Object.class); + } + Map columns = Maps.newHashMap(); for (String columnName : cols) { @@ -1019,7 +1031,7 @@ public class IndexIO columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"))); final QueryableIndex index = new SimpleQueryableIndex( - dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles + dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, commitMetaData ); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java index 069a74e2d5f..14874fca8a7 100644 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ b/processing/src/main/java/io/druid/segment/IndexMaker.java @@ -115,9 +115,9 @@ public class IndexMaker mapper = injector.getInstance(ObjectMapper.class); } - public static File persist(final IncrementalIndex index, File outDir, final IndexSpec indexSpec) throws IOException + public static File persist(final IncrementalIndex index, File outDir, final Object commitMetaData, final IndexSpec indexSpec) throws IOException { - return persist(index, index.getInterval(), outDir, indexSpec); + return persist(index, index.getInterval(), outDir, commitMetaData, indexSpec); } /** @@ -134,16 +134,20 @@ public class IndexMaker final IncrementalIndex index, final Interval dataInterval, File outDir, + final Object commitMetaData, final IndexSpec indexSpec ) throws IOException { - return persist(index, dataInterval, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString())); + return persist( + index, dataInterval, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString()) + ); } public static File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, + final Object commitMetaData, final IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -181,6 +185,7 @@ public class IndexMaker ), index.getMetricAggs(), outDir, + commitMetaData, indexSpec, progress ); @@ -215,22 +220,26 @@ public class IndexMaker ), metricAggs, outDir, + null, indexSpec, progress ); } public static File merge( - List adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec + List adapters, final AggregatorFactory[] metricAggs, File outDir, final String commitMetaData, final IndexSpec indexSpec ) throws IOException { - return merge(adapters, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString())); + return merge( + adapters, metricAggs, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString()) + ); } public static File merge( List adapters, final AggregatorFactory[] metricAggs, File outDir, + final Object commitMetaData, final IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -320,7 +329,9 @@ public class IndexMaker } }; - return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); + return makeIndexFiles( + adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec + ); } @@ -341,6 +352,7 @@ public class IndexMaker progress, Lists.newArrayList(adapter.getDimensionNames()), Lists.newArrayList(adapter.getMetricNames()), + adapter.getMetaData(), new Function>, Iterable>() { @Nullable @@ -362,12 +374,13 @@ public class IndexMaker final IndexSpec indexSpec ) throws IOException { - return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()), indexSpec); + return append(adapters, outDir, null, new LoggingProgressIndicator(outDir.toString()), indexSpec); } public static File append( final List adapters, final File outDir, + final String commitMetaData, final ProgressIndicator progress, final IndexSpec indexSpec ) throws IOException @@ -438,7 +451,7 @@ public class IndexMaker } }; - return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); + return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec); } private static File makeIndexFiles( @@ -447,6 +460,7 @@ public class IndexMaker final ProgressIndicator progress, final List mergedDimensions, final List mergedMetrics, + final Object commitMetaData, final Function>, Iterable> rowMergerFn, final IndexSpec indexSpec ) throws IOException @@ -540,15 +554,9 @@ public class IndexMaker progress.progress(); makeIndexBinary( - v9Smoosher, - adapters, - outDir, - mergedDimensions, - mergedMetrics, - skippedDimensions, - progress, - indexSpec + v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec ); + makeMetadataBinary(v9Smoosher, progress, commitMetaData); v9Smoosher.close(); @@ -1396,8 +1404,7 @@ public class IndexMaker + 16 + serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType); - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); - + final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); dims.writeToChannel(writer); @@ -1412,6 +1419,7 @@ public class IndexMaker serializerUtils.writeLong(writer, dataInterval.getStartMillis()); serializerUtils.writeLong(writer, dataInterval.getEndMillis()); + serializerUtils.writeString( writer, bitmapSerdeFactoryType ); @@ -1422,6 +1430,19 @@ public class IndexMaker progress.stopSection(section); } + private static void makeMetadataBinary( + final FileSmoosher v9Smoosher, + final ProgressIndicator progress, + final Object commitMetadata + ) throws IOException + { + progress.startSection("metadata.drd"); + if (commitMetadata != null) { + v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(commitMetadata))); + } + progress.stopSection("metadata.drd"); + } + private static void writeColumn( FileSmoosher v9Smoosher, ColumnPartSerde serde, diff --git a/processing/src/main/java/io/druid/segment/IndexableAdapter.java b/processing/src/main/java/io/druid/segment/IndexableAdapter.java index 3c09d7a3155..bf13dec6e85 100644 --- a/processing/src/main/java/io/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/IndexableAdapter.java @@ -44,4 +44,6 @@ public interface IndexableAdapter String getMetricType(String metric); ColumnCapabilities getCapabilities(String column); + + Object getMetaData(); } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndex.java b/processing/src/main/java/io/druid/segment/QueryableIndex.java index 1d05dddf621..d68c1565edf 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndex.java @@ -33,6 +33,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable public Indexed getColumnNames(); public Indexed getAvailableDimensions(); public BitmapFactory getBitmapFactoryForDimensions(); + public Object getMetaData(); /** * The close method shouldn't actually be here as this is nasty. We will adjust it in the future. diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index 255525569d0..c6ec649913d 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -81,6 +81,11 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter } } + @Override + public Object getMetaData() { + return input.getMetaData(); + } + @Override public Interval getDataInterval() { diff --git a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java index 42bb050bdac..e81106fdf89 100644 --- a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java @@ -37,6 +37,12 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter this.filter = filter; } + @Override + public Object getMetaData() + { + return null; + } + @Override public Interval getDataInterval() { diff --git a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java index 16b9ee101e7..e3c625f8d8e 100644 --- a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java @@ -37,6 +37,7 @@ public class SimpleQueryableIndex implements QueryableIndex private final BitmapFactory bitmapFactory; private final Map columns; private final SmooshedFileMapper fileMapper; + private final Object commitMetaData; public SimpleQueryableIndex( Interval dataInterval, @@ -44,7 +45,8 @@ public class SimpleQueryableIndex implements QueryableIndex Indexed dimNames, BitmapFactory bitmapFactory, Map columns, - SmooshedFileMapper fileMapper + SmooshedFileMapper fileMapper, + Object commitMetaData ) { Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME)); @@ -54,6 +56,7 @@ public class SimpleQueryableIndex implements QueryableIndex this.bitmapFactory = bitmapFactory; this.columns = columns; this.fileMapper = fileMapper; + this.commitMetaData = commitMetaData; } @Override @@ -96,5 +99,11 @@ public class SimpleQueryableIndex implements QueryableIndex public void close() throws IOException { fileMapper.close(); + } + + @Override + public Object getMetaData() + { + return commitMetaData; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index b862878c55a..5d09fdb4105 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -98,6 +98,12 @@ public class IncrementalIndexAdapter implements IndexableAdapter } } + @Override + public Object getMetaData() + { + return null; + } + @Override public Interval getDataInterval() { diff --git a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java index 4dec2230e17..bf4b350ff36 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java @@ -128,7 +128,7 @@ public class IndexMakerParameterizedTest IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); - QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, indexSpec))); + QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, null, indexSpec))); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); @@ -171,13 +171,13 @@ public class IndexMakerParameterizedTest final File tempDir2 = temporaryFolder.newFolder(); final File mergedDir = temporaryFolder.newFolder(); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, indexSpec))); + QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, null, indexSpec))); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); @@ -242,6 +242,7 @@ public class IndexMakerParameterizedTest IndexMaker.persist( toPersist1, tmpDir1, + null, indexSpec ) ) @@ -251,6 +252,7 @@ public class IndexMakerParameterizedTest IndexMaker.persist( toPersist1, tmpDir2, + null, indexSpec ) ) @@ -291,7 +293,7 @@ public class IndexMakerParameterizedTest .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -394,7 +396,7 @@ public class IndexMakerParameterizedTest .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -451,7 +453,7 @@ public class IndexMakerParameterizedTest .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -500,7 +502,7 @@ public class IndexMakerParameterizedTest .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeIOConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeIOConfig.java index 5ea41581742..70e400b535a 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeIOConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeIOConfig.java @@ -20,6 +20,7 @@ package io.druid.segment.indexing; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.FirehoseFactoryV2; import io.druid.segment.realtime.plumber.PlumberSchool; /** @@ -28,15 +29,18 @@ public class RealtimeIOConfig implements IOConfig { private final FirehoseFactory firehoseFactory; private final PlumberSchool plumberSchool; + private final FirehoseFactoryV2 firehoseFactoryV2; @JsonCreator public RealtimeIOConfig( @JsonProperty("firehose") FirehoseFactory firehoseFactory, - @JsonProperty("plumber") PlumberSchool plumberSchool + @JsonProperty("plumber") PlumberSchool plumberSchool, + @JsonProperty("firehoseV2") FirehoseFactoryV2 firehoseFactoryV2 ) { this.firehoseFactory = firehoseFactory; this.plumberSchool = plumberSchool; + this.firehoseFactoryV2 = firehoseFactoryV2; } @JsonProperty("firehose") @@ -45,6 +49,12 @@ public class RealtimeIOConfig implements IOConfig return firehoseFactory; } + @JsonProperty("firehoseV2") + public FirehoseFactoryV2 getFirehoseFactoryV2() + { + return firehoseFactoryV2; + } + public PlumberSchool getPlumberSchool() { return plumberSchool; diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index 7e7df683b15..77814762252 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -21,12 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseV2; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IngestionSpec; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Plumber; +import java.io.Closeable; import java.io.IOException; /** @@ -41,7 +43,6 @@ public class FireDepartment extends IngestionSpec nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - - continue; - } - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); - if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { - plumber.persist(firehose.commit()); - nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); - } - metrics.incrementProcessed(); - } - catch (ParseException e) { - if (inputRow != null) { - log.error(e, "unparseable line: %s", inputRow); - } - metrics.incrementUnparseable(); - } + if (fireDepartment.checkFirehoseV2()) { + firehoseV2 = initFirehose(metadata); + runFirehoseV2(firehoseV2); + } else { + firehose = initFirehose(); + runFirehose(firehose); } + } catch (RuntimeException e) { log.makeAlert( @@ -325,6 +299,135 @@ public class RealtimeManager implements QuerySegmentWalker } } + private void runFirehoseV2(FirehoseV2 firehose) + { + final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); + try { + firehose.start(); + } + catch (Exception e) { + log.error(e, "Failed to start firehoseV2"); + return; + } + long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + log.info("FirehoseV2 started with nextFlush [%s]", nextFlush); + boolean haveRow = true; + while (haveRow) { + InputRow inputRow = null; + int numRows = 0; + try { + inputRow = firehose.currRow(); + if (inputRow != null) { + try { + numRows = plumber.add(inputRow); + } + catch (IndexSizeExceededException e) { + log.info("Index limit exceeded: %s", e.getMessage()); + nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); + continue; + } + if (numRows < 0) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + } else { + metrics.incrementProcessed(); + } + } + } + catch (Exception e) { + log.makeAlert(e, "Some exception got thrown while processing rows. Ignoring and continuing.") + .addData("inputRow", inputRow); + } + + try { + haveRow = firehose.advance(); + } + catch (Exception e) { + log.debug(e, "thrown away line due to exception, considering unparseable"); + metrics.incrementUnparseable(); + continue; + } + + try { + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { + nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); + } + } catch (Exception e) { + log.makeAlert(e, "An exception happened while queue to persist!? We hope it is transient. Ignore and continue."); + } + } + } + + private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod) + { + plumber.persist(committer); + return new DateTime().plus(intermediatePersistPeriod).getMillis(); + } + + + private void runFirehose(Firehose firehose) + { + + final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod(); + + long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + + while (firehose.hasMore()) { + InputRow inputRow = null; + try { + try { + inputRow = firehose.nextRow(); + + if (inputRow == null) { + log.debug("thrown away null input row, considering unparseable"); + log.info("thrown away null input row, considering unparseable"); + metrics.incrementUnparseable(); + continue; + } + } + catch (Exception e) { + log.debug(e, "thrown away line due to exception, considering unparseable"); + metrics.incrementUnparseable(); + continue; + } + + boolean lateEvent = false; + boolean indexLimitExceeded = false; + try { + lateEvent = plumber.add(inputRow) == -1; + } + catch (IndexSizeExceededException e) { + log.info("Index limit exceeded: %s", e.getMessage()); + indexLimitExceeded = true; + } + if (indexLimitExceeded || lateEvent) { + metrics.incrementThrownAway(); + log.debug("Throwing away event[%s]", inputRow); + + if (indexLimitExceeded || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } + + continue; + } + final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { + plumber.persist(firehose.commit()); + nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); + } + metrics.incrementProcessed(); + } + catch (ParseException e) { + if (inputRow != null) { + log.error(e, "unparseable line: %s", inputRow); + } + metrics.incrementUnparseable(); + } + } + } + public QueryRunner getQueryRunner(Query query) { QueryRunnerFactory> factory = conglomerate.findFactory(query); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java new file mode 100644 index 00000000000..eb7aa59189d --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java @@ -0,0 +1,229 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.segment.realtime.firehose; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.parsers.ParseException; +import com.metamx.emitter.EmittingLogger; + +import io.druid.data.input.Committer; +import io.druid.data.input.FirehoseFactoryV2; +import io.druid.data.input.FirehoseV2; +import io.druid.data.input.InputRow; +import io.druid.data.input.impl.StringInputRowParser; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.commons.io.filefilter.WildcardFileFilter; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +/** + */ +public class LocalFirehoseFactoryV2 implements FirehoseFactoryV2 +{ + private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class); + + private final File baseDir; + private final String filter; + private final StringInputRowParser parser; + + @JsonCreator + public LocalFirehoseFactoryV2( + @JsonProperty("baseDir") File baseDir, + @JsonProperty("filter") String filter, + // Backwards compatible + @JsonProperty("parser") StringInputRowParser parser + ) + { + this.baseDir = baseDir; + this.filter = filter; + this.parser = parser; + } + + @JsonProperty + public File getBaseDir() + { + return baseDir; + } + + @JsonProperty + public String getFilter() + { + return filter; + } + + @JsonProperty + public StringInputRowParser getParser() + { + return parser; + } + + @Override + public FirehoseV2 connect(StringInputRowParser firehoseParser, Object metadata) throws IOException, ParseException + { + log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile()); + + Collection foundFiles = FileUtils.listFiles( + baseDir.getAbsoluteFile(), + new WildcardFileFilter(filter), + TrueFileFilter.INSTANCE + ); + + if (foundFiles == null || foundFiles.isEmpty()) { + throw new ISE("Found no files to ingest! Check your schema."); + } + log.info ("Found files: " + foundFiles); + + final LinkedList files = Lists.newLinkedList( + foundFiles + ); + return new FileIteratingFirehoseV2(new Iterator() + { + @Override + public boolean hasNext() + { + return !files.isEmpty(); + } + + @Override + public LineIterator next() + { + try { + return FileUtils.lineIterator(files.poll()); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }, firehoseParser); + + + + } + class FileIteratingFirehoseV2 implements FirehoseV2 { + private ConcurrentMap lastOffsetPartitions; + private volatile boolean stop; + + private volatile InputRow row = null; + + private final Iterator lineIterators; + private final StringInputRowParser parser; + + private LineIterator lineIterator = null; + + public FileIteratingFirehoseV2( + Iterator lineIterators, + StringInputRowParser parser + ) + { + this.lineIterators = lineIterators; + this.parser = parser; + } + @Override + public void close() throws IOException + { + stop = true; + } + + @Override + public boolean advance() + { + if (stop) { + return false; + } + + nextMessage(); + return true; + } + + @Override + public InputRow currRow() + { + return row; + } + + @Override + public Committer makeCommitter() + { + final Map offsets = Maps.newHashMap(lastOffsetPartitions);//TODO no test on offset + + return new Committer() + { + @Override + public Object getMetadata() + { + return offsets; + } + + @Override + public void run() + { + + } + }; + } + + @Override + public void start() throws Exception + { + nextMessage(); + } + private void nextMessage() + { + while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { + lineIterator = lineIterators.next(); + } + + stop = !(lineIterator != null && lineIterator.hasNext()); + try { + if (lineIterator == null || !lineIterator.hasNext()) { + // Close old streams, maybe. + if (lineIterator != null) { + lineIterator.close(); + } + + lineIterator = lineIterators.next(); + } + + row = parser.parse((String)lineIterator.next());//parser.parse(lineIterator.next());TODO + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index da424a93dbb..04c00350035 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -81,7 +81,7 @@ public class FlushingPlumber extends RealtimePlumber } @Override - public void startJob() + public Object startJob() { log.info("Starting job for %s", getSchema().getDataSource()); @@ -92,8 +92,9 @@ public class FlushingPlumber extends RealtimePlumber flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d"); } - bootstrapSinksFromDisk(); + Object retVal = bootstrapSinksFromDisk(); startFlushThread(); + return retVal; } protected void flushAfterDuration(final long truncatedTime, final Sink sink) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index bbd574e47a6..01ef058212f 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -17,6 +17,7 @@ package io.druid.segment.realtime.plumber; +import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.query.Query; import io.druid.query.QueryRunner; @@ -27,8 +28,10 @@ public interface Plumber /** * Perform any initial setup. Should be called before using any other methods, and should be paired * with a corresponding call to {@link #finishJob}. + * + * @return the metadata of the "newest" segment that might have previously been persisted */ - public void startJob(); + public Object startJob(); /** * @param row - the row to insert @@ -44,6 +47,7 @@ public interface Plumber * * @param commitRunnable code to run after persisting data */ + void persist(Committer commitRunnable); void persist(Runnable commitRunnable); /** diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index b120cbb8e5f..983a5348858 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -40,6 +40,9 @@ import io.druid.client.ServerView; import io.druid.common.guava.ThreadRenamingCallable; import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.concurrent.Execs; +import io.druid.data.input.Committer; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseV2; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; @@ -82,6 +85,9 @@ import java.io.Closeable; import java.io.File; import java.io.FilenameFilter; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -121,7 +127,6 @@ public class RealtimePlumber implements Plumber private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; - public RealtimePlumber( DataSchema schema, RealtimeTuningConfig config, @@ -171,15 +176,17 @@ public class RealtimePlumber implements Plumber } @Override - public void startJob() + public Object startJob() { computeBaseDir(schema).mkdirs(); initializeExecutors(); - bootstrapSinksFromDisk(); + Object retVal = bootstrapSinksFromDisk(); registerServerViewCallback(); startPersistThread(); // Push pending sinks bootstrapped from previous run mergeAndPush(); + + return retVal; } @Override @@ -332,6 +339,27 @@ public class RealtimePlumber implements Plumber @Override public void persist(final Runnable commitRunnable) + { + persist( + new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + commitRunnable.run(); + } + } + ); + } + + @Override + public void persist(final Committer commitRunnable) { final List> indexesToPersist = Lists.newArrayList(); for (Sink sink : sinks.values()) { @@ -352,7 +380,11 @@ public class RealtimePlumber implements Plumber { try { for (Pair pair : indexesToPersist) { - metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); + metrics.incrementRowOutputCount( + persistHydrant( + pair.lhs, schema, pair.rhs, commitRunnable.getMetadata() + ) + ); } commitRunnable.run(); } @@ -417,7 +449,7 @@ public class RealtimePlumber implements Plumber synchronized (hydrant) { if (!hydrant.hasSwapped()) { log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval); + final int rowCount = persistHydrant(hydrant, schema, interval, null); metrics.incrementRowOutputCount(rowCount); } } @@ -450,19 +482,27 @@ public class RealtimePlumber implements Plumber } QueryableIndex index = IndexIO.loadIndex(mergedFile); + log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); + try { + DataSegment segment = dataSegmentPusher.push( + mergedFile, + sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + ); + log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); + segmentPublisher.publishSegment(segment); - DataSegment segment = dataSegmentPusher.push( - mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) - ); - - segmentPublisher.publishSegment(segment); - - if (!isPushedMarker.createNewFile()) { - log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .addData("partitionNum", segment.getShardSpec().getPartitionNum()) - .addData("marker", isPushedMarker) + if (!isPushedMarker.createNewFile()) { + log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .addData("partitionNum", segment.getShardSpec().getPartitionNum()) + .addData("marker", isPushedMarker) + .emit(); + } + } + catch (Throwable e) { + log.info("Exception happen when pushing to deep storage"); + log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) + .addData("interval", interval) .emit(); } } @@ -565,20 +605,22 @@ public class RealtimePlumber implements Plumber } } - protected void bootstrapSinksFromDisk() + protected Object bootstrapSinksFromDisk() { final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); File baseDir = computeBaseDir(schema); if (baseDir == null || !baseDir.exists()) { - return; + return null; } File[] files = baseDir.listFiles(); if (files == null) { - return; + return null; } + Object metadata = null; + long latestCommitTime = 0; for (File sinkDir : files) { Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/")); @@ -611,7 +653,7 @@ public class RealtimePlumber implements Plumber } } ); - + boolean isCorrupted = false; try { List hydrants = Lists.newArrayList(); for (File segmentDir : sinkFiles) { @@ -623,7 +665,35 @@ public class RealtimePlumber implements Plumber if (Ints.tryParse(segmentDir.getName()) == null) { continue; } - + QueryableIndex queryableIndex = null; + try { + queryableIndex = IndexIO.loadIndex(segmentDir); + } + catch (IOException e) { + log.error(e, "Problem loading segmentDir from disk."); + isCorrupted = true; + } + if (isCorrupted) { + try { + File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema); + log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath()); + FileUtils.copyDirectory(segmentDir, corruptSegmentDir); + FileUtils.deleteDirectory(segmentDir); + } + catch (Exception e1) { + log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); + } + continue; + } + BasicFileAttributes attr = Files.readAttributes(segmentDir.toPath(), BasicFileAttributes.class); + if (attr.creationTime().toMillis() > latestCommitTime) { + log.info( + "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", + queryableIndex.getMetaData(), attr.creationTime().toMillis(), latestCommitTime + ); + latestCommitTime = attr.creationTime().toMillis(); + metadata = queryableIndex.getMetaData(); + } hydrants.add( new FireHydrant( new QueryableIndexSegment( @@ -634,13 +704,12 @@ public class RealtimePlumber implements Plumber versioningPolicy.getVersion(sinkInterval), config.getShardSpec() ), - IndexIO.loadIndex(segmentDir) + queryableIndex ), Integer.parseInt(segmentDir.getName()) ) ); } - Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants); sinks.put(sinkInterval.getStartMillis(), currSink); sinkTimeline.add( @@ -657,6 +726,7 @@ public class RealtimePlumber implements Plumber .emit(); } } + return metadata; } protected void startPersistThread() @@ -798,6 +868,11 @@ public class RealtimePlumber implements Plumber return new File(config.getBasePersistDirectory(), schema.getDataSource()); } + protected File computeCorruptedFileDumpDir(File persistDir, DataSchema schema) + { + return new File(persistDir.getAbsolutePath().replace(schema.getDataSource(), "corrupted/"+schema.getDataSource())); + } + protected File computePersistDir(DataSchema schema, Interval interval) { return new File(computeBaseDir(schema), interval.toString().replace("/", "_")); @@ -812,7 +887,7 @@ public class RealtimePlumber implements Plumber * * @return the number of rows persisted */ - protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval) + protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Object commitMetaData) { synchronized (indexToPersist) { if (indexToPersist.hasSwapped()) { @@ -824,9 +899,10 @@ public class RealtimePlumber implements Plumber } log.info( - "DataSource[%s], Interval[%s], persisting Hydrant[%s]", + "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", schema.getDataSource(), interval, + commitMetaData, indexToPersist ); try { @@ -838,6 +914,7 @@ public class RealtimePlumber implements Plumber persistedFile = IndexMaker.persist( indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), + commitMetaData, indexSpec ); } else { diff --git a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java index 4d34378a280..5f1a7fca8fe 100644 --- a/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java +++ b/server/src/test/java/io/druid/segment/realtime/FireDepartmentTest.java @@ -72,7 +72,8 @@ public class FireDepartmentTest null, new RealtimePlumberSchool( null, null, null, null, null, null, null - ) + ), + null ), new RealtimeTuningConfig( null, null, null, null, null, null, null, null, null, false, false, null, null diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 5908ab71f69..cb072e1a48b 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -24,6 +24,9 @@ import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.Committer; +import io.druid.data.input.FirehoseV2; +import io.druid.data.input.FirehoseFactoryV2; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; @@ -41,6 +44,7 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; import io.druid.utils.Runnables; + import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -59,8 +63,11 @@ import java.util.concurrent.TimeUnit; public class RealtimeManagerTest { private RealtimeManager realtimeManager; + private RealtimeManager realtimeManager2; private DataSchema schema; + private DataSchema schema2; private TestPlumber plumber; + private TestPlumber plumber2; @Before public void setUp() throws Exception @@ -78,6 +85,12 @@ public class RealtimeManagerTest new AggregatorFactory[]{new CountAggregatorFactory("rows")}, new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) ); + schema2 = new DataSchema( + "testV2", + null, + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null) + ); RealtimeIOConfig ioConfig = new RealtimeIOConfig( new FirehoseFactory() { @@ -96,6 +109,28 @@ public class RealtimeManagerTest { return plumber; } + }, + null + ); + RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( + null, + new PlumberSchool() + { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return plumber2; + } + }, + new FirehoseFactoryV2() + { + @Override + public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException, ParseException + { + return new TestFirehoseV2(rows.iterator()); + } } ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( @@ -125,6 +160,18 @@ public class RealtimeManagerTest ), null ); + plumber2 = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema2, tuningConfig, new DateTime().toString())); + + realtimeManager2 = new RealtimeManager( + Arrays.asList( + new FireDepartment( + schema2, + ioConfig2, + tuningConfig + ) + ), + null + ); } @Test @@ -148,6 +195,27 @@ public class RealtimeManagerTest Assert.assertEquals(1, plumber.getPersistCount()); } + @Test + public void testRunV2() throws Exception + { + realtimeManager2.start(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + while (realtimeManager2.getMetrics("testV2").processed() != 1) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Realtime manager should have completed processing 2 events!"); + } + } + + Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed()); + Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway()); + Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").unparseable()); + Assert.assertTrue(plumber2.isStartedJob()); + Assert.assertTrue(plumber2.isFinishedJob()); + Assert.assertEquals(1, plumber2.getPersistCount()); + } + private TestInputRowHolder makeRow(final long timestamp) { return new TestInputRowHolder(timestamp, null); @@ -266,6 +334,71 @@ public class RealtimeManagerTest } } + private static class TestFirehoseV2 implements FirehoseV2 + { + private final Iterator rows; + private InputRow currRow; + private boolean stop; + private TestFirehoseV2(Iterator rows) + { + this.rows = rows; + } + private void nextMessage() { + currRow = null; + while (currRow == null) { + final TestInputRowHolder holder = rows.next(); + currRow = holder == null ? null : holder.getRow(); + } + } + + @Override + public void close() throws IOException + { + } + + @Override + public boolean advance() + { + stop = !rows.hasNext(); + if (stop) { + return false; + } + + nextMessage(); + return true; + } + + @Override + public InputRow currRow() + { + return currRow; + } + + @Override + public Committer makeCommitter() + { + return new Committer() + { + @Override + public Object getMetadata() + { + return null; + } + + @Override + public void run() + { + } + }; + } + + @Override + public void start() throws Exception + { + nextMessage(); + } + } + private static class TestPlumber implements Plumber { private final Sink sink; @@ -296,9 +429,10 @@ public class RealtimeManagerTest } @Override - public void startJob() + public Object startJob() { startedJob = true; + return null; } @Override @@ -342,5 +476,11 @@ public class RealtimeManagerTest { finishedJob = true; } + + @Override + public void persist(Committer commitRunnable) + { + persistCount++; + } } }