diff --git a/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java new file mode 100644 index 00000000000..9cec39a61b1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/SinkQueryRunners.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import org.apache.druid.java.util.common.Pair; +import org.joda.time.Interval; + +import java.util.Iterator; + +public class SinkQueryRunners implements Iterable> +{ + Iterable>> runners; + + public SinkQueryRunners(Iterable>> runners) + { + this.runners = runners; + } + + public Iterator>> runnerIntervalMappingIterator() + { + return runners.iterator(); + } + + @Override + public Iterator> iterator() + { + Iterator>> runnerIntervalIterator = runners.iterator(); + return new Iterator>() + { + @Override + public boolean hasNext() + { + return runnerIntervalIterator.hasNext(); + } + + @Override + public QueryRunner next() + { + return runnerIntervalIterator.next().rhs; + } + }; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java index 0a9f3b9b52d..c34d58ce60d 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryRunnerFactory.java @@ -20,7 +20,6 @@ package org.apache.druid.query.scan; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.inject.Inject; @@ -39,6 +38,7 @@ import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.spec.QuerySegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -114,11 +114,11 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory descriptorsOrdered = getSegmentDescriptorsFromSpecificQuerySpec(query.getQuerySegmentSpec()); + List intervalsOrdered = getIntervalsFromSpecificQuerySpec(query.getQuerySegmentSpec()); List> queryRunnersOrdered = Lists.newArrayList(queryRunners); if (query.getOrder().equals(ScanQuery.Order.DESCENDING)) { - descriptorsOrdered = Lists.reverse(descriptorsOrdered); + intervalsOrdered = Lists.reverse(intervalsOrdered); queryRunnersOrdered = Lists.reverse(queryRunnersOrdered); } int maxRowsQueuedForOrdering = (query.getMaxRowsQueuedForOrdering() == null @@ -132,28 +132,29 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory input.run(queryPlus, responseContext) )), query, - descriptorsOrdered + intervalsOrdered ); } else { - Preconditions.checkState( - descriptorsOrdered.size() == queryRunnersOrdered.size(), - "Number of segment descriptors does not equal number of " - + "query runners...something went wrong!" - ); - - // Combine the two lists of segment descriptors and query runners into a single list of - // segment descriptors - query runner pairs. This makes it easier to use stream operators. - List>> descriptorsAndRunnersOrdered = new ArrayList<>(); - for (int i = 0; i < queryRunnersOrdered.size(); i++) { - descriptorsAndRunnersOrdered.add(new Pair<>(descriptorsOrdered.get(i), queryRunnersOrdered.get(i))); + // Use n-way merge strategy + List>> intervalsAndRunnersOrdered = new ArrayList<>(); + if (intervalsOrdered.size() == queryRunnersOrdered.size()) { + for (int i = 0; i < queryRunnersOrdered.size(); i++) { + intervalsAndRunnersOrdered.add(new Pair<>(intervalsOrdered.get(i), queryRunnersOrdered.get(i))); + } + } else if (queryRunners instanceof SinkQueryRunners) { + ((SinkQueryRunners) queryRunners).runnerIntervalMappingIterator() + .forEachRemaining(intervalsAndRunnersOrdered::add); + } else { + throw new ISE("Number of segment descriptors does not equal number of " + + "query runners...something went wrong!"); } // Group the list of pairs by interval. The LinkedHashMap will have an interval paired with a list of all the // query runners for that segment - LinkedHashMap>>> partitionsGroupedByInterval = - descriptorsAndRunnersOrdered.stream() + LinkedHashMap>>> partitionsGroupedByInterval = + intervalsAndRunnersOrdered.stream() .collect(Collectors.groupingBy( - x -> x.lhs.getInterval(), + x -> x.lhs, LinkedHashMap::new, Collectors.toList() )); @@ -167,9 +168,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory priorityQueueSortAndLimit( Sequence inputSequence, ScanQuery scanQuery, - List descriptorsOrdered + List intervalsOrdered ) { Comparator priorityQComparator = new ScanResultValueTimestampComparator(scanQuery); @@ -254,9 +255,9 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory limit && finalInterval == null) { long timestampOfLimitRow = srv.getFirstEventTimestamp(scanQuery.getResultFormat()); - for (SegmentDescriptor descriptor : descriptorsOrdered) { - if (descriptor.getInterval().contains(timestampOfLimitRow)) { - finalInterval = descriptor.getInterval(); + for (Interval interval : intervalsOrdered) { + if (interval.contains(timestampOfLimitRow)) { + finalInterval = interval; } } if (finalInterval == null) { @@ -280,23 +281,28 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory getSegmentDescriptorsFromSpecificQuerySpec(QuerySegmentSpec spec) + List getIntervalsFromSpecificQuerySpec(QuerySegmentSpec spec) { // Query segment spec must be an instance of MultipleSpecificSegmentSpec or SpecificSegmentSpec because // segment descriptors need to be present for a 1:1 matching of intervals with query runners. // The other types of segment spec condense the intervals (i.e. merge neighbouring intervals), eliminating // the 1:1 relationship between intervals and query runners. - List descriptorsOrdered; + List descriptorsOrdered; if (spec instanceof MultipleSpecificSegmentSpec) { // Ascending time order for both descriptors and query runners by default - descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors(); + descriptorsOrdered = ((MultipleSpecificSegmentSpec) spec).getDescriptors() + .stream() + .map(SegmentDescriptor::getInterval) + .collect(Collectors.toList()); } else if (spec instanceof SpecificSegmentSpec) { - descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor()); + descriptorsOrdered = Collections.singletonList(((SpecificSegmentSpec) spec).getDescriptor().getInterval()); } else { - throw new UOE("Time-ordering on scan queries is only supported for queries with segment specs" - + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", - spec.getClass().getSimpleName()); + throw new UOE( + "Time-ordering on scan queries is only supported for queries with segment specs" + + "of type MultipleSpecificSegmentSpec or SpecificSegmentSpec...a [%s] was received instead.", + spec.getClass().getSimpleName() + ); } return descriptorsOrdered; } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java index a05e7150766..8673b3479b0 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValue.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ColumnHolder; import javax.annotation.Nullable; @@ -79,18 +80,18 @@ public class ScanResultValue implements Comparable public long getFirstEventTimestamp(ScanQuery.ResultFormat resultFormat) { if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_LIST)) { - Long timestamp = (Long) ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); - if (timestamp == null) { + Object timestampObj = ((Map) ((List) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME); + if (timestampObj == null) { throw new ISE("Unable to compare timestamp for rows without a time column"); } - return timestamp; + return DimensionHandlerUtils.convertObjectToLong(timestampObj); } else if (resultFormat.equals(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)) { int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME); if (timeColumnIndex == -1) { throw new ISE("Unable to compare timestamp for rows without a time column"); } List firstEvent = (List) ((List) this.getEvents()).get(0); - return (Long) firstEvent.get(timeColumnIndex); + return DimensionHandlerUtils.convertObjectToLong(firstEvent.get(timeColumnIndex)); } throw new UOE("Unable to get first event timestamp using result format of [%s]", resultFormat.toString()); } @@ -105,7 +106,6 @@ public class ScanResultValue implements Comparable return singleEventScanResultValues; } - @Override public boolean equals(Object o) { diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java index 6559c65b70f..4b65f53fcf3 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerFactoryTest.java @@ -140,10 +140,10 @@ public class ScanQueryRunnerFactoryTest List output = factory.priorityQueueSortAndLimit( inputSequence, query, - ImmutableList.of(new SegmentDescriptor(new Interval( + ImmutableList.of(new Interval( DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1) - ), "1", 0)) + )) ).toList(); if (query.getLimit() > Integer.MAX_VALUE) { Assert.fail("Unsupported exception should have been thrown due to high limit"); @@ -275,7 +275,7 @@ public class ScanQueryRunnerFactoryTest ), "1", 0); @Test - public void testGetValidSegmentDescriptorsFromSpec() + public void testGetValidIntervalsFromSpec() { QuerySegmentSpec multiSpecificSpec = new MultipleSpecificSegmentSpec( Collections.singletonList( @@ -284,13 +284,13 @@ public class ScanQueryRunnerFactoryTest ); QuerySegmentSpec singleSpecificSpec = new SpecificSegmentSpec(descriptor); - List descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(multiSpecificSpec); - Assert.assertEquals(1, descriptors.size()); - Assert.assertEquals(descriptor, descriptors.get(0)); + List intervals = factory.getIntervalsFromSpecificQuerySpec(multiSpecificSpec); + Assert.assertEquals(1, intervals.size()); + Assert.assertEquals(descriptor.getInterval(), intervals.get(0)); - descriptors = factory.getSegmentDescriptorsFromSpecificQuerySpec(singleSpecificSpec); - Assert.assertEquals(1, descriptors.size()); - Assert.assertEquals(descriptor, descriptors.get(0)); + intervals = factory.getIntervalsFromSpecificQuerySpec(singleSpecificSpec); + Assert.assertEquals(1, intervals.size()); + Assert.assertEquals(descriptor.getInterval(), intervals.get(0)); } @Test(expected = UOE.class) @@ -304,7 +304,7 @@ public class ScanQueryRunnerFactoryTest ) ) ); - factory.getSegmentDescriptorsFromSpecificQuerySpec(multiIntervalSpec); + factory.getIntervalsFromSpecificQuerySpec(multiIntervalSpec); } @Test(expected = UOE.class) @@ -316,7 +316,7 @@ public class ScanQueryRunnerFactoryTest DateTimes.of("2019-01-01").plusHours(1) ) ); - factory.getSegmentDescriptorsFromSpecificQuerySpec(legacySpec); + factory.getIntervalsFromSpecificQuerySpec(legacySpec); } } } diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java index 47f82ad8ce5..90566aa202c 100644 --- a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueTest.java @@ -36,11 +36,14 @@ import java.util.Map; public class ScanResultValueTest { private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - private static final long TIME_1 = 1234567890000L; - private static final long TIME_2 = 9876543210000L; + private static final long TIME_1_LONG = 1234567890000L; + private static final long TIME_2_LONG = 9876543210000L; + private static final int TIME_3_INT = Integer.MAX_VALUE; - private static ScanResultValue compactedListSRV; - private static ScanResultValue listSRV; + private static ScanResultValue compactedListSRVLongTimestamp; + private static ScanResultValue listSRVLongTimestamp; + private static ScanResultValue compactedListSRVIntegerTimestamp; + private static ScanResultValue listSRVIntegerTimestamp; @BeforeClass public static void setup() @@ -48,73 +51,93 @@ public class ScanResultValueTest String segmentId = "some_segment_id"; List columns = new ArrayList<>(Arrays.asList(ColumnHolder.TIME_COLUMN_NAME, "name", "count")); List event = new ArrayList<>(Arrays.asList( - TIME_1, + TIME_1_LONG, "Feridun", 4 )); List event2 = new ArrayList<>(Arrays.asList( - TIME_2, + TIME_2_LONG, "Justin", 6 )); List> events = Arrays.asList(event, event2); - compactedListSRV = new ScanResultValue(segmentId, columns, events); + compactedListSRVLongTimestamp = new ScanResultValue(segmentId, columns, events); + + List eventInt = new ArrayList<>(Arrays.asList( + TIME_3_INT, + "Feridun", + 4 + )); + + List> eventsInt = Arrays.asList(eventInt, event2); + compactedListSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventsInt); Map eventMap1 = new HashMap<>(); - eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1); + eventMap1.put(ColumnHolder.TIME_COLUMN_NAME, TIME_1_LONG); eventMap1.put("name", "Feridun"); eventMap1.put("count", 4); Map eventMap2 = new HashMap<>(); - eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2); + eventMap2.put(ColumnHolder.TIME_COLUMN_NAME, TIME_2_LONG); eventMap2.put("name", "Justin"); eventMap2.put("count", 6); List> eventMaps = Arrays.asList(eventMap1, eventMap2); - listSRV = new ScanResultValue(segmentId, columns, eventMaps); + listSRVLongTimestamp = new ScanResultValue(segmentId, columns, eventMaps); + + Map eventMap3 = new HashMap<>(); + eventMap3.put(ColumnHolder.TIME_COLUMN_NAME, TIME_3_INT); + eventMap3.put("name", "Justin"); + eventMap3.put("count", 6); + List> eventMapsInt = Arrays.asList(eventMap3, eventMap2); + listSRVIntegerTimestamp = new ScanResultValue(segmentId, columns, eventMapsInt); } @Test public void testSerdeScanResultValueCompactedList() throws IOException { - String serialized = JSON_MAPPER.writeValueAsString(compactedListSRV); + String serialized = JSON_MAPPER.writeValueAsString(compactedListSRVLongTimestamp); ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class); - Assert.assertEquals(compactedListSRV, deserialized); + Assert.assertEquals(compactedListSRVLongTimestamp, deserialized); } @Test public void testSerdeScanResultValueNonCompactedList() throws IOException { - String serialized = JSON_MAPPER.writeValueAsString(listSRV); + String serialized = JSON_MAPPER.writeValueAsString(listSRVLongTimestamp); ScanResultValue deserialized = JSON_MAPPER.readValue(serialized, ScanResultValue.class); - Assert.assertEquals(listSRV, deserialized); + Assert.assertEquals(listSRVLongTimestamp, deserialized); } @Test public void testGetFirstEventTimestampCompactedList() { - long timestamp = compactedListSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST); - Assert.assertEquals(TIME_1, timestamp); + long timestamp = compactedListSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST); + Assert.assertEquals(TIME_1_LONG, timestamp); + long timestampInt = compactedListSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST); + Assert.assertEquals(TIME_3_INT, timestampInt); } @Test public void testGetFirstEventTimestampNonCompactedList() { - long timestamp = listSRV.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST); - Assert.assertEquals(TIME_1, timestamp); + long timestamp = listSRVLongTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST); + Assert.assertEquals(TIME_1_LONG, timestamp); + long timestampInt = listSRVIntegerTimestamp.getFirstEventTimestamp(ScanQuery.ResultFormat.RESULT_FORMAT_LIST); + Assert.assertEquals(TIME_3_INT, timestampInt); } @Test public void testToSingleEventScanResultValues() { - List compactedListScanResultValues = compactedListSRV.toSingleEventScanResultValues(); + List compactedListScanResultValues = compactedListSRVLongTimestamp.toSingleEventScanResultValues(); for (ScanResultValue srv : compactedListScanResultValues) { List events = (List) srv.getEvents(); Assert.assertEquals(1, events.size()); } - List listScanResultValues = listSRV.toSingleEventScanResultValues(); - for (ScanResultValue srv : compactedListScanResultValues) { + List listScanResultValues = listSRVLongTimestamp.toSingleEventScanResultValues(); + for (ScanResultValue srv : listScanResultValues) { List events = (List) srv.getEvents(); Assert.assertEquals(1, events.size()); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 127c56301f4..7d08c0f7849 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -29,6 +29,7 @@ import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.CloseQuietly; @@ -49,6 +50,7 @@ import org.apache.druid.query.QuerySegmentWalker; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -180,47 +182,40 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker FunctionalIterable .create(specs) .transform( - new Function>() - { - @Override - public QueryRunner apply(final SegmentDescriptor descriptor) - { - final PartitionHolder holder = sinkTimeline.findEntry( - descriptor.getInterval(), - descriptor.getVersion() - ); - if (holder == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } + descriptor -> { + final PartitionHolder holder = sinkTimeline.findEntry( + descriptor.getInterval(), + descriptor.getVersion() + ); + if (holder == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } - final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); - if (chunk == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } + final PartitionChunk chunk = holder.getChunk(descriptor.getPartitionNumber()); + if (chunk == null) { + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } - final Sink theSink = chunk.getObject(); - final SegmentId sinkSegmentId = theSink.getSegment().getId(); + final Sink theSink = chunk.getObject(); + final SegmentId sinkSegmentId = theSink.getSegment().getId(); - return new SpecificSegmentQueryRunner<>( - withPerSinkMetrics( - new BySegmentQueryRunner<>( - sinkSegmentId, - descriptor.getInterval().getStart(), - factory.mergeRunners( - Execs.directExecutor(), - Iterables.transform( - theSink, - new Function>() - { - @Override - public QueryRunner apply(final FireHydrant hydrant) - { + return new SpecificSegmentQueryRunner<>( + withPerSinkMetrics( + new BySegmentQueryRunner<>( + sinkSegmentId, + descriptor.getInterval().getStart(), + factory.mergeRunners( + Execs.directExecutor(), + new SinkQueryRunners<>( + Iterables.transform( + theSink, + hydrant -> { // Hydrant might swap at any point, but if it's swapped at the start // then we know it's *definitely* swapped. final boolean hydrantDefinitelySwapped = hydrant.hasSwapped(); if (skipIncrementalSegment && !hydrantDefinitelySwapped) { - return new NoopQueryRunner<>(); + return new Pair<>(Intervals.ETERNITY, new NoopQueryRunner<>()); } // Prevent the underlying segment from swapping when its being iterated @@ -234,7 +229,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker // 1) Only use caching if data is immutable // 2) Hydrants are not the same between replicas, make sure cache is local if (hydrantDefinitelySwapped && cache.isLocal()) { - return new CachingQueryRunner<>( + QueryRunner cachingRunner = new CachingQueryRunner<>( makeHydrantCacheIdentifier(hydrant), descriptor, objectMapper, @@ -249,8 +244,9 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker ), cacheConfig ); + return new Pair<>(segment.lhs.getDataInterval(), cachingRunner); } else { - return baseRunner; + return new Pair<>(segment.lhs.getDataInterval(), baseRunner); } } catch (RuntimeException e) { @@ -258,17 +254,16 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker throw e; } } - } - ) - ) - ), - toolChest, - sinkSegmentId, - cpuTimeAccumulator - ), - new SpecificSegmentSpec(descriptor) - ); - } + ) + ) + ) + ), + toolChest, + sinkSegmentId, + cpuTimeAccumulator + ), + new SpecificSegmentSpec(descriptor) + ); } ) ) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index d2d72bacfa6..4e0c596cbdc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -54,7 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class Sink implements Iterable { - private static final IncrementalIndexAddResult ALREADY_SWAPPED = new IncrementalIndexAddResult(-1, -1, null, "write after index swapped"); + private static final IncrementalIndexAddResult ALREADY_SWAPPED = + new IncrementalIndexAddResult(-1, -1, null, "write after index swapped"); private final Object hydrantLock = new Object(); private final Interval interval; @@ -64,7 +65,7 @@ public class Sink implements Iterable private final int maxRowsInMemory; private final long maxBytesInMemory; private final boolean reportParseExceptions; - private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); + private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList<>(); private final LinkedHashSet dimOrder = new LinkedHashSet<>(); private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger(); private volatile FireHydrant currHydrant; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java index 334214d2174..e9a168dc093 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -37,6 +37,8 @@ import org.apache.druid.query.QueryPlus; import org.apache.druid.query.Result; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; @@ -49,6 +51,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -730,7 +733,7 @@ public class AppenderatorTest final List> results3 = QueryPlus.wrap(query3).run(appenderator, ImmutableMap.of()).toList(); Assert.assertEquals( - "query2", + "query3", ImmutableList.of( new Result<>( DateTimes.of("2001"), @@ -739,6 +742,42 @@ public class AppenderatorTest ), results3 ); + + final ScanQuery query4 = Druids.newScanQueryBuilder() + .dataSource(AppenderatorTester.DATASOURCE) + .intervals( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + new SegmentDescriptor( + Intervals.of("2001/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ), + new SegmentDescriptor( + Intervals.of("2001T03/PT1H"), + IDENTIFIERS.get(2).getVersion(), + IDENTIFIERS.get(2).getShardSpec().getPartitionNum() + ) + ) + ) + ) + .order(ScanQuery.Order.ASCENDING) + .batchSize(10) + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .build(); + final List results4 = + QueryPlus.wrap(query4).run(appenderator, new HashMap<>()).toList(); + Assert.assertEquals(2, results4.size()); // 2 segments, 1 row per segment + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001").getMillis(), "foo", 1L, 8L}, + ((List) ((List) results4.get(0).getEvents()).get(0)).toArray() + ); + Assert.assertArrayEquals(new String[]{"__time", "dim", "count", "met"}, results4.get(0).getColumns().toArray()); + Assert.assertArrayEquals( + new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L}, + ((List) ((List) results4.get(1).getEvents()).get(0)).toArray() + ); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index 3b6a9d92c47..af706dd5340 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -35,12 +35,18 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator; import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryEngine; import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; @@ -48,6 +54,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; @@ -241,6 +248,14 @@ public class AppenderatorTester implements AutoCloseable ), new TimeseriesQueryEngine(), QueryRunnerTestHelper.NOOP_QUERYWATCHER + ), + ScanQuery.class, new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory(TestHelper.makeJsonMapper()) + ), + new ScanQueryEngine(), + new ScanQueryConfig() ) ) ),