diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 29fe8e05728..6e0b4d7496b 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -283,84 +283,72 @@ public class IndexGeneratorJob implements Jobby final AggregatorFactory[] aggs = config.getSchema().getDataSchema().getAggregators(); IncrementalIndex index = makeIncrementalIndex(bucket, aggs); + try { + File baseFlushFile = File.createTempFile("base", "flush"); + baseFlushFile.delete(); + baseFlushFile.mkdirs(); - File baseFlushFile = File.createTempFile("base", "flush"); - baseFlushFile.delete(); - baseFlushFile.mkdirs(); + Set toMerge = Sets.newTreeSet(); + int indexCount = 0; + int lineCount = 0; + int runningTotalLineCount = 0; + long startTime = System.currentTimeMillis(); - Set toMerge = Sets.newTreeSet(); - int indexCount = 0; - int lineCount = 0; - int runningTotalLineCount = 0; - long startTime = System.currentTimeMillis(); - - Set allDimensionNames = Sets.newHashSet(); - - for (final Text value : values) { - context.progress(); - final InputRow inputRow = index.formatRow(parser.parse(value.toString())); - allDimensionNames.addAll(inputRow.getDimensions()); - - int numRows = index.add(inputRow); - ++lineCount; - - if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) { - log.info( - "%,d lines to %,d rows in %,d millis", - lineCount - runningTotalLineCount, - numRows, - System.currentTimeMillis() - startTime - ); - runningTotalLineCount = lineCount; - - final File file = new File(baseFlushFile, String.format("index%,05d", indexCount)); - toMerge.add(file); + Set allDimensionNames = Sets.newHashSet(); + for (final Text value : values) { context.progress(); - IndexMaker.persist( - index, interval, file, new AbstractProgressIndicator() - { - @Override - public void progress() - { - context.progress(); - } - } - ); - index.close(); - index = makeIncrementalIndex(bucket, aggs); + final InputRow inputRow = index.formatRow(parser.parse(value.toString())); + allDimensionNames.addAll(inputRow.getDimensions()); - startTime = System.currentTimeMillis(); - ++indexCount; - } - } + int numRows = index.add(inputRow); + ++lineCount; - log.info("%,d lines completed.", lineCount); + if (numRows >= config.getSchema().getTuningConfig().getRowFlushBoundary()) { + log.info( + "%,d lines to %,d rows in %,d millis", + lineCount - runningTotalLineCount, + numRows, + System.currentTimeMillis() - startTime + ); + runningTotalLineCount = lineCount; - List indexes = Lists.newArrayListWithCapacity(indexCount); - final File mergedBase; + final File file = new File(baseFlushFile, String.format("index%,05d", indexCount)); + toMerge.add(file); - if (toMerge.size() == 0) { - if (index.isEmpty()) { - throw new IAE("If you try to persist empty indexes you are going to have a bad time"); - } - - mergedBase = new File(baseFlushFile, "merged"); - IndexMaker.persist( - index, interval, mergedBase, new AbstractProgressIndicator() - { - @Override - public void progress() - { context.progress(); + IndexMaker.persist( + index, interval, file, new AbstractProgressIndicator() + { + @Override + public void progress() + { + context.progress(); + } + } + ); + // close this index and make a new one + index.close(); + index = makeIncrementalIndex(bucket, aggs); + + startTime = System.currentTimeMillis(); + ++indexCount; } } - ); - } else { - if (!index.isEmpty()) { - final File finalFile = new File(baseFlushFile, "final"); + + log.info("%,d lines completed.", lineCount); + + List indexes = Lists.newArrayListWithCapacity(indexCount); + final File mergedBase; + + if (toMerge.size() == 0) { + if (index.isEmpty()) { + throw new IAE("If you try to persist empty indexes you are going to have a bad time"); + } + + mergedBase = new File(baseFlushFile, "merged"); IndexMaker.persist( - index, interval, finalFile, new AbstractProgressIndicator() + index, interval, mergedBase, new AbstractProgressIndicator() { @Override public void progress() @@ -369,29 +357,45 @@ public class IndexGeneratorJob implements Jobby } } ); - toMerge.add(finalFile); - } + } else { + if (!index.isEmpty()) { + final File finalFile = new File(baseFlushFile, "final"); + IndexMaker.persist( + index, interval, finalFile, new AbstractProgressIndicator() + { + @Override + public void progress() + { + context.progress(); + } + } + ); + toMerge.add(finalFile); + } + for (File file : toMerge) { + indexes.add(IndexIO.loadIndex(file)); + } + mergedBase = IndexMaker.mergeQueryableIndex( + indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator() + { + @Override + public void progress() + { + context.progress(); + } + } + ); + } + serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); for (File file : toMerge) { - indexes.add(IndexIO.loadIndex(file)); + FileUtils.deleteDirectory(file); } - mergedBase = IndexMaker.mergeQueryableIndex( - indexes, aggs, new File(baseFlushFile, "merged"), new AbstractProgressIndicator() - { - @Override - public void progress() - { - context.progress(); - } - } - ); } - index.close(); - serializeOutIndex(context, bucket, mergedBase, Lists.newArrayList(allDimensionNames)); + finally { + index.close(); + } - for (File file : toMerge) { - FileUtils.deleteDirectory(file); - } } private void serializeOutIndex(Context context, Bucket bucket, File mergedBase, List dimensionNames)