review comment - move index closure to finally

This commit is contained in:
nishantmonu51 2014-08-12 14:58:55 +05:30
parent 32b9290723
commit 9598a524a8
1 changed files with 88 additions and 84 deletions

View File

@ -283,84 +283,72 @@ public class IndexGeneratorJob implements Jobby
final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators();
IncrementalIndex index = makeIncrementalIndex(bucket, aggs);
try {
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
baseFlushFile.mkdirs();
File baseFlushFile = File.createTempFile("base", "flush");
baseFlushFile.delete();
baseFlushFile.mkdirs();
Set<File> toMerge = Sets.newTreeSet();
int indexCount = 0;
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<File> toMerge = Sets.newTreeSet();
int indexCount = 0;
int lineCount = 0;
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();
Set<String> allDimensionNames = Sets.newHashSet();
for (final Text value : values) {
context.progress();
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);
++lineCount;
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
toMerge.add(file);
Set<String> allDimensionNames = Sets.newHashSet();
for (final Text value : values) {
context.progress();
IndexMaker.persist(
index, interval, file, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
index.close();
index = makeIncrementalIndex(bucket, aggs);
final InputRow inputRow = index.formatRow(parser.parse(value.toString()));
allDimensionNames.addAll(inputRow.getDimensions());
startTime = System.currentTimeMillis();
++indexCount;
}
}
int numRows = index.add(inputRow);
++lineCount;
log.info("%,d lines completed.", lineCount);
if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) {
log.info(
"%,d lines to %,d rows in %,d millis",
lineCount - runningTotalLineCount,
numRows,
System.currentTimeMillis() - startTime
);
runningTotalLineCount = lineCount;
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
final File file = new File(baseFlushFile, String.format("index%,05d", indexCount));
toMerge.add(file);
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
IndexMaker.persist(
index, interval, mergedBase, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
IndexMaker.persist(
index, interval, file, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
// close this index and make a new one
index.close();
index = makeIncrementalIndex(bucket, aggs);
startTime = System.currentTimeMillis();
++indexCount;
}
}
);
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
log.info("%,d lines completed.", lineCount);
List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
final File mergedBase;
if (toMerge.size() == 0) {
if (index.isEmpty()) {
throw new IAE("If you try to persist empty indexes you are going to have a bad time");
}
mergedBase = new File(baseFlushFile, "merged");
IndexMaker.persist(
index, interval, finalFile, new AbstractProgressIndicator()
index, interval, mergedBase, new AbstractProgressIndicator()
{
@Override
public void progress()
@ -369,29 +357,45 @@ public class IndexGeneratorJob implements Jobby
}
}
);
toMerge.add(finalFile);
}
} else {
if (!index.isEmpty()) {
final File finalFile = new File(baseFlushFile, "final");
IndexMaker.persist(
index, interval, finalFile, new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
toMerge.add(finalFile);
}
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
}
mergedBase = IndexMaker.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
}
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
for (File file : toMerge) {
indexes.add(IndexIO.loadIndex(file));
FileUtils.deleteDirectory(file);
}
mergedBase = IndexMaker.mergeQueryableIndex(
indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator()
{
@Override
public void progress()
{
context.progress();
}
}
);
}
index.close();
serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames));
finally {
index.close();
}
for (File file : toMerge) {
FileUtils.deleteDirectory(file);
}
}
private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List<String> dimensionNames)