From 44911039c5aa4583c2c7b86a7f2fae63be9623ec Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 2 Sep 2015 22:57:42 -0500 Subject: [PATCH] update indexing in the helper to use multiple persists and final merge to catch further issues in aggregator implementations --- .../aggregation/AggregationTestHelper.java | 61 +++++++++++++++---- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 7f5649c070a..a26a7519305 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -28,7 +28,6 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.io.CharSource; import com.google.common.io.Closeables; import com.google.common.io.Files; import com.google.common.util.concurrent.ListenableFuture; @@ -58,9 +57,11 @@ import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -71,7 +72,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -237,19 +238,57 @@ public class AggregationTestHelper int maxRowCount ) throws Exception { - try(IncrementalIndex index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount)) { - while (rows.hasNext()) { + IncrementalIndex index = null; + List toMerge = new ArrayList<>(); + try { + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); + while (rows.hasNext()) { Object row = rows.next(); - if (row instanceof String && parser instanceof StringInputRowParser) { - //Note: this is required because StringInputRowParser is InputRowParser as opposed to - //InputRowsParser - index.add(((StringInputRowParser) parser).parse((String) row)); - } else { - index.add(parser.parse(row)); + try { + if (row instanceof String && parser instanceof StringInputRowParser) { + //Note: this is required because StringInputRowParser is InputRowParser as opposed to + //InputRowsParser + index.add(((StringInputRowParser) parser).parse((String) row)); + } else { + index.add(parser.parse(row)); + } + } + catch (IndexSizeExceededException ex) { + File tmp = Files.createTempDir(); + toMerge.add(tmp); + IndexMerger.persist(index, tmp, null, new IndexSpec()); + index.close(); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); } } - IndexMerger.persist(index, outDir, null, new IndexSpec()); + + if (toMerge.size() > 0) { + File tmp = Files.createTempDir(); + toMerge.add(tmp); + IndexMerger.persist(index, tmp, null, new IndexSpec()); + + List indexes = new ArrayList<>(toMerge.size()); + for (File file : toMerge) { + indexes.add(IndexIO.loadIndex(file)); + } + IndexMerger.mergeQueryableIndex(indexes, metrics, outDir, new IndexSpec()); + + for (QueryableIndex qi : indexes) { + qi.close(); + } + } else { + IndexMerger.persist(index, outDir, null, new IndexSpec()); + } + } + finally { + if (index != null) { + index.close(); + } + + for (File file : toMerge) { + FileUtils.deleteDirectory(file); + } } }