diff --git a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java index a2256936b83..96c49d9ba0d 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java @@ -19,16 +19,23 @@ package org.apache.druid.java.util.common; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; import java.util.SortedSet; import java.util.TreeSet; /** + * */ public class JodaUtils { @@ -36,10 +43,17 @@ public class JodaUtils public static final long MAX_INSTANT = Long.MAX_VALUE / 2; public static final long MIN_INSTANT = Long.MIN_VALUE / 2; - public static ArrayList condenseIntervals(Iterable intervals) + /** + * This method will not materialize the input intervals if they represent + * a SortedSet (i.e. implement that interface). If not, the method internally + * creates a sorted set and populates it with them thus materializing the + * intervals in the input. + * + * @param intervals The Iterable object containing the intervals to condense + * @return The condensed intervals + */ + public static List condenseIntervals(Iterable intervals) { - ArrayList retVal = new ArrayList<>(); - final SortedSet sortedIntervals; if (intervals instanceof SortedSet) { @@ -50,35 +64,122 @@ public class JodaUtils sortedIntervals.add(interval); } } + return ImmutableList.copyOf(condensedIntervalsIterator(sortedIntervals.iterator())); + } - if (sortedIntervals.isEmpty()) { - return new ArrayList<>(); + /** + * This method does not materialize the intervals represented by the + * sortedIntervals iterator. However, caller needs to insure that sortedIntervals + * is already sorted in ascending order (use the Comparators.intervalsByStartThenEnd()). + * It avoids materialization by incrementally condensing the intervals by + * starting from the first and looking for "adjacent" intervals. This is + * possible since intervals in the Iterator are in ascending order (as + * guaranteed by the caller). + *

+ * * + * + * @param sortedIntervals The iterator object containing the intervals to condense + * @return An iterator for the condensed intervals. By construction the condensed intervals are sorted + * in ascending order and contain no repeated elements. The iterator can contain nulls, + * they will be skipped if it does. + * @throws IAE if an element is null or if sortedIntervals is not sorted in ascending order + */ + public static Iterator condensedIntervalsIterator(Iterator sortedIntervals) + { + + if (sortedIntervals == null || !sortedIntervals.hasNext()) { + return Collections.emptyIterator(); } - Iterator intervalsIter = sortedIntervals.iterator(); - Interval currInterval = intervalsIter.next(); - while (intervalsIter.hasNext()) { - Interval next = intervalsIter.next(); + final PeekingIterator peekingIterator = Iterators.peekingIterator(sortedIntervals); + return new Iterator() + { + private Interval previous; - if (currInterval.abuts(next)) { - currInterval = new Interval(currInterval.getStart(), next.getEnd()); - } else if (currInterval.overlaps(next)) { - DateTime nextEnd = next.getEnd(); - DateTime currEnd = currInterval.getEnd(); - currInterval = new Interval( - currInterval.getStart(), - nextEnd.isAfter(currEnd) ? nextEnd : currEnd - ); - } else { - retVal.add(currInterval); - currInterval = next; + @Override + public boolean hasNext() + { + return peekingIterator.hasNext(); } - } - retVal.add(currInterval); + @Override + public Interval next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Interval currInterval = peekingIterator.next(); + if (currInterval == null) { + throw new IAE("Element of intervals is null"); + } + + // check sorted ascending: + verifyAscendingSortOrder(previous, currInterval); + + previous = currInterval; + + while (hasNext()) { + Interval next = peekingIterator.peek(); + if (next == null) { + throw new IAE("Element of intervals is null"); + } + + if (currInterval.abuts(next)) { + currInterval = new Interval(currInterval.getStart(), next.getEnd()); + peekingIterator.next(); + } else if (currInterval.overlaps(next)) { + DateTime nextEnd = next.getEnd(); + DateTime currEnd = currInterval.getEnd(); + currInterval = new Interval( + currInterval.getStart(), + nextEnd.isAfter(currEnd) ? nextEnd : currEnd + ); + peekingIterator.next(); + } else { + break; + } + } + return currInterval; + } + }; + } + + /** + * Verify whether an iterable of intervals contains overlapping intervals + * + * @param intervals An interval iterable sorted using Comparators.intervalsByStartThenEnd() + * @return true if the iterable contains at least two overlapping intervals, false otherwise. + * @throws IAE when at least an element is null or when the iterable is not sorted + */ + public static boolean containOverlappingIntervals(Iterable intervals) + { + if (intervals == null) { + return false; + } + boolean retVal = false; + Interval previous = null; + for (Interval current : intervals) { + if (current == null) { + throw new IAE("Intervals should not contain nulls"); + } + verifyAscendingSortOrder(previous, current); + if (previous != null && previous.overlaps(current)) { + retVal = true; + break; + } + previous = current; + } return retVal; } + private static void verifyAscendingSortOrder(Interval previous, Interval current) + { + if (previous != null && previous.isAfter(current)) { + throw new IAE("Adjacent intervals are not sorted [%s,%s]", previous, current); + } + } + public static Interval umbrellaInterval(Iterable intervals) { ArrayList startDates = new ArrayList<>(); @@ -137,4 +238,6 @@ public class JodaUtils return max; } } + + } diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java new file mode 100644 index 00000000000..7065535eeb4 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.granularity; + +import com.google.common.collect.FluentIterable; +import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Produce a stream of intervals generated by a given set of intervals as input and a given + * granularity. This class avoids materializing the granularity intervals whenever possible. + */ +public class IntervalsByGranularity +{ + private final List sortedNonOverlappingIntervals; + private final Granularity granularity; + + /** + * @param intervals Intervals for which to apply the given granularity. They should + * not contain overlapped intervals. + * @param granularity The granularity to apply + * @throws IAE if intervals contains at least an overlapping pair + */ + public IntervalsByGranularity(Collection intervals, Granularity granularity) + { + // eliminate dups, sort intervals: + Set intervalSet = new HashSet<>(intervals); + List inputIntervals = new ArrayList<>(intervals.size()); + inputIntervals.addAll(intervalSet); + inputIntervals.sort(Comparators.intervalsByStartThenEnd()); + + // sanity check + if (JodaUtils.containOverlappingIntervals(inputIntervals)) { + throw new IAE("Intervals contain overlapping intervals [%s]", intervals); + } + + // all good: + sortedNonOverlappingIntervals = inputIntervals; + this.granularity = granularity; + } + + /** + * @return The intervals according the granularity. The intervals are provided in + * order according to Comparators.intervalsByStartThenEnd() + */ + public Iterator granularityIntervalsIterator() + { + Iterator ite; + if (sortedNonOverlappingIntervals.isEmpty()) { + ite = Collections.emptyIterator(); + } else { + // The filter after transform & concat is to remove duplicats. + // This can happen when condense left intervals that did not overlap but + // when a larger granularity is applied then they become equal + // imagine input are 2013-01-01T00Z/2013-01-10T00Z, 2013-01-15T00Z/2013-01-20T00Z. + // the iterator for the two intervals is called, say with MONTH granularity, two + // intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z. + // Thus dups can be created given the right conditions.... + final SettableSupplier previous = new SettableSupplier<>(); + ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable) + .filter(interval -> { + if (previous.get() != null && previous.get().equals(interval)) { + return false; + } + previous.set(interval); + return true; + }).iterator(); + } + return ite; + } + +} diff --git a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java index c5610f969c8..d7bfb4adffd 100644 --- a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java @@ -19,8 +19,11 @@ package org.apache.druid.common.utils; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; @@ -32,6 +35,7 @@ import java.util.Collections; import java.util.List; /** + * */ public class JodaUtilsTest { @@ -75,6 +79,40 @@ public class JodaUtilsTest Intervals.of("2011-03-05/2011-03-06") ); + List expected = Arrays.asList( + Intervals.of("2011-01-01/2011-01-03"), + Intervals.of("2011-02-01/2011-02-08"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + + List actual = JodaUtils.condenseIntervals(intervals); + + Assert.assertEquals( + expected, + actual + ); + + } + + + @Test + public void testCondenseIntervalsSimpleSortedIterator() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + intervals.sort(Comparators.intervalsByStartThenEnd()); + + List actual = ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); Assert.assertEquals( Arrays.asList( Intervals.of("2011-01-01/2011-01-03"), @@ -83,7 +121,116 @@ public class JodaUtilsTest Intervals.of("2011-03-03/2011-03-04"), Intervals.of("2011-03-05/2011-03-06") ), - JodaUtils.condenseIntervals(intervals) + actual + ); + + } + + @Test + public void testCondenseIntervalsSimpleSortedIteratorOverlapping() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-01-07/2015-01-19"), + Intervals.of("2011-01-15/2011-01-19"), + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-01/2011-03-10") + ); + + intervals.sort(Comparators.intervalsByStartThenEnd()); + + Assert.assertEquals( + Collections.singletonList( + Intervals.of("2011-01-01/2015-01-19") + ), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())) + ); + } + + @Test(expected = IAE.class) + public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullsShouldThrow() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-02-01/2011-03-10"), + null, + Intervals.of("2011-03-07/2011-04-19"), + Intervals.of("2011-04-01/2015-01-19"), + null + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IAE.class) + public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullFirstAndLastshouldThrow() + { + List intervals = Arrays.asList( + null, + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-07/2011-04-19"), + Intervals.of("2011-04-01/2015-01-19"), + null + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IllegalArgumentException.class) + public void testCondenseIntervalsSimpleUnsortedIterator() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IllegalArgumentException.class) + public void testCondenseIntervalsSimpleUnsortedIteratorSmallestAtEnd() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-01/2011-02-04"), + Intervals.of("2011-03-01/2011-03-04"), + Intervals.of("2010-01-01/2010-03-04") + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test + public void testCondenseIntervalsIteratorWithDups() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-04/2011-02-05"), + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + intervals.sort(Comparators.intervalsByStartThenEnd()); + + Assert.assertEquals( + Arrays.asList( + Intervals.of("2011-01-01/2011-01-03"), + Intervals.of("2011-02-01/2011-02-08"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())) ); } @@ -144,4 +291,54 @@ public class JodaUtilsTest Assert.assertEquals(Long.MAX_VALUE, period.getMinutes()); } + @Test + public void testShouldContainOverlappingIntervals() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-25/2011-04-03"), + Intervals.of("2011-04-01/2015-01-19"), + Intervals.of("2016-01-15/2016-01-19") + ); + Assert.assertTrue(JodaUtils.containOverlappingIntervals(intervals)); + } + + + @Test + public void testShouldNotContainOverlappingIntervals() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-10/2011-04-03"), + Intervals.of("2011-04-04/2015-01-14"), + Intervals.of("2016-01-15/2016-01-19") + ); + Assert.assertFalse(JodaUtils.containOverlappingIntervals(intervals)); + } + + @Test(expected = IAE.class) + public void testOverlappingIntervalsContainsNull() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + null, + Intervals.of("2011-04-04/2015-01-14"), + Intervals.of("2016-01-15/2016-01-19") + ); + JodaUtils.containOverlappingIntervals(intervals); + } + + @Test(expected = IAE.class) + public void testOverlappingIntervalsContainsUnsorted() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-10/2011-04-03"), + Intervals.of("2016-01-15/2016-01-19"), + Intervals.of("2011-04-04/2015-01-14") + ); + JodaUtils.containOverlappingIntervals(intervals); + } + + } diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java new file mode 100644 index 00000000000..a38e6d5dbe8 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public class IntervalsByGranularityTest +{ + private static final long SECONDS_IN_YEAR = 31536000; + + + @Test + public void testTrivialIntervalExplosion() + { + Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); + Interval second = Intervals.of("2012-01-01T00Z/2012-02-01T00Z"); + Interval third = Intervals.of("2002-01-01T00Z/2003-01-01T00Z"); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("DAY") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 62 + 365); + + granularityIntervals = intervals.granularityIntervalsIterator(); + count = getCountWithNoHasNext(granularityIntervals); + Assert.assertTrue(count == 62 + 365); + } + + + @Test + public void testDups() + { + Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); + Interval second = Intervals.of("2012-04-01T00Z/2012-05-01T00Z"); + Interval third = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); // dup + + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("DAY") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 61); + } + + + @Test + public void testCondenseForManyIntervals() + { + // This method attempts to test that there are no issues when condensed is called + // with an iterator pointing to millions of intervals (since the version of condensed + // used here takes an interval iterator and does not materialize intervals) + Interval first = Intervals.of("2012-01-01T00Z/P1Y"); + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first), + Granularity.fromString("SECOND") + ); + Assert.assertEquals( + ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator())) + ); + } + + @Test + public void testIntervalExplosion() + { + Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z"); + Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z"); + Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z"); + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("SECOND") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 78537600); + + } + + @Test + public void testSimpleEliminateRepeated() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + + Assert.assertEquals( + ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")), + ImmutableList.copyOf(intervals.granularityIntervalsIterator()) + ); + + } + + @Test + public void testALittleMoreComplexEliminateRepeated() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2015-01-08T00Z/2015-01-11T00Z"), + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z"), + Intervals.of("2007-03-08T00Z/2007-04-11T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + + Assert.assertEquals( + ImmutableList.of( + Intervals.of("2007-03-01T00Z/2007-04-01T00Z"), + Intervals.of("2007-04-01T00Z/2007-05-01T00Z"), + Intervals.of("2012-01-01T00Z/2012-02-01T00Z"), + Intervals.of("2015-01-01T00Z/2015-02-01T00Z") + ), + ImmutableList.copyOf(intervals.granularityIntervalsIterator()) + ); + + } + + @Test(expected = IAE.class) + public void testOverlappingShouldThrow() + { + List inputIntervals = ImmutableList.of( + Intervals.of("2013-01-01T00Z/2013-01-11T00Z"), + Intervals.of("2013-01-05T00Z/2013-01-08T00Z"), + Intervals.of("2013-01-07T00Z/2013-01-15T00Z") + ); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularity.fromString("DAY") + ); + } + + + @Test + public void testWithGranularity() + { + List inputIntervals = ImmutableList.of( + Intervals.of("2013-01-01T00Z/2013-01-10T00Z"), + Intervals.of("2013-01-15T00Z/2013-01-20T00Z"), + Intervals.of("2013-02-07T00Z/2013-02-15T00Z") + ); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularity.fromString("MONTH") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 2); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveThrowsException() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2015-01-08T00Z/2015-01-11T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + intervals.granularityIntervalsIterator().remove(); + } + + @Test + public void testEmptyInput() + { + final List inputIntervals = Collections.emptyList(); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext()); + } + + private long getCount(Iterator granularityIntervalIterator) + { + long count = 0; + Interval previous = null; + Interval current; + while (granularityIntervalIterator.hasNext()) { + current = granularityIntervalIterator.next(); + if (previous != null) { + Assert.assertTrue(previous + "," + current, previous.getEndMillis() <= current.getStartMillis()); + } + previous = current; + count++; + } + return count; + } + + private long getCountWithNoHasNext(Iterator granularityIntervalIterator) + { + long count = 0; + Interval previous = null; + Interval current; + + while (true) { + try { + current = granularityIntervalIterator.next(); + } + catch (NoSuchElementException e) { + // done + break; + } + if (previous != null) { + Assert.assertTrue(previous.getEndMillis() <= current.getStartMillis()); + } + previous = current; + count++; + } + + return count; + } + +} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index b1ec5460993..1d0c3c4b14f 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -64,7 +65,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; /** @@ -109,10 +109,10 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); - if (!config.getSegmentGranularIntervals().isPresent()) { + if (config.getInputIntervals().isEmpty()) { groupByJob.setNumReduceTasks(1); } else { - groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); + groupByJob.setNumReduceTasks(Iterators.size(config.getSegmentGranularIntervals().iterator())); } JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), @@ -151,7 +151,7 @@ public class DetermineHashedPartitionsJob implements Jobby log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); FileSystem fileSystem = null; - if (!config.getSegmentGranularIntervals().isPresent()) { + if (config.getInputIntervals().isEmpty()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) { @@ -159,7 +159,9 @@ public class DetermineHashedPartitionsJob implements Jobby } List intervals = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), - new TypeReference>() {} + new TypeReference>() + { + } ); config.setGranularitySpec( new UniformGranularitySpec( @@ -182,7 +184,7 @@ public class DetermineHashedPartitionsJob implements Jobby } HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { DateTime bucket = segmentGranularity.getStart(); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); @@ -295,11 +297,11 @@ public class DetermineHashedPartitionsJob implements Jobby super.setup(context); rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity(); config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - Optional> intervals = config.getSegmentGranularIntervals(); - if (intervals.isPresent()) { + Iterable intervals = config.getSegmentGranularIntervals(); + if (intervals.iterator().hasNext()) { determineIntervals = false; final ImmutableMap.Builder builder = ImmutableMap.builder(); - for (final Interval bucketInterval : intervals.get()) { + for (final Interval bucketInterval : intervals) { builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector()); } hyperLogLogs = builder.build(); @@ -376,7 +378,7 @@ public class DetermineHashedPartitionsJob implements Jobby protected void setup(Context context) { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - determineIntervals = !config.getSegmentGranularIntervals().isPresent(); + determineIntervals = config.getInputIntervals().isEmpty(); } @Override @@ -477,11 +479,11 @@ public class DetermineHashedPartitionsJob implements Jobby { this.config = config; HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfig.fromConfiguration(config); - if (hadoopConfig.getSegmentGranularIntervals().isPresent()) { + if (!hadoopConfig.getInputIntervals().isEmpty()) { determineIntervals = false; int reducerNumber = 0; ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) { + for (Interval interval : hadoopConfig.getSegmentGranularIntervals()) { builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++); } reducerLookup = builder.build(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index 1e810c69aef..ebe69cd15a7 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -212,7 +212,7 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.setOutputKeyClass(BytesWritable.class); dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); - dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); + dimSelectionJob.setNumReduceTasks(Iterators.size(config.getGranularitySpec().sortedBucketIntervals().iterator())); JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), JobHelper.distributedClassPath(config.makeIntermediatePath()), @@ -256,7 +256,7 @@ public class DeterminePartitionsJob implements Jobby FileSystem fileSystem = null; Map> shardSpecs = new TreeMap<>(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); @@ -447,7 +447,7 @@ public class DeterminePartitionsJob implements Jobby final ImmutableMap.Builder timeIndexBuilder = ImmutableMap.builder(); int idx = 0; - for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) { + for (final Interval bucketInterval : config.getGranularitySpec().sortedBucketIntervals()) { timeIndexBuilder.put(bucketInterval.getStartMillis(), idx); idx++; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index a4f719a4c21..8b5b4b6b0bb 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.TreeMap; /** + * */ public class HadoopDruidDetermineConfigurationJob implements Jobby { @@ -75,7 +76,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby } Map> shardSpecs = new TreeMap<>(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { DateTime bucket = segmentGranularity.getStart(); // negative shardsPerInterval means a single shard List specs = Lists.newArrayListWithCapacity(shardsPerInterval); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index 0b69b33098d..54c8f073a9b 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -76,10 +76,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; -import java.util.SortedSet; /** + * */ public class HadoopDruidIndexerConfig { @@ -99,7 +98,7 @@ public class HadoopDruidIndexerConfig /** * Hadoop tasks running in an Indexer process need a reference to the Properties instance created * in PropertiesModule so that the task sees properties that were specified in Druid's config files. - * + *

* This is not strictly necessary for Peon-based tasks which have all properties, including config file properties, * specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only), * but we always use the injected Properties for consistency. @@ -314,9 +313,9 @@ public class HadoopDruidIndexerConfig public Optional> getIntervals() { - Optional> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals(); - if (setOptional.isPresent()) { - return Optional.of(JodaUtils.condenseIntervals(setOptional.get())); + Iterable bucketIntervals = schema.getDataSchema().getGranularitySpec().sortedBucketIntervals(); + if (bucketIntervals.iterator().hasNext()) { + return Optional.of(JodaUtils.condenseIntervals(bucketIntervals)); } else { return Optional.absent(); } @@ -426,7 +425,6 @@ public class HadoopDruidIndexerConfig * Get the proper bucket for some input row. * * @param inputRow an InputRow - * * @return the Bucket that this row belongs to */ Optional getBucket(InputRow inputRow) @@ -455,14 +453,12 @@ public class HadoopDruidIndexerConfig } - Optional> getSegmentGranularIntervals() + Iterable getSegmentGranularIntervals() { - return Optional.fromNullable( + return schema.getDataSchema() .getGranularitySpec() - .bucketIntervals() - .orNull() - ); + .sortedBucketIntervals(); } public List getInputIntervals() @@ -474,15 +470,17 @@ public class HadoopDruidIndexerConfig Optional> getAllBuckets() { - Optional> intervals = getSegmentGranularIntervals(); - if (intervals.isPresent()) { + Iterable intervals = getSegmentGranularIntervals(); + if (intervals.iterator().hasNext()) { return Optional.of( FunctionalIterable - .create(intervals.get()) + .create(intervals) .transformCat( input -> { final DateTime bucketTime = input.getStart(); - final List specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis()); + final List specs = schema.getTuningConfig() + .getShardSpecs() + .get(bucketTime.getMillis()); if (specs == null) { return ImmutableList.of(); } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java index 02ced6cf10c..5190280d000 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java @@ -83,7 +83,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< throw new ParseException(errorMsg); } - if (!granularitySpec.bucketIntervals().isPresent() + if (granularitySpec.inputIntervals().isEmpty() || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())) .isPresent()) { innerMap(inputRow, context); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java index 3c809d26018..f16ac133933 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJobTest final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); - Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals); final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); @@ -99,7 +98,7 @@ public class HadoopDruidDetermineConfigurationJobTest final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); - Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals); final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java index aed33811156..823cdd29261 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java @@ -78,7 +78,7 @@ public class HadoopIngestionSpecTest Assert.assertEquals( "getIntervals", Collections.singletonList(Intervals.of("2012-01-01/P1D")), - granularitySpec.getIntervals().get() + granularitySpec.inputIntervals() ); Assert.assertEquals( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 8d59554909b..c0254c4fbe4 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -599,7 +599,7 @@ public class IndexGeneratorJobTest Map> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance()); int shardCount = 0; int segmentNum = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { List specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]); List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); for (ShardSpec spec : specs) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index f9c725a294b..4a7fa0d768a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputFormat; @@ -48,6 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -69,7 +69,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -262,41 +261,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet"); } - /** - * Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec. - * - * This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them - * to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}. - * - * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * - * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method - * will initialize {@link #taskLockHelper} as a side effect. - * - * @return whether the lock was acquired - */ - protected boolean determineLockGranularityAndTryLock( - TaskActionClient client, - GranularitySpec granularitySpec - ) throws IOException - { - final List intervals = granularitySpec.bucketIntervals().isPresent() - ? new ArrayList<>(granularitySpec.bucketIntervals().get()) - : Collections.emptyList(); - return determineLockGranularityAndTryLock(client, intervals); - } - /** * Attempts to acquire a lock that covers certain intervals. - * + *

* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * + *

* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method * will initialize {@link #taskLockHelper} as a side effect. * * @return whether the lock was acquired */ - boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals) throws IOException + public boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals) + throws IOException { final boolean forceTimeChunkLock = getContextValue( Tasks.FORCE_TIME_CHUNK_LOCK_KEY, @@ -325,9 +301,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask /** * Attempts to acquire a lock that covers certain segments. - * + *

* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * + *

* This method will initialize {@link #taskLockHelper} as a side effect. * * @return whether the lock was acquired @@ -396,25 +372,33 @@ public abstract class AbstractBatchIndexTask extends AbstractTask } } + protected boolean tryTimeChunkLock(TaskActionClient client, List intervals) throws IOException { // The given intervals are first converted to align with segment granularity. This is because, // when an overwriting task finds a version for a given input row, it expects the interval // associated to each version to be equal or larger than the time bucket where the input row falls in. // See ParallelIndexSupervisorTask.findVersion(). - final Set uniqueIntervals = new HashSet<>(); + final Iterator intervalIterator; final Granularity segmentGranularity = getSegmentGranularity(); - for (Interval interval : intervals) { - if (segmentGranularity == null) { - uniqueIntervals.add(interval); - } else { - Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval)); - } + if (segmentGranularity == null) { + intervalIterator = JodaUtils.condenseIntervals(intervals).iterator(); + } else { + IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); + // the following is calling a condense that does not materialize the intervals: + intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator()); } - // Condense intervals to avoid creating too many locks. - for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) { - final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)); + // Intervals are already condensed to avoid creating too many locks. + // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup. + Interval prev = null; + while (intervalIterator.hasNext()) { + final Interval cur = intervalIterator.next(); + if (prev != null && cur.equals(prev)) { + continue; + } + prev = cur; + final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur)); if (lock == null) { return false; } @@ -525,11 +509,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask * If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock * from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be * filtered by intervalsToRead, so they need to be locked. - * + *

* However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments * with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead * should be locked. - * + *

* The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list * only once. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 1236e359bd3..c11ce31c59a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -82,7 +82,6 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.SortedSet; public class HadoopIndexTask extends HadoopTask implements ChatHandler { @@ -189,12 +188,10 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - Optional> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals(); - if (intervals.isPresent()) { + Iterable intervals = spec.getDataSchema().getGranularitySpec().sortedBucketIntervals(); + if (intervals.iterator().hasNext()) { Interval interval = JodaUtils.umbrellaInterval( - JodaUtils.condenseIntervals( - intervals.get() - ) + JodaUtils.condenseIntervals(intervals) ); return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null; } else { @@ -312,7 +309,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); String hadoopJobIdFile = getHadoopJobIdFileName(); final ClassLoader loader = buildClassLoader(toolbox); - boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); + boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( spec, @@ -377,7 +374,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval( JodaUtils.condenseIntervals( - indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get() + indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() ) ); final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); @@ -621,7 +618,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } - /** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */ + /** + * Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. + */ @SuppressWarnings("unused") public static class HadoopDetermineConfigInnerProcessingRunner { @@ -770,9 +769,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler jobId }); - return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; + return new String[]{jobId, (res == 0 ? "Success" : "Fail")}; } - return new String[] {jobId, "Fail"}; + return new String[]{jobId, "Fail"}; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ce9da23b76f..88779ab717d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -123,7 +123,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.SortedSet; import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -231,7 +230,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler throw new UOE("partitionsSpec[%s] is not supported", tuningConfig.getPartitionsSpec().getClass().getName()); } } - return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.dataSchema.getGranularitySpec()); + return determineLockGranularityAndTryLock( + taskActionClient, + ingestionSchema.dataSchema.getGranularitySpec().inputIntervals() + ); } @Override @@ -452,10 +454,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() ); - final boolean determineIntervals = !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + final boolean determineIntervals = ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource( ingestionSchema.getDataSchema().getParser() @@ -591,7 +593,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); // Must determine intervals if unknown, since we acquire all locks before processing any data. - final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent(); + final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty(); // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value. final boolean determineNumPartitions = partitionsSpec.needsDeterminePartitions(false); @@ -630,7 +632,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @Nonnull DynamicPartitionsSpec partitionsSpec ) { - final SortedSet intervals = granularitySpec.bucketIntervals().get(); + final Iterable intervals = granularitySpec.sortedBucketIntervals(); final int numBucketsPerInterval = 1; final LinearPartitionAnalysis partitionAnalysis = new LinearPartitionAnalysis(partitionsSpec); intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 038e7884377..bd64fbd9567 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -109,7 +109,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -141,7 +140,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen * the segment granularity of existing segments until the task reads all data because we don't know what segments * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are missing and force to use timeChunk lock. - * + *

* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * in the task logs. */ @@ -194,10 +193,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ingestionSchema.getDataSchema().getParser() ); this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() - && !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + && ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); if (missingIntervalsInOverwriteMode) { addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -320,12 +319,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ) { return new PartialRangeSegmentGenerateParallelIndexTaskRunner( - toolbox, - getId(), - getGroupId(), - ingestionSchema, - getContext(), - intervalToPartitions + toolbox, + getId(), + getGroupId(), + ingestionSchema, + getContext(), + intervalToPartitions ); } @@ -350,7 +349,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec()); + return determineLockGranularityAndTryLock( + taskActionClient, + ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() + ); } @Override @@ -513,13 +515,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen /** * Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently * executed in two phases. - * + *

* - In the first phase, each task partitions input data and stores those partitions in local storage. - * - The partition is created based on the segment granularity (primary partition key) and the partition dimension - * values in {@link PartitionsSpec} (secondary partition key). - * - Partitioned data is maintained by {@link IntermediaryDataManager}. + * - The partition is created based on the segment granularity (primary partition key) and the partition dimension + * values in {@link PartitionsSpec} (secondary partition key). + * - Partitioned data is maintained by {@link IntermediaryDataManager}. * - In the second phase, each task reads partitioned data from the intermediary data server (middleManager - * or indexer) and merges them to create the final segments. + * or indexer) and merges them to create the final segments. */ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception { @@ -817,7 +819,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } private static - Map, List> groupPartitionLocationsPerPartition( + Map, List> groupPartitionLocationsPerPartition( Map> subTaskIdToReport, BiFunction createPartitionLocationFunction ) @@ -891,7 +893,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen * @param index index of partition * @param total number of items to partition * @param splits number of desired partitions - * * @return partition range: [lhs, rhs) */ private static Pair getPartitionBoundaries(int index, int total, int splits) @@ -1038,7 +1039,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen { final String dataSource = getDataSource(); final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec(); - final Optional> bucketIntervals = granularitySpec.bucketIntervals(); + // This method is called whenever subtasks need to allocate a new segment via the supervisor task. + // As a result, this code is never called in the Overlord. For now using the materialized intervals + // here is ok for performance reasons + final Set materializedBucketIntervals = granularitySpec.materializedBucketIntervals(); // List locks whenever allocating a new segment because locks might be revoked and no longer valid. final List locks = toolbox @@ -1054,7 +1058,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen Interval interval; String version; - if (bucketIntervals.isPresent()) { + if (!materializedBucketIntervals.isEmpty()) { // If granularity spec has explicit intervals, we just need to find the version associated to the interval. // This is because we should have gotten all required locks up front when the task starts up. final Optional maybeInterval = granularitySpec.bucketInterval(timestamp); @@ -1063,7 +1067,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } interval = maybeInterval.get(); - if (!bucketIntervals.get().contains(interval)) { + if (!materializedBucketIntervals.contains(interval)) { throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index 6bec35d303f..a0a6179f8b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -106,7 +106,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask ); } - @VisibleForTesting // Only for testing + @VisibleForTesting PartialDimensionDistributionTask( @Nullable String id, final String groupId, @@ -334,7 +334,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY); } - @VisibleForTesting // to allow controlling false positive rate of bloom filter + @VisibleForTesting + // to allow controlling false positive rate of bloom filter DedupInputRowFilter( Granularity queryGranularity, int bloomFilterExpectedInsertions, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index b252e5dfb5e..da228765ba2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.SortedSet; import java.util.stream.Collectors; /** @@ -194,7 +193,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask intervals = granularitySpec.bucketIntervals().get(); + final Iterable intervals = granularitySpec.sortedBucketIntervals(); final int numBucketsPerInterval = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 2cc9f723e17..ec019b10135 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -98,7 +98,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask * the segment granularity of existing segments until the task reads all data because we don't know what segments * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are missing and force to use timeChunk lock. - * + *

* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * in the task logs. */ @@ -132,10 +132,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask this.ingestionSchema = ingestionSchema; this.supervisorTaskId = supervisorTaskId; this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() - && !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + && ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); if (missingIntervalsInOverwriteMode) { addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -152,7 +152,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask { return determineLockGranularityAndTryLock( new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), - ingestionSchema.getDataSchema().getGranularitySpec() + ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -263,7 +263,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows} * * - * + *

* At the end of this method, all the remaining segments are published. * * @return true if generated segments are successfully published, otherwise false @@ -291,7 +291,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec(); final long pushTimeout = tuningConfig.getPushTimeout(); - final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); + final boolean explicitIntervals = !granularitySpec.inputIntervals().isEmpty(); final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning( toolbox, getId(), diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index b2e18318b62..0f9774994b1 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -21,29 +21,20 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.Comparators; -import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -public class ArbitraryGranularitySpec implements GranularitySpec +public class ArbitraryGranularitySpec extends BaseGranularitySpec { - private final TreeSet intervals; private final Granularity queryGranularity; - private final Boolean rollup; + protected LookupIntervalBuckets lookupTableBucketByDateTime; @JsonCreator public ArbitraryGranularitySpec( @@ -52,22 +43,15 @@ public class ArbitraryGranularitySpec implements GranularitySpec @JsonProperty("intervals") @Nullable List inputIntervals ) { + super(inputIntervals, rollup); this.queryGranularity = queryGranularity == null ? Granularities.NONE : queryGranularity; - this.rollup = rollup == null ? Boolean.TRUE : rollup; - this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - if (inputIntervals == null) { - inputIntervals = new ArrayList<>(); - } - - // Insert all intervals - intervals.addAll(inputIntervals); + lookupTableBucketByDateTime = new LookupIntervalBuckets(inputIntervals); // Ensure intervals are non-overlapping (but they may abut each other) - final PeekingIterator intervalIterator = Iterators.peekingIterator(intervals.iterator()); + final PeekingIterator intervalIterator = Iterators.peekingIterator(sortedBucketIntervals().iterator()); while (intervalIterator.hasNext()) { final Interval currentInterval = intervalIterator.next(); - if (intervalIterator.hasNext()) { final Interval nextInterval = intervalIterator.peek(); if (currentInterval.overlaps(nextInterval)) { @@ -86,29 +70,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec } @Override - @JsonProperty("intervals") - public Optional> bucketIntervals() + public Iterable sortedBucketIntervals() { - return Optional.of(intervals); - } - - @Override - public List inputIntervals() - { - return ImmutableList.copyOf(intervals); - } - - @Override - public Optional bucketInterval(DateTime dt) - { - // First interval with start time ≤ dt - final Interval interval = intervals.floor(new Interval(dt, DateTimes.MAX)); - - if (interval != null && interval.contains(dt)) { - return Optional.of(interval); - } else { - return Optional.absent(); - } + return () -> lookupTableBucketByDateTime.iterator(); } @Override @@ -117,13 +81,6 @@ public class ArbitraryGranularitySpec implements GranularitySpec throw new UnsupportedOperationException(); } - @Override - @JsonProperty("rollup") - public boolean isRollup() - { - return rollup; - } - @Override @JsonProperty("queryGranularity") public Granularity getQueryGranularity() @@ -143,7 +100,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o; - if (!intervals.equals(that.intervals)) { + if (!inputIntervals().equals(that.inputIntervals())) { return false; } if (!rollup.equals(that.rollup)) { @@ -159,7 +116,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec @Override public int hashCode() { - int result = intervals.hashCode(); + int result = inputIntervals().hashCode(); result = 31 * result + rollup.hashCode(); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); return result; @@ -169,7 +126,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec public String toString() { return "ArbitraryGranularitySpec{" + - "intervals=" + intervals + + "intervals=" + inputIntervals() + ", queryGranularity=" + queryGranularity + ", rollup=" + rollup + '}'; @@ -180,4 +137,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec { return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals); } + + @Override + protected LookupIntervalBuckets getLookupTableBuckets() + { + return lookupTableBucketByDateTime; + } + } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java new file mode 100644 index 00000000000..9ff114b5d68 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing.granularity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +abstract class BaseGranularitySpec implements GranularitySpec +{ + protected List inputIntervals; + protected final Boolean rollup; + + public BaseGranularitySpec(List inputIntervals, Boolean rollup) + { + if (inputIntervals != null) { + this.inputIntervals = ImmutableList.copyOf(inputIntervals); + } else { + this.inputIntervals = Collections.emptyList(); + } + this.rollup = rollup == null ? Boolean.TRUE : rollup; + } + + @Override + @JsonProperty("intervals") + public List inputIntervals() + { + return inputIntervals; + } + + @Override + @JsonProperty("rollup") + public boolean isRollup() + { + return rollup; + } + + @Override + public Optional bucketInterval(DateTime dt) + { + return getLookupTableBuckets().bucketInterval(dt); + } + + @Override + public TreeSet materializedBucketIntervals() + { + return getLookupTableBuckets().materializedIntervals(); + } + + protected abstract LookupIntervalBuckets getLookupTableBuckets(); + + /** + * This is a helper class to facilitate sharing the code for sortedBucketIntervals among + * the various GranularitySpec implementations. In particular, the UniformGranularitySpec + * needs to avoid materializing the intervals when the need to traverse them arises. + */ + protected static class LookupIntervalBuckets + { + private final Iterable intervalIterable; + private final TreeSet intervals; + + /** + * @param intervalIterable The intervals to materialize + */ + public LookupIntervalBuckets(Iterable intervalIterable) + { + this.intervalIterable = intervalIterable; + // The tree set will be materialized on demand (see below) to avoid client code + // blowing up when constructing this data structure and when the + // number of intervals is very large... + this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); + } + + /** + * Returns a bucket interval using a fast lookup into an efficient data structure + * where all the intervals have been materialized + * + * @param dt The date time to lookup + * @return An Optional containing the interval for the given DateTime if it exists + */ + public Optional bucketInterval(DateTime dt) + { + final Interval interval = materializedIntervals().floor(new Interval(dt, DateTimes.MAX)); + if (interval != null && interval.contains(dt)) { + return Optional.of(interval); + } else { + return Optional.absent(); + } + } + + /** + * @return An iterator to traverse the materialized intervals. The traversal will be done in + * order as dictated by Comparators.intervalsByStartThenEnd() + */ + public Iterator iterator() + { + return materializedIntervals().iterator(); + } + + /** + * Helper method to avoid collecting the intervals from the iterator + * + * @return The TreeSet of materialized intervals + */ + public TreeSet materializedIntervals() + { + if (intervalIterable != null && intervalIterable.iterator().hasNext() && intervals.isEmpty()) { + Iterators.addAll(intervals, intervalIterable.iterator()); + } + return intervals; + } + } +} diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java index 9272f6961e4..c3bb5793c97 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java @@ -27,7 +27,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.List; -import java.util.SortedSet; +import java.util.TreeSet; /** * Tells the indexer how to group events based on timestamp. The events may then be further partitioned based @@ -41,11 +41,13 @@ import java.util.SortedSet; public interface GranularitySpec { /** - * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping. + * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping. * - * @return set of all time groups + * @return Iterable of all time groups */ - Optional> bucketIntervals(); + Iterable sortedBucketIntervals(); + + /** * Returns user provided intervals as-is state. used for configuring granular path spec @@ -58,11 +60,18 @@ public interface GranularitySpec * Time-grouping interval corresponding to some instant, if any. * * @param dt instant to return time interval for - * * @return optional time interval */ Optional bucketInterval(DateTime dt); + /** + * This is a helper method for areas of the code, not in the overlord, were for performance + * reasons might need the materialized set of bucket intervals + * @return A fast lookup, ordered set, of the materialized bucket interval + * + */ + TreeSet materializedBucketIntervals(); + Granularity getSegmentGranularity(); boolean isRollup(); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java index fae680aba9b..5fe6a9ed065 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -21,28 +21,22 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.joda.time.DateTime; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.joda.time.Interval; -import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -public class UniformGranularitySpec implements GranularitySpec +public class UniformGranularitySpec extends BaseGranularitySpec { private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; private final Granularity segmentGranularity; private final Granularity queryGranularity; - private final Boolean rollup; - private final List inputIntervals; - private final ArbitraryGranularitySpec wrappedSpec; + private final IntervalsByGranularity intervalsByGranularity; + protected LookupIntervalBuckets lookupTableBucketByDateTime; @JsonCreator public UniformGranularitySpec( @@ -52,21 +46,11 @@ public class UniformGranularitySpec implements GranularitySpec @JsonProperty("intervals") List inputIntervals ) { + super(inputIntervals, rollup); this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity; - this.rollup = rollup == null ? Boolean.TRUE : rollup; this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; - - if (inputIntervals != null) { - List granularIntervals = new ArrayList<>(); - for (Interval inputInterval : inputIntervals) { - Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); - } - this.inputIntervals = ImmutableList.copyOf(inputIntervals); - this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals); - } else { - this.inputIntervals = null; - this.wrappedSpec = null; - } + intervalsByGranularity = new IntervalsByGranularity(this.inputIntervals, segmentGranularity); + lookupTableBucketByDateTime = new LookupIntervalBuckets(sortedBucketIntervals()); } public UniformGranularitySpec( @@ -79,29 +63,9 @@ public class UniformGranularitySpec implements GranularitySpec } @Override - public Optional> bucketIntervals() + public Iterable sortedBucketIntervals() { - if (wrappedSpec == null) { - return Optional.absent(); - } else { - return wrappedSpec.bucketIntervals(); - } - } - - @Override - public List inputIntervals() - { - return inputIntervals == null ? ImmutableList.of() : ImmutableList.copyOf(inputIntervals); - } - - @Override - public Optional bucketInterval(DateTime dt) - { - if (wrappedSpec == null) { - return Optional.absent(); - } else { - return wrappedSpec.bucketInterval(dt); - } + return () -> intervalsByGranularity.granularityIntervalsIterator(); } @Override @@ -111,13 +75,6 @@ public class UniformGranularitySpec implements GranularitySpec return segmentGranularity; } - @Override - @JsonProperty("rollup") - public boolean isRollup() - { - return rollup; - } - @Override @JsonProperty("queryGranularity") public Granularity getQueryGranularity() @@ -125,12 +82,6 @@ public class UniformGranularitySpec implements GranularitySpec return queryGranularity; } - @JsonProperty("intervals") - public Optional> getIntervals() - { - return Optional.fromNullable(inputIntervals); - } - @Override public boolean equals(Object o) { @@ -149,14 +100,15 @@ public class UniformGranularitySpec implements GranularitySpec if (!queryGranularity.equals(that.queryGranularity)) { return false; } - if (!rollup.equals(that.rollup)) { + if (isRollup() != that.isRollup()) { return false; } if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) { return false; } - return !(wrappedSpec != null ? !wrappedSpec.equals(that.wrappedSpec) : that.wrappedSpec != null); + + return true; } @@ -167,7 +119,6 @@ public class UniformGranularitySpec implements GranularitySpec result = 31 * result + queryGranularity.hashCode(); result = 31 * result + rollup.hashCode(); result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0); - result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0); return result; } @@ -179,7 +130,6 @@ public class UniformGranularitySpec implements GranularitySpec ", queryGranularity=" + queryGranularity + ", rollup=" + rollup + ", inputIntervals=" + inputIntervals + - ", wrappedSpec=" + wrappedSpec + '}'; } @@ -188,4 +138,11 @@ public class UniformGranularitySpec implements GranularitySpec { return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals); } + + @Override + protected LookupIntervalBuckets getLookupTableBuckets() + { + return lookupTableBucketByDateTime; + } + } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index d7dff7a31cb..a0154228a61 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -47,7 +48,8 @@ public class ArbitraryGranularityTest Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - )); + ) + ); Assert.assertNotNull(spec.getQueryGranularity()); } @@ -57,12 +59,13 @@ public class ArbitraryGranularityTest final GranularitySpec spec = new ArbitraryGranularitySpec( Granularities.NONE, Lists.newArrayList( - Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), - Intervals.of("2012-02-01T00Z/2012-03-01T00Z"), - Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), - Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), - Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - )); + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-02-01T00Z/2012-03-01T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ) + ); Assert.assertTrue(spec.isRollup()); @@ -74,7 +77,17 @@ public class ArbitraryGranularityTest Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), Intervals.of("2012-02-01T00Z/2012-03-01T00Z") ), - Lists.newArrayList(spec.bucketIntervals().get()) + Lists.newArrayList(spec.sortedBucketIntervals()) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-01T00Z/2012-01-03T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-01T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-08T00Z")) ); Assert.assertEquals( @@ -118,6 +131,7 @@ public class ArbitraryGranularityTest Optional.absent(), spec.bucketInterval(DateTimes.of("2012-01-05T00Z")) ); + } @Test @@ -187,10 +201,26 @@ public class ArbitraryGranularityTest try { final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class); - Assert.assertEquals("Round-trip", spec.bucketIntervals(), rtSpec.bucketIntervals()); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals()) + ); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.inputIntervals()), + ImmutableList.copyOf(rtSpec.inputIntervals()) + ); } catch (Exception e) { throw new RuntimeException(e); } } + + @Test + public void testNullInputIntervals() + { + final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, null); + Assert.assertFalse(spec.sortedBucketIntervals().iterator().hasNext()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java index 402f7fd6d3e..27760f63c18 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -21,6 +21,8 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -34,8 +36,8 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.SortedSet; public class UniformGranularityTest { @@ -44,19 +46,26 @@ public class UniformGranularityTest @Test public void testSimple() { + + final List inputIntervals = Lists.newArrayList( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ); final GranularitySpec spec = new UniformGranularitySpec( Granularities.DAY, null, - Lists.newArrayList( - Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), - Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), - Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), - Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - ) + inputIntervals ); Assert.assertTrue(spec.isRollup()); + Assert.assertEquals( + inputIntervals, + Lists.newArrayList(spec.inputIntervals()) + ); + Assert.assertEquals( Lists.newArrayList( Intervals.of("2012-01-01T00Z/P1D"), @@ -67,7 +76,28 @@ public class UniformGranularityTest Intervals.of("2012-01-09T00Z/P1D"), Intervals.of("2012-01-10T00Z/P1D") ), - Lists.newArrayList(spec.bucketIntervals().get()) + Lists.newArrayList(spec.sortedBucketIntervals()) + ); + + + Assert.assertEquals( + Optional.absent(), + spec.bucketInterval(DateTimes.of("2011-01-12T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-01T00Z/2012-01-02T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-01T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-10T00Z/2012-01-11T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-10T00Z")) + ); + + Assert.assertEquals( + Optional.absent(), + spec.bucketInterval(DateTimes.of("2012-01-12T00Z")) ); Assert.assertEquals( @@ -99,6 +129,7 @@ public class UniformGranularityTest Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")), spec.bucketInterval(DateTimes.of("2012-01-08T01Z")) ); + } @Test @@ -132,9 +163,9 @@ public class UniformGranularityTest try { final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class); Assert.assertEquals( - "Round-trip bucketIntervals", - spec.bucketIntervals(), - rtSpec.bucketIntervals() + "Round-trip sortedBucketIntervals", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator()) ); Assert.assertEquals( "Round-trip granularity", @@ -253,10 +284,9 @@ public class UniformGranularityTest ) ); - Assert.assertTrue(spec.bucketIntervals().isPresent()); + Assert.assertTrue(spec.sortedBucketIntervals().iterator().hasNext()); - final Optional> sortedSetOptional = spec.bucketIntervals(); - final SortedSet intervals = sortedSetOptional.get(); + final Iterable intervals = spec.sortedBucketIntervals(); ArrayList actualIntervals = new ArrayList<>(); for (Interval interval : intervals) { actualIntervals.add(interval.toDurationMillis()); @@ -279,6 +309,25 @@ public class UniformGranularityTest Assert.assertEquals(expectedIntervals, actualIntervals); } + @Test + public void testUniformGranularitySpecWithLargeNumberOfIntervalsDoesNotBlowUp() + { + // just make sure that intervals for uniform spec are not materialized (causing OOM) when created + final GranularitySpec spec = new UniformGranularitySpec( + Granularities.SECOND, + null, + Collections.singletonList( + Intervals.of("2012-01-01T00Z/P10Y") + ) + ); + + Assert.assertTrue(spec != null); + + int count = Iterators.size(spec.sortedBucketIntervals().iterator()); + // account for three leap years... + Assert.assertEquals(3600 * 24 * 365 * 10 + 3 * 24 * 3600, count); + } + private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2) { Assert.assertNotEquals(spec1, spec2);