diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java index 379353e97bb..b64ffcc1bf9 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/ArbitraryGranularitySpec.java @@ -25,6 +25,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; +import com.metamx.common.Granularity; import com.metamx.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -45,22 +46,25 @@ public class ArbitraryGranularitySpec implements GranularitySpec intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); // Insert all intervals - for(final Interval inputInterval : inputIntervals) { + for (final Interval inputInterval : inputIntervals) { intervals.add(inputInterval); } // Ensure intervals are non-overlapping (but they may abut each other) final PeekingIterator intervalIterator = Iterators.peekingIterator(intervals.iterator()); - while(intervalIterator.hasNext()) { + while (intervalIterator.hasNext()) { final Interval currentInterval = intervalIterator.next(); - if(intervalIterator.hasNext()) { + if (intervalIterator.hasNext()) { final Interval nextInterval = intervalIterator.peek(); - if(currentInterval.overlaps(nextInterval)) { - throw new IllegalArgumentException(String.format( - "Overlapping intervals: %s, %s", - currentInterval, - nextInterval)); + if (currentInterval.overlaps(nextInterval)) { + throw new IllegalArgumentException( + String.format( + "Overlapping intervals: %s, %s", + currentInterval, + nextInterval + ) + ); } } } @@ -79,10 +83,16 @@ public class ArbitraryGranularitySpec implements GranularitySpec // First interval with start time ≤ dt final Interval interval = intervals.floor(new Interval(dt, new DateTime(Long.MAX_VALUE))); - if(interval != null && interval.contains(dt)) { + if (interval != null && interval.contains(dt)) { return Optional.of(interval); } else { return Optional.absent(); } } + + @Override + public Granularity getGranularity() + { + throw new UnsupportedOperationException(); + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java index d43cb1bf2b8..5e7d0d7ab06 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/GranularitySpec.java @@ -22,6 +22,7 @@ package io.druid.indexer.granularity; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Optional; +import com.metamx.common.Granularity; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -43,4 +44,6 @@ public interface GranularitySpec /** Time-grouping interval corresponding to some instant, if any. */ public Optional bucketInterval(DateTime dt); + + public Granularity getGranularity(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java index bbe1f681394..726426c2a14 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/granularity/UniformGranularitySpec.java @@ -67,6 +67,7 @@ public class UniformGranularitySpec implements GranularitySpec return wrappedSpec.bucketInterval(dt); } + @Override @JsonProperty("gran") public Granularity getGranularity() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 9af70b766be..a32e0f3b9fe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -1,28 +1,27 @@ /* -* Druid - a distributed column store. -* Copyright (C) 2012, 2013 Metamarkets Group Inc. -* -* This program is free software; you can redistribute it and/or -* modify it under the terms of the GNU General Public License -* as published by the Free Software Foundation; either version 2 -* of the License, or (at your option) any later version. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU General Public License for more details. -* -* You should have received a copy of the GNU General Public License -* along with this program; if not, write to the Free Software -* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -*/ + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ package io.druid.indexing.common.task; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -32,6 +31,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.collect.TreeMultiset; import com.google.common.primitives.Ints; +import com.metamx.common.guava.Comparators; import com.metamx.common.logger.Logger; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -42,6 +42,7 @@ import io.druid.indexer.granularity.GranularitySpec; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.SegmentInsertAction; import io.druid.indexing.common.index.YeOldePlumberSchool; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.loading.DataSegmentPusher; @@ -61,12 +62,9 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import java.util.concurrent.CopyOnWriteArrayList; -/** - * Simple single-threaded indexing task. Meant to be easy to use for small and medium sized datasets. For "big data", - * try launching multiple IndexTasks or using the {@link HadoopIndexTask}. - */ public class IndexTask extends AbstractFixedIntervalTask { private static final Logger log = new Logger(IndexTask.class); @@ -136,67 +134,69 @@ public class IndexTask extends AbstractFixedIntervalTask public TaskStatus run(TaskToolbox toolbox) throws Exception { final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox)); + final Set segments = Sets.newHashSet(); - final Map> shardSpecMap = determinePartitions(targetPartitionSize); + final Set validIntervals = Sets.intersection(granularitySpec.bucketIntervals(), getDataIntervals()); - final Map> schemass = Maps.transformEntries( - shardSpecMap, - new Maps.EntryTransformer, List>() - { - @Override - public List transformEntry( - Interval key, List shardSpecs - ) - { - return Lists.transform( - shardSpecs, - new Function() - { - @Override - public Schema apply(final ShardSpec shardSpec) - { - return new Schema(getDataSource(), spatialDimensions, aggregators, indexGranularity, shardSpec); - } - } - ); - } - } - ); - - final Set segments = generateSegments(toolbox, schemass, myLock.getVersion()); + for (final Interval bucket : validIntervals) { + final List shardSpecs; + if (targetPartitionSize > 0) { + shardSpecs = determinePartitions(bucket, targetPartitionSize); + } else { + shardSpecs = ImmutableList.of(new NoneShardSpec()); + } + for (final ShardSpec shardSpec : shardSpecs) { + final DataSegment segment = generateSegment( + toolbox, + new Schema( + getDataSource(), + spatialDimensions, + aggregators, + indexGranularity, + shardSpec + ), + bucket, + myLock.getVersion() + ); + segments.add(segment); + } + } toolbox.pushSegments(segments); - return TaskStatus.success(getId()); } - private Map> determinePartitions( + private SortedSet getDataIntervals() throws IOException + { + SortedSet retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); + try (Firehose firehose = firehoseFactory.connect()) { + while (firehose.hasMore()) { + final InputRow inputRow = firehose.nextRow(); + Interval interval = granularitySpec.getGranularity().bucket(new DateTime(inputRow.getTimestampFromEpoch())); + retVal.add(interval); + } + } + return retVal; + } + + private List determinePartitions( + final Interval interval, final int targetPartitionSize ) throws IOException { - Map> retVal = Maps.newLinkedHashMap(); + log.info("Determining partitions for interval[%s] with targetPartitionSize[%d]", interval, targetPartitionSize); - log.info("Determining partitions with targetPartitionSize[%d]", targetPartitionSize); + // The implementation of this determine partitions stuff is less than optimal. Should be done better. + + // Blacklist dimensions that have multiple values per row + final Set unusableDimensions = com.google.common.collect.Sets.newHashSet(); + // Track values of all non-blacklisted dimensions + final Map> dimensionValueMultisets = Maps.newHashMap(); // Load data try (Firehose firehose = firehoseFactory.connect()) { - if (!firehose.hasMore()) { - log.error("Unable to find any events to ingest! Check your firehose config!"); - return retVal; - } - InputRow inputRow = firehose.nextRow(); - - for (Interval interval : granularitySpec.bucketIntervals()) { - // Blacklist dimensions that have multiple values per row - final Set unusableDimensions = Sets.newHashSet(); - - // Track values of all non-blacklisted dimensions - final Map> dimensionValueMultisets = Maps.newHashMap(); - - boolean hasEventsInInterval = false; - boolean done = false; - while (!done && interval.contains(inputRow.getTimestampFromEpoch())) { - hasEventsInInterval = true; - + while (firehose.hasMore()) { + final InputRow inputRow = firehose.nextRow(); + if (interval.contains(inputRow.getTimestampFromEpoch())) { // Extract dimensions from event for (final String dim : inputRow.getDimensions()) { final List dimValues = inputRow.getDimension(dim); @@ -216,31 +216,10 @@ public class IndexTask extends AbstractFixedIntervalTask } } } - - if (firehose.hasMore()) { - inputRow = firehose.nextRow(); - } else { - done = true; - } - } - - if (hasEventsInInterval) { - if (targetPartitionSize == 0) { - retVal.put(interval, ImmutableList.of(new NoneShardSpec())); - } else { - retVal.put(interval, determineShardSpecs(dimensionValueMultisets)); - } } } } - return retVal; - } - - private List determineShardSpecs( - final Map> dimensionValueMultisets - ) - { // ShardSpecs we will return final List shardSpecs = Lists.newArrayList(); @@ -320,23 +299,6 @@ public class IndexTask extends AbstractFixedIntervalTask return shardSpecs; } - private Set generateSegments( - final TaskToolbox toolbox, - final Map> schemass, - final String version - ) throws IOException - { - final Set retVal = Sets.newHashSet(); - - for (Map.Entry> entry : schemass.entrySet()) { - for (Schema schema : entry.getValue()) { - retVal.add(generateSegment(toolbox, schema, entry.getKey(), version)); - } - } - - return retVal; - } - private DataSegment generateSegment( final TaskToolbox toolbox, final Schema schema, @@ -358,7 +320,7 @@ public class IndexTask extends AbstractFixedIntervalTask ); // We need to track published segments. - final List pushedSegments = new CopyOnWriteArrayList<>(); + final List pushedSegments = new CopyOnWriteArrayList(); final DataSegmentPusher wrappedDataSegmentPusher = new DataSegmentPusher() { @Override @@ -500,4 +462,4 @@ public class IndexTask extends AbstractFixedIntervalTask { return spatialDimensions; } -} \ No newline at end of file +}