mirror of https://github.com/apache/druid.git
Remove select execution code from SQL planner (#7416)
* Removed select execution code from SQL planner * Update doc
This commit is contained in:
parent
78e6f6fb38
commit
408e3e1b2a
|
@ -719,7 +719,6 @@ The Druid SQL server is configured through the following properties on the Broke
|
|||
|`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
|
||||
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
|
||||
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|
||||
|`druid.sql.planner.selectThreshold`|Page size threshold for [Select queries](../querying/select-query.html). Select queries for larger resultsets will be issued back-to-back using pagination.|1000|
|
||||
|`druid.sql.planner.useApproximateCountDistinct`|Whether to use an approximate cardinalty algorithm for `COUNT(DISTINCT foo)`.|true|
|
||||
|`druid.sql.planner.useApproximateTopN`|Whether to use approximate [TopN queries](../querying/topnquery.html) when a SQL query could be expressed as such. If false, exact [GroupBy queries](../querying/groupbyquery.html) will be used instead.|true|
|
||||
|`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false|
|
||||
|
|
|
@ -45,9 +45,6 @@ public class PlannerConfig
|
|||
@JsonProperty
|
||||
private int maxQueryCount = 8;
|
||||
|
||||
@JsonProperty
|
||||
private int selectThreshold = 1000;
|
||||
|
||||
@JsonProperty
|
||||
private boolean useApproximateCountDistinct = true;
|
||||
|
||||
|
@ -104,11 +101,6 @@ public class PlannerConfig
|
|||
return maxQueryCount;
|
||||
}
|
||||
|
||||
public int getSelectThreshold()
|
||||
{
|
||||
return selectThreshold;
|
||||
}
|
||||
|
||||
public boolean isUseApproximateCountDistinct()
|
||||
{
|
||||
return useApproximateCountDistinct;
|
||||
|
@ -155,7 +147,6 @@ public class PlannerConfig
|
|||
newConfig.maxSemiJoinRowsInMemory = getMaxSemiJoinRowsInMemory();
|
||||
newConfig.maxTopNLimit = getMaxTopNLimit();
|
||||
newConfig.maxQueryCount = getMaxQueryCount();
|
||||
newConfig.selectThreshold = getSelectThreshold();
|
||||
newConfig.useApproximateCountDistinct = getContextBoolean(
|
||||
context,
|
||||
CTX_KEY_USE_APPROXIMATE_COUNT_DISTINCT,
|
||||
|
@ -211,7 +202,6 @@ public class PlannerConfig
|
|||
return maxSemiJoinRowsInMemory == that.maxSemiJoinRowsInMemory &&
|
||||
maxTopNLimit == that.maxTopNLimit &&
|
||||
maxQueryCount == that.maxQueryCount &&
|
||||
selectThreshold == that.selectThreshold &&
|
||||
useApproximateCountDistinct == that.useApproximateCountDistinct &&
|
||||
useApproximateTopN == that.useApproximateTopN &&
|
||||
useFallback == that.useFallback &&
|
||||
|
@ -233,7 +223,6 @@ public class PlannerConfig
|
|||
maxSemiJoinRowsInMemory,
|
||||
maxTopNLimit,
|
||||
maxQueryCount,
|
||||
selectThreshold,
|
||||
useApproximateCountDistinct,
|
||||
useApproximateTopN,
|
||||
useFallback,
|
||||
|
@ -254,7 +243,6 @@ public class PlannerConfig
|
|||
", maxSemiJoinRowsInMemory=" + maxSemiJoinRowsInMemory +
|
||||
", maxTopNLimit=" + maxTopNLimit +
|
||||
", maxQueryCount=" + maxQueryCount +
|
||||
", selectThreshold=" + selectThreshold +
|
||||
", useApproximateCountDistinct=" + useApproximateCountDistinct +
|
||||
", useApproximateTopN=" + useApproximateTopN +
|
||||
", useFallback=" + useFallback +
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.sql.calcite.rel;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.calcite.avatica.ColumnMetaData;
|
||||
|
@ -43,17 +42,12 @@ import org.apache.druid.query.QueryDataSource;
|
|||
import org.apache.druid.query.Result;
|
||||
import org.apache.druid.query.groupby.GroupByQuery;
|
||||
import org.apache.druid.query.scan.ScanQuery;
|
||||
import org.apache.druid.query.select.EventHolder;
|
||||
import org.apache.druid.query.select.PagingSpec;
|
||||
import org.apache.druid.query.select.SelectQuery;
|
||||
import org.apache.druid.query.select.SelectResultValue;
|
||||
import org.apache.druid.query.timeseries.TimeseriesQuery;
|
||||
import org.apache.druid.query.timeseries.TimeseriesResultValue;
|
||||
import org.apache.druid.query.topn.DimensionAndMetricValueExtractor;
|
||||
import org.apache.druid.query.topn.TopNQuery;
|
||||
import org.apache.druid.query.topn.TopNResultValue;
|
||||
import org.apache.druid.segment.DimensionHandlerUtils;
|
||||
import org.apache.druid.segment.column.ColumnHolder;
|
||||
import org.apache.druid.server.QueryLifecycleFactory;
|
||||
import org.apache.druid.server.security.AuthenticationResult;
|
||||
import org.apache.druid.sql.calcite.planner.Calcites;
|
||||
|
@ -65,13 +59,9 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class QueryMaker
|
||||
|
@ -121,8 +111,6 @@ public class QueryMaker
|
|||
return executeGroupBy(druidQuery, (GroupByQuery) query);
|
||||
} else if (query instanceof ScanQuery) {
|
||||
return executeScan(druidQuery, (ScanQuery) query);
|
||||
} else if (query instanceof SelectQuery) {
|
||||
return executeSelect(druidQuery, (SelectQuery) query);
|
||||
} else {
|
||||
throw new ISE("Cannot run query of class[%s]", query.getClass().getName());
|
||||
}
|
||||
|
@ -182,111 +170,6 @@ public class QueryMaker
|
|||
);
|
||||
}
|
||||
|
||||
private Sequence<Object[]> executeSelect(
|
||||
final DruidQuery druidQuery,
|
||||
final SelectQuery baseQuery
|
||||
)
|
||||
{
|
||||
Preconditions.checkState(druidQuery.getGrouping() == null, "grouping must be null");
|
||||
|
||||
final List<RelDataTypeField> fieldList = druidQuery.getOutputRowType().getFieldList();
|
||||
final Integer limit = druidQuery.getLimitSpec() != null ? druidQuery.getLimitSpec().getLimit() : null;
|
||||
final RowSignature outputRowSignature = druidQuery.getOutputRowSignature();
|
||||
|
||||
// Select is paginated, we need to make multiple queries.
|
||||
final Sequence<Sequence<Object[]>> sequenceOfSequences = Sequences.simple(
|
||||
new Iterable<Sequence<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public Iterator<Sequence<Object[]>> iterator()
|
||||
{
|
||||
final AtomicBoolean morePages = new AtomicBoolean(true);
|
||||
final AtomicReference<Map<String, Integer>> pagingIdentifiers = new AtomicReference<>();
|
||||
final AtomicLong rowsRead = new AtomicLong();
|
||||
|
||||
// Each Sequence<Object[]> is one page.
|
||||
return new Iterator<Sequence<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
{
|
||||
return morePages.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sequence<Object[]> next()
|
||||
{
|
||||
final SelectQuery queryWithPagination = baseQuery.withPagingSpec(
|
||||
new PagingSpec(
|
||||
pagingIdentifiers.get(),
|
||||
plannerContext.getPlannerConfig().getSelectThreshold(),
|
||||
true
|
||||
)
|
||||
);
|
||||
|
||||
morePages.set(false);
|
||||
final AtomicBoolean gotResult = new AtomicBoolean();
|
||||
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
runQuery(queryWithPagination),
|
||||
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<Object[]> apply(final Result<SelectResultValue> result)
|
||||
{
|
||||
if (!gotResult.compareAndSet(false, true)) {
|
||||
throw new ISE("WTF?! Expected single result from Select query but got multiple!");
|
||||
}
|
||||
|
||||
pagingIdentifiers.set(result.getValue().getPagingIdentifiers());
|
||||
final List<Object[]> retVals = new ArrayList<>();
|
||||
for (EventHolder holder : result.getValue().getEvents()) {
|
||||
morePages.set(true);
|
||||
final Map<String, Object> map = holder.getEvent();
|
||||
final Object[] retVal = new Object[fieldList.size()];
|
||||
for (RelDataTypeField field : fieldList) {
|
||||
final String outputName = outputRowSignature.getRowOrder().get(field.getIndex());
|
||||
if (outputName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
|
||||
retVal[field.getIndex()] = coerce(
|
||||
holder.getTimestamp().getMillis(),
|
||||
field.getType().getSqlTypeName()
|
||||
);
|
||||
} else {
|
||||
retVal[field.getIndex()] = coerce(
|
||||
map.get(outputName),
|
||||
field.getType().getSqlTypeName()
|
||||
);
|
||||
}
|
||||
}
|
||||
if (limit == null || rowsRead.incrementAndGet() <= limit) {
|
||||
retVals.add(retVal);
|
||||
} else {
|
||||
morePages.set(false);
|
||||
return Sequences.simple(retVals);
|
||||
}
|
||||
}
|
||||
|
||||
return Sequences.simple(retVals);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return Sequences.concat(sequenceOfSequences);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> Sequence<T> runQuery(Query<T> query)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue