From 8510a52e02662757f14c12145c5fc28fcfb779f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Wo=C5=BAniak?= Date: Mon, 20 Mar 2017 22:19:35 +0100 Subject: [PATCH] scan-query: Use long as limit. (#4081) * scan-query: Use long instead of int as limit type * Use MAX_INSTANT queryTimeout, if timeout == 0 --- .../main/java/io/druid/query/scan/ScanQuery.java | 14 +++++++------- .../java/io/druid/query/scan/ScanQueryEngine.java | 12 ++++++------ .../query/scan/ScanQueryLimitRowIterator.java | 7 ++++--- .../druid/query/scan/ScanQueryQueryToolChest.java | 2 +- .../druid/query/scan/ScanQueryRunnerFactory.java | 5 +++-- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index cdd2d1da4c1..df6a4079d5c 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -48,7 +48,7 @@ public class ScanQuery extends BaseQuery private final String resultFormat; private final int batchSize; - private final int limit; + private final long limit; private final DimFilter dimFilter; private final List columns; @@ -58,7 +58,7 @@ public class ScanQuery extends BaseQuery @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("resultFormat") String resultFormat, @JsonProperty("batchSize") int batchSize, - @JsonProperty("limit") int limit, + @JsonProperty("limit") long limit, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("columns") List columns, @JsonProperty("context") Map context @@ -67,7 +67,7 @@ public class ScanQuery extends BaseQuery super(dataSource, querySegmentSpec, false, context); this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; - this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; + this.limit = (limit == 0) ? Long.MAX_VALUE : limit; Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0"); Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0"); this.dimFilter = dimFilter; @@ -87,7 +87,7 @@ public class ScanQuery extends BaseQuery } @JsonProperty - public int getLimit() + public long getLimit() { return limit; } @@ -217,7 +217,7 @@ public class ScanQuery extends BaseQuery int result = super.hashCode(); result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0); result = 31 * result + batchSize; - result = 31 * result + limit; + result = 31 * result + (int) (limit ^ (limit >>> 32)); result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); result = 31 * result + (columns != null ? columns.hashCode() : 0); return result; @@ -260,7 +260,7 @@ public class ScanQuery extends BaseQuery private Map context; private String resultFormat; private int batchSize; - private int limit; + private long limit; private DimFilter dimFilter; private List columns; @@ -346,7 +346,7 @@ public class ScanQuery extends BaseQuery return this; } - public ScanQueryBuilder limit(int l) + public ScanQueryBuilder limit(long l) { limit = l; return this; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 6253992952c..139cc6d0ad0 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -60,7 +60,7 @@ public class ScanQueryEngine ) { if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { - int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); if (count >= query.getLimit()) { return Sequences.empty(); } @@ -104,9 +104,9 @@ public class ScanQueryEngine final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { - responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0); + responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L); } - final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); return Sequences.concat( Sequences.map( adapter.makeCursors( @@ -145,7 +145,7 @@ public class ScanQueryEngine final int batchSize = query.getBatchSize(); return new Iterator() { - private int offset = 0; + private long offset = 0; @Override public boolean hasNext() @@ -159,7 +159,7 @@ public class ScanQueryEngine if (System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } - int lastOffset = offset; + long lastOffset = offset; Object events = null; String resultFormat = query.getResultFormat(); if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { @@ -171,7 +171,7 @@ public class ScanQueryEngine } responseContext.put( ScanQueryRunnerFactory.CTX_COUNT, - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) + (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) ); responseContext.put( ScanQueryRunnerFactory.CTX_TIMEOUT_AT, diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java index 78a11073f55..f102d1c3ec6 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -32,8 +32,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator yielder; private String resultFormat; - private int limit = 0; - private int count = 0; + private long limit = 0; + private long count = 0; public ScanQueryLimitRowIterator( QueryRunner baseRunner, ScanQuery query, @@ -76,7 +76,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator( diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 6b1244c5ff8..712249eac5b 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -72,7 +72,7 @@ public class ScanQueryRunnerFactory implements QueryRunnerFactory