address cr about progress ind

This commit is contained in:
fjy 2014-08-19 12:59:01 -07:00
parent 4fd5479559
commit 88a904e0b3
3 changed files with 16 additions and 36 deletions

View File

@ -37,9 +37,10 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.AbstractProgressIndicator;
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMaker;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IncrementalIndex;
@ -295,6 +296,14 @@ public class IndexGeneratorJob implements Jobby
long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = Sets.newHashSet();
final ProgressIndicator progressIndicator = new BaseProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
};
for (final Text value : values) {
context.progress();
@ -318,14 +327,7 @@ public class IndexGeneratorJob implements Jobby
context.progress();
IndexMaker.persist(
index, interval, file, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
index, interval, file, progressIndicator
);
// close this index and make a new one
index.close();
@ -348,27 +350,13 @@ public class IndexGeneratorJob implements Jobby
mergedBase = new File(baseFlushFile, "merged");
IndexMaker.persist(
index, interval, mergedBase, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
index, interval, mergedBase, progressIndicator
);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMaker.persist(
index, interval, finalFile, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
index, interval, finalFile, progressIndicator
);
toMerge.add(finalFile);
}
@ -377,14 +365,7 @@ public class IndexGeneratorJob implements Jobby
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMaker.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
indexes, aggs, new File(baseFlushFile, "merged"), progressIndicator
);
}
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
@ -395,7 +376,6 @@ public class IndexGeneratorJob implements Jobby
finally {
index.close();
}
}
private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List<String> dimensionNames)

View File

@ -21,7 +21,7 @@ package io.druid.segment;
/**
*/
public abstract class AbstractProgressIndicator implements ProgressIndicator
public class BaseProgressIndicator implements ProgressIndicator
{
@Override
public void progress()

View File

@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
public class LoggingProgressIndicator extends AbstractProgressIndicator
public class LoggingProgressIndicator extends BaseProgressIndicator
{
private static Logger log = new Logger(LoggingProgressIndicator.class);