From 44911039c5aa4583c2c7b86a7f2fae63be9623ec Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 2 Sep 2015 22:57:42 -0500 Subject: [PATCH 1/2] update indexing in the helper to use multiple persists and final merge to catch further issues in aggregator implementations --- .../aggregation/AggregationTestHelper.java | 61 +++++++++++++++---- 1 file changed, 50 insertions(+), 11 deletions(-) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index 7f5649c070a..a26a7519305 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -28,7 +28,6 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.io.CharSource; import com.google.common.io.Closeables; import com.google.common.io.Files; import com.google.common.util.concurrent.ListenableFuture; @@ -58,9 +57,11 @@ import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -71,7 +72,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -237,19 +238,57 @@ public class AggregationTestHelper int maxRowCount ) throws Exception { - try(IncrementalIndex index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount)) { - while (rows.hasNext()) { + IncrementalIndex index = null; + List toMerge = new ArrayList<>(); + try { + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); + while (rows.hasNext()) { Object row = rows.next(); - if (row instanceof String && parser instanceof StringInputRowParser) { - //Note: this is required because StringInputRowParser is InputRowParser as opposed to - //InputRowsParser - index.add(((StringInputRowParser) parser).parse((String) row)); - } else { - index.add(parser.parse(row)); + try { + if (row instanceof String && parser instanceof StringInputRowParser) { + //Note: this is required because StringInputRowParser is InputRowParser as opposed to + //InputRowsParser + index.add(((StringInputRowParser) parser).parse((String) row)); + } else { + index.add(parser.parse(row)); + } + } + catch (IndexSizeExceededException ex) { + File tmp = Files.createTempDir(); + toMerge.add(tmp); + IndexMerger.persist(index, tmp, null, new IndexSpec()); + index.close(); + index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount); } } - IndexMerger.persist(index, outDir, null, new IndexSpec()); + + if (toMerge.size() > 0) { + File tmp = Files.createTempDir(); + toMerge.add(tmp); + IndexMerger.persist(index, tmp, null, new IndexSpec()); + + List indexes = new ArrayList<>(toMerge.size()); + for (File file : toMerge) { + indexes.add(IndexIO.loadIndex(file)); + } + IndexMerger.mergeQueryableIndex(indexes, metrics, outDir, new IndexSpec()); + + for (QueryableIndex qi : indexes) { + qi.close(); + } + } else { + IndexMerger.persist(index, outDir, null, new IndexSpec()); + } + } + finally { + if (index != null) { + index.close(); + } + + for (File file : toMerge) { + FileUtils.deleteDirectory(file); + } } } From 5da58e48e0a65679b08b4db8e1521bf8ea2ad2da Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 3 Sep 2015 14:11:39 -0500 Subject: [PATCH 2/2] use Rule based TemporaryFolder for cleanup of temp directory/files --- .../ApproximateHistogramAggregationTest.java | 7 +++- .../aggregation/AggregationTestHelper.java | 36 +++++++------------ .../HyperUniquesAggregationTest.java | 7 +++- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java index 49ba5c2fdde..e0ade6f6971 100644 --- a/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java +++ b/extensions/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramAggregationTest.java @@ -26,7 +26,9 @@ import io.druid.data.input.MapBasedRow; import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregationTestHelper; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; @@ -36,11 +38,14 @@ public class ApproximateHistogramAggregationTest { private AggregationTestHelper helper; + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + public ApproximateHistogramAggregationTest() { ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule(); module.configure(null); - helper = new AggregationTestHelper(Lists.newArrayList(module.getJacksonModules())); + helper = new AggregationTestHelper(Lists.newArrayList(module.getJacksonModules()), tempFolder); } @Test diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index a26a7519305..f286a3bc792 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -29,7 +29,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; -import com.google.common.io.Files; import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.Sequence; @@ -63,9 +62,9 @@ import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.OnheapIncrementalIndex; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; +import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.FileInputStream; @@ -89,8 +88,11 @@ public class AggregationTestHelper private final GroupByQueryQueryToolChest toolChest; private final GroupByQueryRunnerFactory factory; - public AggregationTestHelper(List jsonModulesToRegister) + private final TemporaryFolder tempFolder; + + public AggregationTestHelper(List jsonModulesToRegister, TemporaryFolder tempFoler) { + this.tempFolder = tempFoler; mapper = new DefaultObjectMapper(); for(Module mod : jsonModulesToRegister) { @@ -141,13 +143,9 @@ public class AggregationTestHelper String groupByQueryJson ) throws Exception { - File segmentDir = Files.createTempDir(); - try { - createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); - return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); - } finally { - FileUtils.deleteDirectory(segmentDir); - } + File segmentDir = tempFolder.newFolder(); + createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); + return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); } public Sequence createIndexAndRunQueryOnSegment( @@ -160,13 +158,9 @@ public class AggregationTestHelper String groupByQueryJson ) throws Exception { - File segmentDir = Files.createTempDir(); - try { - createIndex(inputDataStream, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); - return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); - } finally { - FileUtils.deleteDirectory(segmentDir); - } + File segmentDir = tempFolder.newFolder(); + createIndex(inputDataStream, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); + return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); } public void createIndex( @@ -255,7 +249,7 @@ public class AggregationTestHelper } } catch (IndexSizeExceededException ex) { - File tmp = Files.createTempDir(); + File tmp = tempFolder.newFolder(); toMerge.add(tmp); IndexMerger.persist(index, tmp, null, new IndexSpec()); index.close(); @@ -264,7 +258,7 @@ public class AggregationTestHelper } if (toMerge.size() > 0) { - File tmp = Files.createTempDir(); + File tmp = tempFolder.newFolder(); toMerge.add(tmp); IndexMerger.persist(index, tmp, null, new IndexSpec()); @@ -285,10 +279,6 @@ public class AggregationTestHelper if (index != null) { index.close(); } - - for (File file : toMerge) { - FileUtils.deleteDirectory(file); - } } } diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java index e3de00c44e6..dcc613125e4 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java @@ -27,16 +27,21 @@ import io.druid.granularity.QueryGranularity; import io.druid.jackson.AggregatorsModule; import io.druid.query.aggregation.AggregationTestHelper; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.File; public class HyperUniquesAggregationTest { + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Test public void testIngestAndQuery() throws Exception { - AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule())); + AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule()), tempFolder); String metricSpec = "[{" + "\"type\": \"hyperUnique\","