From 0e4750bac208d99f4a07545cc2f401f9bcdc1381 Mon Sep 17 00:00:00 2001 From: Agustin Gonzalez Date: Fri, 29 Jan 2021 07:02:10 -0700 Subject: [PATCH] Granularity interval materialization (#10742) * Prevent interval materialization for UniformGranularitySpec inside the overlord * Change API of bucketIntervals in GranularitySpec to return an Iterable * Javadoc update, respect inputIntervals contract * Eliminate dependency on wrappedspec (i.e. ArbitraryGranularity) in UniformGranularitySpec * Added one boundary condition test to UniformGranularityTest and fixed Travis forbidden method errors in IntervalsByGranularity * Fix Travis style & other checks * Refactor TreeSet to facilitate re-use in UniformGranularitySpec * Make sure intervals are unique when there is no segment granularity * Style/bugspot fixes... * More travis checks * Add condensedIntervals method to GranularitySpec and pass it as needed to the lock method * Style & PR feedback * Fixed failing test * Fixed bug in IntervalsByGranularity iterator that it would return repeated elements (see added unit tests that were broken before this change) * Refactor so that we can get the condensed buckets without materializing the intervals * Get rid of GranularitySpec::condensedInputIntervals ... not needed * Travis failures fixes * Travis checkstyle fix * Edited/added javadoc comments and a method name (code review feedback) * Fixed jacoco coverage by moving class and adding more coverage * Avoid materializing the condensed intervals when locking * Deal with overlapping intervals * Remove code and use library code instead * Refactor intervals by granularity using the FluentIterable, add sanity checks * Change !hasNext() to inputIntervals().isEmpty() * Remove redundant lambda * Use materialized intervals here since this is outside the overlord (for performance) * Name refactor to reflect the fact that bucket intervals are sorted. * Style fixes * Removed redundant method and have condensedIntervalIterator throw IAE when element is null for consistency with other methods in this class (as well that null interval when condensing does not make sense) * Remove forbidden api * Move helper class inside common base class to reduce public space pollution --- .../druid/java/util/common/JodaUtils.java | 149 ++++++++-- .../granularity/IntervalsByGranularity.java | 100 +++++++ .../druid/common/utils/JodaUtilsTest.java | 199 ++++++++++++- .../common/IntervalsByGranularityTest.java | 266 ++++++++++++++++++ .../indexer/DetermineHashedPartitionsJob.java | 26 +- .../druid/indexer/DeterminePartitionsJob.java | 6 +- .../HadoopDruidDetermineConfigurationJob.java | 3 +- .../indexer/HadoopDruidIndexerConfig.java | 30 +- .../indexer/HadoopDruidIndexerMapper.java | 2 +- ...oopDruidDetermineConfigurationJobTest.java | 5 +- .../indexer/HadoopIngestionSpecTest.java | 2 +- .../druid/indexer/IndexGeneratorJobTest.java | 2 +- .../common/task/AbstractBatchIndexTask.java | 70 ++--- .../indexing/common/task/HadoopIndexTask.java | 21 +- .../druid/indexing/common/task/IndexTask.java | 18 +- .../parallel/ParallelIndexSupervisorTask.java | 50 ++-- .../PartialDimensionDistributionTask.java | 5 +- .../PartialHashSegmentGenerateTask.java | 3 +- .../batch/parallel/SinglePhaseSubTask.java | 16 +- .../granularity/ArbitraryGranularitySpec.java | 70 ++--- .../granularity/BaseGranularitySpec.java | 140 +++++++++ .../indexing/granularity/GranularitySpec.java | 19 +- .../granularity/UniformGranularitySpec.java | 81 ++---- .../granularity/ArbitraryGranularityTest.java | 48 +++- .../granularity/UniformGranularityTest.java | 77 ++++- 25 files changed, 1106 insertions(+), 302 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java create mode 100644 core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java create mode 100644 server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java index a2256936b83..96c49d9ba0d 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/JodaUtils.java @@ -19,16 +19,23 @@ package org.apache.druid.java.util.common; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; import java.util.SortedSet; import java.util.TreeSet; /** + * */ public class JodaUtils { @@ -36,10 +43,17 @@ public class JodaUtils public static final long MAX_INSTANT = Long.MAX_VALUE / 2; public static final long MIN_INSTANT = Long.MIN_VALUE / 2; - public static ArrayList condenseIntervals(Iterable intervals) + /** + * This method will not materialize the input intervals if they represent + * a SortedSet (i.e. implement that interface). If not, the method internally + * creates a sorted set and populates it with them thus materializing the + * intervals in the input. + * + * @param intervals The Iterable object containing the intervals to condense + * @return The condensed intervals + */ + public static List condenseIntervals(Iterable intervals) { - ArrayList retVal = new ArrayList<>(); - final SortedSet sortedIntervals; if (intervals instanceof SortedSet) { @@ -50,35 +64,122 @@ public class JodaUtils sortedIntervals.add(interval); } } + return ImmutableList.copyOf(condensedIntervalsIterator(sortedIntervals.iterator())); + } - if (sortedIntervals.isEmpty()) { - return new ArrayList<>(); + /** + * This method does not materialize the intervals represented by the + * sortedIntervals iterator. However, caller needs to insure that sortedIntervals + * is already sorted in ascending order (use the Comparators.intervalsByStartThenEnd()). + * It avoids materialization by incrementally condensing the intervals by + * starting from the first and looking for "adjacent" intervals. This is + * possible since intervals in the Iterator are in ascending order (as + * guaranteed by the caller). + *

+ * * + * + * @param sortedIntervals The iterator object containing the intervals to condense + * @return An iterator for the condensed intervals. By construction the condensed intervals are sorted + * in ascending order and contain no repeated elements. The iterator can contain nulls, + * they will be skipped if it does. + * @throws IAE if an element is null or if sortedIntervals is not sorted in ascending order + */ + public static Iterator condensedIntervalsIterator(Iterator sortedIntervals) + { + + if (sortedIntervals == null || !sortedIntervals.hasNext()) { + return Collections.emptyIterator(); } - Iterator intervalsIter = sortedIntervals.iterator(); - Interval currInterval = intervalsIter.next(); - while (intervalsIter.hasNext()) { - Interval next = intervalsIter.next(); + final PeekingIterator peekingIterator = Iterators.peekingIterator(sortedIntervals); + return new Iterator() + { + private Interval previous; - if (currInterval.abuts(next)) { - currInterval = new Interval(currInterval.getStart(), next.getEnd()); - } else if (currInterval.overlaps(next)) { - DateTime nextEnd = next.getEnd(); - DateTime currEnd = currInterval.getEnd(); - currInterval = new Interval( - currInterval.getStart(), - nextEnd.isAfter(currEnd) ? nextEnd : currEnd - ); - } else { - retVal.add(currInterval); - currInterval = next; + @Override + public boolean hasNext() + { + return peekingIterator.hasNext(); } - } - retVal.add(currInterval); + @Override + public Interval next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + Interval currInterval = peekingIterator.next(); + if (currInterval == null) { + throw new IAE("Element of intervals is null"); + } + + // check sorted ascending: + verifyAscendingSortOrder(previous, currInterval); + + previous = currInterval; + + while (hasNext()) { + Interval next = peekingIterator.peek(); + if (next == null) { + throw new IAE("Element of intervals is null"); + } + + if (currInterval.abuts(next)) { + currInterval = new Interval(currInterval.getStart(), next.getEnd()); + peekingIterator.next(); + } else if (currInterval.overlaps(next)) { + DateTime nextEnd = next.getEnd(); + DateTime currEnd = currInterval.getEnd(); + currInterval = new Interval( + currInterval.getStart(), + nextEnd.isAfter(currEnd) ? nextEnd : currEnd + ); + peekingIterator.next(); + } else { + break; + } + } + return currInterval; + } + }; + } + + /** + * Verify whether an iterable of intervals contains overlapping intervals + * + * @param intervals An interval iterable sorted using Comparators.intervalsByStartThenEnd() + * @return true if the iterable contains at least two overlapping intervals, false otherwise. + * @throws IAE when at least an element is null or when the iterable is not sorted + */ + public static boolean containOverlappingIntervals(Iterable intervals) + { + if (intervals == null) { + return false; + } + boolean retVal = false; + Interval previous = null; + for (Interval current : intervals) { + if (current == null) { + throw new IAE("Intervals should not contain nulls"); + } + verifyAscendingSortOrder(previous, current); + if (previous != null && previous.overlaps(current)) { + retVal = true; + break; + } + previous = current; + } return retVal; } + private static void verifyAscendingSortOrder(Interval previous, Interval current) + { + if (previous != null && previous.isAfter(current)) { + throw new IAE("Adjacent intervals are not sorted [%s,%s]", previous, current); + } + } + public static Interval umbrellaInterval(Iterable intervals) { ArrayList startDates = new ArrayList<>(); @@ -137,4 +238,6 @@ public class JodaUtils return max; } } + + } diff --git a/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java new file mode 100644 index 00000000000..7065535eeb4 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/granularity/IntervalsByGranularity.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common.granularity; + +import com.google.common.collect.FluentIterable; +import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Produce a stream of intervals generated by a given set of intervals as input and a given + * granularity. This class avoids materializing the granularity intervals whenever possible. + */ +public class IntervalsByGranularity +{ + private final List sortedNonOverlappingIntervals; + private final Granularity granularity; + + /** + * @param intervals Intervals for which to apply the given granularity. They should + * not contain overlapped intervals. + * @param granularity The granularity to apply + * @throws IAE if intervals contains at least an overlapping pair + */ + public IntervalsByGranularity(Collection intervals, Granularity granularity) + { + // eliminate dups, sort intervals: + Set intervalSet = new HashSet<>(intervals); + List inputIntervals = new ArrayList<>(intervals.size()); + inputIntervals.addAll(intervalSet); + inputIntervals.sort(Comparators.intervalsByStartThenEnd()); + + // sanity check + if (JodaUtils.containOverlappingIntervals(inputIntervals)) { + throw new IAE("Intervals contain overlapping intervals [%s]", intervals); + } + + // all good: + sortedNonOverlappingIntervals = inputIntervals; + this.granularity = granularity; + } + + /** + * @return The intervals according the granularity. The intervals are provided in + * order according to Comparators.intervalsByStartThenEnd() + */ + public Iterator granularityIntervalsIterator() + { + Iterator ite; + if (sortedNonOverlappingIntervals.isEmpty()) { + ite = Collections.emptyIterator(); + } else { + // The filter after transform & concat is to remove duplicats. + // This can happen when condense left intervals that did not overlap but + // when a larger granularity is applied then they become equal + // imagine input are 2013-01-01T00Z/2013-01-10T00Z, 2013-01-15T00Z/2013-01-20T00Z. + // the iterator for the two intervals is called, say with MONTH granularity, two + // intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z. + // Thus dups can be created given the right conditions.... + final SettableSupplier previous = new SettableSupplier<>(); + ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable) + .filter(interval -> { + if (previous.get() != null && previous.get().equals(interval)) { + return false; + } + previous.set(interval); + return true; + }).iterator(); + } + return ite; + } + +} diff --git a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java index c5610f969c8..d7bfb4adffd 100644 --- a/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/utils/JodaUtilsTest.java @@ -19,8 +19,11 @@ package org.apache.druid.common.utils; +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; @@ -32,6 +35,7 @@ import java.util.Collections; import java.util.List; /** + * */ public class JodaUtilsTest { @@ -75,6 +79,40 @@ public class JodaUtilsTest Intervals.of("2011-03-05/2011-03-06") ); + List expected = Arrays.asList( + Intervals.of("2011-01-01/2011-01-03"), + Intervals.of("2011-02-01/2011-02-08"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + + List actual = JodaUtils.condenseIntervals(intervals); + + Assert.assertEquals( + expected, + actual + ); + + } + + + @Test + public void testCondenseIntervalsSimpleSortedIterator() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + intervals.sort(Comparators.intervalsByStartThenEnd()); + + List actual = ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); Assert.assertEquals( Arrays.asList( Intervals.of("2011-01-01/2011-01-03"), @@ -83,7 +121,116 @@ public class JodaUtilsTest Intervals.of("2011-03-03/2011-03-04"), Intervals.of("2011-03-05/2011-03-06") ), - JodaUtils.condenseIntervals(intervals) + actual + ); + + } + + @Test + public void testCondenseIntervalsSimpleSortedIteratorOverlapping() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-01-07/2015-01-19"), + Intervals.of("2011-01-15/2011-01-19"), + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-01/2011-03-10") + ); + + intervals.sort(Comparators.intervalsByStartThenEnd()); + + Assert.assertEquals( + Collections.singletonList( + Intervals.of("2011-01-01/2015-01-19") + ), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())) + ); + } + + @Test(expected = IAE.class) + public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullsShouldThrow() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-02-01/2011-03-10"), + null, + Intervals.of("2011-03-07/2011-04-19"), + Intervals.of("2011-04-01/2015-01-19"), + null + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IAE.class) + public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullFirstAndLastshouldThrow() + { + List intervals = Arrays.asList( + null, + Intervals.of("2011-01-02/2011-02-03"), + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-07/2011-04-19"), + Intervals.of("2011-04-01/2015-01-19"), + null + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IllegalArgumentException.class) + public void testCondenseIntervalsSimpleUnsortedIterator() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test(expected = IllegalArgumentException.class) + public void testCondenseIntervalsSimpleUnsortedIteratorSmallestAtEnd() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-01/2011-02-04"), + Intervals.of("2011-03-01/2011-03-04"), + Intervals.of("2010-01-01/2010-03-04") + ); + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())); + } + + @Test + public void testCondenseIntervalsIteratorWithDups() + { + List intervals = Arrays.asList( + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-02-04/2011-02-05"), + Intervals.of("2011-01-01/2011-01-02"), + Intervals.of("2011-01-02/2011-01-03"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-03/2011-02-08"), + Intervals.of("2011-02-01/2011-02-02"), + Intervals.of("2011-02-01/2011-02-05"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ); + intervals.sort(Comparators.intervalsByStartThenEnd()); + + Assert.assertEquals( + Arrays.asList( + Intervals.of("2011-01-01/2011-01-03"), + Intervals.of("2011-02-01/2011-02-08"), + Intervals.of("2011-03-01/2011-03-02"), + Intervals.of("2011-03-03/2011-03-04"), + Intervals.of("2011-03-05/2011-03-06") + ), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator())) ); } @@ -144,4 +291,54 @@ public class JodaUtilsTest Assert.assertEquals(Long.MAX_VALUE, period.getMinutes()); } + @Test + public void testShouldContainOverlappingIntervals() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-25/2011-04-03"), + Intervals.of("2011-04-01/2015-01-19"), + Intervals.of("2016-01-15/2016-01-19") + ); + Assert.assertTrue(JodaUtils.containOverlappingIntervals(intervals)); + } + + + @Test + public void testShouldNotContainOverlappingIntervals() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-10/2011-04-03"), + Intervals.of("2011-04-04/2015-01-14"), + Intervals.of("2016-01-15/2016-01-19") + ); + Assert.assertFalse(JodaUtils.containOverlappingIntervals(intervals)); + } + + @Test(expected = IAE.class) + public void testOverlappingIntervalsContainsNull() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + null, + Intervals.of("2011-04-04/2015-01-14"), + Intervals.of("2016-01-15/2016-01-19") + ); + JodaUtils.containOverlappingIntervals(intervals); + } + + @Test(expected = IAE.class) + public void testOverlappingIntervalsContainsUnsorted() + { + List intervals = Arrays.asList( + Intervals.of("2011-02-01/2011-03-10"), + Intervals.of("2011-03-10/2011-04-03"), + Intervals.of("2016-01-15/2016-01-19"), + Intervals.of("2011-04-04/2015-01-14") + ); + JodaUtils.containOverlappingIntervals(intervals); + } + + } diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java new file mode 100644 index 00000000000..a38e6d5dbe8 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsByGranularityTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public class IntervalsByGranularityTest +{ + private static final long SECONDS_IN_YEAR = 31536000; + + + @Test + public void testTrivialIntervalExplosion() + { + Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); + Interval second = Intervals.of("2012-01-01T00Z/2012-02-01T00Z"); + Interval third = Intervals.of("2002-01-01T00Z/2003-01-01T00Z"); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("DAY") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 62 + 365); + + granularityIntervals = intervals.granularityIntervalsIterator(); + count = getCountWithNoHasNext(granularityIntervals); + Assert.assertTrue(count == 62 + 365); + } + + + @Test + public void testDups() + { + Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); + Interval second = Intervals.of("2012-04-01T00Z/2012-05-01T00Z"); + Interval third = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); // dup + + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("DAY") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 61); + } + + + @Test + public void testCondenseForManyIntervals() + { + // This method attempts to test that there are no issues when condensed is called + // with an iterator pointing to millions of intervals (since the version of condensed + // used here takes an interval iterator and does not materialize intervals) + Interval first = Intervals.of("2012-01-01T00Z/P1Y"); + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first), + Granularity.fromString("SECOND") + ); + Assert.assertEquals( + ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")), + ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator())) + ); + } + + @Test + public void testIntervalExplosion() + { + Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z"); + Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z"); + Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z"); + IntervalsByGranularity intervals = new IntervalsByGranularity( + ImmutableList.of(first, second, third), + Granularity.fromString("SECOND") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 78537600); + + } + + @Test + public void testSimpleEliminateRepeated() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + + Assert.assertEquals( + ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")), + ImmutableList.copyOf(intervals.granularityIntervalsIterator()) + ); + + } + + @Test + public void testALittleMoreComplexEliminateRepeated() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2015-01-08T00Z/2015-01-11T00Z"), + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z"), + Intervals.of("2007-03-08T00Z/2007-04-11T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + + Assert.assertEquals( + ImmutableList.of( + Intervals.of("2007-03-01T00Z/2007-04-01T00Z"), + Intervals.of("2007-04-01T00Z/2007-05-01T00Z"), + Intervals.of("2012-01-01T00Z/2012-02-01T00Z"), + Intervals.of("2015-01-01T00Z/2015-02-01T00Z") + ), + ImmutableList.copyOf(intervals.granularityIntervalsIterator()) + ); + + } + + @Test(expected = IAE.class) + public void testOverlappingShouldThrow() + { + List inputIntervals = ImmutableList.of( + Intervals.of("2013-01-01T00Z/2013-01-11T00Z"), + Intervals.of("2013-01-05T00Z/2013-01-08T00Z"), + Intervals.of("2013-01-07T00Z/2013-01-15T00Z") + ); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularity.fromString("DAY") + ); + } + + + @Test + public void testWithGranularity() + { + List inputIntervals = ImmutableList.of( + Intervals.of("2013-01-01T00Z/2013-01-10T00Z"), + Intervals.of("2013-01-15T00Z/2013-01-20T00Z"), + Intervals.of("2013-02-07T00Z/2013-02-15T00Z") + ); + + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularity.fromString("MONTH") + ); + + // get count: + Iterator granularityIntervals = intervals.granularityIntervalsIterator(); + long count = getCount(granularityIntervals); + Assert.assertTrue(count == 2); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveThrowsException() + { + final List inputIntervals = ImmutableList.of( + Intervals.of("2015-01-08T00Z/2015-01-11T00Z") + ); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + intervals.granularityIntervalsIterator().remove(); + } + + @Test + public void testEmptyInput() + { + final List inputIntervals = Collections.emptyList(); + IntervalsByGranularity intervals = new IntervalsByGranularity( + inputIntervals, + Granularities.MONTH + ); + Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext()); + } + + private long getCount(Iterator granularityIntervalIterator) + { + long count = 0; + Interval previous = null; + Interval current; + while (granularityIntervalIterator.hasNext()) { + current = granularityIntervalIterator.next(); + if (previous != null) { + Assert.assertTrue(previous + "," + current, previous.getEndMillis() <= current.getStartMillis()); + } + previous = current; + count++; + } + return count; + } + + private long getCountWithNoHasNext(Iterator granularityIntervalIterator) + { + long count = 0; + Interval previous = null; + Interval current; + + while (true) { + try { + current = granularityIntervalIterator.next(); + } + catch (NoSuchElementException e) { + // done + break; + } + if (previous != null) { + Assert.assertTrue(previous.getEndMillis() <= current.getStartMillis()); + } + previous = current; + count++; + } + + return count; + } + +} diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index b1ec5460993..1d0c3c4b14f 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -22,6 +22,7 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -64,7 +65,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.TreeMap; /** @@ -109,10 +109,10 @@ public class DetermineHashedPartitionsJob implements Jobby groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); - if (!config.getSegmentGranularIntervals().isPresent()) { + if (config.getInputIntervals().isEmpty()) { groupByJob.setNumReduceTasks(1); } else { - groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); + groupByJob.setNumReduceTasks(Iterators.size(config.getSegmentGranularIntervals().iterator())); } JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), @@ -151,7 +151,7 @@ public class DetermineHashedPartitionsJob implements Jobby log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); FileSystem fileSystem = null; - if (!config.getSegmentGranularIntervals().isPresent()) { + if (config.getInputIntervals().isEmpty()) { final Path intervalInfoPath = config.makeIntervalInfoPath(); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) { @@ -159,7 +159,9 @@ public class DetermineHashedPartitionsJob implements Jobby } List intervals = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( Utils.openInputStream(groupByJob, intervalInfoPath), - new TypeReference>() {} + new TypeReference>() + { + } ); config.setGranularitySpec( new UniformGranularitySpec( @@ -182,7 +184,7 @@ public class DetermineHashedPartitionsJob implements Jobby } HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { DateTime bucket = segmentGranularity.getStart(); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); @@ -295,11 +297,11 @@ public class DetermineHashedPartitionsJob implements Jobby super.setup(context); rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity(); config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - Optional> intervals = config.getSegmentGranularIntervals(); - if (intervals.isPresent()) { + Iterable intervals = config.getSegmentGranularIntervals(); + if (intervals.iterator().hasNext()) { determineIntervals = false; final ImmutableMap.Builder builder = ImmutableMap.builder(); - for (final Interval bucketInterval : intervals.get()) { + for (final Interval bucketInterval : intervals) { builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector()); } hyperLogLogs = builder.build(); @@ -376,7 +378,7 @@ public class DetermineHashedPartitionsJob implements Jobby protected void setup(Context context) { config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); - determineIntervals = !config.getSegmentGranularIntervals().isPresent(); + determineIntervals = config.getInputIntervals().isEmpty(); } @Override @@ -477,11 +479,11 @@ public class DetermineHashedPartitionsJob implements Jobby { this.config = config; HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfig.fromConfiguration(config); - if (hadoopConfig.getSegmentGranularIntervals().isPresent()) { + if (!hadoopConfig.getInputIntervals().isEmpty()) { determineIntervals = false; int reducerNumber = 0; ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) { + for (Interval interval : hadoopConfig.getSegmentGranularIntervals()) { builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++); } reducerLookup = builder.build(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index 1e810c69aef..ebe69cd15a7 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -212,7 +212,7 @@ public class DeterminePartitionsJob implements Jobby dimSelectionJob.setOutputKeyClass(BytesWritable.class); dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); - dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); + dimSelectionJob.setNumReduceTasks(Iterators.size(config.getGranularitySpec().sortedBucketIntervals().iterator())); JobHelper.setupClasspath( JobHelper.distributedClassPath(config.getWorkingPath()), JobHelper.distributedClassPath(config.makeIntermediatePath()), @@ -256,7 +256,7 @@ public class DeterminePartitionsJob implements Jobby FileSystem fileSystem = null; Map> shardSpecs = new TreeMap<>(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); if (fileSystem == null) { fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); @@ -447,7 +447,7 @@ public class DeterminePartitionsJob implements Jobby final ImmutableMap.Builder timeIndexBuilder = ImmutableMap.builder(); int idx = 0; - for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) { + for (final Interval bucketInterval : config.getGranularitySpec().sortedBucketIntervals()) { timeIndexBuilder.put(bucketInterval.getStartMillis(), idx); idx++; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java index a4f719a4c21..8b5b4b6b0bb 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.TreeMap; /** + * */ public class HadoopDruidDetermineConfigurationJob implements Jobby { @@ -75,7 +76,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby } Map> shardSpecs = new TreeMap<>(); int shardCount = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { DateTime bucket = segmentGranularity.getStart(); // negative shardsPerInterval means a single shard List specs = Lists.newArrayListWithCapacity(shardsPerInterval); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index 0b69b33098d..54c8f073a9b 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -76,10 +76,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; -import java.util.SortedSet; /** + * */ public class HadoopDruidIndexerConfig { @@ -99,7 +98,7 @@ public class HadoopDruidIndexerConfig /** * Hadoop tasks running in an Indexer process need a reference to the Properties instance created * in PropertiesModule so that the task sees properties that were specified in Druid's config files. - * + *

* This is not strictly necessary for Peon-based tasks which have all properties, including config file properties, * specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only), * but we always use the injected Properties for consistency. @@ -314,9 +313,9 @@ public class HadoopDruidIndexerConfig public Optional> getIntervals() { - Optional> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals(); - if (setOptional.isPresent()) { - return Optional.of(JodaUtils.condenseIntervals(setOptional.get())); + Iterable bucketIntervals = schema.getDataSchema().getGranularitySpec().sortedBucketIntervals(); + if (bucketIntervals.iterator().hasNext()) { + return Optional.of(JodaUtils.condenseIntervals(bucketIntervals)); } else { return Optional.absent(); } @@ -426,7 +425,6 @@ public class HadoopDruidIndexerConfig * Get the proper bucket for some input row. * * @param inputRow an InputRow - * * @return the Bucket that this row belongs to */ Optional getBucket(InputRow inputRow) @@ -455,14 +453,12 @@ public class HadoopDruidIndexerConfig } - Optional> getSegmentGranularIntervals() + Iterable getSegmentGranularIntervals() { - return Optional.fromNullable( + return schema.getDataSchema() .getGranularitySpec() - .bucketIntervals() - .orNull() - ); + .sortedBucketIntervals(); } public List getInputIntervals() @@ -474,15 +470,17 @@ public class HadoopDruidIndexerConfig Optional> getAllBuckets() { - Optional> intervals = getSegmentGranularIntervals(); - if (intervals.isPresent()) { + Iterable intervals = getSegmentGranularIntervals(); + if (intervals.iterator().hasNext()) { return Optional.of( FunctionalIterable - .create(intervals.get()) + .create(intervals) .transformCat( input -> { final DateTime bucketTime = input.getStart(); - final List specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis()); + final List specs = schema.getTuningConfig() + .getShardSpecs() + .get(bucketTime.getMillis()); if (specs == null) { return ImmutableList.of(); } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java index 02ced6cf10c..5190280d000 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java @@ -83,7 +83,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< throw new ParseException(errorMsg); } - if (!granularitySpec.bucketIntervals().isPresent() + if (granularitySpec.inputIntervals().isEmpty() || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())) .isPresent()) { innerMap(inputRow, context); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java index 3c809d26018..f16ac133933 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJobTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; @@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJobTest final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); - Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals); final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); @@ -99,7 +98,7 @@ public class HadoopDruidDetermineConfigurationJobTest final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); - Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); + Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals); final ArgumentCaptor>> resultCaptor = ArgumentCaptor.forClass(Map.class); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java index aed33811156..823cdd29261 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java @@ -78,7 +78,7 @@ public class HadoopIngestionSpecTest Assert.assertEquals( "getIntervals", Collections.singletonList(Intervals.of("2012-01-01/P1D")), - granularitySpec.getIntervals().get() + granularitySpec.inputIntervals() ); Assert.assertEquals( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 8d59554909b..c0254c4fbe4 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -599,7 +599,7 @@ public class IndexGeneratorJobTest Map> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance()); int shardCount = 0; int segmentNum = 0; - for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + for (Interval segmentGranularity : config.getSegmentGranularIntervals()) { List specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]); List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); for (ShardSpec spec : specs) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index f9c725a294b..4a7fa0d768a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputFormat; @@ -48,6 +47,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -69,7 +69,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -262,41 +261,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet"); } - /** - * Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec. - * - * This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them - * to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}. - * - * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * - * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method - * will initialize {@link #taskLockHelper} as a side effect. - * - * @return whether the lock was acquired - */ - protected boolean determineLockGranularityAndTryLock( - TaskActionClient client, - GranularitySpec granularitySpec - ) throws IOException - { - final List intervals = granularitySpec.bucketIntervals().isPresent() - ? new ArrayList<>(granularitySpec.bucketIntervals().get()) - : Collections.emptyList(); - return determineLockGranularityAndTryLock(client, intervals); - } - /** * Attempts to acquire a lock that covers certain intervals. - * + *

* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * + *

* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method * will initialize {@link #taskLockHelper} as a side effect. * * @return whether the lock was acquired */ - boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals) throws IOException + public boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals) + throws IOException { final boolean forceTimeChunkLock = getContextValue( Tasks.FORCE_TIME_CHUNK_LOCK_KEY, @@ -325,9 +301,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask /** * Attempts to acquire a lock that covers certain segments. - * + *

* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. - * + *

* This method will initialize {@link #taskLockHelper} as a side effect. * * @return whether the lock was acquired @@ -396,25 +372,33 @@ public abstract class AbstractBatchIndexTask extends AbstractTask } } + protected boolean tryTimeChunkLock(TaskActionClient client, List intervals) throws IOException { // The given intervals are first converted to align with segment granularity. This is because, // when an overwriting task finds a version for a given input row, it expects the interval // associated to each version to be equal or larger than the time bucket where the input row falls in. // See ParallelIndexSupervisorTask.findVersion(). - final Set uniqueIntervals = new HashSet<>(); + final Iterator intervalIterator; final Granularity segmentGranularity = getSegmentGranularity(); - for (Interval interval : intervals) { - if (segmentGranularity == null) { - uniqueIntervals.add(interval); - } else { - Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval)); - } + if (segmentGranularity == null) { + intervalIterator = JodaUtils.condenseIntervals(intervals).iterator(); + } else { + IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); + // the following is calling a condense that does not materialize the intervals: + intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator()); } - // Condense intervals to avoid creating too many locks. - for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) { - final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)); + // Intervals are already condensed to avoid creating too many locks. + // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup. + Interval prev = null; + while (intervalIterator.hasNext()) { + final Interval cur = intervalIterator.next(); + if (prev != null && cur.equals(prev)) { + continue; + } + prev = cur; + final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur)); if (lock == null) { return false; } @@ -525,11 +509,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask * If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock * from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be * filtered by intervalsToRead, so they need to be locked. - * + *

* However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments * with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead * should be locked. - * + *

* The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list * only once. */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 1236e359bd3..c11ce31c59a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -82,7 +82,6 @@ import java.lang.reflect.Method; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.SortedSet; public class HadoopIndexTask extends HadoopTask implements ChatHandler { @@ -189,12 +188,10 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - Optional> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals(); - if (intervals.isPresent()) { + Iterable intervals = spec.getDataSchema().getGranularitySpec().sortedBucketIntervals(); + if (intervals.iterator().hasNext()) { Interval interval = JodaUtils.umbrellaInterval( - JodaUtils.condenseIntervals( - intervals.get() - ) + JodaUtils.condenseIntervals(intervals) ); return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null; } else { @@ -312,7 +309,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); String hadoopJobIdFile = getHadoopJobIdFileName(); final ClassLoader loader = buildClassLoader(toolbox); - boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); + boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( spec, @@ -377,7 +374,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval( JodaUtils.condenseIntervals( - indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get() + indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals() ) ); final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); @@ -621,7 +618,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler } - /** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */ + /** + * Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. + */ @SuppressWarnings("unused") public static class HadoopDetermineConfigInnerProcessingRunner { @@ -770,9 +769,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler jobId }); - return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; + return new String[]{jobId, (res == 0 ? "Success" : "Fail")}; } - return new String[] {jobId, "Fail"}; + return new String[]{jobId, "Fail"}; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index ce9da23b76f..88779ab717d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -123,7 +123,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.SortedSet; import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -231,7 +230,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler throw new UOE("partitionsSpec[%s] is not supported", tuningConfig.getPartitionsSpec().getClass().getName()); } } - return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.dataSchema.getGranularitySpec()); + return determineLockGranularityAndTryLock( + taskActionClient, + ingestionSchema.dataSchema.getGranularitySpec().inputIntervals() + ); } @Override @@ -452,10 +454,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() ); - final boolean determineIntervals = !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + final boolean determineIntervals = ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource( ingestionSchema.getDataSchema().getParser() @@ -591,7 +593,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); // Must determine intervals if unknown, since we acquire all locks before processing any data. - final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent(); + final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty(); // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value. final boolean determineNumPartitions = partitionsSpec.needsDeterminePartitions(false); @@ -630,7 +632,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler @Nonnull DynamicPartitionsSpec partitionsSpec ) { - final SortedSet intervals = granularitySpec.bucketIntervals().get(); + final Iterable intervals = granularitySpec.sortedBucketIntervals(); final int numBucketsPerInterval = 1; final LinearPartitionAnalysis partitionAnalysis = new LinearPartitionAnalysis(partitionsSpec); intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 038e7884377..bd64fbd9567 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -109,7 +109,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -141,7 +140,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen * the segment granularity of existing segments until the task reads all data because we don't know what segments * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are missing and force to use timeChunk lock. - * + *

* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * in the task logs. */ @@ -194,10 +193,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ingestionSchema.getDataSchema().getParser() ); this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() - && !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + && ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); if (missingIntervalsInOverwriteMode) { addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -320,12 +319,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen ) { return new PartialRangeSegmentGenerateParallelIndexTaskRunner( - toolbox, - getId(), - getGroupId(), - ingestionSchema, - getContext(), - intervalToPartitions + toolbox, + getId(), + getGroupId(), + ingestionSchema, + getContext(), + intervalToPartitions ); } @@ -350,7 +349,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec()); + return determineLockGranularityAndTryLock( + taskActionClient, + ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() + ); } @Override @@ -513,13 +515,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen /** * Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently * executed in two phases. - * + *

* - In the first phase, each task partitions input data and stores those partitions in local storage. - * - The partition is created based on the segment granularity (primary partition key) and the partition dimension - * values in {@link PartitionsSpec} (secondary partition key). - * - Partitioned data is maintained by {@link IntermediaryDataManager}. + * - The partition is created based on the segment granularity (primary partition key) and the partition dimension + * values in {@link PartitionsSpec} (secondary partition key). + * - Partitioned data is maintained by {@link IntermediaryDataManager}. * - In the second phase, each task reads partitioned data from the intermediary data server (middleManager - * or indexer) and merges them to create the final segments. + * or indexer) and merges them to create the final segments. */ private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception { @@ -817,7 +819,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } private static - Map, List> groupPartitionLocationsPerPartition( + Map, List> groupPartitionLocationsPerPartition( Map> subTaskIdToReport, BiFunction createPartitionLocationFunction ) @@ -891,7 +893,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen * @param index index of partition * @param total number of items to partition * @param splits number of desired partitions - * * @return partition range: [lhs, rhs) */ private static Pair getPartitionBoundaries(int index, int total, int splits) @@ -1038,7 +1039,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen { final String dataSource = getDataSource(); final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec(); - final Optional> bucketIntervals = granularitySpec.bucketIntervals(); + // This method is called whenever subtasks need to allocate a new segment via the supervisor task. + // As a result, this code is never called in the Overlord. For now using the materialized intervals + // here is ok for performance reasons + final Set materializedBucketIntervals = granularitySpec.materializedBucketIntervals(); // List locks whenever allocating a new segment because locks might be revoked and no longer valid. final List locks = toolbox @@ -1054,7 +1058,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen Interval interval; String version; - if (bucketIntervals.isPresent()) { + if (!materializedBucketIntervals.isEmpty()) { // If granularity spec has explicit intervals, we just need to find the version associated to the interval. // This is because we should have gotten all required locks up front when the task starts up. final Optional maybeInterval = granularitySpec.bucketInterval(timestamp); @@ -1063,7 +1067,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen } interval = maybeInterval.get(); - if (!bucketIntervals.get().contains(interval)) { + if (!materializedBucketIntervals.contains(interval)) { throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index 6bec35d303f..a0a6179f8b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -106,7 +106,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask ); } - @VisibleForTesting // Only for testing + @VisibleForTesting PartialDimensionDistributionTask( @Nullable String id, final String groupId, @@ -334,7 +334,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY); } - @VisibleForTesting // to allow controlling false positive rate of bloom filter + @VisibleForTesting + // to allow controlling false positive rate of bloom filter DedupInputRowFilter( Granularity queryGranularity, int bloomFilterExpectedInsertions, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index b252e5dfb5e..da228765ba2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.SortedSet; import java.util.stream.Collectors; /** @@ -194,7 +193,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask intervals = granularitySpec.bucketIntervals().get(); + final Iterable intervals = granularitySpec.sortedBucketIntervals(); final int numBucketsPerInterval = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 2cc9f723e17..ec019b10135 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -98,7 +98,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask * the segment granularity of existing segments until the task reads all data because we don't know what segments * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are missing and force to use timeChunk lock. - * + *

* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * in the task logs. */ @@ -132,10 +132,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask this.ingestionSchema = ingestionSchema; this.supervisorTaskId = supervisorTaskId; this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() - && !ingestionSchema.getDataSchema() - .getGranularitySpec() - .bucketIntervals() - .isPresent(); + && ingestionSchema.getDataSchema() + .getGranularitySpec() + .inputIntervals() + .isEmpty(); if (missingIntervalsInOverwriteMode) { addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); } @@ -152,7 +152,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask { return determineLockGranularityAndTryLock( new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), - ingestionSchema.getDataSchema().getGranularitySpec() + ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals() ); } @@ -263,7 +263,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows} * * - * + *

* At the end of this method, all the remaining segments are published. * * @return true if generated segments are successfully published, otherwise false @@ -291,7 +291,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec(); final long pushTimeout = tuningConfig.getPushTimeout(); - final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); + final boolean explicitIntervals = !granularitySpec.inputIntervals().isEmpty(); final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning( toolbox, getId(), diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index b2e18318b62..0f9774994b1 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -21,29 +21,20 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.Comparators; -import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -public class ArbitraryGranularitySpec implements GranularitySpec +public class ArbitraryGranularitySpec extends BaseGranularitySpec { - private final TreeSet intervals; private final Granularity queryGranularity; - private final Boolean rollup; + protected LookupIntervalBuckets lookupTableBucketByDateTime; @JsonCreator public ArbitraryGranularitySpec( @@ -52,22 +43,15 @@ public class ArbitraryGranularitySpec implements GranularitySpec @JsonProperty("intervals") @Nullable List inputIntervals ) { + super(inputIntervals, rollup); this.queryGranularity = queryGranularity == null ? Granularities.NONE : queryGranularity; - this.rollup = rollup == null ? Boolean.TRUE : rollup; - this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); - if (inputIntervals == null) { - inputIntervals = new ArrayList<>(); - } - - // Insert all intervals - intervals.addAll(inputIntervals); + lookupTableBucketByDateTime = new LookupIntervalBuckets(inputIntervals); // Ensure intervals are non-overlapping (but they may abut each other) - final PeekingIterator intervalIterator = Iterators.peekingIterator(intervals.iterator()); + final PeekingIterator intervalIterator = Iterators.peekingIterator(sortedBucketIntervals().iterator()); while (intervalIterator.hasNext()) { final Interval currentInterval = intervalIterator.next(); - if (intervalIterator.hasNext()) { final Interval nextInterval = intervalIterator.peek(); if (currentInterval.overlaps(nextInterval)) { @@ -86,29 +70,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec } @Override - @JsonProperty("intervals") - public Optional> bucketIntervals() + public Iterable sortedBucketIntervals() { - return Optional.of(intervals); - } - - @Override - public List inputIntervals() - { - return ImmutableList.copyOf(intervals); - } - - @Override - public Optional bucketInterval(DateTime dt) - { - // First interval with start time ≤ dt - final Interval interval = intervals.floor(new Interval(dt, DateTimes.MAX)); - - if (interval != null && interval.contains(dt)) { - return Optional.of(interval); - } else { - return Optional.absent(); - } + return () -> lookupTableBucketByDateTime.iterator(); } @Override @@ -117,13 +81,6 @@ public class ArbitraryGranularitySpec implements GranularitySpec throw new UnsupportedOperationException(); } - @Override - @JsonProperty("rollup") - public boolean isRollup() - { - return rollup; - } - @Override @JsonProperty("queryGranularity") public Granularity getQueryGranularity() @@ -143,7 +100,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o; - if (!intervals.equals(that.intervals)) { + if (!inputIntervals().equals(that.inputIntervals())) { return false; } if (!rollup.equals(that.rollup)) { @@ -159,7 +116,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec @Override public int hashCode() { - int result = intervals.hashCode(); + int result = inputIntervals().hashCode(); result = 31 * result + rollup.hashCode(); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); return result; @@ -169,7 +126,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec public String toString() { return "ArbitraryGranularitySpec{" + - "intervals=" + intervals + + "intervals=" + inputIntervals() + ", queryGranularity=" + queryGranularity + ", rollup=" + rollup + '}'; @@ -180,4 +137,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec { return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals); } + + @Override + protected LookupIntervalBuckets getLookupTableBuckets() + { + return lookupTableBucketByDateTime; + } + } diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java new file mode 100644 index 00000000000..9ff114b5d68 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/BaseGranularitySpec.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.indexing.granularity; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; + +abstract class BaseGranularitySpec implements GranularitySpec +{ + protected List inputIntervals; + protected final Boolean rollup; + + public BaseGranularitySpec(List inputIntervals, Boolean rollup) + { + if (inputIntervals != null) { + this.inputIntervals = ImmutableList.copyOf(inputIntervals); + } else { + this.inputIntervals = Collections.emptyList(); + } + this.rollup = rollup == null ? Boolean.TRUE : rollup; + } + + @Override + @JsonProperty("intervals") + public List inputIntervals() + { + return inputIntervals; + } + + @Override + @JsonProperty("rollup") + public boolean isRollup() + { + return rollup; + } + + @Override + public Optional bucketInterval(DateTime dt) + { + return getLookupTableBuckets().bucketInterval(dt); + } + + @Override + public TreeSet materializedBucketIntervals() + { + return getLookupTableBuckets().materializedIntervals(); + } + + protected abstract LookupIntervalBuckets getLookupTableBuckets(); + + /** + * This is a helper class to facilitate sharing the code for sortedBucketIntervals among + * the various GranularitySpec implementations. In particular, the UniformGranularitySpec + * needs to avoid materializing the intervals when the need to traverse them arises. + */ + protected static class LookupIntervalBuckets + { + private final Iterable intervalIterable; + private final TreeSet intervals; + + /** + * @param intervalIterable The intervals to materialize + */ + public LookupIntervalBuckets(Iterable intervalIterable) + { + this.intervalIterable = intervalIterable; + // The tree set will be materialized on demand (see below) to avoid client code + // blowing up when constructing this data structure and when the + // number of intervals is very large... + this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); + } + + /** + * Returns a bucket interval using a fast lookup into an efficient data structure + * where all the intervals have been materialized + * + * @param dt The date time to lookup + * @return An Optional containing the interval for the given DateTime if it exists + */ + public Optional bucketInterval(DateTime dt) + { + final Interval interval = materializedIntervals().floor(new Interval(dt, DateTimes.MAX)); + if (interval != null && interval.contains(dt)) { + return Optional.of(interval); + } else { + return Optional.absent(); + } + } + + /** + * @return An iterator to traverse the materialized intervals. The traversal will be done in + * order as dictated by Comparators.intervalsByStartThenEnd() + */ + public Iterator iterator() + { + return materializedIntervals().iterator(); + } + + /** + * Helper method to avoid collecting the intervals from the iterator + * + * @return The TreeSet of materialized intervals + */ + public TreeSet materializedIntervals() + { + if (intervalIterable != null && intervalIterable.iterator().hasNext() && intervals.isEmpty()) { + Iterators.addAll(intervals, intervalIterable.iterator()); + } + return intervals; + } + } +} diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java index 9272f6961e4..c3bb5793c97 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/GranularitySpec.java @@ -27,7 +27,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import java.util.List; -import java.util.SortedSet; +import java.util.TreeSet; /** * Tells the indexer how to group events based on timestamp. The events may then be further partitioned based @@ -41,11 +41,13 @@ import java.util.SortedSet; public interface GranularitySpec { /** - * Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping. + * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping. * - * @return set of all time groups + * @return Iterable of all time groups */ - Optional> bucketIntervals(); + Iterable sortedBucketIntervals(); + + /** * Returns user provided intervals as-is state. used for configuring granular path spec @@ -58,11 +60,18 @@ public interface GranularitySpec * Time-grouping interval corresponding to some instant, if any. * * @param dt instant to return time interval for - * * @return optional time interval */ Optional bucketInterval(DateTime dt); + /** + * This is a helper method for areas of the code, not in the overlord, were for performance + * reasons might need the materialized set of bucket intervals + * @return A fast lookup, ordered set, of the materialized bucket interval + * + */ + TreeSet materializedBucketIntervals(); + Granularity getSegmentGranularity(); boolean isRollup(); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java index fae680aba9b..5fe6a9ed065 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -21,28 +21,22 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.joda.time.DateTime; +import org.apache.druid.java.util.common.granularity.IntervalsByGranularity; import org.joda.time.Interval; -import java.util.ArrayList; import java.util.List; -import java.util.SortedSet; -public class UniformGranularitySpec implements GranularitySpec +public class UniformGranularitySpec extends BaseGranularitySpec { private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; private final Granularity segmentGranularity; private final Granularity queryGranularity; - private final Boolean rollup; - private final List inputIntervals; - private final ArbitraryGranularitySpec wrappedSpec; + private final IntervalsByGranularity intervalsByGranularity; + protected LookupIntervalBuckets lookupTableBucketByDateTime; @JsonCreator public UniformGranularitySpec( @@ -52,21 +46,11 @@ public class UniformGranularitySpec implements GranularitySpec @JsonProperty("intervals") List inputIntervals ) { + super(inputIntervals, rollup); this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity; - this.rollup = rollup == null ? Boolean.TRUE : rollup; this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; - - if (inputIntervals != null) { - List granularIntervals = new ArrayList<>(); - for (Interval inputInterval : inputIntervals) { - Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval)); - } - this.inputIntervals = ImmutableList.copyOf(inputIntervals); - this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals); - } else { - this.inputIntervals = null; - this.wrappedSpec = null; - } + intervalsByGranularity = new IntervalsByGranularity(this.inputIntervals, segmentGranularity); + lookupTableBucketByDateTime = new LookupIntervalBuckets(sortedBucketIntervals()); } public UniformGranularitySpec( @@ -79,29 +63,9 @@ public class UniformGranularitySpec implements GranularitySpec } @Override - public Optional> bucketIntervals() + public Iterable sortedBucketIntervals() { - if (wrappedSpec == null) { - return Optional.absent(); - } else { - return wrappedSpec.bucketIntervals(); - } - } - - @Override - public List inputIntervals() - { - return inputIntervals == null ? ImmutableList.of() : ImmutableList.copyOf(inputIntervals); - } - - @Override - public Optional bucketInterval(DateTime dt) - { - if (wrappedSpec == null) { - return Optional.absent(); - } else { - return wrappedSpec.bucketInterval(dt); - } + return () -> intervalsByGranularity.granularityIntervalsIterator(); } @Override @@ -111,13 +75,6 @@ public class UniformGranularitySpec implements GranularitySpec return segmentGranularity; } - @Override - @JsonProperty("rollup") - public boolean isRollup() - { - return rollup; - } - @Override @JsonProperty("queryGranularity") public Granularity getQueryGranularity() @@ -125,12 +82,6 @@ public class UniformGranularitySpec implements GranularitySpec return queryGranularity; } - @JsonProperty("intervals") - public Optional> getIntervals() - { - return Optional.fromNullable(inputIntervals); - } - @Override public boolean equals(Object o) { @@ -149,14 +100,15 @@ public class UniformGranularitySpec implements GranularitySpec if (!queryGranularity.equals(that.queryGranularity)) { return false; } - if (!rollup.equals(that.rollup)) { + if (isRollup() != that.isRollup()) { return false; } if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) { return false; } - return !(wrappedSpec != null ? !wrappedSpec.equals(that.wrappedSpec) : that.wrappedSpec != null); + + return true; } @@ -167,7 +119,6 @@ public class UniformGranularitySpec implements GranularitySpec result = 31 * result + queryGranularity.hashCode(); result = 31 * result + rollup.hashCode(); result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0); - result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0); return result; } @@ -179,7 +130,6 @@ public class UniformGranularitySpec implements GranularitySpec ", queryGranularity=" + queryGranularity + ", rollup=" + rollup + ", inputIntervals=" + inputIntervals + - ", wrappedSpec=" + wrappedSpec + '}'; } @@ -188,4 +138,11 @@ public class UniformGranularitySpec implements GranularitySpec { return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals); } + + @Override + protected LookupIntervalBuckets getLookupTableBuckets() + { + return lookupTableBucketByDateTime; + } + } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java index d7dff7a31cb..a0154228a61 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/ArbitraryGranularityTest.java @@ -21,6 +21,7 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -47,7 +48,8 @@ public class ArbitraryGranularityTest Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - )); + ) + ); Assert.assertNotNull(spec.getQueryGranularity()); } @@ -57,12 +59,13 @@ public class ArbitraryGranularityTest final GranularitySpec spec = new ArbitraryGranularitySpec( Granularities.NONE, Lists.newArrayList( - Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), - Intervals.of("2012-02-01T00Z/2012-03-01T00Z"), - Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), - Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), - Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - )); + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-02-01T00Z/2012-03-01T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ) + ); Assert.assertTrue(spec.isRollup()); @@ -74,7 +77,17 @@ public class ArbitraryGranularityTest Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), Intervals.of("2012-02-01T00Z/2012-03-01T00Z") ), - Lists.newArrayList(spec.bucketIntervals().get()) + Lists.newArrayList(spec.sortedBucketIntervals()) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-01T00Z/2012-01-03T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-01T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-08T00Z")) ); Assert.assertEquals( @@ -118,6 +131,7 @@ public class ArbitraryGranularityTest Optional.absent(), spec.bucketInterval(DateTimes.of("2012-01-05T00Z")) ); + } @Test @@ -187,10 +201,26 @@ public class ArbitraryGranularityTest try { final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class); - Assert.assertEquals("Round-trip", spec.bucketIntervals(), rtSpec.bucketIntervals()); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals()) + ); + Assert.assertEquals( + "Round-trip", + ImmutableList.copyOf(spec.inputIntervals()), + ImmutableList.copyOf(rtSpec.inputIntervals()) + ); } catch (Exception e) { throw new RuntimeException(e); } } + + @Test + public void testNullInputIntervals() + { + final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, null); + Assert.assertFalse(spec.sortedBucketIntervals().iterator().hasNext()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java index 402f7fd6d3e..27760f63c18 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/granularity/UniformGranularityTest.java @@ -21,6 +21,8 @@ package org.apache.druid.segment.indexing.granularity; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; @@ -34,8 +36,8 @@ import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; -import java.util.SortedSet; public class UniformGranularityTest { @@ -44,19 +46,26 @@ public class UniformGranularityTest @Test public void testSimple() { + + final List inputIntervals = Lists.newArrayList( + Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), + Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), + Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), + Intervals.of("2012-01-01T00Z/2012-01-03T00Z") + ); final GranularitySpec spec = new UniformGranularitySpec( Granularities.DAY, null, - Lists.newArrayList( - Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), - Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), - Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), - Intervals.of("2012-01-01T00Z/2012-01-03T00Z") - ) + inputIntervals ); Assert.assertTrue(spec.isRollup()); + Assert.assertEquals( + inputIntervals, + Lists.newArrayList(spec.inputIntervals()) + ); + Assert.assertEquals( Lists.newArrayList( Intervals.of("2012-01-01T00Z/P1D"), @@ -67,7 +76,28 @@ public class UniformGranularityTest Intervals.of("2012-01-09T00Z/P1D"), Intervals.of("2012-01-10T00Z/P1D") ), - Lists.newArrayList(spec.bucketIntervals().get()) + Lists.newArrayList(spec.sortedBucketIntervals()) + ); + + + Assert.assertEquals( + Optional.absent(), + spec.bucketInterval(DateTimes.of("2011-01-12T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-01T00Z/2012-01-02T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-01T00Z")) + ); + + Assert.assertEquals( + Optional.of(Intervals.of("2012-01-10T00Z/2012-01-11T00Z")), + spec.bucketInterval(DateTimes.of("2012-01-10T00Z")) + ); + + Assert.assertEquals( + Optional.absent(), + spec.bucketInterval(DateTimes.of("2012-01-12T00Z")) ); Assert.assertEquals( @@ -99,6 +129,7 @@ public class UniformGranularityTest Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")), spec.bucketInterval(DateTimes.of("2012-01-08T01Z")) ); + } @Test @@ -132,9 +163,9 @@ public class UniformGranularityTest try { final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class); Assert.assertEquals( - "Round-trip bucketIntervals", - spec.bucketIntervals(), - rtSpec.bucketIntervals() + "Round-trip sortedBucketIntervals", + ImmutableList.copyOf(spec.sortedBucketIntervals()), + ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator()) ); Assert.assertEquals( "Round-trip granularity", @@ -253,10 +284,9 @@ public class UniformGranularityTest ) ); - Assert.assertTrue(spec.bucketIntervals().isPresent()); + Assert.assertTrue(spec.sortedBucketIntervals().iterator().hasNext()); - final Optional> sortedSetOptional = spec.bucketIntervals(); - final SortedSet intervals = sortedSetOptional.get(); + final Iterable intervals = spec.sortedBucketIntervals(); ArrayList actualIntervals = new ArrayList<>(); for (Interval interval : intervals) { actualIntervals.add(interval.toDurationMillis()); @@ -279,6 +309,25 @@ public class UniformGranularityTest Assert.assertEquals(expectedIntervals, actualIntervals); } + @Test + public void testUniformGranularitySpecWithLargeNumberOfIntervalsDoesNotBlowUp() + { + // just make sure that intervals for uniform spec are not materialized (causing OOM) when created + final GranularitySpec spec = new UniformGranularitySpec( + Granularities.SECOND, + null, + Collections.singletonList( + Intervals.of("2012-01-01T00Z/P10Y") + ) + ); + + Assert.assertTrue(spec != null); + + int count = Iterators.size(spec.sortedBucketIntervals().iterator()); + // account for three leap years... + Assert.assertEquals(3600 * 24 * 365 * 10 + 3 * 24 * 3600, count); + } + private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2) { Assert.assertNotEquals(spec1, spec2);