Granularity interval materialization (#10742)

* Prevent interval materialization for UniformGranularitySpec inside the overlord

* Change API of bucketIntervals in GranularitySpec to return an Iterable<Interval>

* Javadoc update, respect inputIntervals contract

* Eliminate dependency on wrappedspec (i.e. ArbitraryGranularity) in UniformGranularitySpec

* Added one boundary condition test to UniformGranularityTest and fixed Travis forbidden method errors in IntervalsByGranularity

* Fix Travis style & other checks

* Refactor TreeSet to facilitate re-use in UniformGranularitySpec

* Make sure intervals are unique when there is no segment granularity

* Style/bugspot fixes...

* More travis checks

* Add condensedIntervals method to GranularitySpec and pass it as needed to the lock method

* Style & PR feedback

* Fixed failing test

* Fixed bug in IntervalsByGranularity iterator that it would return repeated elements (see added unit tests that were broken before this change)

* Refactor so that we can get the condensed buckets without materializing the intervals

* Get rid of GranularitySpec::condensedInputIntervals ... not needed

* Travis failures fixes

* Travis checkstyle fix

* Edited/added javadoc comments and a method name (code review feedback)

* Fixed jacoco coverage by moving class and adding more coverage

* Avoid materializing the condensed intervals when locking

* Deal with overlapping intervals

* Remove code and use library code instead

* Refactor intervals by granularity using the FluentIterable, add sanity checks

* Change !hasNext() to inputIntervals().isEmpty()

* Remove redundant lambda

* Use materialized intervals here since this is outside the overlord (for performance)

* Name refactor to reflect the fact that bucket intervals are sorted.

* Style fixes

* Removed redundant method and have condensedIntervalIterator throw IAE when element is null for consistency with other methods in this class (as well that null interval when condensing does not make sense)

* Remove forbidden api

* Move helper class inside common base class to reduce public space pollution
This commit is contained in:
Agustin Gonzalez 2021-01-29 07:02:10 -07:00 committed by GitHub
parent f773497e2c
commit 0e4750bac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 1106 additions and 302 deletions

View File

@ -19,16 +19,23 @@
package org.apache.druid.java.util.common; package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
/** /**
*
*/ */
public class JodaUtils public class JodaUtils
{ {
@ -36,10 +43,17 @@ public class JodaUtils
public static final long MAX_INSTANT = Long.MAX_VALUE / 2; public static final long MAX_INSTANT = Long.MAX_VALUE / 2;
public static final long MIN_INSTANT = Long.MIN_VALUE / 2; public static final long MIN_INSTANT = Long.MIN_VALUE / 2;
public static ArrayList<Interval> condenseIntervals(Iterable<Interval> intervals) /**
* This method will not materialize the input intervals if they represent
* a SortedSet (i.e. implement that interface). If not, the method internally
* creates a sorted set and populates it with them thus materializing the
* intervals in the input.
*
* @param intervals The Iterable object containing the intervals to condense
* @return The condensed intervals
*/
public static List<Interval> condenseIntervals(Iterable<Interval> intervals)
{ {
ArrayList<Interval> retVal = new ArrayList<>();
final SortedSet<Interval> sortedIntervals; final SortedSet<Interval> sortedIntervals;
if (intervals instanceof SortedSet) { if (intervals instanceof SortedSet) {
@ -50,18 +64,70 @@ public class JodaUtils
sortedIntervals.add(interval); sortedIntervals.add(interval);
} }
} }
return ImmutableList.copyOf(condensedIntervalsIterator(sortedIntervals.iterator()));
if (sortedIntervals.isEmpty()) {
return new ArrayList<>();
} }
Iterator<Interval> intervalsIter = sortedIntervals.iterator(); /**
Interval currInterval = intervalsIter.next(); * This method does not materialize the intervals represented by the
while (intervalsIter.hasNext()) { * sortedIntervals iterator. However, caller needs to insure that sortedIntervals
Interval next = intervalsIter.next(); * is already sorted in ascending order (use the Comparators.intervalsByStartThenEnd()).
* It avoids materialization by incrementally condensing the intervals by
* starting from the first and looking for "adjacent" intervals. This is
* possible since intervals in the Iterator are in ascending order (as
* guaranteed by the caller).
* <p>
* *
*
* @param sortedIntervals The iterator object containing the intervals to condense
* @return An iterator for the condensed intervals. By construction the condensed intervals are sorted
* in ascending order and contain no repeated elements. The iterator can contain nulls,
* they will be skipped if it does.
* @throws IAE if an element is null or if sortedIntervals is not sorted in ascending order
*/
public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
{
if (sortedIntervals == null || !sortedIntervals.hasNext()) {
return Collections.emptyIterator();
}
final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
return new Iterator<Interval>()
{
private Interval previous;
@Override
public boolean hasNext()
{
return peekingIterator.hasNext();
}
@Override
public Interval next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
Interval currInterval = peekingIterator.next();
if (currInterval == null) {
throw new IAE("Element of intervals is null");
}
// check sorted ascending:
verifyAscendingSortOrder(previous, currInterval);
previous = currInterval;
while (hasNext()) {
Interval next = peekingIterator.peek();
if (next == null) {
throw new IAE("Element of intervals is null");
}
if (currInterval.abuts(next)) { if (currInterval.abuts(next)) {
currInterval = new Interval(currInterval.getStart(), next.getEnd()); currInterval = new Interval(currInterval.getStart(), next.getEnd());
peekingIterator.next();
} else if (currInterval.overlaps(next)) { } else if (currInterval.overlaps(next)) {
DateTime nextEnd = next.getEnd(); DateTime nextEnd = next.getEnd();
DateTime currEnd = currInterval.getEnd(); DateTime currEnd = currInterval.getEnd();
@ -69,16 +135,51 @@ public class JodaUtils
currInterval.getStart(), currInterval.getStart(),
nextEnd.isAfter(currEnd) ? nextEnd : currEnd nextEnd.isAfter(currEnd) ? nextEnd : currEnd
); );
peekingIterator.next();
} else { } else {
retVal.add(currInterval); break;
currInterval = next;
} }
} }
retVal.add(currInterval); return currInterval;
}
};
}
/**
* Verify whether an iterable of intervals contains overlapping intervals
*
* @param intervals An interval iterable sorted using Comparators.intervalsByStartThenEnd()
* @return true if the iterable contains at least two overlapping intervals, false otherwise.
* @throws IAE when at least an element is null or when the iterable is not sorted
*/
public static boolean containOverlappingIntervals(Iterable<Interval> intervals)
{
if (intervals == null) {
return false;
}
boolean retVal = false;
Interval previous = null;
for (Interval current : intervals) {
if (current == null) {
throw new IAE("Intervals should not contain nulls");
}
verifyAscendingSortOrder(previous, current);
if (previous != null && previous.overlaps(current)) {
retVal = true;
break;
}
previous = current;
}
return retVal; return retVal;
} }
private static void verifyAscendingSortOrder(Interval previous, Interval current)
{
if (previous != null && previous.isAfter(current)) {
throw new IAE("Adjacent intervals are not sorted [%s,%s]", previous, current);
}
}
public static Interval umbrellaInterval(Iterable<Interval> intervals) public static Interval umbrellaInterval(Iterable<Interval> intervals)
{ {
ArrayList<DateTime> startDates = new ArrayList<>(); ArrayList<DateTime> startDates = new ArrayList<>();
@ -137,4 +238,6 @@ public class JodaUtils
return max; return max;
} }
} }
} }

View File

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.granularity;
import com.google.common.collect.FluentIterable;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Produce a stream of intervals generated by a given set of intervals as input and a given
* granularity. This class avoids materializing the granularity intervals whenever possible.
*/
public class IntervalsByGranularity
{
private final List<Interval> sortedNonOverlappingIntervals;
private final Granularity granularity;
/**
* @param intervals Intervals for which to apply the given granularity. They should
* not contain overlapped intervals.
* @param granularity The granularity to apply
* @throws IAE if intervals contains at least an overlapping pair
*/
public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
{
// eliminate dups, sort intervals:
Set<Interval> intervalSet = new HashSet<>(intervals);
List<Interval> inputIntervals = new ArrayList<>(intervals.size());
inputIntervals.addAll(intervalSet);
inputIntervals.sort(Comparators.intervalsByStartThenEnd());
// sanity check
if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
}
// all good:
sortedNonOverlappingIntervals = inputIntervals;
this.granularity = granularity;
}
/**
* @return The intervals according the granularity. The intervals are provided in
* order according to Comparators.intervalsByStartThenEnd()
*/
public Iterator<Interval> granularityIntervalsIterator()
{
Iterator<Interval> ite;
if (sortedNonOverlappingIntervals.isEmpty()) {
ite = Collections.emptyIterator();
} else {
// The filter after transform & concat is to remove duplicats.
// This can happen when condense left intervals that did not overlap but
// when a larger granularity is applied then they become equal
// imagine input are 2013-01-01T00Z/2013-01-10T00Z, 2013-01-15T00Z/2013-01-20T00Z.
// the iterator for the two intervals is called, say with MONTH granularity, two
// intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
// Thus dups can be created given the right conditions....
final SettableSupplier<Interval> previous = new SettableSupplier<>();
ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
.filter(interval -> {
if (previous.get() != null && previous.get().equals(interval)) {
return false;
}
previous.set(interval);
return true;
}).iterator();
}
return ite;
}
}

View File

@ -19,8 +19,11 @@
package org.apache.druid.common.utils; package org.apache.druid.common.utils;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Duration; import org.joda.time.Duration;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.Period; import org.joda.time.Period;
@ -32,6 +35,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
*
*/ */
public class JodaUtilsTest public class JodaUtilsTest
{ {
@ -75,6 +79,40 @@ public class JodaUtilsTest
Intervals.of("2011-03-05/2011-03-06") Intervals.of("2011-03-05/2011-03-06")
); );
List<Interval> expected = Arrays.asList(
Intervals.of("2011-01-01/2011-01-03"),
Intervals.of("2011-02-01/2011-02-08"),
Intervals.of("2011-03-01/2011-03-02"),
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
);
List<Interval> actual = JodaUtils.condenseIntervals(intervals);
Assert.assertEquals(
expected,
actual
);
}
@Test
public void testCondenseIntervalsSimpleSortedIterator()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-01-02/2011-01-03"),
Intervals.of("2011-02-03/2011-02-08"),
Intervals.of("2011-02-01/2011-02-02"),
Intervals.of("2011-02-01/2011-02-05"),
Intervals.of("2011-03-01/2011-03-02"),
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
);
intervals.sort(Comparators.intervalsByStartThenEnd());
List<Interval> actual = ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
Assert.assertEquals( Assert.assertEquals(
Arrays.asList( Arrays.asList(
Intervals.of("2011-01-01/2011-01-03"), Intervals.of("2011-01-01/2011-01-03"),
@ -83,7 +121,116 @@ public class JodaUtilsTest
Intervals.of("2011-03-03/2011-03-04"), Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06") Intervals.of("2011-03-05/2011-03-06")
), ),
JodaUtils.condenseIntervals(intervals) actual
);
}
@Test
public void testCondenseIntervalsSimpleSortedIteratorOverlapping()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-02-01/2011-03-10"),
Intervals.of("2011-01-02/2011-02-03"),
Intervals.of("2011-01-07/2015-01-19"),
Intervals.of("2011-01-15/2011-01-19"),
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-02-01/2011-03-10")
);
intervals.sort(Comparators.intervalsByStartThenEnd());
Assert.assertEquals(
Collections.singletonList(
Intervals.of("2011-01-01/2015-01-19")
),
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
);
}
@Test(expected = IAE.class)
public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullsShouldThrow()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-01-02/2011-02-03"),
Intervals.of("2011-02-01/2011-03-10"),
null,
Intervals.of("2011-03-07/2011-04-19"),
Intervals.of("2011-04-01/2015-01-19"),
null
);
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
}
@Test(expected = IAE.class)
public void testCondenseIntervalsSimplSortedIteratorOverlappingWithNullFirstAndLastshouldThrow()
{
List<Interval> intervals = Arrays.asList(
null,
Intervals.of("2011-01-02/2011-02-03"),
Intervals.of("2011-02-01/2011-03-10"),
Intervals.of("2011-03-07/2011-04-19"),
Intervals.of("2011-04-01/2015-01-19"),
null
);
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
}
@Test(expected = IllegalArgumentException.class)
public void testCondenseIntervalsSimpleUnsortedIterator()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-01-02/2011-01-03"),
Intervals.of("2011-02-03/2011-02-08"),
Intervals.of("2011-02-01/2011-02-02"),
Intervals.of("2011-02-01/2011-02-05"),
Intervals.of("2011-03-01/2011-03-02"),
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
);
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
}
@Test(expected = IllegalArgumentException.class)
public void testCondenseIntervalsSimpleUnsortedIteratorSmallestAtEnd()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-02-01/2011-02-04"),
Intervals.of("2011-03-01/2011-03-04"),
Intervals.of("2010-01-01/2010-03-04")
);
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()));
}
@Test
public void testCondenseIntervalsIteratorWithDups()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-02-04/2011-02-05"),
Intervals.of("2011-01-01/2011-01-02"),
Intervals.of("2011-01-02/2011-01-03"),
Intervals.of("2011-02-03/2011-02-08"),
Intervals.of("2011-02-03/2011-02-08"),
Intervals.of("2011-02-01/2011-02-02"),
Intervals.of("2011-02-01/2011-02-05"),
Intervals.of("2011-03-01/2011-03-02"),
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
);
intervals.sort(Comparators.intervalsByStartThenEnd());
Assert.assertEquals(
Arrays.asList(
Intervals.of("2011-01-01/2011-01-03"),
Intervals.of("2011-02-01/2011-02-08"),
Intervals.of("2011-03-01/2011-03-02"),
Intervals.of("2011-03-03/2011-03-04"),
Intervals.of("2011-03-05/2011-03-06")
),
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.iterator()))
); );
} }
@ -144,4 +291,54 @@ public class JodaUtilsTest
Assert.assertEquals(Long.MAX_VALUE, period.getMinutes()); Assert.assertEquals(Long.MAX_VALUE, period.getMinutes());
} }
@Test
public void testShouldContainOverlappingIntervals()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-02-01/2011-03-10"),
Intervals.of("2011-03-25/2011-04-03"),
Intervals.of("2011-04-01/2015-01-19"),
Intervals.of("2016-01-15/2016-01-19")
);
Assert.assertTrue(JodaUtils.containOverlappingIntervals(intervals));
}
@Test
public void testShouldNotContainOverlappingIntervals()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-02-01/2011-03-10"),
Intervals.of("2011-03-10/2011-04-03"),
Intervals.of("2011-04-04/2015-01-14"),
Intervals.of("2016-01-15/2016-01-19")
);
Assert.assertFalse(JodaUtils.containOverlappingIntervals(intervals));
}
@Test(expected = IAE.class)
public void testOverlappingIntervalsContainsNull()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-02-01/2011-03-10"),
null,
Intervals.of("2011-04-04/2015-01-14"),
Intervals.of("2016-01-15/2016-01-19")
);
JodaUtils.containOverlappingIntervals(intervals);
}
@Test(expected = IAE.class)
public void testOverlappingIntervalsContainsUnsorted()
{
List<Interval> intervals = Arrays.asList(
Intervals.of("2011-02-01/2011-03-10"),
Intervals.of("2011-03-10/2011-04-03"),
Intervals.of("2016-01-15/2016-01-19"),
Intervals.of("2011-04-04/2015-01-14")
);
JodaUtils.containOverlappingIntervals(intervals);
}
} }

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
public class IntervalsByGranularityTest
{
private static final long SECONDS_IN_YEAR = 31536000;
@Test
public void testTrivialIntervalExplosion()
{
Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
Interval second = Intervals.of("2012-01-01T00Z/2012-02-01T00Z");
Interval third = Intervals.of("2002-01-01T00Z/2003-01-01T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
granularityIntervals = intervals.granularityIntervalsIterator();
count = getCountWithNoHasNext(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
}
@Test
public void testDups()
{
Interval first = Intervals.of("2013-01-01T00Z/2013-02-01T00Z");
Interval second = Intervals.of("2012-04-01T00Z/2012-05-01T00Z");
Interval third = Intervals.of("2013-01-01T00Z/2013-02-01T00Z"); // dup
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 61);
}
@Test
public void testCondenseForManyIntervals()
{
// This method attempts to test that there are no issues when condensed is called
// with an iterator pointing to millions of intervals (since the version of condensed
// used here takes an interval iterator and does not materialize intervals)
Interval first = Intervals.of("2012-01-01T00Z/P1Y");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first),
Granularity.fromString("SECOND")
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator()))
);
}
@Test
public void testIntervalExplosion()
{
Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("SECOND")
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 78537600);
}
@Test
public void testSimpleEliminateRepeated()
{
final List<Interval> inputIntervals = ImmutableList.of(
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
);
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularities.MONTH
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
}
@Test
public void testALittleMoreComplexEliminateRepeated()
{
final List<Interval> inputIntervals = ImmutableList.of(
Intervals.of("2015-01-08T00Z/2015-01-11T00Z"),
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z"),
Intervals.of("2007-03-08T00Z/2007-04-11T00Z")
);
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularities.MONTH
);
Assert.assertEquals(
ImmutableList.of(
Intervals.of("2007-03-01T00Z/2007-04-01T00Z"),
Intervals.of("2007-04-01T00Z/2007-05-01T00Z"),
Intervals.of("2012-01-01T00Z/2012-02-01T00Z"),
Intervals.of("2015-01-01T00Z/2015-02-01T00Z")
),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);
}
@Test(expected = IAE.class)
public void testOverlappingShouldThrow()
{
List<Interval> inputIntervals = ImmutableList.of(
Intervals.of("2013-01-01T00Z/2013-01-11T00Z"),
Intervals.of("2013-01-05T00Z/2013-01-08T00Z"),
Intervals.of("2013-01-07T00Z/2013-01-15T00Z")
);
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("DAY")
);
}
@Test
public void testWithGranularity()
{
List<Interval> inputIntervals = ImmutableList.of(
Intervals.of("2013-01-01T00Z/2013-01-10T00Z"),
Intervals.of("2013-01-15T00Z/2013-01-20T00Z"),
Intervals.of("2013-02-07T00Z/2013-02-15T00Z")
);
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("MONTH")
);
// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 2);
}
@Test(expected = UnsupportedOperationException.class)
public void testRemoveThrowsException()
{
final List<Interval> inputIntervals = ImmutableList.of(
Intervals.of("2015-01-08T00Z/2015-01-11T00Z")
);
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularities.MONTH
);
intervals.granularityIntervalsIterator().remove();
}
@Test
public void testEmptyInput()
{
final List<Interval> inputIntervals = Collections.emptyList();
IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularities.MONTH
);
Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
}
private long getCount(Iterator<Interval> granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
Interval current;
while (granularityIntervalIterator.hasNext()) {
current = granularityIntervalIterator.next();
if (previous != null) {
Assert.assertTrue(previous + "," + current, previous.getEndMillis() <= current.getStartMillis());
}
previous = current;
count++;
}
return count;
}
private long getCountWithNoHasNext(Iterator<Interval> granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
Interval current;
while (true) {
try {
current = granularityIntervalIterator.next();
}
catch (NoSuchElementException e) {
// done
break;
}
if (previous != null) {
Assert.assertTrue(previous.getEndMillis() <= current.getStartMillis());
}
previous = current;
count++;
}
return count;
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.indexer;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
@ -64,7 +65,6 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
/** /**
@ -109,10 +109,10 @@ public class DetermineHashedPartitionsJob implements Jobby
groupByJob.setOutputValueClass(NullWritable.class); groupByJob.setOutputValueClass(NullWritable.class);
groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class);
groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class);
if (!config.getSegmentGranularIntervals().isPresent()) { if (config.getInputIntervals().isEmpty()) {
groupByJob.setNumReduceTasks(1); groupByJob.setNumReduceTasks(1);
} else { } else {
groupByJob.setNumReduceTasks(config.getSegmentGranularIntervals().get().size()); groupByJob.setNumReduceTasks(Iterators.size(config.getSegmentGranularIntervals().iterator()));
} }
JobHelper.setupClasspath( JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()), JobHelper.distributedClassPath(config.getWorkingPath()),
@ -151,7 +151,7 @@ public class DetermineHashedPartitionsJob implements Jobby
log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals()); log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null; FileSystem fileSystem = null;
if (!config.getSegmentGranularIntervals().isPresent()) { if (config.getInputIntervals().isEmpty()) {
final Path intervalInfoPath = config.makeIntervalInfoPath(); final Path intervalInfoPath = config.makeIntervalInfoPath();
fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration()); fileSystem = intervalInfoPath.getFileSystem(groupByJob.getConfiguration());
if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) { if (!Utils.exists(groupByJob, fileSystem, intervalInfoPath)) {
@ -159,7 +159,9 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
List<Interval> intervals = HadoopDruidIndexerConfig.JSON_MAPPER.readValue( List<Interval> intervals = HadoopDruidIndexerConfig.JSON_MAPPER.readValue(
Utils.openInputStream(groupByJob, intervalInfoPath), Utils.openInputStream(groupByJob, intervalInfoPath),
new TypeReference<List<Interval>>() {} new TypeReference<List<Interval>>()
{
}
); );
config.setGranularitySpec( config.setGranularitySpec(
new UniformGranularitySpec( new UniformGranularitySpec(
@ -182,7 +184,7 @@ public class DetermineHashedPartitionsJob implements Jobby
} }
HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction(); HashPartitionFunction partitionFunction = ((HashedPartitionsSpec) partitionsSpec).getPartitionFunction();
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart(); DateTime bucket = segmentGranularity.getStart();
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
@ -295,11 +297,11 @@ public class DetermineHashedPartitionsJob implements Jobby
super.setup(context); super.setup(context);
rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity(); rollupGranularity = getConfig().getGranularitySpec().getQueryGranularity();
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
Optional<Set<Interval>> intervals = config.getSegmentGranularIntervals(); Iterable<Interval> intervals = config.getSegmentGranularIntervals();
if (intervals.isPresent()) { if (intervals.iterator().hasNext()) {
determineIntervals = false; determineIntervals = false;
final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder(); final ImmutableMap.Builder<Interval, HyperLogLogCollector> builder = ImmutableMap.builder();
for (final Interval bucketInterval : intervals.get()) { for (final Interval bucketInterval : intervals) {
builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector()); builder.put(bucketInterval, HyperLogLogCollector.makeLatestCollector());
} }
hyperLogLogs = builder.build(); hyperLogLogs = builder.build();
@ -376,7 +378,7 @@ public class DetermineHashedPartitionsJob implements Jobby
protected void setup(Context context) protected void setup(Context context)
{ {
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
determineIntervals = !config.getSegmentGranularIntervals().isPresent(); determineIntervals = config.getInputIntervals().isEmpty();
} }
@Override @Override
@ -477,11 +479,11 @@ public class DetermineHashedPartitionsJob implements Jobby
{ {
this.config = config; this.config = config;
HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfig.fromConfiguration(config); HadoopDruidIndexerConfig hadoopConfig = HadoopDruidIndexerConfig.fromConfiguration(config);
if (hadoopConfig.getSegmentGranularIntervals().isPresent()) { if (!hadoopConfig.getInputIntervals().isEmpty()) {
determineIntervals = false; determineIntervals = false;
int reducerNumber = 0; int reducerNumber = 0;
ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder(); ImmutableMap.Builder<LongWritable, Integer> builder = ImmutableMap.builder();
for (Interval interval : hadoopConfig.getSegmentGranularIntervals().get()) { for (Interval interval : hadoopConfig.getSegmentGranularIntervals()) {
builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++); builder.put(new LongWritable(interval.getStartMillis()), reducerNumber++);
} }
reducerLookup = builder.build(); reducerLookup = builder.build();

View File

@ -212,7 +212,7 @@ public class DeterminePartitionsJob implements Jobby
dimSelectionJob.setOutputKeyClass(BytesWritable.class); dimSelectionJob.setOutputKeyClass(BytesWritable.class);
dimSelectionJob.setOutputValueClass(Text.class); dimSelectionJob.setOutputValueClass(Text.class);
dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class); dimSelectionJob.setOutputFormatClass(DeterminePartitionsDimSelectionOutputFormat.class);
dimSelectionJob.setNumReduceTasks(config.getGranularitySpec().bucketIntervals().get().size()); dimSelectionJob.setNumReduceTasks(Iterators.size(config.getGranularitySpec().sortedBucketIntervals().iterator()));
JobHelper.setupClasspath( JobHelper.setupClasspath(
JobHelper.distributedClassPath(config.getWorkingPath()), JobHelper.distributedClassPath(config.getWorkingPath()),
JobHelper.distributedClassPath(config.makeIntermediatePath()), JobHelper.distributedClassPath(config.makeIntermediatePath()),
@ -256,7 +256,7 @@ public class DeterminePartitionsJob implements Jobby
FileSystem fileSystem = null; FileSystem fileSystem = null;
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(); Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity); final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
if (fileSystem == null) { if (fileSystem == null) {
fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration()); fileSystem = partitionInfoPath.getFileSystem(dimSelectionJob.getConfiguration());
@ -447,7 +447,7 @@ public class DeterminePartitionsJob implements Jobby
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder(); final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0; int idx = 0;
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) { for (final Interval bucketInterval : config.getGranularitySpec().sortedBucketIntervals()) {
timeIndexBuilder.put(bucketInterval.getStartMillis(), idx); timeIndexBuilder.put(bucketInterval.getStartMillis(), idx);
idx++; idx++;
} }

View File

@ -36,6 +36,7 @@ import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
/** /**
*
*/ */
public class HadoopDruidDetermineConfigurationJob implements Jobby public class HadoopDruidDetermineConfigurationJob implements Jobby
{ {
@ -75,7 +76,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
} }
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(); Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
int shardCount = 0; int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
DateTime bucket = segmentGranularity.getStart(); DateTime bucket = segmentGranularity.getStart();
// negative shardsPerInterval means a single shard // negative shardsPerInterval means a single shard
List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval); List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);

View File

@ -76,10 +76,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
/** /**
*
*/ */
public class HadoopDruidIndexerConfig public class HadoopDruidIndexerConfig
{ {
@ -99,7 +98,7 @@ public class HadoopDruidIndexerConfig
/** /**
* Hadoop tasks running in an Indexer process need a reference to the Properties instance created * Hadoop tasks running in an Indexer process need a reference to the Properties instance created
* in PropertiesModule so that the task sees properties that were specified in Druid's config files. * in PropertiesModule so that the task sees properties that were specified in Druid's config files.
* * <p>
* This is not strictly necessary for Peon-based tasks which have all properties, including config file properties, * This is not strictly necessary for Peon-based tasks which have all properties, including config file properties,
* specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only), * specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only),
* but we always use the injected Properties for consistency. * but we always use the injected Properties for consistency.
@ -314,9 +313,9 @@ public class HadoopDruidIndexerConfig
public Optional<List<Interval>> getIntervals() public Optional<List<Interval>> getIntervals()
{ {
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals(); Iterable<Interval> bucketIntervals = schema.getDataSchema().getGranularitySpec().sortedBucketIntervals();
if (setOptional.isPresent()) { if (bucketIntervals.iterator().hasNext()) {
return Optional.of(JodaUtils.condenseIntervals(setOptional.get())); return Optional.of(JodaUtils.condenseIntervals(bucketIntervals));
} else { } else {
return Optional.absent(); return Optional.absent();
} }
@ -426,7 +425,6 @@ public class HadoopDruidIndexerConfig
* Get the proper bucket for some input row. * Get the proper bucket for some input row.
* *
* @param inputRow an InputRow * @param inputRow an InputRow
*
* @return the Bucket that this row belongs to * @return the Bucket that this row belongs to
*/ */
Optional<Bucket> getBucket(InputRow inputRow) Optional<Bucket> getBucket(InputRow inputRow)
@ -455,14 +453,12 @@ public class HadoopDruidIndexerConfig
} }
Optional<Set<Interval>> getSegmentGranularIntervals() Iterable<Interval> getSegmentGranularIntervals()
{ {
return Optional.fromNullable( return
schema.getDataSchema() schema.getDataSchema()
.getGranularitySpec() .getGranularitySpec()
.bucketIntervals() .sortedBucketIntervals();
.orNull()
);
} }
public List<Interval> getInputIntervals() public List<Interval> getInputIntervals()
@ -474,15 +470,17 @@ public class HadoopDruidIndexerConfig
Optional<Iterable<Bucket>> getAllBuckets() Optional<Iterable<Bucket>> getAllBuckets()
{ {
Optional<Set<Interval>> intervals = getSegmentGranularIntervals(); Iterable<Interval> intervals = getSegmentGranularIntervals();
if (intervals.isPresent()) { if (intervals.iterator().hasNext()) {
return Optional.of( return Optional.of(
FunctionalIterable FunctionalIterable
.create(intervals.get()) .create(intervals)
.transformCat( .transformCat(
input -> { input -> {
final DateTime bucketTime = input.getStart(); final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis()); final List<HadoopyShardSpec> specs = schema.getTuningConfig()
.getShardSpecs()
.get(bucketTime.getMillis());
if (specs == null) { if (specs == null) {
return ImmutableList.of(); return ImmutableList.of();
} }

View File

@ -83,7 +83,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
throw new ParseException(errorMsg); throw new ParseException(errorMsg);
} }
if (!granularitySpec.bucketIntervals().isPresent() if (granularitySpec.inputIntervals().isEmpty()
|| granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())) || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) { .isPresent()) {
innerMap(inputRow, context); innerMap(inputRow, context);

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexer; package org.apache.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@ -59,7 +58,7 @@ public class HadoopDruidDetermineConfigurationJobTest
final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class); final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());
@ -99,7 +98,7 @@ public class HadoopDruidDetermineConfigurationJobTest
final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class); final HadoopDruidIndexerConfig config = Mockito.mock(HadoopDruidIndexerConfig.class);
Mockito.when(config.isDeterminingPartitions()).thenReturn(false); Mockito.when(config.isDeterminingPartitions()).thenReturn(false);
Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec); Mockito.when(config.getPartitionsSpec()).thenReturn(partitionsSpec);
Mockito.when(config.getSegmentGranularIntervals()).thenReturn(Optional.of(intervals)); Mockito.when(config.getSegmentGranularIntervals()).thenReturn(intervals);
final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class); final ArgumentCaptor<Map<Long, List<HadoopyShardSpec>>> resultCaptor = ArgumentCaptor.forClass(Map.class);
Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture()); Mockito.doNothing().when(config).setShardSpecs(resultCaptor.capture());

View File

@ -78,7 +78,7 @@ public class HadoopIngestionSpecTest
Assert.assertEquals( Assert.assertEquals(
"getIntervals", "getIntervals",
Collections.singletonList(Intervals.of("2012-01-01/P1D")), Collections.singletonList(Intervals.of("2012-01-01/P1D")),
granularitySpec.getIntervals().get() granularitySpec.inputIntervals()
); );
Assert.assertEquals( Assert.assertEquals(

View File

@ -599,7 +599,7 @@ public class IndexGeneratorJobTest
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance()); Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
int shardCount = 0; int shardCount = 0;
int segmentNum = 0; int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
List<ShardSpec> specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]); List<ShardSpec> specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]);
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (ShardSpec spec : specs) { for (ShardSpec spec : specs) {

View File

@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.task;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
@ -48,6 +47,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionHandler;
@ -69,7 +69,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -262,41 +261,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet"); return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet");
} }
/**
* Attempts to acquire a lock that covers the intervals specified in a certain granularitySpec.
*
* This method uses {@link GranularitySpec#bucketIntervals()} to get the list of intervals to lock, and passes them
* to {@link #determineLockGranularityAndTryLock(TaskActionClient, List)}.
*
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
*
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect.
*
* @return whether the lock was acquired
*/
protected boolean determineLockGranularityAndTryLock(
TaskActionClient client,
GranularitySpec granularitySpec
) throws IOException
{
final List<Interval> intervals = granularitySpec.bucketIntervals().isPresent()
? new ArrayList<>(granularitySpec.bucketIntervals().get())
: Collections.emptyList();
return determineLockGranularityAndTryLock(client, intervals);
}
/** /**
* Attempts to acquire a lock that covers certain intervals. * Attempts to acquire a lock that covers certain intervals.
* * <p>
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
* * <p>
* If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method * If {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} is set, or if {@param intervals} is nonempty, then this method
* will initialize {@link #taskLockHelper} as a side effect. * will initialize {@link #taskLockHelper} as a side effect.
* *
* @return whether the lock was acquired * @return whether the lock was acquired
*/ */
boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals) throws IOException public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<Interval> intervals)
throws IOException
{ {
final boolean forceTimeChunkLock = getContextValue( final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
@ -325,9 +301,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
/** /**
* Attempts to acquire a lock that covers certain segments. * Attempts to acquire a lock that covers certain segments.
* * <p>
* Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock.
* * <p>
* This method will initialize {@link #taskLockHelper} as a side effect. * This method will initialize {@link #taskLockHelper} as a side effect.
* *
* @return whether the lock was acquired * @return whether the lock was acquired
@ -396,25 +372,33 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
} }
} }
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{ {
// The given intervals are first converted to align with segment granularity. This is because, // The given intervals are first converted to align with segment granularity. This is because,
// when an overwriting task finds a version for a given input row, it expects the interval // when an overwriting task finds a version for a given input row, it expects the interval
// associated to each version to be equal or larger than the time bucket where the input row falls in. // associated to each version to be equal or larger than the time bucket where the input row falls in.
// See ParallelIndexSupervisorTask.findVersion(). // See ParallelIndexSupervisorTask.findVersion().
final Set<Interval> uniqueIntervals = new HashSet<>(); final Iterator<Interval> intervalIterator;
final Granularity segmentGranularity = getSegmentGranularity(); final Granularity segmentGranularity = getSegmentGranularity();
for (Interval interval : intervals) {
if (segmentGranularity == null) { if (segmentGranularity == null) {
uniqueIntervals.add(interval); intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
} else { } else {
Iterables.addAll(uniqueIntervals, segmentGranularity.getIterable(interval)); IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
} // the following is calling a condense that does not materialize the intervals:
intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
} }
// Condense intervals to avoid creating too many locks. // Intervals are already condensed to avoid creating too many locks.
for (Interval interval : JodaUtils.condenseIntervals(uniqueIntervals)) { // Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)); Interval prev = null;
while (intervalIterator.hasNext()) {
final Interval cur = intervalIterator.next();
if (prev != null && cur.equals(prev)) {
continue;
}
prev = cur;
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
if (lock == null) { if (lock == null) {
return false; return false;
} }
@ -525,11 +509,11 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
* If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock * If the given firehoseFactory is {@link IngestSegmentFirehoseFactory}, then it finds the segments to lock
* from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be * from the firehoseFactory. This is because those segments will be read by this task no matter what segments would be
* filtered by intervalsToRead, so they need to be locked. * filtered by intervalsToRead, so they need to be locked.
* * <p>
* However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments * However, firehoseFactory is not IngestSegmentFirehoseFactory, it means this task will overwrite some segments
* with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead * with data read from some input source outside of Druid. As a result, only the segments falling in intervalsToRead
* should be locked. * should be locked.
* * <p>
* The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list * The order of segments within the returned list is unspecified, but each segment is guaranteed to appear in the list
* only once. * only once.
*/ */

View File

@ -82,7 +82,6 @@ import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.SortedSet;
public class HadoopIndexTask extends HadoopTask implements ChatHandler public class HadoopIndexTask extends HadoopTask implements ChatHandler
{ {
@ -189,12 +188,10 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@Override @Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception public boolean isReady(TaskActionClient taskActionClient) throws Exception
{ {
Optional<SortedSet<Interval>> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals(); Iterable<Interval> intervals = spec.getDataSchema().getGranularitySpec().sortedBucketIntervals();
if (intervals.isPresent()) { if (intervals.iterator().hasNext()) {
Interval interval = JodaUtils.umbrellaInterval( Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals( JodaUtils.condenseIntervals(intervals)
intervals.get()
)
); );
return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null; return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null;
} else { } else {
@ -312,7 +309,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName(); String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox); final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent(); boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty();
HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
spec, spec,
@ -377,7 +374,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
if (determineIntervals) { if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval( Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals( JodaUtils.condenseIntervals(
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get() indexerSchema.getDataSchema().getGranularitySpec().sortedBucketIntervals()
) )
); );
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS); final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT_MILLIS);
@ -621,7 +618,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
} }
/** Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}. */ /**
* Called indirectly in {@link HadoopIndexTask#run(TaskToolbox)}.
*/
@SuppressWarnings("unused") @SuppressWarnings("unused")
public static class HadoopDetermineConfigInnerProcessingRunner public static class HadoopDetermineConfigInnerProcessingRunner
{ {
@ -770,9 +769,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
jobId jobId
}); });
return new String[] {jobId, (res == 0 ? "Success" : "Fail")}; return new String[]{jobId, (res == 0 ? "Success" : "Fail")};
} }
return new String[] {jobId, "Fail"}; return new String[]{jobId, "Fail"};
} }
} }

View File

@ -123,7 +123,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -231,7 +230,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
throw new UOE("partitionsSpec[%s] is not supported", tuningConfig.getPartitionsSpec().getClass().getName()); throw new UOE("partitionsSpec[%s] is not supported", tuningConfig.getPartitionsSpec().getClass().getName());
} }
} }
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.dataSchema.getGranularitySpec()); return determineLockGranularityAndTryLock(
taskActionClient,
ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
);
} }
@Override @Override
@ -452,10 +454,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
ingestionSchema.getTuningConfig().getMaxSavedParseExceptions() ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
); );
final boolean determineIntervals = !ingestionSchema.getDataSchema() final boolean determineIntervals = ingestionSchema.getDataSchema()
.getGranularitySpec() .getGranularitySpec()
.bucketIntervals() .inputIntervals()
.isPresent(); .isEmpty();
final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource( final InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
ingestionSchema.getDataSchema().getParser() ingestionSchema.getDataSchema().getParser()
@ -591,7 +593,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
// Must determine intervals if unknown, since we acquire all locks before processing any data. // Must determine intervals if unknown, since we acquire all locks before processing any data.
final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent(); final boolean determineIntervals = granularitySpec.inputIntervals().isEmpty();
// Must determine partitions if rollup is guaranteed and the user didn't provide a specific value. // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value.
final boolean determineNumPartitions = partitionsSpec.needsDeterminePartitions(false); final boolean determineNumPartitions = partitionsSpec.needsDeterminePartitions(false);
@ -630,7 +632,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@Nonnull DynamicPartitionsSpec partitionsSpec @Nonnull DynamicPartitionsSpec partitionsSpec
) )
{ {
final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get(); final Iterable<Interval> intervals = granularitySpec.sortedBucketIntervals();
final int numBucketsPerInterval = 1; final int numBucketsPerInterval = 1;
final LinearPartitionAnalysis partitionAnalysis = new LinearPartitionAnalysis(partitionsSpec); final LinearPartitionAnalysis partitionAnalysis = new LinearPartitionAnalysis(partitionsSpec);
intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval));

View File

@ -109,7 +109,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -141,7 +140,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
* the segment granularity of existing segments until the task reads all data because we don't know what segments * the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock. * are missing and force to use timeChunk lock.
* * <p>
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs. * in the task logs.
*/ */
@ -194,10 +193,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
ingestionSchema.getDataSchema().getParser() ingestionSchema.getDataSchema().getParser()
); );
this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
&& !ingestionSchema.getDataSchema() && ingestionSchema.getDataSchema()
.getGranularitySpec() .getGranularitySpec()
.bucketIntervals() .inputIntervals()
.isPresent(); .isEmpty();
if (missingIntervalsInOverwriteMode) { if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
} }
@ -350,7 +349,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
@Override @Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception public boolean isReady(TaskActionClient taskActionClient) throws Exception
{ {
return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec()); return determineLockGranularityAndTryLock(
taskActionClient,
ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
);
} }
@Override @Override
@ -513,7 +515,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
/** /**
* Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently * Run the multi phase parallel indexing for perfect rollup. In this mode, the parallel indexing is currently
* executed in two phases. * executed in two phases.
* * <p>
* - In the first phase, each task partitions input data and stores those partitions in local storage. * - In the first phase, each task partitions input data and stores those partitions in local storage.
* - The partition is created based on the segment granularity (primary partition key) and the partition dimension * - The partition is created based on the segment granularity (primary partition key) and the partition dimension
* values in {@link PartitionsSpec} (secondary partition key). * values in {@link PartitionsSpec} (secondary partition key).
@ -891,7 +893,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
* @param index index of partition * @param index index of partition
* @param total number of items to partition * @param total number of items to partition
* @param splits number of desired partitions * @param splits number of desired partitions
*
* @return partition range: [lhs, rhs) * @return partition range: [lhs, rhs)
*/ */
private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits) private static Pair<Integer, Integer> getPartitionBoundaries(int index, int total, int splits)
@ -1038,7 +1039,10 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
{ {
final String dataSource = getDataSource(); final String dataSource = getDataSource();
final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec(); final GranularitySpec granularitySpec = getIngestionSchema().getDataSchema().getGranularitySpec();
final Optional<SortedSet<Interval>> bucketIntervals = granularitySpec.bucketIntervals(); // This method is called whenever subtasks need to allocate a new segment via the supervisor task.
// As a result, this code is never called in the Overlord. For now using the materialized intervals
// here is ok for performance reasons
final Set<Interval> materializedBucketIntervals = granularitySpec.materializedBucketIntervals();
// List locks whenever allocating a new segment because locks might be revoked and no longer valid. // List locks whenever allocating a new segment because locks might be revoked and no longer valid.
final List<TaskLock> locks = toolbox final List<TaskLock> locks = toolbox
@ -1054,7 +1058,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
Interval interval; Interval interval;
String version; String version;
if (bucketIntervals.isPresent()) { if (!materializedBucketIntervals.isEmpty()) {
// If granularity spec has explicit intervals, we just need to find the version associated to the interval. // If granularity spec has explicit intervals, we just need to find the version associated to the interval.
// This is because we should have gotten all required locks up front when the task starts up. // This is because we should have gotten all required locks up front when the task starts up.
final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp); final Optional<Interval> maybeInterval = granularitySpec.bucketInterval(timestamp);
@ -1063,7 +1067,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
} }
interval = maybeInterval.get(); interval = maybeInterval.get();
if (!bucketIntervals.get().contains(interval)) { if (!materializedBucketIntervals.contains(interval)) {
throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec); throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", interval, granularitySpec);
} }

View File

@ -106,7 +106,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
); );
} }
@VisibleForTesting // Only for testing @VisibleForTesting
PartialDimensionDistributionTask( PartialDimensionDistributionTask(
@Nullable String id, @Nullable String id,
final String groupId, final String groupId,
@ -334,7 +334,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY); this(queryGranularity, BLOOM_FILTER_EXPECTED_INSERTIONS, BLOOM_FILTER_EXPECTED_FALSE_POSITIVE_PROBABILTY);
} }
@VisibleForTesting // to allow controlling false positive rate of bloom filter @VisibleForTesting
// to allow controlling false positive rate of bloom filter
DedupInputRowFilter( DedupInputRowFilter(
Granularity queryGranularity, Granularity queryGranularity,
int bloomFilterExpectedInsertions, int bloomFilterExpectedInsertions,

View File

@ -42,7 +42,6 @@ import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.SortedSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -194,7 +193,7 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
// We only care about the intervals in intervalToNumShardsOverride here. // We only care about the intervals in intervalToNumShardsOverride here.
intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket); intervalToNumShardsOverride.forEach(partitionAnalysis::updateBucket);
} else { } else {
final SortedSet<Interval> intervals = granularitySpec.bucketIntervals().get(); final Iterable<Interval> intervals = granularitySpec.sortedBucketIntervals();
final int numBucketsPerInterval = partitionsSpec.getNumShards() == null final int numBucketsPerInterval = partitionsSpec.getNumShards() == null
? 1 ? 1
: partitionsSpec.getNumShards(); : partitionsSpec.getNumShards();

View File

@ -98,7 +98,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
* the segment granularity of existing segments until the task reads all data because we don't know what segments * the segment granularity of existing segments until the task reads all data because we don't know what segments
* are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
* are missing and force to use timeChunk lock. * are missing and force to use timeChunk lock.
* * <p>
* This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
* in the task logs. * in the task logs.
*/ */
@ -132,10 +132,10 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
this.ingestionSchema = ingestionSchema; this.ingestionSchema = ingestionSchema;
this.supervisorTaskId = supervisorTaskId; this.supervisorTaskId = supervisorTaskId;
this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting() this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
&& !ingestionSchema.getDataSchema() && ingestionSchema.getDataSchema()
.getGranularitySpec() .getGranularitySpec()
.bucketIntervals() .inputIntervals()
.isPresent(); .isEmpty();
if (missingIntervalsInOverwriteMode) { if (missingIntervalsInOverwriteMode) {
addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true); addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
} }
@ -152,7 +152,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
{ {
return determineLockGranularityAndTryLock( return determineLockGranularityAndTryLock(
new SurrogateTaskActionClient(supervisorTaskId, taskActionClient), new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
ingestionSchema.getDataSchema().getGranularitySpec() ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
); );
} }
@ -263,7 +263,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
* If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows} * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows}
* </li> * </li>
* </ul> * </ul>
* * <p>
* At the end of this method, all the remaining segments are published. * At the end of this method, all the remaining segments are published.
* *
* @return true if generated segments are successfully published, otherwise false * @return true if generated segments are successfully published, otherwise false
@ -291,7 +291,7 @@ public class SinglePhaseSubTask extends AbstractBatchIndexTask
final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec(); final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();
final long pushTimeout = tuningConfig.getPushTimeout(); final long pushTimeout = tuningConfig.getPushTimeout();
final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent(); final boolean explicitIntervals = !granularitySpec.inputIntervals().isEmpty();
final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning( final SegmentAllocator segmentAllocator = SegmentAllocators.forLinearPartitioning(
toolbox, toolbox,
getId(), getId(),

View File

@ -21,29 +21,20 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator; import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
public class ArbitraryGranularitySpec implements GranularitySpec public class ArbitraryGranularitySpec extends BaseGranularitySpec
{ {
private final TreeSet<Interval> intervals;
private final Granularity queryGranularity; private final Granularity queryGranularity;
private final Boolean rollup; protected LookupIntervalBuckets lookupTableBucketByDateTime;
@JsonCreator @JsonCreator
public ArbitraryGranularitySpec( public ArbitraryGranularitySpec(
@ -52,22 +43,15 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@JsonProperty("intervals") @Nullable List<Interval> inputIntervals @JsonProperty("intervals") @Nullable List<Interval> inputIntervals
) )
{ {
super(inputIntervals, rollup);
this.queryGranularity = queryGranularity == null ? Granularities.NONE : queryGranularity; this.queryGranularity = queryGranularity == null ? Granularities.NONE : queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
if (inputIntervals == null) { lookupTableBucketByDateTime = new LookupIntervalBuckets(inputIntervals);
inputIntervals = new ArrayList<>();
}
// Insert all intervals
intervals.addAll(inputIntervals);
// Ensure intervals are non-overlapping (but they may abut each other) // Ensure intervals are non-overlapping (but they may abut each other)
final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(intervals.iterator()); final PeekingIterator<Interval> intervalIterator = Iterators.peekingIterator(sortedBucketIntervals().iterator());
while (intervalIterator.hasNext()) { while (intervalIterator.hasNext()) {
final Interval currentInterval = intervalIterator.next(); final Interval currentInterval = intervalIterator.next();
if (intervalIterator.hasNext()) { if (intervalIterator.hasNext()) {
final Interval nextInterval = intervalIterator.peek(); final Interval nextInterval = intervalIterator.peek();
if (currentInterval.overlaps(nextInterval)) { if (currentInterval.overlaps(nextInterval)) {
@ -86,29 +70,9 @@ public class ArbitraryGranularitySpec implements GranularitySpec
} }
@Override @Override
@JsonProperty("intervals") public Iterable<Interval> sortedBucketIntervals()
public Optional<SortedSet<Interval>> bucketIntervals()
{ {
return Optional.of(intervals); return () -> lookupTableBucketByDateTime.iterator();
}
@Override
public List<Interval> inputIntervals()
{
return ImmutableList.copyOf(intervals);
}
@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
// First interval with start time dt
final Interval interval = intervals.floor(new Interval(dt, DateTimes.MAX));
if (interval != null && interval.contains(dt)) {
return Optional.of(interval);
} else {
return Optional.absent();
}
} }
@Override @Override
@ -117,13 +81,6 @@ public class ArbitraryGranularitySpec implements GranularitySpec
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
@JsonProperty("rollup")
public boolean isRollup()
{
return rollup;
}
@Override @Override
@JsonProperty("queryGranularity") @JsonProperty("queryGranularity")
public Granularity getQueryGranularity() public Granularity getQueryGranularity()
@ -143,7 +100,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o; ArbitraryGranularitySpec that = (ArbitraryGranularitySpec) o;
if (!intervals.equals(that.intervals)) { if (!inputIntervals().equals(that.inputIntervals())) {
return false; return false;
} }
if (!rollup.equals(that.rollup)) { if (!rollup.equals(that.rollup)) {
@ -159,7 +116,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@Override @Override
public int hashCode() public int hashCode()
{ {
int result = intervals.hashCode(); int result = inputIntervals().hashCode();
result = 31 * result + rollup.hashCode(); result = 31 * result + rollup.hashCode();
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result; return result;
@ -169,7 +126,7 @@ public class ArbitraryGranularitySpec implements GranularitySpec
public String toString() public String toString()
{ {
return "ArbitraryGranularitySpec{" + return "ArbitraryGranularitySpec{" +
"intervals=" + intervals + "intervals=" + inputIntervals() +
", queryGranularity=" + queryGranularity + ", queryGranularity=" + queryGranularity +
", rollup=" + rollup + ", rollup=" + rollup +
'}'; '}';
@ -180,4 +137,11 @@ public class ArbitraryGranularitySpec implements GranularitySpec
{ {
return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals); return new ArbitraryGranularitySpec(queryGranularity, rollup, inputIntervals);
} }
@Override
protected LookupIntervalBuckets getLookupTableBuckets()
{
return lookupTableBucketByDateTime;
}
} }

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
abstract class BaseGranularitySpec implements GranularitySpec
{
protected List<Interval> inputIntervals;
protected final Boolean rollup;
public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
{
if (inputIntervals != null) {
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
} else {
this.inputIntervals = Collections.emptyList();
}
this.rollup = rollup == null ? Boolean.TRUE : rollup;
}
@Override
@JsonProperty("intervals")
public List<Interval> inputIntervals()
{
return inputIntervals;
}
@Override
@JsonProperty("rollup")
public boolean isRollup()
{
return rollup;
}
@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
return getLookupTableBuckets().bucketInterval(dt);
}
@Override
public TreeSet<Interval> materializedBucketIntervals()
{
return getLookupTableBuckets().materializedIntervals();
}
protected abstract LookupIntervalBuckets getLookupTableBuckets();
/**
* This is a helper class to facilitate sharing the code for sortedBucketIntervals among
* the various GranularitySpec implementations. In particular, the UniformGranularitySpec
* needs to avoid materializing the intervals when the need to traverse them arises.
*/
protected static class LookupIntervalBuckets
{
private final Iterable<Interval> intervalIterable;
private final TreeSet<Interval> intervals;
/**
* @param intervalIterable The intervals to materialize
*/
public LookupIntervalBuckets(Iterable<Interval> intervalIterable)
{
this.intervalIterable = intervalIterable;
// The tree set will be materialized on demand (see below) to avoid client code
// blowing up when constructing this data structure and when the
// number of intervals is very large...
this.intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
}
/**
* Returns a bucket interval using a fast lookup into an efficient data structure
* where all the intervals have been materialized
*
* @param dt The date time to lookup
* @return An Optional containing the interval for the given DateTime if it exists
*/
public Optional<Interval> bucketInterval(DateTime dt)
{
final Interval interval = materializedIntervals().floor(new Interval(dt, DateTimes.MAX));
if (interval != null && interval.contains(dt)) {
return Optional.of(interval);
} else {
return Optional.absent();
}
}
/**
* @return An iterator to traverse the materialized intervals. The traversal will be done in
* order as dictated by Comparators.intervalsByStartThenEnd()
*/
public Iterator<Interval> iterator()
{
return materializedIntervals().iterator();
}
/**
* Helper method to avoid collecting the intervals from the iterator
*
* @return The TreeSet of materialized intervals
*/
public TreeSet<Interval> materializedIntervals()
{
if (intervalIterable != null && intervalIterable.iterator().hasNext() && intervals.isEmpty()) {
Iterators.addAll(intervals, intervalIterable.iterator());
}
return intervals;
}
}
}

View File

@ -27,7 +27,7 @@ import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List; import java.util.List;
import java.util.SortedSet; import java.util.TreeSet;
/** /**
* Tells the indexer how to group events based on timestamp. The events may then be further partitioned based * Tells the indexer how to group events based on timestamp. The events may then be further partitioned based
@ -41,11 +41,13 @@ import java.util.SortedSet;
public interface GranularitySpec public interface GranularitySpec
{ {
/** /**
* Set of all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping. * Iterable all time groups, broken up on segment boundaries. Should be sorted by interval start and non-overlapping.
* *
* @return set of all time groups * @return Iterable of all time groups
*/ */
Optional<SortedSet<Interval>> bucketIntervals(); Iterable<Interval> sortedBucketIntervals();
/** /**
* Returns user provided intervals as-is state. used for configuring granular path spec * Returns user provided intervals as-is state. used for configuring granular path spec
@ -58,11 +60,18 @@ public interface GranularitySpec
* Time-grouping interval corresponding to some instant, if any. * Time-grouping interval corresponding to some instant, if any.
* *
* @param dt instant to return time interval for * @param dt instant to return time interval for
*
* @return optional time interval * @return optional time interval
*/ */
Optional<Interval> bucketInterval(DateTime dt); Optional<Interval> bucketInterval(DateTime dt);
/**
* This is a helper method for areas of the code, not in the overlord, were for performance
* reasons might need the materialized set of bucket intervals
* @return A fast lookup, ordered set, of the materialized bucket interval
*
*/
TreeSet<Interval> materializedBucketIntervals();
Granularity getSegmentGranularity(); Granularity getSegmentGranularity();
boolean isRollup(); boolean isRollup();

View File

@ -21,28 +21,22 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.joda.time.DateTime; import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.SortedSet;
public class UniformGranularitySpec implements GranularitySpec public class UniformGranularitySpec extends BaseGranularitySpec
{ {
private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY; private static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE; private static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;
private final Granularity segmentGranularity; private final Granularity segmentGranularity;
private final Granularity queryGranularity; private final Granularity queryGranularity;
private final Boolean rollup; private final IntervalsByGranularity intervalsByGranularity;
private final List<Interval> inputIntervals; protected LookupIntervalBuckets lookupTableBucketByDateTime;
private final ArbitraryGranularitySpec wrappedSpec;
@JsonCreator @JsonCreator
public UniformGranularitySpec( public UniformGranularitySpec(
@ -52,21 +46,11 @@ public class UniformGranularitySpec implements GranularitySpec
@JsonProperty("intervals") List<Interval> inputIntervals @JsonProperty("intervals") List<Interval> inputIntervals
) )
{ {
super(inputIntervals, rollup);
this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity; this.queryGranularity = queryGranularity == null ? DEFAULT_QUERY_GRANULARITY : queryGranularity;
this.rollup = rollup == null ? Boolean.TRUE : rollup;
this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity; this.segmentGranularity = segmentGranularity == null ? DEFAULT_SEGMENT_GRANULARITY : segmentGranularity;
intervalsByGranularity = new IntervalsByGranularity(this.inputIntervals, segmentGranularity);
if (inputIntervals != null) { lookupTableBucketByDateTime = new LookupIntervalBuckets(sortedBucketIntervals());
List<Interval> granularIntervals = new ArrayList<>();
for (Interval inputInterval : inputIntervals) {
Iterables.addAll(granularIntervals, this.segmentGranularity.getIterable(inputInterval));
}
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
this.wrappedSpec = new ArbitraryGranularitySpec(queryGranularity, rollup, granularIntervals);
} else {
this.inputIntervals = null;
this.wrappedSpec = null;
}
} }
public UniformGranularitySpec( public UniformGranularitySpec(
@ -79,29 +63,9 @@ public class UniformGranularitySpec implements GranularitySpec
} }
@Override @Override
public Optional<SortedSet<Interval>> bucketIntervals() public Iterable<Interval> sortedBucketIntervals()
{ {
if (wrappedSpec == null) { return () -> intervalsByGranularity.granularityIntervalsIterator();
return Optional.absent();
} else {
return wrappedSpec.bucketIntervals();
}
}
@Override
public List<Interval> inputIntervals()
{
return inputIntervals == null ? ImmutableList.of() : ImmutableList.copyOf(inputIntervals);
}
@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
if (wrappedSpec == null) {
return Optional.absent();
} else {
return wrappedSpec.bucketInterval(dt);
}
} }
@Override @Override
@ -111,13 +75,6 @@ public class UniformGranularitySpec implements GranularitySpec
return segmentGranularity; return segmentGranularity;
} }
@Override
@JsonProperty("rollup")
public boolean isRollup()
{
return rollup;
}
@Override @Override
@JsonProperty("queryGranularity") @JsonProperty("queryGranularity")
public Granularity getQueryGranularity() public Granularity getQueryGranularity()
@ -125,12 +82,6 @@ public class UniformGranularitySpec implements GranularitySpec
return queryGranularity; return queryGranularity;
} }
@JsonProperty("intervals")
public Optional<List<Interval>> getIntervals()
{
return Optional.fromNullable(inputIntervals);
}
@Override @Override
public boolean equals(Object o) public boolean equals(Object o)
{ {
@ -149,14 +100,15 @@ public class UniformGranularitySpec implements GranularitySpec
if (!queryGranularity.equals(that.queryGranularity)) { if (!queryGranularity.equals(that.queryGranularity)) {
return false; return false;
} }
if (!rollup.equals(that.rollup)) { if (isRollup() != that.isRollup()) {
return false; return false;
} }
if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) { if (inputIntervals != null ? !inputIntervals.equals(that.inputIntervals) : that.inputIntervals != null) {
return false; return false;
} }
return !(wrappedSpec != null ? !wrappedSpec.equals(that.wrappedSpec) : that.wrappedSpec != null);
return true;
} }
@ -167,7 +119,6 @@ public class UniformGranularitySpec implements GranularitySpec
result = 31 * result + queryGranularity.hashCode(); result = 31 * result + queryGranularity.hashCode();
result = 31 * result + rollup.hashCode(); result = 31 * result + rollup.hashCode();
result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0); result = 31 * result + (inputIntervals != null ? inputIntervals.hashCode() : 0);
result = 31 * result + (wrappedSpec != null ? wrappedSpec.hashCode() : 0);
return result; return result;
} }
@ -179,7 +130,6 @@ public class UniformGranularitySpec implements GranularitySpec
", queryGranularity=" + queryGranularity + ", queryGranularity=" + queryGranularity +
", rollup=" + rollup + ", rollup=" + rollup +
", inputIntervals=" + inputIntervals + ", inputIntervals=" + inputIntervals +
", wrappedSpec=" + wrappedSpec +
'}'; '}';
} }
@ -188,4 +138,11 @@ public class UniformGranularitySpec implements GranularitySpec
{ {
return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals); return new UniformGranularitySpec(segmentGranularity, queryGranularity, rollup, inputIntervals);
} }
@Override
protected LookupIntervalBuckets getLookupTableBuckets()
{
return lookupTableBucketByDateTime;
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -47,7 +48,8 @@ public class ArbitraryGranularityTest
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z") Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
)); )
);
Assert.assertNotNull(spec.getQueryGranularity()); Assert.assertNotNull(spec.getQueryGranularity());
} }
@ -62,7 +64,8 @@ public class ArbitraryGranularityTest
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z") Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
)); )
);
Assert.assertTrue(spec.isRollup()); Assert.assertTrue(spec.isRollup());
@ -74,7 +77,17 @@ public class ArbitraryGranularityTest
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
Intervals.of("2012-02-01T00Z/2012-03-01T00Z") Intervals.of("2012-02-01T00Z/2012-03-01T00Z")
), ),
Lists.newArrayList(spec.bucketIntervals().get()) Lists.newArrayList(spec.sortedBucketIntervals())
);
Assert.assertEquals(
Optional.of(Intervals.of("2012-01-01T00Z/2012-01-03T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
);
Assert.assertEquals(
Optional.of(Intervals.of("2012-01-08T00Z/2012-01-11T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-08T00Z"))
); );
Assert.assertEquals( Assert.assertEquals(
@ -118,6 +131,7 @@ public class ArbitraryGranularityTest
Optional.absent(), Optional.absent(),
spec.bucketInterval(DateTimes.of("2012-01-05T00Z")) spec.bucketInterval(DateTimes.of("2012-01-05T00Z"))
); );
} }
@Test @Test
@ -187,10 +201,26 @@ public class ArbitraryGranularityTest
try { try {
final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class); final GranularitySpec rtSpec = JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsString(spec), GranularitySpec.class);
Assert.assertEquals("Round-trip", spec.bucketIntervals(), rtSpec.bucketIntervals()); Assert.assertEquals(
"Round-trip",
ImmutableList.copyOf(spec.sortedBucketIntervals()),
ImmutableList.copyOf(rtSpec.sortedBucketIntervals())
);
Assert.assertEquals(
"Round-trip",
ImmutableList.copyOf(spec.inputIntervals()),
ImmutableList.copyOf(rtSpec.inputIntervals())
);
} }
catch (Exception e) { catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Test
public void testNullInputIntervals()
{
final GranularitySpec spec = new ArbitraryGranularitySpec(Granularities.NONE, null);
Assert.assertFalse(spec.sortedBucketIntervals().iterator().hasNext());
}
} }

View File

@ -21,6 +21,8 @@ package org.apache.druid.segment.indexing.granularity;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -34,8 +36,8 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.SortedSet;
public class UniformGranularityTest public class UniformGranularityTest
{ {
@ -44,19 +46,26 @@ public class UniformGranularityTest
@Test @Test
public void testSimple() public void testSimple()
{ {
final GranularitySpec spec = new UniformGranularitySpec(
Granularities.DAY, final List<Interval> inputIntervals = Lists.newArrayList(
null,
Lists.newArrayList(
Intervals.of("2012-01-08T00Z/2012-01-11T00Z"), Intervals.of("2012-01-08T00Z/2012-01-11T00Z"),
Intervals.of("2012-01-07T00Z/2012-01-08T00Z"), Intervals.of("2012-01-07T00Z/2012-01-08T00Z"),
Intervals.of("2012-01-03T00Z/2012-01-04T00Z"), Intervals.of("2012-01-03T00Z/2012-01-04T00Z"),
Intervals.of("2012-01-01T00Z/2012-01-03T00Z") Intervals.of("2012-01-01T00Z/2012-01-03T00Z")
) );
final GranularitySpec spec = new UniformGranularitySpec(
Granularities.DAY,
null,
inputIntervals
); );
Assert.assertTrue(spec.isRollup()); Assert.assertTrue(spec.isRollup());
Assert.assertEquals(
inputIntervals,
Lists.newArrayList(spec.inputIntervals())
);
Assert.assertEquals( Assert.assertEquals(
Lists.newArrayList( Lists.newArrayList(
Intervals.of("2012-01-01T00Z/P1D"), Intervals.of("2012-01-01T00Z/P1D"),
@ -67,7 +76,28 @@ public class UniformGranularityTest
Intervals.of("2012-01-09T00Z/P1D"), Intervals.of("2012-01-09T00Z/P1D"),
Intervals.of("2012-01-10T00Z/P1D") Intervals.of("2012-01-10T00Z/P1D")
), ),
Lists.newArrayList(spec.bucketIntervals().get()) Lists.newArrayList(spec.sortedBucketIntervals())
);
Assert.assertEquals(
Optional.<Interval>absent(),
spec.bucketInterval(DateTimes.of("2011-01-12T00Z"))
);
Assert.assertEquals(
Optional.of(Intervals.of("2012-01-01T00Z/2012-01-02T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-01T00Z"))
);
Assert.assertEquals(
Optional.of(Intervals.of("2012-01-10T00Z/2012-01-11T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-10T00Z"))
);
Assert.assertEquals(
Optional.<Interval>absent(),
spec.bucketInterval(DateTimes.of("2012-01-12T00Z"))
); );
Assert.assertEquals( Assert.assertEquals(
@ -99,6 +129,7 @@ public class UniformGranularityTest
Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")), Optional.of(Intervals.of("2012-01-08T00Z/2012-01-09T00Z")),
spec.bucketInterval(DateTimes.of("2012-01-08T01Z")) spec.bucketInterval(DateTimes.of("2012-01-08T01Z"))
); );
} }
@Test @Test
@ -132,9 +163,9 @@ public class UniformGranularityTest
try { try {
final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class); final GranularitySpec rtSpec = JOSN_MAPPER.readValue(JOSN_MAPPER.writeValueAsString(spec), GranularitySpec.class);
Assert.assertEquals( Assert.assertEquals(
"Round-trip bucketIntervals", "Round-trip sortedBucketIntervals",
spec.bucketIntervals(), ImmutableList.copyOf(spec.sortedBucketIntervals()),
rtSpec.bucketIntervals() ImmutableList.copyOf(rtSpec.sortedBucketIntervals().iterator())
); );
Assert.assertEquals( Assert.assertEquals(
"Round-trip granularity", "Round-trip granularity",
@ -253,10 +284,9 @@ public class UniformGranularityTest
) )
); );
Assert.assertTrue(spec.bucketIntervals().isPresent()); Assert.assertTrue(spec.sortedBucketIntervals().iterator().hasNext());
final Optional<SortedSet<Interval>> sortedSetOptional = spec.bucketIntervals(); final Iterable<Interval> intervals = spec.sortedBucketIntervals();
final SortedSet<Interval> intervals = sortedSetOptional.get();
ArrayList<Long> actualIntervals = new ArrayList<>(); ArrayList<Long> actualIntervals = new ArrayList<>();
for (Interval interval : intervals) { for (Interval interval : intervals) {
actualIntervals.add(interval.toDurationMillis()); actualIntervals.add(interval.toDurationMillis());
@ -279,6 +309,25 @@ public class UniformGranularityTest
Assert.assertEquals(expectedIntervals, actualIntervals); Assert.assertEquals(expectedIntervals, actualIntervals);
} }
@Test
public void testUniformGranularitySpecWithLargeNumberOfIntervalsDoesNotBlowUp()
{
// just make sure that intervals for uniform spec are not materialized (causing OOM) when created
final GranularitySpec spec = new UniformGranularitySpec(
Granularities.SECOND,
null,
Collections.singletonList(
Intervals.of("2012-01-01T00Z/P10Y")
)
);
Assert.assertTrue(spec != null);
int count = Iterators.size(spec.sortedBucketIntervals().iterator());
// account for three leap years...
Assert.assertEquals(3600 * 24 * 365 * 10 + 3 * 24 * 3600, count);
}
private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2) private void notEqualsCheck(GranularitySpec spec1, GranularitySpec spec2)
{ {
Assert.assertNotEquals(spec1, spec2); Assert.assertNotEquals(spec1, spec2);