another attempt at index task

This commit is contained in:
fjy 2014-01-10 17:56:22 -08:00
parent f0b4d0c1e4
commit 1ecc94cfb6
4 changed files with 95 additions and 119 deletions

View File

@ -25,6 +25,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Sets;
import com.metamx.common.Granularity;
import com.metamx.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -45,22 +46,25 @@ public class ArbitraryGranularitySpec implements GranularitySpec
intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
// Insert all intervals
for(final Interval inputInterval : inputIntervals) {
for (final Interval inputInterval : inputIntervals) {
intervals.add(inputInterval);
}
// Ensure intervals are non-overlapping (but they may abut each other)
final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator());
while(intervalIterator.hasNext()) {
while (intervalIterator.hasNext()) {
final Interval currentInterval = intervalIterator.next();
if(intervalIterator.hasNext()) {
if (intervalIterator.hasNext()) {
final Interval nextInterval = intervalIterator.peek();
if(currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval));
if (currentInterval.overlaps(nextInterval)) {
throw new IllegalArgumentException(
String.format(
"Overlapping intervals: %s, %s",
currentInterval,
nextInterval
)
);
}
}
}
@ -79,10 +83,16 @@ public class ArbitraryGranularitySpec implements GranularitySpec
// First interval with start time dt
final Interval interval = intervals.floor(new Interval(dt, new DateTime(Long.MAX_VALUE)));
if(interval != null && interval.contains(dt)) {
if (interval != null && interval.contains(dt)) {
return Optional.of(interval);
} else {
return Optional.absent();
}
}
@Override
public Granularity getGranularity()
{
throw new UnsupportedOperationException();
}
}

View File

@ -22,6 +22,7 @@ package io.druid.indexer.granularity;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.base.Optional;
import com.metamx.common.Granularity;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -43,4 +44,6 @@ public interface GranularitySpec
/** Time-grouping interval corresponding to some instant, if any. */
public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getGranularity();
}

View File

@ -67,6 +67,7 @@ public class UniformGranularitySpec implements GranularitySpec
return wrappedSpec.bucketInterval(dt);
}
@Override
@JsonProperty("gran")
public Granularity getGranularity()
{

View File

@ -1,28 +1,27 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -32,6 +31,7 @@ import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.guava.Comparators;
import com.metamx.common.logger.Logger;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
@ -42,6 +42,7 @@ import io.druid.indexer.granularity.GranularitySpec;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.index.YeOldePlumberSchool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.loading.DataSegmentPusher;
@ -61,12 +62,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Simple single-threaded indexing task. Meant to be easy to use for small and medium sized datasets. For "big data",
* try launching multiple IndexTasks or using the {@link HadoopIndexTask}.
*/
public class IndexTask extends AbstractFixedIntervalTask
{
private static final Logger log = new Logger(IndexTask.class);
@ -136,67 +134,69 @@ public class IndexTask extends AbstractFixedIntervalTask
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
final Set<DataSegment> segments = Sets.newHashSet();
final Map<Interval, List<ShardSpec>> shardSpecMap = determinePartitions(targetPartitionSize);
final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals());
final Map<Interval, List<Schema>> schemass = Maps.transformEntries(
shardSpecMap,
new Maps.EntryTransformer<Interval, List<ShardSpec>, List<Schema>>()
{
@Override
public List<Schema> transformEntry(
Interval key, List<ShardSpec> shardSpecs
)
{
return Lists.transform(
shardSpecs,
new Function<ShardSpec, Schema>()
{
@Override
public Schema apply(final ShardSpec shardSpec)
{
return new Schema(getDataSource(), spatialDimensions, aggregators, indexGranularity, shardSpec);
}
}
);
}
}
);
final Set<DataSegment> segments = generateSegments(toolbox, schemass, myLock.getVersion());
for (final Interval bucket : validIntervals) {
final List<ShardSpec> shardSpecs;
if (targetPartitionSize > 0) {
shardSpecs = determinePartitions(bucket, targetPartitionSize);
} else {
shardSpecs = ImmutableList.<ShardSpec>of(new NoneShardSpec());
}
for (final ShardSpec shardSpec : shardSpecs) {
final DataSegment segment = generateSegment(
toolbox,
new Schema(
getDataSource(),
spatialDimensions,
aggregators,
indexGranularity,
shardSpec
),
bucket,
myLock.getVersion()
);
segments.add(segment);
}
}
toolbox.pushSegments(segments);
return TaskStatus.success(getId());
}
private Map<Interval, List<ShardSpec>> determinePartitions(
private SortedSet<Interval> getDataIntervals() throws IOException
{
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
try (Firehose firehose = firehoseFactory.connect()) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
}
}
return retVal;
}
private List<ShardSpec> determinePartitions(
final Interval interval,
final int targetPartitionSize
) throws IOException
{
Map<Interval, List<ShardSpec>> retVal = Maps.newLinkedHashMap();
log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize);
log.info("Determining partitions with targetPartitionSize[%d]", targetPartitionSize);
// The implementation of this determine partitions stuff is less than optimal. Should be done better.
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = com.google.common.collect.Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
// Load data
try (Firehose firehose = firehoseFactory.connect()) {
if (!firehose.hasMore()) {
log.error("Unable to find any events to ingest! Check your firehose config!");
return retVal;
}
InputRow inputRow = firehose.nextRow();
for (Interval interval : granularitySpec.bucketIntervals()) {
// Blacklist dimensions that have multiple values per row
final Set<String> unusableDimensions = Sets.newHashSet();
// Track values of all non-blacklisted dimensions
final Map<String, TreeMultiset<String>> dimensionValueMultisets = Maps.newHashMap();
boolean hasEventsInInterval = false;
boolean done = false;
while (!done && interval.contains(inputRow.getTimestampFromEpoch())) {
hasEventsInInterval = true;
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
if (interval.contains(inputRow.getTimestampFromEpoch())) {
// Extract dimensions from event
for (final String dim : inputRow.getDimensions()) {
final List<String> dimValues = inputRow.getDimension(dim);
@ -216,31 +216,10 @@ public class IndexTask extends AbstractFixedIntervalTask
}
}
}
if (firehose.hasMore()) {
inputRow = firehose.nextRow();
} else {
done = true;
}
}
if (hasEventsInInterval) {
if (targetPartitionSize == 0) {
retVal.put(interval, ImmutableList.<ShardSpec>of(new NoneShardSpec()));
} else {
retVal.put(interval, determineShardSpecs(dimensionValueMultisets));
}
}
}
}
return retVal;
}
private List<ShardSpec> determineShardSpecs(
final Map<String, TreeMultiset<String>> dimensionValueMultisets
)
{
// ShardSpecs we will return
final List<ShardSpec> shardSpecs = Lists.newArrayList();
@ -320,23 +299,6 @@ public class IndexTask extends AbstractFixedIntervalTask
return shardSpecs;
}
private Set<DataSegment> generateSegments(
final TaskToolbox toolbox,
final Map<Interval, List<Schema>> schemass,
final String version
) throws IOException
{
final Set<DataSegment> retVal = Sets.newHashSet();
for (Map.Entry<Interval, List<Schema>> entry : schemass.entrySet()) {
for (Schema schema : entry.getValue()) {
retVal.add(generateSegment(toolbox, schema, entry.getKey(), version));
}
}
return retVal;
}
private DataSegment generateSegment(
final TaskToolbox toolbox,
final Schema schema,
@ -358,7 +320,7 @@ public class IndexTask extends AbstractFixedIntervalTask
);
// We need to track published segments.
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<>();
final List<DataSegment> pushedSegments = new CopyOnWriteArrayList<DataSegment>();
final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher()
{
@Override
@ -500,4 +462,4 @@ public class IndexTask extends AbstractFixedIntervalTask
{
return spatialDimensions;
}
}
}