explicitly unmap hydrant files when abandonSegment to recycle mmap memory (#4341)

* fix TestKafkaExtractionCluster fail due to port already used

* explicitly unmap hydrant files when abandonSegment to recyle mmap memory

* address the comments

* apply to AppenderatorImpl
This commit is contained in:
kaijianding 2017-06-02 07:15:30 +08:00 committed by Roman Leventov
parent 1150bf7a2c
commit 0efd18247b
4 changed files with 40 additions and 23 deletions

View File

@ -59,6 +59,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
/**
*
@ -128,6 +129,7 @@ public class TestKafkaExtractionCluster
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));
kafkaConfig = new KafkaConfig(serverProperties);

View File

@ -214,24 +214,8 @@ public class IndexMerger
ProgressIndicator progress
) throws IOException
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
List<IndexableAdapter> indexAdapteres = Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
return merge(
indexAdapteres,
toIndexableAdapters(indexes),
rollup,
metricAggs,
outDir,
@ -268,6 +252,26 @@ public class IndexMerger
);
}
private static List<IndexableAdapter> toIndexableAdapters(List<QueryableIndex> indexes)
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
return Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
}
private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> indexes)
{
int maxSize = 0;
@ -303,6 +307,11 @@ public class IndexMerger
return ImmutableList.copyOf(orderingCandidate);
}
public static List<String> getMergedDimensionsFromQueryableIndexes(List<QueryableIndex> indexes)
{
return getMergedDimensions(toIndexableAdapters(indexes));
}
public static List<String> getMergedDimensions(List<IndexableAdapter> indexes)
{
if (indexes.size() == 0) {

View File

@ -575,11 +575,9 @@ public class AppenderatorImpl implements Appenderator
tuningConfig.getIndexSpec()
);
QueryableIndex index = indexIO.loadIndex(mergedFile);
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);
objectMapper.writeValue(descriptorFile, segment);
@ -925,6 +923,14 @@ public class AppenderatorImpl implements Appenderator
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
try {
hydrant.getSegment().close();
}
catch (IOException e) {
log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource())
.addData("identifier", hydrant.getSegment().getIdentifier())
.emit();
}
}
if (removeOnDiskData) {

View File

@ -42,10 +42,10 @@ import io.druid.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
@ -424,12 +424,11 @@ public class RealtimePlumber implements Plumber
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));
QueryableIndex index = indexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());
DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);
@ -861,6 +860,7 @@ public class RealtimePlumber implements Plumber
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
hydrant.getSegment().close();
}
synchronized (handoffCondition) {
handoffCondition.notifyAll();