diff --git a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java index b8d13be6674..88cb5840950 100644 --- a/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java +++ b/server/src/main/java/com/metamx/druid/query/group/GroupByQueryEngine.java @@ -236,6 +236,11 @@ public class GroupByQueryEngine } } + public int getIncrement() + { + return increment; + } + public int[] getIncrements() { return increments; @@ -301,11 +306,11 @@ public class GroupByQueryEngine return delegate.next(); } - if (cursor.isDone()) { + if (unprocessedKeys == null && cursor.isDone()) { throw new NoSuchElementException(); } - final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.limit()); + final PositionMaintainer positionMaintainer = new PositionMaintainer(0, sizesRequired, metricsBuffer.remaining()); final RowUpdater rowUpdater = new RowUpdater(metricsBuffer, aggregators, positionMaintainer); if (unprocessedKeys != null) { for (ByteBuffer key : unprocessedKeys) { @@ -327,6 +332,13 @@ public class GroupByQueryEngine cursor.advance(); } + if (rowUpdater.getPositions().isEmpty() && unprocessedKeys != null) { + throw new ISE( + "Not enough memory to process even a single item. Required [%,d] memory, but only have[%,d]", + positionMaintainer.getIncrement(), metricsBuffer.remaining() + ); + } + delegate = FunctionalIterator .create(rowUpdater.getPositions().entrySet().iterator()) .transform(