Applied Jon's recommended changes

This commit is contained in:
Justin Borromeo 2019-03-07 18:40:00 -08:00
parent fb966def83
commit 73f4038068
5 changed files with 30 additions and 25 deletions

View File

@ -156,12 +156,12 @@ The format of the result when resultFormat equals `compactedList`:
## Time Ordering
The Scan query currently supports ordering based on timestamp for non-legacy queries. Note that using time ordering
will yield results that do not indicate which segment rows are from. Furthermore, time ordering is only supported
where the result set limit is less than `druid.query.scan.maxRowsQueuedForTimeOrdering` rows and less than
`druid.query.scan.maxSegmentsTimeOrderedInMemory` segments are scanned per Historical. The reasoning behind these
limitations is that the implementation of time ordering uses two strategies that can consume too much heap memory
if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on query
result set limit and the number of segments being scanned.
will yield results that do not indicate which segment rows are from (`segmentId` will show up as `null`). Furthermore,
time ordering is only supported where the result set limit is less than `druid.query.scan.maxRowsQueuedForTimeOrdering`
rows **or** fewer than `druid.query.scan.maxSegmentsTimeOrderedInMemory` segments are scanned per Historical. The
reasoning behind these limitations is that the implementation of time ordering uses two strategies that can consume too
much heap memory if left unbounded. These strategies (listed below) are chosen on a per-Historical basis depending on
query result set limit and the number of segments being scanned.
1. Priority Queue: Each segment on a Historical is opened sequentially. Every row is added to a bounded priority
queue which is ordered by timestamp. For every row above the result set limit, the row with the earliest (if descending)
@ -170,8 +170,8 @@ priority queue are streamed back to the Broker(s) in batches. Attempting to loa
risk of Historical nodes running out of memory. The `druid.query.scan.maxRowsQueuedForTimeOrdering` property protects
from this by limiting the number of rows in the query result set when time ordering is used.
2. K-Way/N-Way Merge: Each segment on a Historical is opened in parallel. Since each segment's rows are already
time-ordered, a k-way merge can be performed on the results from each segment. This approach doesn't persist the entire
2. N-Way Merge: Each segment on a Historical is opened in parallel. Since each segment's rows are already
time-ordered, an n-way merge can be performed on the results from each segment. This approach doesn't persist the entire
result set in memory (like the Priority Queue) as it streams back batches as they are returned from the merge function.
However, attempting to query too many segments could also result in high memory usage due to the need to open
decompression and decoding buffers for each. The `druid.query.scan.maxSegmentsTimeOrderedInMemory` limit protects

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
@ -72,7 +73,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
case "list":
return RESULT_FORMAT_LIST;
default:
return RESULT_FORMAT_LIST;
throw new UOE("Scan query result format [%s] is not supported.", name);
}
}
}

View File

@ -123,9 +123,7 @@ public class ScanQueryEngine
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
}
final long limit = query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE) ?
query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) :
Long.MAX_VALUE;
final long limit = calculateLimit(query, responseContext);
return Sequences.concat(
adapter
.makeCursors(
@ -259,4 +257,15 @@ public class ScanQueryEngine
))
);
}
/**
* If we're performing time-ordering, we want to scan through every row in the segment (hence the maximum limit)
*/
private long calculateLimit(ScanQuery query, Map<String, Object> responseContext)
{
if (query.getTimeOrder().equals(ScanQuery.TimeOrder.NONE)) {
return query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
}
return Long.MAX_VALUE;
}
}

View File

@ -52,7 +52,6 @@ import java.util.Map;
*/
public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultValue>
{
private static final String TIME_ORDERING_SEGMENT_ID = "No segment ID available when using time ordering";
private Yielder<ScanResultValue> yielder;
private ScanQuery.ResultFormat resultFormat;
private long limit;
@ -98,7 +97,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
throw new UOE(ScanQuery.ResultFormat.RESULT_FORMAT_VALUE_VECTOR + " is not supported yet");
}
// We don't want to perform batching at the historical-level if we're time ordering
// We want to perform batching if we are not time-ordering or are at the outer level if we are re time-ordering
if (query.getTimeOrder() == ScanQuery.TimeOrder.NONE ||
!query.getContextBoolean(ScanQuery.CTX_KEY_OUTERMOST, true)) {
ScanResultValue batch = yielder.get();
@ -128,7 +127,7 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
yielder = yielder.next(null);
count++;
}
return new ScanResultValue(TIME_ORDERING_SEGMENT_ID, columns, eventsToAdd);
return new ScanResultValue(null, columns, eventsToAdd);
}
}

View File

@ -130,19 +130,15 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
);
return unbatched;
} else if (query.getLimit() > scanQueryConfig.getMaxRowsQueuedForTimeOrdering()) {
throw new UOE(
"Time ordering for query result set limit of %,d is not supported. Try lowering the result "
+ "set size to less than or equal to the configurable time ordering limit of %,d rows.",
query.getLimit(),
scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
);
}
throw new UOE(
"Time ordering for queries of %,d segments per historical is not supported. Try reducing the scope "
+ "of the query to scan fewer segments than the configurable time ordering limit of %,d segments",
"Time ordering for queries of %,d segments per historical and a row limit of %,d is not supported."
+ " Try reducing the scope of the query to scan fewer segments than the configurable time ordering limit of"
+ " %,d segments or lower the row limit below %,d.",
numSegments,
scanQueryConfig.getMaxSegmentsTimeOrderedInMemory()
query.getLimit(),
scanQueryConfig.getMaxSegmentsTimeOrderedInMemory(),
scanQueryConfig.getMaxRowsQueuedForTimeOrdering()
);
};
}