From 5bdc4a761ae8fafa17262f1573001e5eee2d5c7c Mon Sep 17 00:00:00 2001 From: nishantmonu51 Date: Thu, 5 Jun 2014 19:41:43 +0530 Subject: [PATCH] close IncrementalIndex properly and free up buffer --- .../common/index/YeOldePlumberSchool.java | 5 +++-- .../query/GroupByParallelQueryRunner.java | 8 ++++++- .../groupby/GroupByQueryQueryToolChest.java | 21 ++++++++++--------- .../realtime/plumber/RealtimePlumber.java | 6 ++++-- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index ff9f1549607..de3aa720219 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -40,6 +40,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; +import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; @@ -219,14 +220,14 @@ public class YeOldePlumberSchool implements PlumberSchool log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist); try { - + final IncrementalIndex index = indexToPersist.getIndex(); IndexMerger.persist( indexToPersist.getIndex(), dirToPersist ); indexToPersist.swapSegment(null); - + index.close(); metrics.incrementRowOutputCount(rowsToPersist); spilled.add(dirToPersist); diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 43e65b9e49a..98b6ae7d9f5 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; +import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.common.logger.Logger; @@ -140,7 +141,12 @@ public class GroupByParallelQueryRunner implements QueryRunner } } - return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null)); + return new ResourceClosingSequence( + Sequences.simple( + indexAccumulatorPair.lhs + .iterableWithPostAggregations(null) + ), indexAccumulatorPair.lhs + ); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index 1cec9a993c7..6a5845b3ef1 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -29,6 +29,7 @@ import com.google.inject.Inject; import com.metamx.common.Pair; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ConcatSequence; +import com.metamx.common.guava.ResourceClosingSequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; import com.metamx.emitter.service.ServiceMetricEvent; @@ -62,10 +63,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest NO_MERGE_CONTEXT = ImmutableMap.of(GROUP_BY_MERGE_KEY, "false"); + private static final Map NO_MERGE_CONTEXT = ImmutableMap.of( + GROUP_BY_MERGE_KEY, + "false" + ); private final Supplier configSupplier; - private GroupByQueryEngine engine; // For running the outer query around a subquery private final StupidPool bufferPool; + private GroupByQueryEngine engine; // For running the outer query around a subquery @Inject @@ -99,16 +103,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeGroupByResults(final GroupByQuery query, QueryRunner runner) { - Sequence result; - // If there's a subquery, merge subquery results and then apply the aggregator DataSource dataSource = query.getDataSource(); if (dataSource instanceof QueryDataSource) { GroupByQuery subquery; try { subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery(); - } catch (ClassCastException e) { + } + catch (ClassCastException e) { throw new UnsupportedOperationException("Subqueries must be of type 'group by'"); } Sequence subqueryResult = mergeGroupByResults(subquery, runner); @@ -118,11 +121,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest(postAggregate(query, index), index); } - private Sequence postAggregate(final GroupByQuery query, IncrementalIndex index) { Sequence sequence = Sequences.map( @@ -135,7 +137,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest mergeSequences(Sequence> seqOfSequences) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index cbe8b597b9f..26b4cc45509 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -39,6 +39,7 @@ import io.druid.segment.IndexMerger; import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; +import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.loading.DataSegmentPusher; @@ -713,14 +714,15 @@ public class RealtimePlumber implements Plumber indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())) ); - + IncrementalIndex index = indexToPersist.getIndex(); indexToPersist.swapSegment( new QueryableIndexSegment( indexToPersist.getSegment().getIdentifier(), IndexIO.loadIndex(persistedFile) ) ); - + //TODO: can there be some races here ? + index.close(); return numRows; } catch (IOException e) {