Fix filterSegments for TimeBoundary and DataSourceMetadata queries (#7023)

* Fix filterSegments for TimeBoundary and DataSourceMetadata queries

* add javadoc

* fix build
This commit is contained in:
Jihoon Son 2019-02-08 10:03:02 -08:00 committed by Fangjin Yang
parent b3dcbe70ad
commit c9f21bc782
9 changed files with 433 additions and 37 deletions

View File

@ -22,8 +22,26 @@ package org.apache.druid.timeline;
import org.apache.druid.guice.annotations.PublicApi;
import org.joda.time.Interval;
/**
* A logical segment can represent an entire segment or a part of a segment. As a result, it can have a different
* interval from its actual base segment. {@link #getInterval()} and {@link #getTrueInterval()} return the interval of
* this logical segment and the interval of the base segment, respectively.
*
* For example, suppose we have 2 segments as below:
*
* - Segment A has an interval of 2017/2018.
* - Segment B has an interval of 2017-08-01/2017-08-02.
*
* For these segments, {@link VersionedIntervalTimeline#lookup} returns 3 segments as below:
*
* - interval of 2017/2017-08-01 (trueInterval: 2017/2018)
* - interval of 2017-08-01/2017-08-02 (trueInterval: 2017-08-01/2017-08-02)
* - interval of 2017-08-02/2018 (trueInterval: 2017/2018)
*/
@PublicApi
public interface LogicalSegment
{
Interval getInterval();
Interval getTrueInterval();
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.timeline;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
@ -27,16 +28,25 @@ import org.joda.time.Interval;
public class TimelineObjectHolder<VersionType, ObjectType> implements LogicalSegment
{
private final Interval interval;
private final Interval trueInterval;
private final VersionType version;
private final PartitionHolder<ObjectType> object;
@VisibleForTesting
public TimelineObjectHolder(Interval interval, VersionType version, PartitionHolder<ObjectType> object)
{
this(interval, interval, version, object);
}
public TimelineObjectHolder(
Interval interval,
Interval trueInterval,
VersionType version,
PartitionHolder<ObjectType> object
)
{
this.interval = interval;
this.trueInterval = trueInterval;
this.version = version;
this.object = object;
}
@ -47,6 +57,12 @@ public class TimelineObjectHolder<VersionType, ObjectType> implements LogicalSeg
return interval;
}
@Override
public Interval getTrueInterval()
{
return trueInterval;
}
public VersionType getVersion()
{
return version;
@ -62,6 +78,7 @@ public class TimelineObjectHolder<VersionType, ObjectType> implements LogicalSeg
{
return "TimelineObjectHolder{" +
"interval=" + interval +
", trueInterval=" + trueInterval +
", version=" + version +
", object=" + object +
'}';

View File

@ -300,6 +300,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
private TimelineObjectHolder<VersionType, ObjectType> timelineEntryToObjectHolder(TimelineEntry entry)
{
return new TimelineObjectHolder<>(
entry.getTrueInterval(),
entry.getTrueInterval(),
entry.getVersion(),
new PartitionHolder<>(entry.getPartitionHolder())
@ -586,10 +587,11 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
if (timelineInterval.overlaps(interval)) {
retVal.add(
new TimelineObjectHolder<VersionType, ObjectType>(
new TimelineObjectHolder<>(
timelineInterval,
val.getTrueInterval(),
val.getVersion(),
new PartitionHolder<ObjectType>(val.getPartitionHolder())
new PartitionHolder<>(val.getPartitionHolder())
)
);
}
@ -604,8 +606,9 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
.isAfter(firstEntry.getInterval().getStart())) {
retVal.set(
0,
new TimelineObjectHolder<VersionType, ObjectType>(
new TimelineObjectHolder<>(
new Interval(interval.getStart(), firstEntry.getInterval().getEnd()),
firstEntry.getTrueInterval(),
firstEntry.getVersion(),
firstEntry.getObject()
)
@ -616,8 +619,9 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
if (interval.overlaps(lastEntry.getInterval()) && interval.getEnd().isBefore(lastEntry.getInterval().getEnd())) {
retVal.set(
retVal.size() - 1,
new TimelineObjectHolder<VersionType, ObjectType>(
new TimelineObjectHolder<>(
new Interval(lastEntry.getInterval().getStart(), interval.getEnd()),
lastEntry.getTrueInterval(),
lastEntry.getVersion(),
lastEntry.getObject()
)

View File

@ -22,9 +22,6 @@ package org.apache.druid.query.datasourcemetadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -42,6 +39,7 @@ import org.apache.druid.timeline.LogicalSegment;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*/
@ -68,19 +66,9 @@ public class DataSourceQueryQueryToolChest
final T max = segments.get(segments.size() - 1);
return Lists.newArrayList(
Iterables.filter(
segments,
new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return max != null && input.getInterval().overlaps(max.getInterval());
}
}
)
);
return segments.stream()
.filter(input -> max != null && input.getInterval().overlaps(max.getTrueInterval()))
.collect(Collectors.toList());
}
@Override

View File

@ -23,8 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
@ -46,6 +44,7 @@ import org.apache.druid.timeline.LogicalSegment;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
*/
@ -85,20 +84,10 @@ public class TimeBoundaryQueryQueryToolChest
final T min = query.isMaxTime() ? null : segments.get(0);
final T max = query.isMinTime() ? null : segments.get(segments.size() - 1);
return Lists.newArrayList(
Iterables.filter(
segments,
new Predicate<T>()
{
@Override
public boolean apply(T input)
{
return (min != null && input.getInterval().overlaps(min.getInterval())) ||
(max != null && input.getInterval().overlaps(max.getInterval()));
}
}
)
);
return segments.stream()
.filter(input -> (min != null && input.getInterval().overlaps(min.getTrueInterval())) ||
(max != null && input.getInterval().overlaps(max.getTrueInterval())))
.collect(Collectors.toList());
}
@Override

View File

@ -164,6 +164,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2012-01-01/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -172,6 +178,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2012-01-01T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -180,6 +192,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2013-01-01/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -188,6 +206,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2013-01-01T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -196,6 +220,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2013-01-01T02/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
}
)
);
@ -210,6 +240,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2013-01-01/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -218,6 +254,12 @@ public class DataSourceMetadataQueryTest
{
return Intervals.of("2013-01-01T02/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
}
);
@ -226,6 +268,143 @@ public class DataSourceMetadataQueryTest
}
}
@Test
public void testFilterOverlappingSegments()
{
final GenericQueryMetricsFactory queryMetricsFactory = DefaultGenericQueryMetricsFactory.instance();
final DataSourceQueryQueryToolChest toolChest = new DataSourceQueryQueryToolChest(queryMetricsFactory);
final List<LogicalSegment> segments = toolChest
.filterSegments(
null,
ImmutableList.of(
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2015/2016-08-01");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2015/2016-08-01");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2016-08-01/2017");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2016-08-01/2017");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017/2017-08-01");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017/2018");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017-08-01/2017-08-02");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017-08-01/2017-08-02");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017-08-02/2018");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017/2018");
}
}
)
);
final List<LogicalSegment> expected = ImmutableList.of(
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017/2017-08-01");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017/2018");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017-08-01/2017-08-02");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017-08-01/2017-08-02");
}
},
new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of("2017-08-02/2018");
}
@Override
public Interval getTrueInterval()
{
return Intervals.of("2017/2018");
}
}
);
Assert.assertEquals(expected.size(), segments.size());
for (int i = 0; i < expected.size(); i++) {
Assert.assertEquals(expected.get(i).getInterval(), segments.get(i).getInterval());
Assert.assertEquals(expected.get(i).getTrueInterval(), segments.get(i).getTrueInterval());
}
}
@Test
public void testResultSerialization()
{

View File

@ -39,6 +39,7 @@ import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.LegacySegmentSpec;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.timeline.LogicalSegment;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@ -292,7 +293,20 @@ public class SegmentMetadataQueryQueryToolChestTest
"2000-01-09/P1D"
)
.stream()
.map(interval -> (LogicalSegment) () -> Intervals.of(interval))
.map(interval -> new LogicalSegment()
{
@Override
public Interval getInterval()
{
return Intervals.of(interval);
}
@Override
public Interval getTrueInterval()
{
return Intervals.of(interval);
}
})
.collect(Collectors.toList())
);

View File

@ -923,6 +923,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2012-01-01/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -931,6 +937,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2012-01-01T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -939,6 +951,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2013-01-05/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -947,6 +965,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2013-05-20/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -955,6 +979,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2014-01-05/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -963,6 +993,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2014-02-05/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -971,6 +1007,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-19T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -979,6 +1021,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-20T02/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
}
);
@ -998,6 +1046,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-19T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -1006,6 +1060,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-20T02/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
}
);
@ -1031,6 +1091,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2013-05-20/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -1039,6 +1105,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2014-01-05/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -1047,6 +1119,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2014-02-05/P1D");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -1055,6 +1133,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-19T01/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
},
new LogicalSegment()
{
@ -1063,6 +1147,12 @@ public class SegmentMetadataQueryTest
{
return Intervals.of("2015-01-20T02/PT1H");
}
@Override
public Interval getTrueInterval()
{
return getInterval();
}
}
);

View File

@ -73,6 +73,11 @@ public class TimeBoundaryQueryQueryToolChestTest
.build();
private static LogicalSegment createLogicalSegment(final Interval interval)
{
return createLogicalSegment(interval, interval);
}
private static LogicalSegment createLogicalSegment(final Interval interval, final Interval trueInterval)
{
return new LogicalSegment()
{
@ -81,6 +86,12 @@ public class TimeBoundaryQueryQueryToolChestTest
{
return interval;
}
@Override
public Interval getTrueInterval()
{
return trueInterval;
}
};
}
@ -116,6 +127,35 @@ public class TimeBoundaryQueryQueryToolChestTest
}
}
@Test
public void testFilterOverlapingSegments()
{
final List<LogicalSegment> actual = new TimeBoundaryQueryQueryToolChest().filterSegments(
TIME_BOUNDARY_QUERY,
Arrays.asList(
createLogicalSegment(Intervals.of("2015/2016-08-01")),
createLogicalSegment(Intervals.of("2016-08-01/2017")),
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018"))
)
);
final List<LogicalSegment> expected = Arrays.asList(
createLogicalSegment(Intervals.of("2015/2016-08-01")),
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018"))
);
Assert.assertEquals(expected.size(), actual.size());
for (int i = 0; i < actual.size(); i++) {
Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval());
Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval());
}
}
@Test
public void testMaxTimeFilterSegments()
{
@ -145,6 +185,62 @@ public class TimeBoundaryQueryQueryToolChestTest
}
}
@Test
public void testMaxTimeFilterOverlapingSegments()
{
final List<LogicalSegment> actual = new TimeBoundaryQueryQueryToolChest().filterSegments(
MAXTIME_BOUNDARY_QUERY,
Arrays.asList(
createLogicalSegment(Intervals.of("2015/2016-08-01")),
createLogicalSegment(Intervals.of("2016-08-01/2017")),
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018"))
)
);
final List<LogicalSegment> expected = Arrays.asList(
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018"))
);
Assert.assertEquals(expected.size(), actual.size());
for (int i = 0; i < actual.size(); i++) {
Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval());
Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval());
}
}
@Test
public void testMinTimeFilterOverlapingSegments()
{
final List<LogicalSegment> actual = new TimeBoundaryQueryQueryToolChest().filterSegments(
MINTIME_BOUNDARY_QUERY,
Arrays.asList(
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2018/2018-08-01")),
createLogicalSegment(Intervals.of("2018-08-01/2019"))
)
);
final List<LogicalSegment> expected = Arrays.asList(
createLogicalSegment(Intervals.of("2017/2017-08-01"), Intervals.of("2017/2018")),
createLogicalSegment(Intervals.of("2017-08-01/2017-08-02")),
createLogicalSegment(Intervals.of("2017-08-02/2018"), Intervals.of("2017/2018"))
);
Assert.assertEquals(expected.size(), actual.size());
for (int i = 0; i < actual.size(); i++) {
Assert.assertEquals(expected.get(i).getInterval(), actual.get(i).getInterval());
Assert.assertEquals(expected.get(i).getTrueInterval(), actual.get(i).getTrueInterval());
}
}
@Test
public void testMinTimeFilterSegments()
{
@ -192,6 +288,7 @@ public class TimeBoundaryQueryQueryToolChestTest
Assert.assertEquals(7, segments.size());
}
@Test
public void testCacheStrategy() throws Exception
{