Small changes

This commit is contained in:
Justin Borromeo 2019-02-15 15:57:53 -08:00
parent 4e69276d57
commit 35150fe1a6
10 changed files with 59 additions and 76 deletions

View File

@ -114,7 +114,7 @@ public class ScanBenchmark
private int limit;
@Param({"none", "descending", "ascending"})
private static String timeOrdering;
private static ScanQuery.TimeOrder timeOrdering;
private static final Logger log = new Logger(ScanBenchmark.class);
private static final ObjectMapper JSON_MAPPER;

View File

@ -924,7 +924,7 @@ public class Druids
private DimFilter dimFilter;
private List<String> columns;
private Boolean legacy;
private String timeOrder;
private ScanQuery.TimeOrder timeOrder;
public ScanQueryBuilder()
{
@ -1051,7 +1051,7 @@ public class Druids
return this;
}
public ScanQueryBuilder timeOrder(String timeOrder)
public ScanQueryBuilder timeOrder(ScanQuery.TimeOrder timeOrder)
{
this.timeOrder = timeOrder;
return this;

View File

@ -40,9 +40,11 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
public static final String RESULT_FORMAT_COMPACTED_LIST = "compactedList";
public static final String RESULT_FORMAT_VALUE_VECTOR = "valueVector";
public static final String TIME_ORDER_ASCENDING = "ascending";
public static final String TIME_ORDER_DESCENDING = "descending";
public static final String TIME_ORDER_NONE = "none";
public enum TimeOrder {
@JsonProperty("ascending") ASCENDING,
@JsonProperty("descending") DESCENDING,
@JsonProperty("none") NONE
}
private final VirtualColumns virtualColumns;
private final String resultFormat;
@ -51,7 +53,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
private final DimFilter dimFilter;
private final List<String> columns;
private final Boolean legacy;
private final String timeOrder;
private final TimeOrder timeOrder;
@JsonCreator
public ScanQuery(
@ -61,7 +63,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
@JsonProperty("resultFormat") String resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("limit") long limit,
@JsonProperty("timeOrder") String timeOrder,
@JsonProperty("timeOrder") TimeOrder timeOrder,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("legacy") Boolean legacy,
@ -78,7 +80,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.timeOrder = timeOrder == null ? TIME_ORDER_NONE : timeOrder;
this.timeOrder = timeOrder == null ? TimeOrder.NONE : timeOrder;
}
@JsonProperty
@ -106,7 +108,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
}
@JsonProperty
public String getTimeOrder()
public TimeOrder getTimeOrder()
{
return timeOrder;
}

View File

@ -40,7 +40,6 @@ public class ScanQueryConfig
return this;
}
// int should suffice here because no one should be sorting greater than 2B rows in memory
@JsonProperty
private int maxRowsTimeOrderedInMemory = 100000;

View File

@ -131,8 +131,8 @@ public class ScanQueryEngine
intervals.get(0),
query.getVirtualColumns(),
Granularities.ALL,
query.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE) && query.isDescending()),
query.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING) ||
(query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) && query.isDescending()),
null
)
.map(cursor -> new BaseSequence<>(

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.BaseSequence;
@ -89,43 +90,39 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
}
};
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_NONE)) {
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(queryPlusWithNonNullLegacy, responseContext);
}
return new BaseSequence<ScanResultValue, ScanQueryLimitRowIterator>(scanQueryLimitRowIteratorMaker);
} else if ((scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_ASCENDING) ||
scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING))
&& scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
} else if (scanQuery.getLimit() <= scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
Iterator<ScanResultValue> scanResultIterator = scanQueryLimitRowIteratorMaker.make();
return new BaseSequence(
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedTimeOrderedIterator>()
new BaseSequence.IteratorMaker<ScanResultValue, ScanBatchedIterator>()
{
@Override
public ScanBatchedTimeOrderedIterator make()
public ScanBatchedIterator make()
{
return new ScanBatchedTimeOrderedIterator(
return new ScanBatchedIterator(
sortScanResultValues(scanResultIterator, scanQuery),
scanQuery.getBatchSize()
);
}
@Override
public void cleanup(ScanBatchedTimeOrderedIterator iterFromMake)
public void cleanup(ScanBatchedIterator iterFromMake)
{
CloseQuietly.close(iterFromMake);
}
});
} else if (scanQuery.getLimit() > scanQueryConfig.getMaxRowsTimeOrderedInMemory()) {
} else {
throw new UOE(
"Time ordering for result set limit of %s is not supported. Try lowering the "
+ "result set size to less than or equal to the time ordering limit of %s.",
"Time ordering for result set limit of %,d is not supported. Try lowering the "
+ "result set size to less than or equal to the time ordering limit of %,d.",
scanQuery.getLimit(),
scanQueryConfig.getMaxRowsTimeOrderedInMemory()
);
} else {
throw new UOE("Time ordering [%s] is not supported", scanQuery.getTimeOrder());
}
};
}
@ -198,16 +195,16 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
}
/**
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/SRV) and aggregates
* events into ScanResultValues with {int batchSize} events. The columns from the first event per ScanResultValue
* will be used to populate the column section.
* This iterator supports iteration through any Iterable of unbatched ScanResultValues (1 event/ScanResultValue) and
* aggregates events into ScanResultValues with {@code batchSize} events. The columns from the first event per
* ScanResultValue will be used to populate the column section.
*/
private static class ScanBatchedTimeOrderedIterator implements CloseableIterator<ScanResultValue>
private static class ScanBatchedIterator implements CloseableIterator<ScanResultValue>
{
private final Iterator<ScanResultValue> itr;
private final int batchSize;
public ScanBatchedTimeOrderedIterator(Iterator<ScanResultValue> iterator, int batchSize)
public ScanBatchedIterator(Iterator<ScanResultValue> iterator, int batchSize)
{
this.itr = iterator;
this.batchSize = batchSize;
@ -234,7 +231,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
ScanResultValue srv = itr.next();
// Only replace once using the columns from the first event
columns = columns.isEmpty() ? srv.getColumns() : columns;
eventsToAdd.add(((List) srv.getEvents()).get(0));
eventsToAdd.add(Iterables.getOnlyElement((List) srv.getEvents()));
}
return new ScanResultValue(null, columns, eventsToAdd);
}

View File

@ -21,8 +21,12 @@ package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class ScanResultValue implements Comparable<ScanResultValue>
{
@ -40,7 +44,7 @@ public class ScanResultValue implements Comparable<ScanResultValue>
@JsonCreator
public ScanResultValue(
@JsonProperty("segmentId") String segmentId,
@Nullable @JsonProperty("segmentId") String segmentId,
@JsonProperty("columns") List<String> columns,
@JsonProperty("events") Object events
)
@ -50,6 +54,7 @@ public class ScanResultValue implements Comparable<ScanResultValue>
this.events = events;
}
@Nullable
@JsonProperty
public String getSegmentId()
{
@ -68,6 +73,17 @@ public class ScanResultValue implements Comparable<ScanResultValue>
return events;
}
public long getFirstEventTimestamp(ScanQuery query) {
if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) {
return (Long) ((Map<String, Object>) ((List<Object>) this.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME);
} else if (query.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) {
int timeColumnIndex = this.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
List<Object> firstEvent = (List<Object>) ((List<Object>) this.getEvents()).get(0);
return (Long)firstEvent.get(timeColumnIndex);
}
throw new UOE("Unable to get first event timestamp using result format of [%s]", query.getResultFormat());
}
@Override
public boolean equals(Object o)
{

View File

@ -20,12 +20,8 @@
package org.apache.druid.query.scan;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.segment.column.ColumnHolder;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* This comparator class supports comparisons of ScanResultValues based on the timestamp of their first event. Since
@ -47,39 +43,12 @@ public class ScanResultValueTimestampComparator implements Comparator<ScanResult
public int compare(ScanResultValue o1, ScanResultValue o2)
{
int comparison;
if (scanQuery.getResultFormat().equals(ScanQuery.RESULT_FORMAT_LIST)) {
comparison = Longs.compare(
(Long) ((Map<String, Object>) ((List<Object>) o1.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME),
(Long) ((Map<String, Object>) ((List<Object>) o2.getEvents()).get(0)).get(ColumnHolder.TIME_COLUMN_NAME)
);
} else if (scanQuery.getResultFormat().equals(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)) {
int val1TimeColumnIndex = o1.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
int val2TimeColumnIndex = o2.getColumns().indexOf(ColumnHolder.TIME_COLUMN_NAME);
List<Object> event1 = (List<Object>) ((List<Object>) o1.getEvents()).get(0);
List<Object> event2 = (List<Object>) ((List<Object>) o2.getEvents()).get(0);
comparison = Longs.compare(
(Long) event1.get(val1TimeColumnIndex),
(Long) event2.get(val2TimeColumnIndex)
);
} else {
throw new UOE("Result format [%s] is not supported", scanQuery.getResultFormat());
}
if (scanQuery.getTimeOrder().equals(ScanQuery.TIME_ORDER_DESCENDING)) {
comparison = Longs.compare(
o1.getFirstEventTimestamp(scanQuery),
o2.getFirstEventTimestamp(scanQuery));
if (scanQuery.getTimeOrder().equals(ScanQuery.TimeOrder.DESCENDING)) {
return comparison * -1;
}
return comparison;
}
@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
ScanResultValueTimestampComparator comp = (ScanResultValueTimestampComparator) obj;
return this.scanQuery.equals(comp.scanQuery);
}
}

View File

@ -92,7 +92,7 @@ public class ScanQueryQueryToolChestTest
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat("list")
.timeOrder(ScanQuery.TIME_ORDER_DESCENDING)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
@ -129,7 +129,7 @@ public class ScanQueryQueryToolChestTest
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat("list")
.timeOrder(ScanQuery.TIME_ORDER_ASCENDING)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
@ -164,7 +164,7 @@ public class ScanQueryQueryToolChestTest
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TIME_ORDER_DESCENDING)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)
@ -199,7 +199,7 @@ public class ScanQueryQueryToolChestTest
}
ScanQuery scanQuery = new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.timeOrder(ScanQuery.TIME_ORDER_ASCENDING)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.dataSource("some data source")
.intervals(emptySegmentSpec)
.limit(99999)

View File

@ -52,7 +52,7 @@ public class ScanResultValueTimestampComparatorTest
public void comparisonDescendingListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TIME_ORDER_DESCENDING)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
@ -89,7 +89,7 @@ public class ScanResultValueTimestampComparatorTest
public void comparisonAscendingListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TIME_ORDER_ASCENDING)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_LIST)
.dataSource("some src")
.intervals(intervalSpec)
@ -126,7 +126,7 @@ public class ScanResultValueTimestampComparatorTest
public void comparisonDescendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TIME_ORDER_DESCENDING)
.timeOrder(ScanQuery.TimeOrder.DESCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)
@ -161,7 +161,7 @@ public class ScanResultValueTimestampComparatorTest
public void comparisonAscendingCompactedListTest()
{
ScanQuery query = Druids.newScanQueryBuilder()
.timeOrder(ScanQuery.TIME_ORDER_ASCENDING)
.timeOrder(ScanQuery.TimeOrder.ASCENDING)
.resultFormat(ScanQuery.RESULT_FORMAT_COMPACTED_LIST)
.dataSource("some src")
.intervals(intervalSpec)