Fix time-ordered scan queries on realtime segments (#7546)

* Initial commit

* Added test for int to long conversion

* Add appenderator test for realtime scan query

* get rid of todo

* Fix forbidden apis

* Jon's recommendations

* Formatting
This commit is contained in:
Justin Borromeo 2019-04-26 16:12:10 -07:00 committed by Jonathan Wei
parent ebdf07b69f
commit 07dd742e35
9 changed files with 257 additions and 118 deletions

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import org.apache.druid.java.util.common.Pair;
import org.joda.time.Interval;
import java.util.Iterator;
public class SinkQueryRunners<T> implements Iterable<QueryRunner<T>>
{
Iterable<Pair<Interval, QueryRunner<T>>> runners;
public SinkQueryRunners(Iterable<Pair<Interval, QueryRunner<T>>> runners)
{
this.runners = runners;
}
public Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalMappingIterator()
{
return runners.iterator();
}
@Override
public Iterator<QueryRunner<T>> iterator()
{
Iterator<Pair<Interval, QueryRunner<T>>> runnerIntervalIterator = runners.iterator();
return new Iterator<QueryRunner<T>>()
{
@Override
public boolean hasNext()
{
return runnerIntervalIterator.hasNext();
}
@Override
public QueryRunner<T> next()
{
return runnerIntervalIterator.next().rhs;
}
};
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.query.scan;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
@ -39,6 +38,7 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -114,11 +114,11 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
return returnedRows;
}
} else {
List<SegmentDescriptor> descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<Interval> intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec());
List<QueryRunner<ScanResultValue>> queryRunnersOrdered = Lists.newArrayList(queryRunners);
if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) {
descriptorsOrdered = Lists.reverse(descriptorsOrdered);
intervalsOrdered = Lists.reverse(intervalsOrdered);
queryRunnersOrdered = Lists.reverse(queryRunnersOrdered);
}
int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null
@ -132,28 +132,29 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
input -> input.run(queryPlus, responseContext)
)),
query,
descriptorsOrdered
intervalsOrdered
);
} else {
Preconditions.checkState(
descriptorsOrdered.size() == queryRunnersOrdered.size(),
"Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!"
);
// Combine the two lists of segment descriptors and query runners into a single list of
// segment descriptors - query runner pairs. This makes it easier to use stream operators.
List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>> descriptorsAndRunnersOrdered = new ArrayList<>();
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i)));
// Use n-way merge strategy
List<Pair<Interval, QueryRunner<ScanResultValue>>> intervalsAndRunnersOrdered = new ArrayList<>();
if (intervalsOrdered.size() == queryRunnersOrdered.size()) {
for (int i = 0; i < queryRunnersOrdered.size(); i++) {
intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i)));
}
} else if (queryRunners instanceof SinkQueryRunners) {
((SinkQueryRunners<ScanResultValue>) queryRunners).runnerIntervalMappingIterator()
.forEachRemaining(intervalsAndRunnersOrdered::add);
} else {
throw new ISE("Number of segment descriptors does not equal number of "
+ "query runners...something went wrong!");
}
// Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the
// query runners for that segment
LinkedHashMap<Interval, List<Pair<SegmentDescriptor, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
descriptorsAndRunnersOrdered.stream()
LinkedHashMap<Interval, List<Pair<Interval, QueryRunner<ScanResultValue>>>> partitionsGroupedByInterval =
intervalsAndRunnersOrdered.stream()
.collect(Collectors.groupingBy(
x -> x.lhs.getInterval(),
x -> x.lhs,
LinkedHashMap::new,
Collectors.toList()
));
@ -167,9 +168,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.max(Comparator.comparing(Integer::valueOf))
.get();
int segmentPartitionLimit = (query.getMaxSegmentPartitionsOrderedInMemory() == null
? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
: query.getMaxSegmentPartitionsOrderedInMemory());
int segmentPartitionLimit = query.getMaxSegmentPartitionsOrderedInMemory() == null
? scanQueryConfig.getMaxSegmentPartitionsOrderedInMemory()
: query.getMaxSegmentPartitionsOrderedInMemory();
if (maxNumPartitionsInSegment <= segmentPartitionLimit) {
// Use n-way merge strategy
@ -205,7 +206,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
Sequence<ScanResultValue> priorityQueueSortAndLimit(
Sequence<ScanResultValue> inputSequence,
ScanQuery scanQuery,
List<SegmentDescriptor> descriptorsOrdered
List<Interval> intervalsOrdered
)
{
Comparator<ScanResultValue> priorityQComparator = new ScanResultValueTimestampComparator(scanQuery);
@ -254,9 +255,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
// Finish scanning the interval containing the limit row
if (numRowsScanned > limit && finalInterval == null) {
long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat());
for (SegmentDescriptor descriptor : descriptorsOrdered) {
if (descriptor.getInterval().contains(timestampOfLimitRow)) {
finalInterval = descriptor.getInterval();
for (Interval interval : intervalsOrdered) {
if (interval.contains(timestampOfLimitRow)) {
finalInterval = interval;
}
}
if (finalInterval == null) {
@ -280,23 +281,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
@VisibleForTesting
List<SegmentDescriptor> getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec)
List<Interval> getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec)
{
// Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because
// segment descriptors need to be present for a 1:1 matching of intervals with query runners.
// The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating
// the 1:1 relationship between intervals and query runners.
List<SegmentDescriptor> descriptorsOrdered;
List<Interval> descriptorsOrdered;
if (spec instanceof MultipleSpecificSegmentSpec) {
// Ascending time order for both descriptors and query runners by default
descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors();
descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors()
.stream()
.map(SegmentDescriptor::getInterval)
.collect(Collectors.toList());
} else if (spec instanceof SpecificSegmentSpec) {
descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor());
descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor().getInterval());
} else {
throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
spec.getClass().getSimpleName());
throw new UOE(
"Time-ordering on scan queries is only supported for queries with segment specs"
+ "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.",
spec.getClass().getSimpleName()
);
}
return descriptorsOrdered;
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
@ -79,18 +80,18 @@ public class ScanResultValue implements Comparable<ScanResultValue>
public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat)
{
if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) {
Long timestamp = (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
if (timestamp == null) {
Object timestampObj = ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
if (timestampObj == null) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
return timestamp;
return DimensionHandlerUtils.convertObjectToLong(timestampObj);
} else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (timeColumnIndex == -1) {
throw new ISE("Unable to compare timestamp for rows without a time column");
}
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long) firstEvent.get(timeColumnIndex);
return DimensionHandlerUtils.convertObjectToLong(firstEvent.get(timeColumnIndex));
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString());
}
@ -105,7 +106,6 @@ public class ScanResultValue implements Comparable<ScanResultValue>
return singleEventScanResultValues;
}
@Override
public boolean equals(Object o)
{

View File

@ -140,10 +140,10 @@ public class ScanQueryRunnerFactoryTest
List<ScanResultValue> output = factory.priorityQueueSortAndLimit(
inputSequence,
query,
ImmutableList.of(new SegmentDescriptor(new Interval(
ImmutableList.of(new Interval(
DateTimes.of("2010-01-01"),
DateTimes.of("2019-01-01").plusHours(1)
), "1", 0))
))
).toList();
if (query.getLimit() > Integer.MAX_VALUE) {
Assert.fail("Unsupported exception should have been thrown due to high limit");
@ -275,7 +275,7 @@ public class ScanQueryRunnerFactoryTest
), "1", 0);
@Test
public void testGetValidSegmentDescriptorsFromSpec()
public void testGetValidIntervalsFromSpec()
{
QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec(
Collections.singletonList(
@ -284,13 +284,13 @@ public class ScanQueryRunnerFactoryTest
);
QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor);
List<SegmentDescriptor> descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec);
Assert.assertEquals(1, descriptors.size());
Assert.assertEquals(descriptor, descriptors.get(0));
List<Interval> intervals = factory.getIntervalsFromSpecificQuerySpec(multiSpecificSpec);
Assert.assertEquals(1, intervals.size());
Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec);
Assert.assertEquals(1, descriptors.size());
Assert.assertEquals(descriptor, descriptors.get(0));
intervals = factory.getIntervalsFromSpecificQuerySpec(singleSpecificSpec);
Assert.assertEquals(1, intervals.size());
Assert.assertEquals(descriptor.getInterval(), intervals.get(0));
}
@Test(expected = UOE.class)
@ -304,7 +304,7 @@ public class ScanQueryRunnerFactoryTest
)
)
);
factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec);
factory.getIntervalsFromSpecificQuerySpec(multiIntervalSpec);
}
@Test(expected = UOE.class)
@ -316,7 +316,7 @@ public class ScanQueryRunnerFactoryTest
DateTimes.of("2019-01-01").plusHours(1)
)
);
factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec);
factory.getIntervalsFromSpecificQuerySpec(legacySpec);
}
}
}

View File

@ -36,11 +36,14 @@ import java.util.Map;
public class ScanResultValueTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final long TIME_1 = 1234567890000L;
private static final long TIME_2 = 9876543210000L;
private static final long TIME_1_LONG = 1234567890000L;
private static final long TIME_2_LONG = 9876543210000L;
private static final int TIME_3_INT = Integer.MAX_VALUE;
private static ScanResultValue compactedListSRV;
private static ScanResultValue listSRV;
private static ScanResultValue compactedListSRVLongTimestamp;
private static ScanResultValue listSRVLongTimestamp;
private static ScanResultValue compactedListSRVIntegerTimestamp;
private static ScanResultValue listSRVIntegerTimestamp;
@BeforeClass
public static void setup()
@ -48,73 +51,93 @@ public class ScanResultValueTest
String segmentId = "some_segment_id";
List<String> columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count"));
List<Object> event = new ArrayList<>(Arrays.asList(
TIME_1,
TIME_1_LONG,
"Feridun",
4
));
List<Object> event2 = new ArrayList<>(Arrays.asList(
TIME_2,
TIME_2_LONG,
"Justin",
6
));
List<List<Object>> events = Arrays.asList(event, event2);
compactedListSRV = new ScanResultValue(segmentId, columns, events);
compactedListSRVLongTimestamp = new ScanResultValue(segmentId, columns, events);
List<Object> eventInt = new ArrayList<>(Arrays.asList(
TIME_3_INT,
"Feridun",
4
));
List<List<Object>> eventsInt = Arrays.asList(eventInt, event2);
compactedListSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventsInt);
Map<String, Object> eventMap1 = new HashMap<>();
eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1);
eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1_LONG);
eventMap1.put("name", "Feridun");
eventMap1.put("count", 4);
Map<String, Object> eventMap2 = new HashMap<>();
eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2);
eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2_LONG);
eventMap2.put("name", "Justin");
eventMap2.put("count", 6);
List<Map<String, Object>> eventMaps = Arrays.asList(eventMap1, eventMap2);
listSRV = new ScanResultValue(segmentId, columns, eventMaps);
listSRVLongTimestamp = new ScanResultValue(segmentId, columns, eventMaps);
Map<String, Object> eventMap3 = new HashMap<>();
eventMap3.put(ColumnHolder.TIME_COLUMN_NAME, TIME_3_INT);
eventMap3.put("name", "Justin");
eventMap3.put("count", 6);
List<Map<String, Object>> eventMapsInt = Arrays.asList(eventMap3, eventMap2);
listSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventMapsInt);
}
@Test
public void testSerdeScanResultValueCompactedList() throws IOException
{
String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV);
String serialized = JSON_MAPPER.writeValueAsString(compactedListSRVLongTimestamp);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(compactedListSRV, deserialized);
Assert.assertEquals(compactedListSRVLongTimestamp, deserialized);
}
@Test
public void testSerdeScanResultValueNonCompactedList() throws IOException
{
String serialized = JSON_MAPPER.writeValueAsString(listSRV);
String serialized = JSON_MAPPER.writeValueAsString(listSRVLongTimestamp);
ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class);
Assert.assertEquals(listSRV, deserialized);
Assert.assertEquals(listSRVLongTimestamp, deserialized);
}
@Test
public void testGetFirstEventTimestampCompactedList()
{
long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
Assert.assertEquals(TIME_1, timestamp);
long timestamp = compactedListSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
Assert.assertEquals(TIME_1_LONG, timestamp);
long timestampInt = compactedListSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST);
Assert.assertEquals(TIME_3_INT, timestampInt);
}
@Test
public void testGetFirstEventTimestampNonCompactedList()
{
long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
Assert.assertEquals(TIME_1, timestamp);
long timestamp = listSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
Assert.assertEquals(TIME_1_LONG, timestamp);
long timestampInt = listSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST);
Assert.assertEquals(TIME_3_INT, timestampInt);
}
@Test
public void testToSingleEventScanResultValues()
{
List<ScanResultValue> compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues();
List<ScanResultValue> compactedListScanResultValues = compactedListSRVLongTimestamp.toSingleEventScanResultValues();
for (ScanResultValue srv : compactedListScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}
List<ScanResultValue> listScanResultValues = listSRV.toSingleEventScanResultValues();
for (ScanResultValue srv : compactedListScanResultValues) {
List<ScanResultValue> listScanResultValues = listSRVLongTimestamp.toSingleEventScanResultValues();
for (ScanResultValue srv : listScanResultValues) {
List<Object> events = (List<Object>) srv.getEvents();
Assert.assertEquals(1, events.size());
}

View File

@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.CloseQuietly;
@ -49,6 +50,7 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -180,47 +182,40 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
FunctionalIterable
.create(specs)
.transform(
new Function<SegmentDescriptor, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final SegmentDescriptor descriptor)
{
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
descriptor -> {
final PartitionHolder<Sink> holder = sinkTimeline.findEntry(
descriptor.getInterval(),
descriptor.getVersion()
);
if (holder == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final PartitionChunk<Sink> chunk = holder.getChunk(descriptor.getPartitionNumber());
if (chunk == null) {
return new ReportTimelineMissingSegmentQueryRunner<>(descriptor);
}
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
final Sink theSink = chunk.getObject();
final SegmentId sinkSegmentId = theSink.getSegment().getId();
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
Iterables.transform(
theSink,
new Function<FireHydrant, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(final FireHydrant hydrant)
{
return new SpecificSegmentQueryRunner<>(
withPerSinkMetrics(
new BySegmentQueryRunner<>(
sinkSegmentId,
descriptor.getInterval().getStart(),
factory.mergeRunners(
Execs.directExecutor(),
new SinkQueryRunners<>(
Iterables.transform(
theSink,
hydrant -> {
// Hydrant might swap at any point, but if it's swapped at the start
// then we know it's *definitely* swapped.
final boolean hydrantDefinitelySwapped = hydrant.hasSwapped();
if (skipIncrementalSegment && !hydrantDefinitelySwapped) {
return new NoopQueryRunner<>();
return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>());
}
// Prevent the underlying segment from swapping when its being iterated
@ -234,7 +229,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// 1) Only use caching if data is immutable
// 2) Hydrants are not the same between replicas, make sure cache is local
if (hydrantDefinitelySwapped && cache.isLocal()) {
return new CachingQueryRunner<>(
QueryRunner<T> cachingRunner = new CachingQueryRunner<>(
makeHydrantCacheIdentifier(hydrant),
descriptor,
objectMapper,
@ -249,8 +244,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
),
cacheConfig
);
return new Pair<>(segment.lhs.getDataInterval(), cachingRunner);
} else {
return baseRunner;
return new Pair<>(segment.lhs.getDataInterval(), baseRunner);
}
}
catch (RuntimeException e) {
@ -258,17 +254,16 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
throw e;
}
}
}
)
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
)
)
)
),
toolChest,
sinkSegmentId,
cpuTimeAccumulator
),
new SpecificSegmentSpec(descriptor)
);
}
)
)

View File

@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Sink implements Iterable<FireHydrant>
{
private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
private static final IncrementalIndexAddResult ALREADY_SWAPPED =
new IncrementalIndexAddResult(-1, -1, null, "write after index swapped");
private final Object hydrantLock = new Object();
private final Interval interval;
@ -64,7 +65,7 @@ public class Sink implements Iterable<FireHydrant>
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean reportParseExceptions;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
private final LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
private volatile FireHydrant currHydrant;

View File

@ -37,6 +37,8 @@ import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
@ -49,6 +51,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -730,7 +733,7 @@ public class AppenderatorTest
final List<Result<TimeseriesResultValue>> results3 =
QueryPlus.wrap(query3).run(appenderator, ImmutableMap.of()).toList();
Assert.assertEquals(
"query2",
"query3",
ImmutableList.of(
new Result<>(
DateTimes.of("2001"),
@ -739,6 +742,42 @@ public class AppenderatorTest
),
results3
);
final ScanQuery query4 = Druids.newScanQueryBuilder()
.dataSource(AppenderatorTester.DATASOURCE)
.intervals(
new MultipleSpecificSegmentSpec(
ImmutableList.of(
new SegmentDescriptor(
Intervals.of("2001/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
),
new SegmentDescriptor(
Intervals.of("2001T03/PT1H"),
IDENTIFIERS.get(2).getVersion(),
IDENTIFIERS.get(2).getShardSpec().getPartitionNum()
)
)
)
)
.order(ScanQuery.Order.ASCENDING)
.batchSize(10)
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.build();
final List<ScanResultValue> results4 =
QueryPlus.wrap(query4).run(appenderator, new HashMap<>()).toList();
Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L},
((List<Object>) ((List<Object>) results4.get(0).getEvents()).get(0)).toArray()
);
Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray());
Assert.assertArrayEquals(
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
);
}
}

View File

@ -35,12 +35,18 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
@ -48,6 +54,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@ -241,6 +248,14 @@ public class AppenderatorTester implements AutoCloseable
),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
),
ScanQuery.class, new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper())
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
),