mirror of https://github.com/apache/druid.git
scan-query: Use long as limit. (#4081)
* scan-query: Use long instead of int as limit type * Use MAX_INSTANT queryTimeout, if timeout == 0
This commit is contained in:
parent
64248d31b6
commit
8510a52e02
|
@ -48,7 +48,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
|
||||
private final String resultFormat;
|
||||
private final int batchSize;
|
||||
private final int limit;
|
||||
private final long limit;
|
||||
private final DimFilter dimFilter;
|
||||
private final List<String> columns;
|
||||
|
||||
|
@ -58,7 +58,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
|
||||
@JsonProperty("resultFormat") String resultFormat,
|
||||
@JsonProperty("batchSize") int batchSize,
|
||||
@JsonProperty("limit") int limit,
|
||||
@JsonProperty("limit") long limit,
|
||||
@JsonProperty("filter") DimFilter dimFilter,
|
||||
@JsonProperty("columns") List<String> columns,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
|
@ -67,7 +67,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
super(dataSource, querySegmentSpec, false, context);
|
||||
this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat;
|
||||
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
|
||||
this.limit = (limit == 0) ? Integer.MAX_VALUE : limit;
|
||||
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
|
||||
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
|
||||
Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
|
||||
this.dimFilter = dimFilter;
|
||||
|
@ -87,7 +87,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getLimit()
|
||||
public long getLimit()
|
||||
{
|
||||
return limit;
|
||||
}
|
||||
|
@ -217,7 +217,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
int result = super.hashCode();
|
||||
result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0);
|
||||
result = 31 * result + batchSize;
|
||||
result = 31 * result + limit;
|
||||
result = 31 * result + (int) (limit ^ (limit >>> 32));
|
||||
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
|
||||
result = 31 * result + (columns != null ? columns.hashCode() : 0);
|
||||
return result;
|
||||
|
@ -260,7 +260,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
private Map<String, Object> context;
|
||||
private String resultFormat;
|
||||
private int batchSize;
|
||||
private int limit;
|
||||
private long limit;
|
||||
private DimFilter dimFilter;
|
||||
private List<String> columns;
|
||||
|
||||
|
@ -346,7 +346,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>
|
|||
return this;
|
||||
}
|
||||
|
||||
public ScanQueryBuilder limit(int l)
|
||||
public ScanQueryBuilder limit(long l)
|
||||
{
|
||||
limit = l;
|
||||
return this;
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ScanQueryEngine
|
|||
)
|
||||
{
|
||||
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
|
||||
int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
|
||||
long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
|
||||
if (count >= query.getLimit()) {
|
||||
return Sequences.empty();
|
||||
}
|
||||
|
@ -104,9 +104,9 @@ public class ScanQueryEngine
|
|||
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter()));
|
||||
|
||||
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
|
||||
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0);
|
||||
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
|
||||
}
|
||||
final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
|
||||
final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
|
||||
return Sequences.concat(
|
||||
Sequences.map(
|
||||
adapter.makeCursors(
|
||||
|
@ -145,7 +145,7 @@ public class ScanQueryEngine
|
|||
final int batchSize = query.getBatchSize();
|
||||
return new Iterator<ScanResultValue>()
|
||||
{
|
||||
private int offset = 0;
|
||||
private long offset = 0;
|
||||
|
||||
@Override
|
||||
public boolean hasNext()
|
||||
|
@ -159,7 +159,7 @@ public class ScanQueryEngine
|
|||
if (System.currentTimeMillis() >= timeoutAt) {
|
||||
throw new QueryInterruptedException(new TimeoutException());
|
||||
}
|
||||
int lastOffset = offset;
|
||||
long lastOffset = offset;
|
||||
Object events = null;
|
||||
String resultFormat = query.getResultFormat();
|
||||
if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
|
||||
|
@ -171,7 +171,7 @@ public class ScanQueryEngine
|
|||
}
|
||||
responseContext.put(
|
||||
ScanQueryRunnerFactory.CTX_COUNT,
|
||||
(int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
|
||||
(long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
|
||||
);
|
||||
responseContext.put(
|
||||
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
|
||||
|
|
|
@ -32,8 +32,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
|
|||
{
|
||||
private Yielder<ScanResultValue> yielder;
|
||||
private String resultFormat;
|
||||
private int limit = 0;
|
||||
private int count = 0;
|
||||
private long limit = 0;
|
||||
private long count = 0;
|
||||
|
||||
public ScanQueryLimitRowIterator(
|
||||
QueryRunner<ScanResultValue> baseRunner, ScanQuery query,
|
||||
|
@ -76,7 +76,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
|
|||
return batch;
|
||||
} else {
|
||||
// last batch
|
||||
int left = limit - count;
|
||||
// single batch length is <= Integer.MAX_VALUE, so this should not overflow
|
||||
int left = (int) (limit - count);
|
||||
count = limit;
|
||||
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
|
|||
)
|
||||
{
|
||||
ScanQuery scanQuery = (ScanQuery) query;
|
||||
if (scanQuery.getLimit() == Integer.MAX_VALUE) {
|
||||
if (scanQuery.getLimit() == Long.MAX_VALUE) {
|
||||
return runner.run(query, responseContext);
|
||||
}
|
||||
return new BaseSequence<>(
|
||||
|
|
|
@ -72,7 +72,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
)
|
||||
{
|
||||
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
|
||||
final long timeoutAt = queryTimeout == null
|
||||
final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L)
|
||||
? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue();
|
||||
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
|
||||
return Sequences.concat(
|
||||
|
@ -119,7 +119,8 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory<ScanResultValu
|
|||
}
|
||||
|
||||
// it happens in unit tests
|
||||
if (responseContext.get(CTX_TIMEOUT_AT) == null) {
|
||||
final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT);
|
||||
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
|
||||
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
|
||||
};
|
||||
return engine.process((ScanQuery) query, segment, responseContext);
|
||||
|
|
Loading…
Reference in New Issue