close IncrementalIndex properly and free up buffer

This commit is contained in:
nishantmonu51 2014-06-05 19:41:43 +05:30
parent 6265613bb9
commit 5bdc4a761a
4 changed files with 25 additions and 15 deletions

View File

@ -40,6 +40,7 @@ import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -219,14 +220,14 @@ public class YeOldePlumberSchool implements PlumberSchool
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);
try {
final IncrementalIndex index = indexToPersist.getIndex();
IndexMerger.persist(
indexToPersist.getIndex(),
dirToPersist
);
indexToPersist.swapSegment(null);
index.close();
metrics.incrementRowOutputCount(rowsToPersist);
spilled.add(dirToPersist);

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
@ -140,7 +141,12 @@ public class GroupByParallelQueryRunner implements QueryRunner<Row>
}
}
return Sequences.simple(indexAccumulatorPair.lhs.iterableWithPostAggregations(null));
return new ResourceClosingSequence<Row>(
Sequences.simple(
indexAccumulatorPair.lhs
.iterableWithPostAggregations(null)
), indexAccumulatorPair.lhs
);
}
}

View File

@ -29,6 +29,7 @@ import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence;
import com.metamx.common.guava.ResourceClosingSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.emitter.service.ServiceMetricEvent;
@ -62,10 +63,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
};
private static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(GROUP_BY_MERGE_KEY, "false");
private static final Map<String, Object> NO_MERGE_CONTEXT = ImmutableMap.<String, Object>of(
GROUP_BY_MERGE_KEY,
"false"
);
private final Supplier<GroupByQueryConfig> configSupplier;
private GroupByQueryEngine engine; // For running the outer query around a subquery
private final StupidPool<ByteBuffer> bufferPool;
private GroupByQueryEngine engine; // For running the outer query around a subquery
@Inject
@ -99,16 +103,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
private Sequence<Row> mergeGroupByResults(final GroupByQuery query, QueryRunner<Row> runner)
{
Sequence<Row> result;
// If there's a subquery, merge subquery results and then apply the aggregator
DataSource dataSource = query.getDataSource();
if (dataSource instanceof QueryDataSource) {
GroupByQuery subquery;
try {
subquery = (GroupByQuery) ((QueryDataSource) dataSource).getQuery();
} catch (ClassCastException e) {
}
catch (ClassCastException e) {
throw new UnsupportedOperationException("Subqueries must be of type 'group by'");
}
Sequence<Row> subqueryResult = mergeGroupByResults(subquery, runner);
@ -118,11 +121,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
} else {
result = runner.run(query);
}
return postAggregate(query, makeIncrementalIndex(query, result));
final IncrementalIndex index = makeIncrementalIndex(query, result);
return new ResourceClosingSequence<Row>(postAggregate(query, index), index);
}
private Sequence<Row> postAggregate(final GroupByQuery query, IncrementalIndex index)
{
Sequence<Row> sequence = Sequences.map(
@ -135,7 +137,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(
query.getGranularity()
.toDateTime(row.getTimestampFromEpoch()),
.toDateTime(row.getTimestampFromEpoch()),
row.getEvent()
);
}
@ -156,7 +158,6 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return rows.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}
@Override
public Sequence<Row> mergeSequences(Sequence<Sequence<Row>> seqOfSequences)
{

View File

@ -39,6 +39,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
@ -713,14 +714,15 @@ public class RealtimePlumber implements Plumber
indexToPersist.getIndex(),
new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount()))
);
IncrementalIndex index = indexToPersist.getIndex();
indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
IndexIO.loadIndex(persistedFile)
)
);
//TODO: can there be some races here ?
index.close();
return numRows;
}
catch (IOException e) {