Fix checkstyle and test

This commit is contained in:
Justin Borromeo 2019-03-22 15:54:42 -07:00
parent 62dcedacde
commit a87d02127c
2 changed files with 12 additions and 9 deletions

View File

@ -177,20 +177,21 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
.collect(Collectors.toList()))
.collect(Collectors.toList());
// Starting from the innermost Sequences.map:
// (1) Deaggregate each ScanResultValue returned by the query runners
// (2) Combine the deaggregated ScanResultValues into a single sequence
// (3) Create a sequence of results from each runner in the group and flatmerge based on timestamp
// (4) Create a sequence of results from each runner group
// (5) Join all the results into a single sequence
return Sequences.concat( // (5)
Sequences.map( // (4)
return Sequences.concat(
Sequences.map(
Sequences.simple(groupedRunners),
runnerGroup ->
Sequences.map( // (3)
Sequences.map(
Sequences.simple(runnerGroup),
(input) -> Sequences.concat( // (2)
Sequences.map( // (1)
(input) -> Sequences.concat(
Sequences.map(
input.run(queryPlus, responseContext),
srv -> Sequences.simple(srv.toSingleEventScanResultValues())
)
@ -246,7 +247,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
}
}
);
boolean doneScanning = false;
boolean doneScanning = yielder.isDone();
// We need to scan limit elements and anything else in the last segment
int numRowsScanned = 0;
Interval finalInterval = null;

View File

@ -20,11 +20,14 @@
package org.apache.druid.query.scan;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -32,7 +35,6 @@ import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
@RunWith(Parameterized.class)
@ -101,7 +103,7 @@ public class ScanQueryRunnerFactoryTest
List<ScanResultValue> srvs = new ArrayList<>(numElements);
List<Long> expectedEventTimestamps = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
long timestamp = (ThreadLocalRandom.current().nextLong());
long timestamp = DateTimes.of("2015-01-01").plusHours(i).getMillis();
expectedEventTimestamps.add(timestamp);
srvs.add(ScanQueryTestHelper.generateScanResultValue(timestamp, resultFormat, 1));
}
@ -122,7 +124,7 @@ public class ScanQueryRunnerFactoryTest
factory.sortAndLimitScanResultValuesPriorityQueue(
inputSequence,
query,
null
ImmutableList.of(new SegmentDescriptor(new Interval(DateTimes.of("2010-01-01"), DateTimes.of("2019-01-01").plusHours(1)), "1", 0))
).toList();
// check each scan result value has one event