Merge pull request #1609 from himanshug/kafka_firehose2

Experimental kafa simple consumer based firehose
This commit is contained in:
Gian Merlino 2015-08-27 21:04:02 -07:00
commit 19c63a1103
45 changed files with 2209 additions and 471 deletions

View File

@ -1,25 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.utils;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;
import io.druid.collections.IntList;
import java.io.IOException;
@ -61,15 +64,20 @@ public class SerializerUtils
final int length = readInt(in);
byte[] stringBytes = new byte[length];
ByteStreams.readFully(in, stringBytes);
return new String(stringBytes, UTF8);
return StringUtils.fromUtf8(stringBytes);
}
public String readString(ByteBuffer in) throws IOException
{
final int length = in.getInt();
byte[] stringBytes = new byte[length];
in.get(stringBytes);
return new String(stringBytes, UTF8);
return StringUtils.fromUtf8(readBytes(in, length));
}
public byte[] readBytes(ByteBuffer in, int length) throws IOException
{
byte[] bytes = new byte[length];
in.get(bytes);
return bytes;
}
public void writeStrings(OutputStream out, String[] names) throws IOException

View File

@ -0,0 +1,30 @@
---
layout: doc_page
---
# KafkaSimpleConsumerFirehose
This is an experimental firehose to ingest data from kafka using kafka simple consumer api. Currently, this firehose would only work inside standalone realtime nodes.
The configuration for KafkaSimpleConsumerFirehose is similar to the KafkaFirehose [Kafka firehose example](realtime-ingestion.html#realtime-specfile), except `firehose` should be replaced with `firehoseV2` like this:
```json
"firehoseV2": {
"type" : "kafka-0.8-v2",
"brokerList" : ["localhost:4443"],
"queueBufferLength":10001,
"resetOffsetToEarliest":"true",
"partitionIdList" : ["0"],
"clientId" : "localclient",
"feed": "wikipedia"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|kafka-0.8-v2|yes|
|brokerList|list of the kafka brokers|yes|
|queueBufferLength|the buffer length for kafka message queue|no default(20000)|
|resetOffsetToEarliest|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|true|
|partitionIdList|list of kafka partition ids|yes|
|clientId|the clientId for kafka SimpleConsumer|yes|
|feed|kafka topic|yes|
For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second kafka topic, keeping one partition can work properly, but more partitions could be added if higher throughput is required.

View File

@ -260,7 +260,6 @@ When using this firehose, events can be sent by submitting a POST request to the
|serviceName|name used to announce the event receiver service endpoint|yes|
|bufferSize| size of buffer used by firehose to store events|no default(100000)|
#### TimedShutoffFirehose
This can be used to start a firehose that will shut down at a specified time.
@ -283,3 +282,5 @@ An example is shown below:
|type|This should be "timed"|yes|
|shutoffTime|time at which the firehose should shut down, in ISO8601 format|yes|
|delegate|firehose to use|yes|
=======

View File

@ -90,6 +90,7 @@ h2. Development
** "Select Query":../development/select-query.html
** "Approximate Histograms and Quantiles":../development/approximate-histograms.html
** "Router node":../development/router.html
** "New Kafka Firehose":../development/kafka-simple-consumer-firehose.html
h2. Misc
* "Papers & Talks":../misc/papers-and-talks.html

View File

@ -0,0 +1,81 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-eight-simple-consumer</artifactId>
<name>druid-kafka-eight-simple-consumer</name>
<description>druid-kafka-eight-simple-consumer</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.8.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
public class KafkaEightSimpleConsumerDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("KafkaEightSimpleConsumerFirehoseModule").registerSubtypes(
new NamedType(KafkaEightSimpleConsumerFirehoseFactory.class, "kafka-0.8-v2")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,332 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Committer;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaEightSimpleConsumerFirehoseFactory implements
FirehoseFactoryV2<ByteBufferInputRowParser>
{
private static final EmittingLogger log = new EmittingLogger(
KafkaEightSimpleConsumerFirehoseFactory.class
);
@JsonProperty
private final List<String> brokerList;
@JsonProperty
private final List<Integer> partitionIdList;
@JsonProperty
private final String clientId;
@JsonProperty
private final String feed;
@JsonProperty
private final int queueBufferLength;
@JsonProperty
private final boolean earliest;
private final List<PartitionConsumerWorker> consumerWorkers = new CopyOnWriteArrayList<>();
private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000;
private static final int CONSUMER_FETCH_TIMEOUT = 10000;
@JsonCreator
public KafkaEightSimpleConsumerFirehoseFactory(
@JsonProperty("brokerList") List<String> brokerList,
@JsonProperty("partitionIdList") List<Integer> partitionIdList,
@JsonProperty("clientId") String clientId,
@JsonProperty("feed") String feed,
@JsonProperty("queueBufferLength") Integer queueBufferLength,
@JsonProperty("resetOffsetToEarliest") Boolean resetOffsetToEarliest
)
{
this.brokerList = brokerList;
Preconditions.checkArgument(
brokerList != null && brokerList.size() > 0,
"brokerList is null/empty"
);
this.partitionIdList = partitionIdList;
Preconditions.checkArgument(
partitionIdList != null && partitionIdList.size() > 0,
"partitionIdList is null/empty"
);
this.clientId = clientId;
Preconditions.checkArgument(
clientId != null && !clientId.isEmpty(),
"clientId is null/empty"
);
this.feed = feed;
Preconditions.checkArgument(
feed != null && !feed.isEmpty(),
"feed is null/empty"
);
this.queueBufferLength = queueBufferLength == null ? DEFAULT_QUEUE_BUFFER_LENGTH : queueBufferLength;
Preconditions.checkArgument(queueBufferLength > 0, "queueBufferLength must be positive number");
log.info("queueBufferLength loaded as[%s]", this.queueBufferLength);
this.earliest = resetOffsetToEarliest == null ? true : resetOffsetToEarliest.booleanValue();
log.info(
"if old offsets are not known, data from partition will be read from [%s] available offset.",
this.earliest ? "earliest" : "latest"
);
}
private Map<Integer, Long> loadOffsetFromPreviousMetaData(Object lastCommit)
{
Map<Integer, Long> offsetMap = Maps.newHashMap();
if (lastCommit == null) {
return offsetMap;
}
if (lastCommit instanceof Map) {
Map<Object, Object> lastCommitMap = (Map) lastCommit;
for (Map.Entry<Object, Object> entry : lastCommitMap.entrySet()) {
try {
int partitionId = Integer.parseInt(entry.getKey().toString());
long offset = Long.parseLong(entry.getValue().toString());
log.debug("Recover last commit information partitionId [%s], offset [%s]", partitionId, offset);
offsetMap.put(partitionId, offset);
}
catch (NumberFormatException e) {
log.error(e, "Fail to load offset from previous meta data [%s]", entry);
}
}
log.info("Loaded offset map[%s]", offsetMap);
} else {
log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", feed);
}
return offsetMap;
}
@Override
public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object lastCommit) throws IOException
{
final Map<Integer, Long> lastOffsets = loadOffsetFromPreviousMetaData(lastCommit);
for (Integer partition : partitionIdList) {
final KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer(
feed, partition, clientId, brokerList, earliest
);
Long startOffset = lastOffsets.get(partition);
PartitionConsumerWorker worker = new PartitionConsumerWorker(
feed, kafkaSimpleConsumer, partition, startOffset == null ? 0 : startOffset
);
consumerWorkers.add(worker);
}
final LinkedBlockingQueue<BytesMessageWithOffset> messageQueue = new LinkedBlockingQueue<BytesMessageWithOffset>(
queueBufferLength
);
log.info("Kicking off all consumers");
for (PartitionConsumerWorker worker : consumerWorkers) {
worker.go(messageQueue);
}
log.info("All consumer started");
return new FirehoseV2()
{
private Map<Integer, Long> lastOffsetPartitions;
private volatile boolean stopped;
private volatile BytesMessageWithOffset msg = null;
private volatile InputRow row = null;
{
lastOffsetPartitions = Maps.newHashMap();
lastOffsetPartitions.putAll(lastOffsets);
}
@Override
public void start() throws Exception
{
nextMessage();
}
@Override
public boolean advance()
{
if (stopped) {
return false;
}
nextMessage();
return true;
}
private void nextMessage()
{
try {
row = null;
while (row == null) {
if (msg != null) {
lastOffsetPartitions.put(msg.getPartition(), msg.offset());
}
msg = messageQueue.take();
final byte[] message = msg.message();
row = message == null ? null : firehoseParser.parse(ByteBuffer.wrap(message));
}
}
catch (InterruptedException e) {
//Let the caller decide whether to stop or continue when thread is interrupted.
log.warn(e, "Thread Interrupted while taking from queue, propagating the interrupt");
Thread.currentThread().interrupt();
}
}
@Override
public InputRow currRow()
{
if (stopped) {
return null;
}
return row;
}
@Override
public Committer makeCommitter()
{
final Map<Integer, Long> offsets = Maps.newHashMap(lastOffsetPartitions);
return new Committer()
{
@Override
public Object getMetadata()
{
return offsets;
}
@Override
public void run()
{
}
};
}
@Override
public void close() throws IOException
{
log.info("Stopping kafka 0.8 simple firehose");
stopped = true;
for (PartitionConsumerWorker t : consumerWorkers) {
Closeables.close(t, true);
}
}
};
}
private static class PartitionConsumerWorker implements Closeable
{
private final String topic;
private final KafkaSimpleConsumer consumer;
private final int partitionId;
private final long startOffset;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private volatile Thread thread = null;
PartitionConsumerWorker(String topic, KafkaSimpleConsumer consumer, int partitionId, long startOffset)
{
this.topic = topic;
this.consumer = consumer;
this.partitionId = partitionId;
this.startOffset = startOffset;
}
public void go(final LinkedBlockingQueue<BytesMessageWithOffset> messageQueue)
{
thread = new Thread()
{
@Override
public void run()
{
long offset = startOffset;
log.info("Start running parition[%s], offset[%s]", partitionId, offset);
try {
while (!stopped.get()) {
try {
Iterable<BytesMessageWithOffset> msgs = consumer.fetch(offset, CONSUMER_FETCH_TIMEOUT);
int count = 0;
for (BytesMessageWithOffset msgWithOffset : msgs) {
offset = msgWithOffset.offset();
messageQueue.put(msgWithOffset);
count++;
}
log.debug("fetch [%s] msgs for partition [%s] in one time ", count, partitionId);
}
catch (InterruptedException e) {
log.info("Interrupted when fetching data, shutting down.");
return;
}
catch (Exception e) {
log.error(e, "Exception happened in fetching data, but will continue consuming");
}
}
}
finally {
consumer.stop();
}
}
};
thread.setDaemon(true);
thread.setName(String.format("kafka-%s-%s", topic, partitionId));
thread.start();
}
@Override
public synchronized void close() throws IOException
{
if (stopped.compareAndSet(false, true)) {
thread.interrupt();
thread = null;
}
}
}
}

View File

@ -0,0 +1,387 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.firehose.kafka;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.net.HostAndPort;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example}
* <p>
* This class is not thread safe, the caller must ensure all the methods be
* called from single thread
*/
public class KafkaSimpleConsumer
{
public static final List<BytesMessageWithOffset> EMPTY_MSGS = new ArrayList<>();
private static final Logger log = new Logger(KafkaSimpleConsumer.class);
private final List<HostAndPort> allBrokers;
private final String topic;
private final int partitionId;
private final String clientId;
private final String leaderLookupClientId;
private final boolean earliest;
private volatile Broker leaderBroker;
private List<HostAndPort> replicaBrokers;
private SimpleConsumer consumer = null;
private static final int SO_TIMEOUT = 30000;
private static final int BUFFER_SIZE = 65536;
private static final long RETRY_INTERVAL = 1000L;
private static final int FETCH_SIZE = 100000000;
public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List<String> brokers, boolean earliest)
{
List<HostAndPort> brokerList = new ArrayList<>();
for (String broker : brokers) {
HostAndPort brokerHostAndPort = HostAndPort.fromString(broker);
Preconditions.checkArgument(
brokerHostAndPort.getHostText() != null &&
!brokerHostAndPort.getHostText().isEmpty() &&
brokerHostAndPort.hasPort(),
"kafka broker [%s] is not valid, must be <host>:<port>",
broker
);
brokerList.add(brokerHostAndPort);
}
this.allBrokers = Collections.unmodifiableList(brokerList);
this.topic = topic;
this.partitionId = partitionId;
this.clientId = String.format("%s_%d_%s", topic, partitionId, clientId);
this.leaderLookupClientId = clientId + "leaderLookup";
this.replicaBrokers = new ArrayList<>();
this.replicaBrokers.addAll(this.allBrokers);
this.earliest = earliest;
log.info(
"KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup",
this.clientId,
this.leaderLookupClientId
);
}
private void ensureConsumer(Broker leader) throws InterruptedException
{
if (consumer == null) {
while (leaderBroker == null) {
leaderBroker = findNewLeader(leader);
}
log.info(
"making SimpleConsumer[%s][%s], leader broker[%s:%s]",
topic, partitionId, leaderBroker.host(), leaderBroker.port()
);
consumer = new SimpleConsumer(
leaderBroker.host(), leaderBroker.port(), SO_TIMEOUT, BUFFER_SIZE, clientId
);
}
}
public static class BytesMessageWithOffset
{
final byte[] msg;
final long offset;
final int partition;
public BytesMessageWithOffset(byte[] msg, long offset, int partition)
{
this.msg = msg;
this.offset = offset;
this.partition = partition;
}
public int getPartition()
{
return partition;
}
public byte[] message()
{
return msg;
}
public long offset()
{
return offset;
}
}
private Iterable<BytesMessageWithOffset> filterAndDecode(Iterable<MessageAndOffset> kafkaMessages, final long offset)
{
return FunctionalIterable
.create(kafkaMessages)
.filter(
new Predicate<MessageAndOffset>()
{
@Override
public boolean apply(MessageAndOffset msgAndOffset)
{
return msgAndOffset.offset() >= offset;
}
}
)
.transform(
new Function<MessageAndOffset, BytesMessageWithOffset>()
{
@Override
public BytesMessageWithOffset apply(MessageAndOffset msgAndOffset)
{
ByteBuffer bb = msgAndOffset.message().payload();
byte[] payload = new byte[bb.remaining()];
bb.get(payload);
// add nextOffset here, thus next fetch will use nextOffset instead of current offset
return new BytesMessageWithOffset(payload, msgAndOffset.nextOffset(), partitionId);
}
}
);
}
private long getOffset(boolean earliest) throws InterruptedException
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(
topicAndPartition,
new PartitionOffsetRequestInfo(
earliest ? kafka.api.OffsetRequest.EarliestTime() : kafka.api.OffsetRequest.LatestTime(), 1
)
);
OffsetRequest request = new OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId
);
OffsetResponse response = null;
try {
response = consumer.getOffsetsBefore(request);
}
catch (Exception e) {
ensureNotInterrupted(e);
log.error(e, "caught exception in getOffsetsBefore [%s] - [%s]", topic, partitionId);
return -1;
}
if (response.hasError()) {
log.error(
"error fetching data Offset from the Broker [%s]. reason: [%s]", leaderBroker.host(),
response.errorCode(topic, partitionId)
);
return -1;
}
long[] offsets = response.offsets(topic, partitionId);
return earliest ? offsets[0] : offsets[offsets.length - 1];
}
public Iterable<BytesMessageWithOffset> fetch(long offset, int timeoutMs) throws InterruptedException
{
FetchResponse response = null;
Broker previousLeader = leaderBroker;
while (true) {
ensureConsumer(previousLeader);
FetchRequest request = new FetchRequestBuilder()
.clientId(clientId)
.addFetch(topic, partitionId, offset, FETCH_SIZE)
.maxWait(timeoutMs)
.minBytes(1)
.build();
log.debug("fetch offset %s", offset);
try {
response = consumer.fetch(request);
}
catch (Exception e) {
ensureNotInterrupted(e);
log.warn(e, "caughte exception in fetch {} - {}", topic, partitionId);
response = null;
}
if (response == null || response.hasError()) {
short errorCode = response != null ? response.errorCode(topic, partitionId) : ErrorMapping.UnknownCode();
log.warn("fetch %s - %s with offset %s encounters error: [%s]", topic, partitionId, offset, errorCode);
boolean needNewLeader = false;
if (errorCode == ErrorMapping.RequestTimedOutCode()) {
log.info("kafka request timed out, response[%s]", response);
} else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
long newOffset = getOffset(earliest);
log.info("got [%s] offset[%s] for [%s][%s]", earliest ? "earliest" : "latest", newOffset, topic, partitionId);
if (newOffset < 0) {
needNewLeader = true;
} else {
offset = newOffset;
continue;
}
} else {
needNewLeader = true;
}
if (needNewLeader) {
stopConsumer();
previousLeader = leaderBroker;
leaderBroker = null;
continue;
}
} else {
break;
}
}
return response != null ? filterAndDecode(response.messageSet(topic, partitionId), offset) : EMPTY_MSGS;
}
private void stopConsumer()
{
if (consumer != null) {
try {
consumer.close();
log.info("stop consumer[%s][%s], leaderBroker[%s]", topic, partitionId, leaderBroker);
}
catch (Exception e) {
log.warn(e, "stop consumer[%s][%s] failed", topic, partitionId);
}
finally {
consumer = null;
}
}
}
public void stop()
{
stopConsumer();
log.info("KafkaSimpleConsumer[%s][%s] stopped", topic, partitionId);
}
private PartitionMetadata findLeader() throws InterruptedException
{
for (HostAndPort broker : replicaBrokers) {
SimpleConsumer consumer = null;
try {
log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString());
consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId);
TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic)));
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
if (topic.equals(item.topic())) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == partitionId) {
return part;
}
}
}
}
}
catch (Exception e) {
ensureNotInterrupted(e);
log.warn(
e, "error communicating with Kafka Broker [%s] to find leader for [%s] - [%s]", broker, topic, partitionId
);
}
finally {
if (consumer != null) {
consumer.close();
}
}
}
return null;
}
private Broker findNewLeader(Broker oldLeader) throws InterruptedException
{
long retryCnt = 0;
while (true) {
PartitionMetadata metadata = findLeader();
if (metadata != null) {
replicaBrokers.clear();
for (Broker replica : metadata.replicas()) {
replicaBrokers.add(
HostAndPort.fromParts(replica.host(), replica.port())
);
}
log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", metadata, oldLeader);
Broker newLeader = metadata.leader();
if (newLeader != null) {
// We check the retryCnt here as well to make sure that we have slept a little bit
// if we don't notice a change in leadership
// just in case if Zookeeper doesn't get updated fast enough
if (oldLeader == null || isValidNewLeader(newLeader) || retryCnt != 0) {
return newLeader;
}
}
}
Thread.sleep(RETRY_INTERVAL);
retryCnt++;
// if could not find the leader for current replicaBrokers, let's try to
// find one via allBrokers
if (retryCnt >= 3 && (retryCnt - 3) % 5 == 0) {
log.warn("cannot find leader for [%s] - [%s] after [%s] retries", topic, partitionId, retryCnt);
replicaBrokers.clear();
replicaBrokers.addAll(allBrokers);
}
}
}
private boolean isValidNewLeader(Broker broker)
{
// broker is considered valid new leader if it is not the same as old leaderBroker
return !(leaderBroker.host().equalsIgnoreCase(broker.host()) && leaderBroker.port() == broker.port());
}
private void ensureNotInterrupted(Exception e) throws InterruptedException
{
if (Thread.interrupted()) {
log.error(e, "Interrupted during fetching for %s - %s", topic, partitionId);
throw new InterruptedException();
}
}
}

View File

@ -0,0 +1,20 @@
#
# Licensed to Metamarkets Group Inc. (Metamarkets) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Metamarkets licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
io.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule

View File

@ -486,7 +486,7 @@ public class IndexGeneratorJob implements Jobby
) throws IOException
{
return IndexMaker.persist(
index, interval, file, config.getIndexSpec(), progressIndicator
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
}

View File

@ -68,7 +68,7 @@ public class LegacyIndexGeneratorJob extends IndexGeneratorJob
IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator
) throws IOException
{
return IndexMerger.persist(index, interval, file, config.getIndexSpec(), progressIndicator);
return IndexMerger.persist(index, interval, file, null, config.getIndexSpec(), progressIndicator);
}
@Override

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -99,9 +100,9 @@ public class YeOldePlumberSchool implements PlumberSchool
return new Plumber()
{
@Override
public void startJob()
public Object startJob()
{
return null;
}
@Override
@ -206,6 +207,7 @@ public class YeOldePlumberSchool implements PlumberSchool
IndexMerger.persist(
indexToPersist.getIndex(),
dirToPersist,
null,
config.getIndexSpec()
);
@ -227,6 +229,12 @@ public class YeOldePlumberSchool implements PlumberSchool
{
return new File(persistDir, String.format("spill%d", n));
}
@Override
public void persist(Committer commitRunnable)
{
persist(commitRunnable);
}
};
}
}

View File

@ -63,7 +63,7 @@ public class TestRealtimeTask extends RealtimeIndexTask
{
return null;
}
}
}, null
), null
)
);

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
@ -281,8 +283,10 @@ public class TaskSerdeTest
{
return null;
}
}
},
null
),
new RealtimeTuningConfig(
1,
new Period("PT10M"),
@ -332,8 +336,10 @@ public class TaskSerdeTest
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f);
Assert.assertEquals(
task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f
);
}
@Test

View File

@ -136,7 +136,7 @@ public class IngestSegmentFirehoseFactoryTest
if (!persistDir.mkdirs() && !persistDir.exists()) {
throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath()));
}
IndexMerger.persist(index, persistDir, indexSpec);
IndexMerger.persist(index, persistDir, null, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)

View File

@ -801,8 +801,8 @@ public class TaskLifecycleTest
);
RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig(
new MockFirehoseFactory(true),
null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
null
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,

View File

@ -56,7 +56,7 @@ public class TaskAnnouncementTest
{
return null;
}
}), null
}, null), null
)
);
final TaskStatus status = TaskStatus.running(task.getId());

View File

@ -96,6 +96,7 @@
Kafka 0.7 is not available in Maven Central -->
<!-- <module>extensions/kafka&#45;seven</module> -->
<module>extensions/kafka-eight</module>
<module>extensions/kafka-eight-simpleConsumer</module>
<module>extensions/rabbitmq</module>
<module>extensions/histogram</module>
<module>extensions/mysql-metadata-storage</module>
@ -103,10 +104,10 @@
<module>extensions/azure-extensions</module>
<module>extensions/namespace-lookup</module>
<module>extensions/kafka-extraction-namespace</module>
<!-- distribution packaging -->
<module>extensions-distribution</module>
<module>distribution</module>
</modules>
<dependencyManagement>

View File

@ -1,22 +1,25 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@ -201,7 +204,13 @@ public class IndexIO
return convertSegment(toConvert, converted, indexSpec, false, true);
}
public static boolean convertSegment(File toConvert, File converted, IndexSpec indexSpec, boolean forceIfCurrent, boolean validate)
public static boolean convertSegment(
File toConvert,
File converted,
IndexSpec indexSpec,
boolean forceIfCurrent,
boolean validate
)
throws IOException
{
final int version = SegmentUtils.getVersionFromDir(toConvert);
@ -230,7 +239,7 @@ public class IndexIO
default:
if (forceIfCurrent) {
IndexMerger.convert(toConvert, converted, indexSpec);
if(validate){
if (validate) {
DefaultIndexIOHandler.validateTwoSegments(toConvert, converted);
}
return true;
@ -253,7 +262,7 @@ public class IndexIO
IndexableAdapter adapter2
)
{
if(rb1.getTimestamp() != rb2.getTimestamp()){
if (rb1.getTimestamp() != rb2.getTimestamp()) {
throw new SegmentValidationException(
"Timestamp mismatch. Expected %d found %d",
rb1.getTimestamp(), rb2.getTimestamp()
@ -466,8 +475,8 @@ public class IndexIO
public static void validateTwoSegments(File dir1, File dir2) throws IOException
{
try(QueryableIndex queryableIndex1 = loadIndex(dir1)) {
try(QueryableIndex queryableIndex2 = loadIndex(dir2)) {
try (QueryableIndex queryableIndex1 = loadIndex(dir1)) {
try (QueryableIndex queryableIndex2 = loadIndex(dir2)) {
validateTwoSegments(
new QueryableIndexIndexableAdapter(queryableIndex1),
new QueryableIndexIndexableAdapter(queryableIndex2)
@ -852,14 +861,12 @@ public class IndexIO
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims9.writeToChannel(writer);
@ -868,6 +875,11 @@ public class IndexIO
serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString);
writer.close();
final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd");
if (metadataBuffer != null) {
v9Smoosher.add("metadata.drd", metadataBuffer);
}
log.info("Skipped files[%s]", skippedFiles);
v9Smoosher.close();
@ -970,7 +982,8 @@ public class IndexIO
index.getAvailableDimensions(),
new ConciseBitmapFactory(),
columns,
index.getFileMapper()
index.getFileMapper(),
null
);
}
}
@ -999,6 +1012,7 @@ public class IndexIO
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.STRING_STRATEGY);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;
/**
* This is a workaround for the fact that in v8 segments, we have no information about the type of bitmap
* index to use. Since we cannot very cleanly build v9 segments directly, we are using a workaround where
@ -1010,6 +1024,22 @@ public class IndexIO
segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory();
}
Map<String, Object> metadata = null;
ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd");
if (metadataBB != null) {
try {
metadata = mapper.readValue(
serializerUtils.readBytes(metadataBB, metadataBB.remaining()),
new TypeReference<Map<String, Object>>()
{
}
);
}
catch (IOException ex) {
throw new IOException("Failed to read metadata", ex);
}
}
Map<String, Column> columns = Maps.newHashMap();
for (String columnName : cols) {
@ -1019,7 +1049,7 @@ public class IndexIO
columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time")));
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, metadata
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -115,9 +117,14 @@ public class IndexMaker
mapper = injector.getInstance(ObjectMapper.class);
}
public static File persist(final IncrementalIndex index, File outDir, final IndexSpec indexSpec) throws IOException
public static File persist(
final IncrementalIndex index,
File outDir,
final Map<String, Object> segmentMetadata,
final IndexSpec indexSpec
) throws IOException
{
return persist(index, index.getInterval(), outDir, indexSpec);
return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec);
}
/**
@ -134,16 +141,20 @@ public class IndexMaker
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
final Map<String, Object> segmentMetadata,
final IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
return persist(
index, dataInterval, outDir, segmentMetadata, indexSpec, new LoggingProgressIndicator(outDir.toString())
);
}
public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
final Map<String, Object> segmentMetadata,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -181,6 +192,7 @@ public class IndexMaker
),
index.getMetricAggs(),
outDir,
segmentMetadata,
indexSpec,
progress
);
@ -215,6 +227,7 @@ public class IndexMaker
),
metricAggs,
outDir,
null,
indexSpec,
progress
);
@ -224,13 +237,16 @@ public class IndexMaker
List<IndexableAdapter> adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec
) throws IOException
{
return merge(adapters, metricAggs, outDir, indexSpec, new LoggingProgressIndicator(outDir.toString()));
return merge(
adapters, metricAggs, outDir, null, indexSpec, new LoggingProgressIndicator(outDir.toString())
);
}
public static File merge(
List<IndexableAdapter> adapters,
final AggregatorFactory[] metricAggs,
File outDir,
final Map<String, Object> segmentMetaData,
final IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -320,7 +336,9 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(
adapters, outDir, progress, mergedDimensions, mergedMetrics, segmentMetaData, rowMergerFn, indexSpec
);
}
@ -341,6 +359,7 @@ public class IndexMaker
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
null,
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@ -355,7 +374,6 @@ public class IndexMaker
}
}
public static File append(
final List<IndexableAdapter> adapters,
final File outDir,
@ -438,7 +456,7 @@ public class IndexMaker
}
};
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec);
}
private static File makeIndexFiles(
@ -447,6 +465,7 @@ public class IndexMaker
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Map<String, Object> segmentMetadata,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
@ -540,15 +559,9 @@ public class IndexMaker
progress.progress();
makeIndexBinary(
v9Smoosher,
adapters,
outDir,
mergedDimensions,
mergedMetrics,
skippedDimensions,
progress,
indexSpec
v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec
);
makeMetadataBinary(v9Smoosher, progress, segmentMetadata);
v9Smoosher.close();
@ -1397,7 +1410,6 @@ public class IndexMaker
+ serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims.writeToChannel(writer);
@ -1412,6 +1424,7 @@ public class IndexMaker
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
serializerUtils.writeString(
writer, bitmapSerdeFactoryType
);
@ -1422,6 +1435,19 @@ public class IndexMaker
progress.stopSection(section);
}
private static void makeMetadataBinary(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
final Map<String, Object> segmentMetadata
) throws IOException
{
if (segmentMetadata != null && !segmentMetadata.isEmpty()) {
progress.startSection("metadata.drd");
v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(segmentMetadata)));
progress.stopSection("metadata.drd");
}
}
private static void writeColumn(
FileSmoosher v9Smoosher,
ColumnPartSerde serde,

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -127,9 +129,14 @@ public class IndexMerger
}
public static File persist(final IncrementalIndex index, File outDir, IndexSpec indexSpec) throws IOException
public static File persist(
final IncrementalIndex index,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(index, index.getInterval(), outDir, indexSpec);
return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec);
}
/**
@ -148,16 +155,18 @@ public class IndexMerger
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator());
return persist(index, dataInterval, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
}
public static File persist(
final IncrementalIndex index,
final Interval dataInterval,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -195,6 +204,7 @@ public class IndexMerger
),
index.getMetricAggs(),
outDir,
segmentMetadata,
indexSpec,
progress
);
@ -229,22 +239,28 @@ public class IndexMerger
),
metricAggs,
outDir,
null,
indexSpec,
progress
);
}
public static File merge(
List<IndexableAdapter> indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec
List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec
) throws IOException
{
return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator());
return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator());
}
public static File merge(
List<IndexableAdapter> indexes,
final AggregatorFactory[] metricAggs,
File outDir,
Map<String, Object> segmentMetadata,
IndexSpec indexSpec,
ProgressIndicator progress
) throws IOException
@ -333,7 +349,16 @@ public class IndexMerger
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(
indexes,
outDir,
progress,
mergedDimensions,
mergedMetrics,
segmentMetadata,
rowMergerFn,
indexSpec
);
}
// Faster than IndexMaker
@ -354,6 +379,7 @@ public class IndexMerger
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
null,
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@ -445,7 +471,7 @@ public class IndexMerger
}
};
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec);
}
private static File makeIndexFiles(
@ -454,6 +480,7 @@ public class IndexMerger
final ProgressIndicator progress,
final List<String> mergedDimensions,
final List<String> mergedMetrics,
final Map<String, Object> segmentMetadata,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn,
final IndexSpec indexSpec
) throws IOException
@ -900,6 +927,13 @@ public class IndexMerger
)
);
if (segmentMetadata != null && !segmentMetadata.isEmpty()) {
writeMetadataToFile( new File(v8OutDir, "metadata.drd"), segmentMetadata);
log.info("wrote metadata.drd in outDir[%s].", v8OutDir);
expectedFiles.add("metadata.drd");
}
Map<String, File> files = Maps.newLinkedHashMap();
for (String fileName : expectedFiles) {
files.put(fileName, new File(v8OutDir, fileName));
@ -1273,4 +1307,31 @@ public class IndexMerger
}
return true;
}
private static void writeMetadataToFile(File metadataFile, Map<String, Object> metadata) throws IOException
{
FileOutputStream metadataFileOutputStream = null;
FileChannel metadataFilechannel = null;
try {
metadataFileOutputStream = new FileOutputStream(metadataFile);
metadataFilechannel = metadataFileOutputStream.getChannel();
byte[] metadataBytes = mapper.writeValueAsBytes(metadata);
if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) {
throw new IOException("Failed to write metadata for file");
}
}
finally {
if (metadataFilechannel != null) {
metadataFilechannel.close();
metadataFilechannel = null;
}
if (metadataFileOutputStream != null) {
metadataFileOutputStream.close();
metadataFileOutputStream = null;
}
}
IndexIO.checkFileSize(metadataFile);
}
}

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -23,6 +25,7 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
/**
*/
@ -33,6 +36,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable
public Indexed<String> getColumnNames();
public Indexed<String> getAvailableDimensions();
public BitmapFactory getBitmapFactoryForDimensions();
public Map<String, Object> getMetaData();
/**
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -42,7 +44,6 @@ import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -37,6 +39,7 @@ public class SimpleQueryableIndex implements QueryableIndex
private final BitmapFactory bitmapFactory;
private final Map<String, Column> columns;
private final SmooshedFileMapper fileMapper;
private final Map<String, Object> metadata;
public SimpleQueryableIndex(
Interval dataInterval,
@ -44,7 +47,8 @@ public class SimpleQueryableIndex implements QueryableIndex
Indexed<String> dimNames,
BitmapFactory bitmapFactory,
Map<String, Column> columns,
SmooshedFileMapper fileMapper
SmooshedFileMapper fileMapper,
Map<String, Object> metadata
)
{
Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME));
@ -54,6 +58,7 @@ public class SimpleQueryableIndex implements QueryableIndex
this.bitmapFactory = bitmapFactory;
this.columns = columns;
this.fileMapper = fileMapper;
this.metadata = metadata;
}
@Override
@ -97,4 +102,10 @@ public class SimpleQueryableIndex implements QueryableIndex
{
fileMapper.close();
}
@Override
public Map<String, Object> getMetaData()
{
return metadata;
}
}

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;

View File

@ -249,7 +249,7 @@ public class AggregationTestHelper
index.add(parser.parse(row));
}
}
IndexMerger.persist(index, outDir, new IndexSpec());
IndexMerger.persist(index, outDir, null, new IndexSpec());
}
}

View File

@ -61,6 +61,7 @@ public class EmptyIndexTest
Lists.<IndexableAdapter>newArrayList(emptyIndexAdapter),
new AggregatorFactory[0],
tmpDir,
null,
new IndexSpec()
);

View File

@ -128,7 +128,16 @@ public class IndexMakerParameterizedTest
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, indexSpec)));
QueryableIndex index = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -171,13 +180,31 @@ public class IndexMakerParameterizedTest
final File tempDir2 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, indexSpec)));
QueryableIndex index2 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist2,
tempDir2,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
@ -242,6 +269,7 @@ public class IndexMakerParameterizedTest
IndexMaker.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
@ -251,6 +279,7 @@ public class IndexMakerParameterizedTest
IndexMaker.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
@ -291,7 +320,16 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -394,7 +432,16 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -451,7 +498,16 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -500,7 +556,16 @@ public class IndexMakerParameterizedTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMaker.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);

View File

@ -170,7 +170,7 @@ public class IndexMakerTest
}
tmpDir = Files.createTempDir();
persistTmpDir = new File(tmpDir, "persistDir");
IndexMerger.persist(toPersist, persistTmpDir, INDEX_SPEC);
IndexMerger.persist(toPersist, persistTmpDir, null, INDEX_SPEC);
}
@After
@ -179,10 +179,40 @@ public class IndexMakerTest
FileUtils.deleteDirectory(tmpDir);
}
@Test
public void testPersistWithSegmentMetadata() throws IOException
{
File outDir = Files.createTempDir();
QueryableIndex index = null;
try {
outDir = Files.createTempDir();
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
index = IndexIO.loadIndex(IndexMaker.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC));
Assert.assertEquals(segmentMetadata, index.getMetaData());
}
finally {
if (index != null) {
index.close();
;
}
if (outDir != null) {
FileUtils.deleteDirectory(outDir);
}
}
}
@Test
public void testSimpleReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
IndexIO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
@ -212,7 +242,13 @@ public class IndexMakerTest
@Test
public void testIdempotentReprocess() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
IndexIO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
reprocessAndValidate(persistTmpDir, tmpDir1);
@ -231,7 +267,13 @@ public class IndexMakerTest
@Test
public void testSimpleAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
IndexIO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed"));
}
@ -239,7 +281,13 @@ public class IndexMakerTest
@Test
public void testIdempotentAppend() throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir)));
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(
closer.closeLater(
IndexIO.loadIndex(
persistTmpDir
)
)
);
Assert.assertEquals(events.size(), adapter.getNumRows());
final File tmpDir1 = new File(tmpDir, "reprocessed1");
appendAndValidate(persistTmpDir, tmpDir1);

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -52,6 +54,7 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class IndexMergerTest
@ -126,7 +129,16 @@ public class IndexMergerTest
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec)));
QueryableIndex index = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist,
tempDir,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
@ -135,6 +147,37 @@ public class IndexMergerTest
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
}
@Test
public void testPersistWithSegmentMetadata() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
Map<String, Object> segmentMetadata = ImmutableMap.<String, Object>of("key", "value");
final File tempDir = temporaryFolder.newFolder();
QueryableIndex index = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist,
tempDir,
segmentMetadata,
indexSpec
)
)
);
Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions()));
Assert.assertEquals(3, index.getColumnNames().size());
assertDimCompression(index, indexSpec.getDimensionCompressionStrategy());
Assert.assertEquals(segmentMetadata, index.getMetaData());
}
@Test
public void testPersistMerge() throws Exception
{
@ -169,13 +212,31 @@ public class IndexMergerTest
final File tempDir2 = temporaryFolder.newFolder();
final File mergedDir = temporaryFolder.newFolder();
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions()));
Assert.assertEquals(3, index1.getColumnNames().size());
QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec)));
QueryableIndex index2 = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist2,
tempDir2,
null,
indexSpec
)
)
);
Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength());
Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions()));
@ -240,6 +301,7 @@ public class IndexMergerTest
IndexMerger.persist(
toPersist1,
tmpDir1,
null,
indexSpec
)
)
@ -249,6 +311,7 @@ public class IndexMergerTest
IndexMerger.persist(
toPersist1,
tmpDir2,
null,
indexSpec
)
)
@ -294,7 +357,16 @@ public class IndexMergerTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -397,7 +469,16 @@ public class IndexMergerTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -464,7 +545,7 @@ public class IndexMergerTest
);
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))
IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, null, indexSpec))
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);
@ -521,7 +602,16 @@ public class IndexMergerTest
.getBitmapFactory()
);
QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)));
QueryableIndex index1 = closer.closeLater(
IndexIO.loadIndex(
IndexMerger.persist(
toPersist1,
tempDir1,
null,
indexSpec
)
)
);
final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1);

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -27,7 +29,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.hash.Hashing;
import com.metamx.common.Pair;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
@ -149,7 +150,8 @@ public class SchemalessIndex
try {
theIndex.add(new MapBasedInputRow(timestamp, dims, event));
} catch(IndexSizeExceededException e) {
}
catch (IndexSizeExceededException e) {
Throwables.propagate(e);
}
@ -186,12 +188,15 @@ public class SchemalessIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(top, topFile, indexSpec);
IndexMerger.persist(bottom, bottomFile, indexSpec);
IndexMerger.persist(top, topFile, null, indexSpec);
IndexMerger.persist(bottom, bottomFile, null, indexSpec);
mergedIndex = io.druid.segment.IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile, indexSpec
Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)),
METRIC_AGGS,
mergedFile,
indexSpec
)
);
@ -233,7 +238,10 @@ public class SchemalessIndex
QueryableIndex index = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile, indexSpec
Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)),
METRIC_AGGS,
mergedFile,
indexSpec
)
);
@ -350,7 +358,7 @@ public class SchemalessIndex
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(rowIndex, tmpFile, indexSpec);
IndexMerger.persist(rowIndex, tmpFile, null, indexSpec);
rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile));
}
}
@ -410,7 +418,7 @@ public class SchemalessIndex
theFile.mkdirs();
theFile.deleteOnExit();
filesToMap.add(theFile);
IndexMerger.persist(index, theFile, indexSpec);
IndexMerger.persist(index, theFile, null, indexSpec);
}
return filesToMap;

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
@ -141,8 +143,8 @@ public class TestIndex
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(top, DATA_INTERVAL, topFile, indexSpec);
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile, indexSpec);
IndexMerger.persist(top, DATA_INTERVAL, topFile, null, indexSpec);
IndexMerger.persist(bottom, DATA_INTERVAL, bottomFile, null, indexSpec);
mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
@ -253,7 +255,7 @@ public class TestIndex
someTmpFile.mkdirs();
someTmpFile.deleteOnExit();
IndexMerger.persist(index, someTmpFile, indexSpec);
IndexMerger.persist(index, someTmpFile, null, indexSpec);
return IndexIO.loadIndex(someTmpFile);
}
catch (IOException e) {

View File

@ -232,7 +232,7 @@ public class SpatialFilterBonusTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile, indexSpec);
IndexMerger.persist(theIndex, tmpFile, null, indexSpec);
return IndexIO.loadIndex(tmpFile);
}
@ -412,9 +412,9 @@ public class SpatialFilterBonusTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
IndexMerger.persist(first, DATA_INTERVAL, firstFile, null, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, null, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(

View File

@ -261,7 +261,7 @@ public class SpatialFilterTest
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile, indexSpec);
IndexMerger.persist(theIndex, tmpFile, null, indexSpec);
return IndexIO.loadIndex(tmpFile);
}
@ -481,9 +481,9 @@ public class SpatialFilterTest
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, indexSpec);
IndexMerger.persist(first, DATA_INTERVAL, firstFile, null, indexSpec);
IndexMerger.persist(second, DATA_INTERVAL, secondFile, null, indexSpec);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile, null, indexSpec);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(

View File

@ -1,25 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.segment.realtime.plumber.PlumberSchool;
/**
@ -28,15 +31,22 @@ public class RealtimeIOConfig implements IOConfig
{
private final FirehoseFactory firehoseFactory;
private final PlumberSchool plumberSchool;
private final FirehoseFactoryV2 firehoseFactoryV2;
@JsonCreator
public RealtimeIOConfig(
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("plumber") PlumberSchool plumberSchool
@JsonProperty("plumber") PlumberSchool plumberSchool,
@JsonProperty("firehoseV2") FirehoseFactoryV2 firehoseFactoryV2
)
{
if (firehoseFactory != null && firehoseFactoryV2 != null) {
throw new IllegalArgumentException("Only provide one of firehose or firehoseV2");
}
this.firehoseFactory = firehoseFactory;
this.plumberSchool = plumberSchool;
this.firehoseFactoryV2 = firehoseFactoryV2;
}
@JsonProperty("firehose")
@ -45,6 +55,12 @@ public class RealtimeIOConfig implements IOConfig
return firehoseFactory;
}
@JsonProperty("firehoseV2")
public FirehoseFactoryV2 getFirehoseFactoryV2()
{
return firehoseFactoryV2;
}
public PlumberSchool getPlumberSchool()
{
return plumberSchool;

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime;
@ -21,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IngestionSpec;
import io.druid.segment.indexing.RealtimeIOConfig;
@ -31,7 +34,7 @@ import java.io.IOException;
/**
* A Fire Department has a Firehose and a Plumber.
*
* <p>
* This is a metaphor for a realtime stream (Firehose) and a coordinator of sinks (Plumber). The Firehose provides the
* realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes
* sure that the sinks don't overflow.
@ -41,7 +44,6 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
private final DataSchema dataSchema;
private final RealtimeIOConfig ioConfig;
private final RealtimeTuningConfig tuningConfig;
private final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
@JsonCreator
@ -92,11 +94,21 @@ public class FireDepartment extends IngestionSpec<RealtimeIOConfig, RealtimeTuni
return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics);
}
public boolean checkFirehoseV2()
{
return ioConfig.getFirehoseFactoryV2() != null;
}
public Firehose connect() throws IOException
{
return ioConfig.getFirehoseFactory().connect(dataSchema.getParser());
}
public FirehoseV2 connect(Object metaData) throws IOException
{
return ioConfig.getFirehoseFactoryV2().connect(dataSchema.getParser(), metaData);
}
public FireDepartmentMetrics getMetrics()
{
return metrics;

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime;
@ -29,7 +31,9 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.parsers.ParseException;
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.NoopQueryRunner;
@ -156,7 +160,8 @@ public class RealtimeManager implements QuerySegmentWalker
Iterable<FireChief> chiefsOfDataSource = chiefs.get(input);
return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(), // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
MoreExecutors.sameThreadExecutor(),
// Chaining query runners which wait on submitted chain query runners can make executor pools deadlock
Iterables.transform(
chiefsOfDataSource, new Function<FireChief, QueryRunner<T>>()
{
@ -182,6 +187,7 @@ public class RealtimeManager implements QuerySegmentWalker
private final RealtimeTuningConfig config;
private volatile Firehose firehose = null;
private volatile FirehoseV2 firehoseV2 = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
@ -214,6 +220,26 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
public FirehoseV2 initFirehoseV2(Object metaData)
{
synchronized (this) {
if (firehoseV2 == null) {
try {
log.info("Calling the FireDepartment and getting a FirehoseV2.");
firehoseV2 = fireDepartment.connect(metaData);
log.info("FirehoseV2 acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("FirehoseV2 already connected, skipping initFirehoseV2().");
}
return firehoseV2;
}
}
public Plumber initPlumber()
{
synchronized (this) {
@ -238,15 +264,124 @@ public class RealtimeManager implements QuerySegmentWalker
public void run()
{
plumber = initPlumber();
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try {
plumber.startJob();
Object metadata = plumber.startJob();
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
runFirehose(firehose);
}
}
catch (RuntimeException e) {
log.makeAlert(
e,
"RuntimeException aborted realtime processing[%s]",
fireDepartment.getDataSchema().getDataSource()
).emit();
normalExit = false;
throw e;
}
catch (Error e) {
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
}
finally {
CloseQuietly.close(firehose);
if (normalExit) {
plumber.finishJob();
plumber = null;
firehose = null;
}
}
}
private void runFirehoseV2(FirehoseV2 firehose)
{
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
try {
firehose.start();
}
catch (Exception e) {
log.error(e, "Failed to start firehoseV2");
return;
}
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
log.info("FirehoseV2 started with nextFlush [%s]", nextFlush);
boolean haveRow = true;
while (haveRow) {
InputRow inputRow = null;
int numRows = 0;
try {
inputRow = firehose.currRow();
if (inputRow != null) {
try {
numRows = plumber.add(inputRow);
}
catch (IndexSizeExceededException e) {
log.debug(e, "Index limit exceeded: %s", e.getMessage());
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
continue;
}
if (numRows < 0) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
} else {
metrics.incrementProcessed();
}
} else {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
}
}
catch (Exception e) {
log.makeAlert(e, "Unknown exception, Ignoring and continuing.")
.addData("inputRow", inputRow);
}
try {
haveRow = firehose.advance();
}
catch (Exception e) {
log.debug(e, "exception in firehose.advance(), considering unparseable row");
metrics.incrementUnparseable();
continue;
}
try {
final Sink sink = inputRow != null ? plumber.getSink(inputRow.getTimestampFromEpoch()) : null;
if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) {
nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod);
}
}
catch (Exception e) {
log.makeAlert(
e,
"An exception happened while queue to persist!? We hope it is transient. Ignore and continue."
);
}
}
}
private long doIncrementalPersist(Committer committer, Period intermediatePersistPeriod)
{
plumber.persist(committer);
return new DateTime().plus(intermediatePersistPeriod).getMillis();
}
private void runFirehose(Firehose firehose)
{
final Period intermediatePersistPeriod = config.getIntermediatePersistPeriod();
long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis();
while (firehose.hasMore()) {
InputRow inputRow = null;
try {
@ -259,7 +394,7 @@ public class RealtimeManager implements QuerySegmentWalker
continue;
}
}
catch (ParseException e) {
catch (Exception e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
@ -300,30 +435,6 @@ public class RealtimeManager implements QuerySegmentWalker
}
}
}
catch (RuntimeException e) {
log.makeAlert(
e,
"RuntimeException aborted realtime processing[%s]",
fireDepartment.getDataSchema().getDataSource()
).emit();
normalExit = false;
throw e;
}
catch (Error e) {
log.makeAlert(e, "Exception aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
}
finally {
CloseQuietly.close(firehose);
if (normalExit) {
plumber.finishJob();
plumber = null;
firehose = null;
}
}
}
public <T> QueryRunner<T> getQueryRunner(Query<T> query)
{

View File

@ -81,7 +81,7 @@ public class FlushingPlumber extends RealtimePlumber
}
@Override
public void startJob()
public Object startJob()
{
log.info("Starting job for %s", getSchema().getDataSource());
@ -92,8 +92,9 @@ public class FlushingPlumber extends RealtimePlumber
flushScheduledExec = Execs.scheduledSingleThreaded("flushing_scheduled_%d");
}
bootstrapSinksFromDisk();
Object retVal = bootstrapSinksFromDisk();
startFlushThread();
return retVal;
}
protected void flushAfterDuration(final long truncatedTime, final Sink sink)

View File

@ -17,6 +17,7 @@
package io.druid.segment.realtime.plumber;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
@ -27,8 +28,10 @@ public interface Plumber
/**
* Perform any initial setup. Should be called before using any other methods, and should be paired
* with a corresponding call to {@link #finishJob}.
*
* @return the metadata of the "newest" segment that might have previously been persisted
*/
public void startJob();
public Object startJob();
/**
* @param row - the row to insert
@ -44,6 +47,7 @@ public interface Plumber
*
* @param commitRunnable code to run after persisting data
*/
void persist(Committer commitRunnable);
void persist(Runnable commitRunnable);
/**

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
@ -22,6 +24,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -40,6 +43,7 @@ import io.druid.client.ServerView;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.query.MetricsEmittingQueryRunner;
import io.druid.query.Query;
@ -121,6 +125,8 @@ public class RealtimePlumber implements Plumber
private volatile ExecutorService mergeExecutor = null;
private volatile ScheduledExecutorService scheduledExecutor = null;
private static final String COMMIT_METADATA_KEY = "%commitMetadata%";
private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%";
public RealtimePlumber(
DataSchema schema,
@ -171,15 +177,17 @@ public class RealtimePlumber implements Plumber
}
@Override
public void startJob()
public Object startJob()
{
computeBaseDir(schema).mkdirs();
initializeExecutors();
bootstrapSinksFromDisk();
Object retVal = bootstrapSinksFromDisk();
registerServerViewCallback();
startPersistThread();
// Push pending sinks bootstrapped from previous run
mergeAndPush();
return retVal;
}
@Override
@ -332,6 +340,27 @@ public class RealtimePlumber implements Plumber
@Override
public void persist(final Runnable commitRunnable)
{
persist(
new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
commitRunnable.run();
}
}
);
}
@Override
public void persist(final Committer committer)
{
final List<Pair<FireHydrant, Interval>> indexesToPersist = Lists.newArrayList();
for (Sink sink : sinks.values()) {
@ -344,17 +373,55 @@ public class RealtimePlumber implements Plumber
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
final Map<String, Object> metadata = committer.getMetadata() == null ? null :
ImmutableMap.of(
COMMIT_METADATA_KEY,
committer.getMetadata(),
COMMIT_METADATA_TIMESTAMP_KEY,
System.currentTimeMillis()
);
persistExecutor.execute(
new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource()))
{
@Override
public void doRun()
{
/* Note:
If plumber crashes after storing a subset of all the hydrants then we will lose data and next
time we will start with the commitMetadata stored in those hydrants.
option#1:
maybe it makes sense to store the metadata outside the segments in a separate file. This is because the
commit metadata isn't really associated with an individual segment-- it's associated with a set of segments
that are persisted at the same time or maybe whole datasource. So storing it in the segments is asking for problems.
Sort of like this:
{
"metadata" : {"foo": "bar"},
"segments": [
{"id": "datasource_2000_2001_2000_1", "hydrant": 10},
{"id": "datasource_2001_2002_2001_1", "hydrant": 12},
]
}
When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the
ones in the commit file.
option#2
We could also just include the set of segments for the same chunk of metadata in more metadata on each
of the segments. we might also have to think about the hand-off in terms of the full set of segments being
handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing
off and the others fail, the real-time would believe that it needs to re-ingest the data).
*/
try {
for (Pair<FireHydrant, Interval> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs));
metrics.incrementRowOutputCount(
persistHydrant(
pair.lhs, schema, pair.rhs, metadata
)
);
}
commitRunnable.run();
committer.run();
}
catch (Exception e) {
metrics.incrementFailedPersists();
@ -413,11 +480,16 @@ public class RealtimePlumber implements Plumber
return;
}
/*
Note: it the plumber crashes after persisting a subset of hydrants then might duplicate data as these
hydrants will be read but older commitMetadata will be used. fixing this possibly needs structural
changes to plumber.
*/
for (FireHydrant hydrant : sink) {
synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink);
final int rowCount = persistHydrant(hydrant, schema, interval);
final int rowCount = persistHydrant(hydrant, schema, interval, null);
metrics.incrementRowOutputCount(rowCount);
}
}
@ -450,12 +522,13 @@ public class RealtimePlumber implements Plumber
}
QueryableIndex index = IndexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);
if (!isPushedMarker.createNewFile()) {
@ -565,20 +638,22 @@ public class RealtimePlumber implements Plumber
}
}
protected void bootstrapSinksFromDisk()
protected Object bootstrapSinksFromDisk()
{
final VersioningPolicy versioningPolicy = config.getVersioningPolicy();
File baseDir = computeBaseDir(schema);
if (baseDir == null || !baseDir.exists()) {
return;
return null;
}
File[] files = baseDir.listFiles();
if (files == null) {
return;
return null;
}
Object metadata = null;
long latestCommitTime = 0;
for (File sinkDir : files) {
Interval sinkInterval = new Interval(sinkDir.getName().replace("_", "/"));
@ -611,7 +686,7 @@ public class RealtimePlumber implements Plumber
}
}
);
boolean isCorrupted = false;
try {
List<FireHydrant> hydrants = Lists.newArrayList();
for (File segmentDir : sinkFiles) {
@ -623,7 +698,43 @@ public class RealtimePlumber implements Plumber
if (Ints.tryParse(segmentDir.getName()) == null) {
continue;
}
QueryableIndex queryableIndex = null;
try {
queryableIndex = IndexIO.loadIndex(segmentDir);
}
catch (IOException e) {
log.error(e, "Problem loading segmentDir from disk.");
isCorrupted = true;
}
if (isCorrupted) {
try {
File corruptSegmentDir = computeCorruptedFileDumpDir(segmentDir, schema);
log.info("Renaming %s to %s", segmentDir.getAbsolutePath(), corruptSegmentDir.getAbsolutePath());
FileUtils.copyDirectory(segmentDir, corruptSegmentDir);
FileUtils.deleteDirectory(segmentDir);
}
catch (Exception e1) {
log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath());
}
//Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed
//at some point.
continue;
}
Map<String, Object> segmentMetadata = queryableIndex.getMetaData();
if (segmentMetadata != null) {
Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY);
if (timestampObj != null) {
long timestamp = ((Long) timestampObj).longValue();
if (timestamp > latestCommitTime) {
log.info(
"Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]",
queryableIndex.getMetaData(), timestamp, latestCommitTime
);
latestCommitTime = timestamp;
metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY);
}
}
}
hydrants.add(
new FireHydrant(
new QueryableIndexSegment(
@ -634,13 +745,12 @@ public class RealtimePlumber implements Plumber
versioningPolicy.getVersion(sinkInterval),
config.getShardSpec()
),
IndexIO.loadIndex(segmentDir)
queryableIndex
),
Integer.parseInt(segmentDir.getName())
)
);
}
Sink currSink = new Sink(sinkInterval, schema, config, versioningPolicy.getVersion(sinkInterval), hydrants);
sinks.put(sinkInterval.getStartMillis(), currSink);
sinkTimeline.add(
@ -657,6 +767,7 @@ public class RealtimePlumber implements Plumber
.emit();
}
}
return metadata;
}
protected void startPersistThread()
@ -798,6 +909,14 @@ public class RealtimePlumber implements Plumber
return new File(config.getBasePersistDirectory(), schema.getDataSource());
}
protected File computeCorruptedFileDumpDir(File persistDir, DataSchema schema)
{
return new File(
persistDir.getAbsolutePath()
.replace(schema.getDataSource(), "corrupted" + File.pathSeparator + schema.getDataSource())
);
}
protected File computePersistDir(DataSchema schema, Interval interval)
{
return new File(computeBaseDir(schema), interval.toString().replace("/", "_"));
@ -812,7 +931,12 @@ public class RealtimePlumber implements Plumber
*
* @return the number of rows persisted
*/
protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval)
protected int persistHydrant(
FireHydrant indexToPersist,
DataSchema schema,
Interval interval,
Map<String, Object> metaData
)
{
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
@ -824,9 +948,10 @@ public class RealtimePlumber implements Plumber
}
log.info(
"DataSource[%s], Interval[%s], persisting Hydrant[%s]",
"DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]",
schema.getDataSource(),
interval,
metaData,
indexToPersist
);
try {
@ -834,16 +959,19 @@ public class RealtimePlumber implements Plumber
final File persistedFile;
final IndexSpec indexSpec = config.getIndexSpec();
if (config.isPersistInHeap()) {
persistedFile = IndexMaker.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
);
} else {
persistedFile = IndexMerger.persist(
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())),
metaData,
indexSpec
);
}

View File

@ -72,7 +72,8 @@ public class FireDepartmentTest
null,
new RealtimePlumberSchool(
null, null, null, null, null, null, null
)
),
null
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null, null

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime;
@ -22,8 +24,11 @@ import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.FirehoseFactoryV2;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
@ -59,8 +64,11 @@ import java.util.concurrent.TimeUnit;
public class RealtimeManagerTest
{
private RealtimeManager realtimeManager;
private RealtimeManager realtimeManager2;
private DataSchema schema;
private DataSchema schema2;
private TestPlumber plumber;
private TestPlumber plumber2;
@Before
public void setUp() throws Exception
@ -78,6 +86,12 @@ public class RealtimeManagerTest
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
schema2 = new DataSchema(
"testV2",
null,
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null)
);
RealtimeIOConfig ioConfig = new RealtimeIOConfig(
new FirehoseFactory()
{
@ -96,6 +110,28 @@ public class RealtimeManagerTest
{
return plumber;
}
},
null
);
RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(
null,
new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return plumber2;
}
},
new FirehoseFactoryV2()
{
@Override
public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException, ParseException
{
return new TestFirehoseV2(rows.iterator());
}
}
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
@ -125,6 +161,18 @@ public class RealtimeManagerTest
),
null
);
plumber2 = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema2, tuningConfig, new DateTime().toString()));
realtimeManager2 = new RealtimeManager(
Arrays.<FireDepartment>asList(
new FireDepartment(
schema2,
ioConfig2,
tuningConfig
)
),
null
);
}
@Test
@ -148,6 +196,27 @@ public class RealtimeManagerTest
Assert.assertEquals(1, plumber.getPersistCount());
}
@Test
public void testRunV2() throws Exception
{
realtimeManager2.start();
Stopwatch stopwatch = Stopwatch.createStarted();
while (realtimeManager2.getMetrics("testV2").processed() != 1) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Realtime manager should have completed processing 2 events!");
}
}
Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed());
Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway());
Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable());
Assert.assertTrue(plumber2.isStartedJob());
Assert.assertTrue(plumber2.isFinishedJob());
Assert.assertEquals(1, plumber2.getPersistCount());
}
private TestInputRowHolder makeRow(final long timestamp)
{
return new TestInputRowHolder(timestamp, null);
@ -266,6 +335,74 @@ public class RealtimeManagerTest
}
}
private static class TestFirehoseV2 implements FirehoseV2
{
private final Iterator<TestInputRowHolder> rows;
private InputRow currRow;
private boolean stop;
private TestFirehoseV2(Iterator<TestInputRowHolder> rows)
{
this.rows = rows;
}
private void nextMessage()
{
currRow = null;
while (currRow == null) {
final TestInputRowHolder holder = rows.next();
currRow = holder == null ? null : holder.getRow();
}
}
@Override
public void close() throws IOException
{
}
@Override
public boolean advance()
{
stop = !rows.hasNext();
if (stop) {
return false;
}
nextMessage();
return true;
}
@Override
public InputRow currRow()
{
return currRow;
}
@Override
public Committer makeCommitter()
{
return new Committer()
{
@Override
public Object getMetadata()
{
return null;
}
@Override
public void run()
{
}
};
}
@Override
public void start() throws Exception
{
nextMessage();
}
}
private static class TestPlumber implements Plumber
{
private final Sink sink;
@ -296,9 +433,10 @@ public class RealtimeManagerTest
}
@Override
public void startJob()
public Object startJob()
{
startedJob = true;
return null;
}
@Override
@ -342,5 +480,11 @@ public class RealtimeManagerTest
{
finishedJob = true;
}
@Override
public void persist(Committer commitRunnable)
{
persistCount++;
}
}
}

View File

@ -1,19 +1,21 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.plumber;
@ -25,6 +27,7 @@ import com.metamx.common.Granularity;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.FilteredServerView;
import io.druid.client.ServerView;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
@ -45,12 +48,13 @@ import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -71,6 +75,7 @@ public class RealtimePlumberSchoolTest
{
private final RejectionPolicyFactory rejectionPolicy;
private RealtimePlumber plumber;
private RealtimePlumberSchool realtimePlumberSchool;
private DataSegmentAnnouncer announcer;
private SegmentPublisher segmentPublisher;
private DataSegmentPusher dataSegmentPusher;
@ -169,7 +174,7 @@ public class RealtimePlumberSchoolTest
null
);
RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool(
realtimePlumberSchool = new RealtimePlumberSchool(
emitter,
new DefaultQueryRunnerFactoryConglomerate(Maps.<Class<? extends Query>, QueryRunnerFactory>newHashMap()),
dataSegmentPusher,
@ -187,10 +192,31 @@ public class RealtimePlumberSchoolTest
public void tearDown() throws Exception
{
EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter);
FileUtils.deleteDirectory(
new File(
tuningConfig.getBasePersistDirectory(),
schema.getDataSource()
)
);
}
@Test(timeout = 60000)
public void testPersist() throws Exception
{
testPersist(null);
}
@Test(timeout = 60000)
public void testPersistWithCommitMetadata() throws Exception
{
final Object commitMetadata = "dummyCommitMetadata";
testPersist(commitMetadata);
plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics);
Assert.assertEquals(commitMetadata, plumber.startJob());
}
private void testPersist(final Object commitMetadata) throws Exception
{
final MutableBoolean committed = new MutableBoolean(false);
plumber.getSinks()
@ -203,12 +229,32 @@ public class RealtimePlumberSchoolTest
new DateTime("2014-12-01T12:34:56.789").toString()
)
);
plumber.startJob();
Assert.assertNull(plumber.startJob());
final InputRow row = EasyMock.createNiceMock(InputRow.class);
EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L);
EasyMock.expect(row.getDimensions()).andReturn(new ArrayList<String>());
EasyMock.replay(row);
plumber.add(row);
if (commitMetadata != null) {
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return commitMetadata;
}
@Override
public void run()
{
committed.setValue(true);
}
}
);
} else {
plumber.persist(
new Runnable()
{
@ -219,6 +265,7 @@ public class RealtimePlumberSchoolTest
}
}
);
}
while (!committed.booleanValue()) {
Thread.sleep(100);