diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index d1e85df5750..bff448da55b 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -34,6 +34,7 @@ import com.google.common.collect.Ordering; import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; +import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; @@ -85,6 +86,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -604,79 +606,95 @@ public class IndexMerger } } - + Closer closer = Closer.create(); final Interval dataInterval; - File v8OutDir = new File(outDir, "v8-tmp"); + final File v8OutDir = new File(outDir, "v8-tmp"); v8OutDir.mkdirs(); - - /************* Main index.drd file **************/ - progress.progress(); - long startTime = System.currentTimeMillis(); - File indexFile = new File(v8OutDir, "index.drd"); - - try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile); - FileChannel channel = fileOutputStream.getChannel()) { - channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION})); - - GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel); - GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel); - - DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT); - DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT); - - for (IndexableAdapter index : indexes) { - minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart()); - maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd()); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + FileUtils.deleteDirectory(v8OutDir); } + }); + final IOPeon ioPeon = new TmpFileIOPeon(); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + ioPeon.cleanup(); + } + }); + try { + /************* Main index.drd file **************/ + progress.progress(); + long startTime = System.currentTimeMillis(); + File indexFile = new File(v8OutDir, "index.drd"); - dataInterval = new Interval(minTime, maxTime); - serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime)); - serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory())); - } - IndexIO.checkFileSize(indexFile); - log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + try (FileOutputStream fileOutputStream = new FileOutputStream(indexFile); + FileChannel channel = fileOutputStream.getChannel()) { + channel.write(ByteBuffer.wrap(new byte[]{IndexIO.V8_VERSION})); - /************* Setup Dim Conversions **************/ - progress.progress(); - startTime = System.currentTimeMillis(); + GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY).writeToChannel(channel); + GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY).writeToChannel(channel); - IOPeon ioPeon = new TmpFileIOPeon(); - ArrayList dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size()); - Map dimensionCardinalities = Maps.newHashMap(); - ArrayList> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); - final ArrayList convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); + DateTime minTime = new DateTime(JodaUtils.MAX_INSTANT); + DateTime maxTime = new DateTime(JodaUtils.MIN_INSTANT); - for (int i = 0; i < indexes.size(); ++i) { - dimConversions.add(Maps.newHashMap()); - } - - for (String dimension : mergedDimensions) { - final GenericIndexedWriter writer = new GenericIndexedWriter( - ioPeon, dimension, GenericIndexed.STRING_STRATEGY - ); - writer.open(); - - boolean dimHasNull = false; - boolean dimHasValues = false; - boolean dimAbsentFromSomeIndex = false; - - int numMergeIndex = 0; - Indexed dimValueLookup = null; - Indexed[] dimValueLookups = new Indexed[indexes.size() + 1]; - for (int i = 0; i < indexes.size(); i++) { - Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); - if (!isNullColumn(dimValues)) { - dimHasValues = true; - dimHasNull |= dimValues.indexOf(null) >= 0; - dimValueLookups[i] = dimValueLookup = dimValues; - numMergeIndex++; - } else { - dimAbsentFromSomeIndex = true; + for (IndexableAdapter index : indexes) { + minTime = JodaUtils.minDateTime(minTime, index.getDataInterval().getStart()); + maxTime = JodaUtils.maxDateTime(maxTime, index.getDataInterval().getEnd()); } + + dataInterval = new Interval(minTime, maxTime); + serializerUtils.writeString(channel, String.format("%s/%s", minTime, maxTime)); + serializerUtils.writeString(channel, mapper.writeValueAsString(indexSpec.getBitmapSerdeFactory())); + } + IndexIO.checkFileSize(indexFile); + log.info("outDir[%s] completed index.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + + /************* Setup Dim Conversions **************/ + progress.progress(); + startTime = System.currentTimeMillis(); + + ArrayList dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size()); + Map dimensionCardinalities = Maps.newHashMap(); + ArrayList> dimConversions = Lists.newArrayListWithCapacity(indexes.size()); + final ArrayList convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); + + for (int i = 0; i < indexes.size(); ++i) { + dimConversions.add(Maps.newHashMap()); } - boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex; - convertMissingDimsFlags.add(convertMissingDims); + for (String dimension : mergedDimensions) { + final GenericIndexedWriter writer = new GenericIndexedWriter( + ioPeon, dimension, GenericIndexed.STRING_STRATEGY + ); + writer.open(); + + boolean dimHasNull = false; + boolean dimHasValues = false; + boolean dimAbsentFromSomeIndex = false; + + int numMergeIndex = 0; + Indexed dimValueLookup = null; + Indexed[] dimValueLookups = new Indexed[indexes.size() + 1]; + for (int i = 0; i < indexes.size(); i++) { + Indexed dimValues = indexes.get(i).getDimValueLookup(dimension); + if (!isNullColumn(dimValues)) { + dimHasValues = true; + dimHasNull |= dimValues.indexOf(null) >= 0; + dimValueLookups[i] = dimValueLookup = dimValues; + numMergeIndex++; + } else { + dimAbsentFromSomeIndex = true; + } + } + + boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex; + convertMissingDimsFlags.add(convertMissingDims); /* * Ensure the empty str is always in the dictionary if the dimension was missing from one index but @@ -685,331 +703,333 @@ public class IndexMerger * later on, to allow rows from indexes without a particular dimension to merge correctly with * rows from indexes with null/empty str values for that dimension. */ - if (convertMissingDims && !dimHasNull) { - dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL; - numMergeIndex++; - } - - int cardinality = 0; - if (numMergeIndex > 1) { - DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true); - - while (iterator.hasNext()) { - writer.write(iterator.next()); + if (convertMissingDims && !dimHasNull) { + dimValueLookups[indexes.size()] = dimValueLookup = EMPTY_STR_DIM_VAL; + numMergeIndex++; } - for (int i = 0; i < indexes.size(); i++) { - if (dimValueLookups[i] != null && iterator.needConversion(i)) { - dimConversions.get(i).put(dimension, iterator.conversions[i]); + int cardinality = 0; + if (numMergeIndex > 1) { + DictionaryMergeIterator iterator = new DictionaryMergeIterator(dimValueLookups, true); + + while (iterator.hasNext()) { + writer.write(iterator.next()); + } + + for (int i = 0; i < indexes.size(); i++) { + if (dimValueLookups[i] != null && iterator.needConversion(i)) { + dimConversions.get(i).put(dimension, iterator.conversions[i]); + } + } + cardinality = iterator.counter; + } else if (numMergeIndex == 1) { + for (String value : dimValueLookup) { + writer.write(value); + } + cardinality = dimValueLookup.size(); + } + + dimensionCardinalities.put(dimension, cardinality); + + FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); + dimOuts.add(dimOut); + + writer.close(); + serializerUtils.writeString(dimOut, dimension); + ByteStreams.copy(writer.combineStreams(), dimOut); + + ioPeon.cleanup(); + } + log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); + + /************* Walk through data sets and merge them *************/ + progress.progress(); + startTime = System.currentTimeMillis(); + + Iterable theRows = makeRowIterable( + indexes, + mergedDimensions, + mergedMetrics, + dimConversions, + convertMissingDimsFlags, + rowMergerFn + ); + + CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( + ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY + ); + + timeWriter.open(); + + ArrayList forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size()); + for (String dimension : mergedDimensions) { + VSizeIndexedWriter writer = new VSizeIndexedWriter(ioPeon, dimension, dimensionCardinalities.get(dimension)); + writer.open(); + forwardDimWriters.add(writer); + } + + ArrayList metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size()); + for (String metric : mergedMetrics) { + ValueType type = valueTypes.get(metric); + switch (type) { + case LONG: + metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon)); + break; + case FLOAT: + metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); + break; + case COMPLEX: + final String typeName = metricTypeNames.get(metric); + ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); + + if (serde == null) { + throw new ISE("Unknown type[%s]", typeName); + } + + metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde)); + break; + default: + throw new ISE("Unknown type[%s]", type); + } + } + + for (MetricColumnSerializer metWriter : metWriters) { + metWriter.open(); + } + + int rowCount = 0; + long time = System.currentTimeMillis(); + List rowNumConversions = Lists.newArrayListWithCapacity(indexes.size()); + for (IndexableAdapter index : indexes) { + int[] arr = new int[index.getNumRows()]; + Arrays.fill(arr, INVALID_ROW); + rowNumConversions.add(IntBuffer.wrap(arr)); + } + + for (Rowboat theRow : theRows) { + progress.progress(); + timeWriter.add(theRow.getTimestamp()); + + final Object[] metrics = theRow.getMetrics(); + for (int i = 0; i < metrics.length; ++i) { + metWriters.get(i).serialize(metrics[i]); + } + + int[][] dims = theRow.getDims(); + for (int i = 0; i < dims.length; ++i) { + List listToWrite = (i >= dims.length || dims[i] == null) + ? null + : Ints.asList(dims[i]); + forwardDimWriters.get(i).write(listToWrite); + } + + for (Map.Entry> comprisedRow : theRow.getComprisedRows().entrySet()) { + final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); + + for (Integer rowNum : comprisedRow.getValue()) { + while (conversionBuffer.position() < rowNum) { + conversionBuffer.put(INVALID_ROW); + } + conversionBuffer.put(rowCount); } } - cardinality = iterator.counter; - } else if (numMergeIndex == 1) { - for (String value : dimValueLookup) { - writer.write(value); + + if ((++rowCount % 500000) == 0) { + log.info( + "outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time + ); + time = System.currentTimeMillis(); } - cardinality = dimValueLookup.size(); } - dimensionCardinalities.put(dimension, cardinality); + for (IntBuffer rowNumConversion : rowNumConversions) { + rowNumConversion.rewind(); + } - FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, dimension), true); - dimOuts.add(dimOut); + final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER); + timeFile.delete(); + OutputSupplier out = Files.newOutputStreamSupplier(timeFile, true); + timeWriter.closeAndConsolidate(out); + IndexIO.checkFileSize(timeFile); - writer.close(); - serializerUtils.writeString(dimOut, dimension); - ByteStreams.copy(writer.combineStreams(), dimOut); + for (int i = 0; i < mergedDimensions.size(); ++i) { + forwardDimWriters.get(i).close(); + ByteStreams.copy(forwardDimWriters.get(i).combineStreams(), dimOuts.get(i)); + } + + for (MetricColumnSerializer metWriter : metWriters) { + metWriter.close(); + } ioPeon.cleanup(); - } - log.info("outDir[%s] completed dim conversions in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); - - /************* Walk through data sets and merge them *************/ - progress.progress(); - startTime = System.currentTimeMillis(); - - Iterable theRows = makeRowIterable( - indexes, - mergedDimensions, - mergedMetrics, - dimConversions, - convertMissingDimsFlags, - rowMergerFn - ); - - CompressedLongsSupplierSerializer timeWriter = CompressedLongsSupplierSerializer.create( - ioPeon, "little_end_time", IndexIO.BYTE_ORDER, CompressedObjectStrategy.DEFAULT_COMPRESSION_STRATEGY - ); - - timeWriter.open(); - - ArrayList forwardDimWriters = Lists.newArrayListWithCapacity(mergedDimensions.size()); - for (String dimension : mergedDimensions) { - VSizeIndexedWriter writer = new VSizeIndexedWriter(ioPeon, dimension, dimensionCardinalities.get(dimension)); - writer.open(); - forwardDimWriters.add(writer); - } - - ArrayList metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size()); - for (String metric : mergedMetrics) { - ValueType type = valueTypes.get(metric); - switch (type) { - case LONG: - metWriters.add(new LongMetricColumnSerializer(metric, v8OutDir, ioPeon)); - break; - case FLOAT: - metWriters.add(new FloatMetricColumnSerializer(metric, v8OutDir, ioPeon)); - break; - case COMPLEX: - final String typeName = metricTypeNames.get(metric); - ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName); - - if (serde == null) { - throw new ISE("Unknown type[%s]", typeName); - } - - metWriters.add(new ComplexMetricColumnSerializer(metric, v8OutDir, ioPeon, serde)); - break; - default: - throw new ISE("Unknown type[%s]", type); - } - } - - for (MetricColumnSerializer metWriter : metWriters) { - metWriter.open(); - } - - int rowCount = 0; - long time = System.currentTimeMillis(); - List rowNumConversions = Lists.newArrayListWithCapacity(indexes.size()); - for (IndexableAdapter index : indexes) { - int[] arr = new int[index.getNumRows()]; - Arrays.fill(arr, INVALID_ROW); - rowNumConversions.add(IntBuffer.wrap(arr)); - } - - for (Rowboat theRow : theRows) { - progress.progress(); - timeWriter.add(theRow.getTimestamp()); - - final Object[] metrics = theRow.getMetrics(); - for (int i = 0; i < metrics.length; ++i) { - metWriters.get(i).serialize(metrics[i]); - } - - int[][] dims = theRow.getDims(); - for (int i = 0; i < dims.length; ++i) { - List listToWrite = (i >= dims.length || dims[i] == null) - ? null - : Ints.asList(dims[i]); - forwardDimWriters.get(i).write(listToWrite); - } - - for (Map.Entry> comprisedRow : theRow.getComprisedRows().entrySet()) { - final IntBuffer conversionBuffer = rowNumConversions.get(comprisedRow.getKey()); - - for (Integer rowNum : comprisedRow.getValue()) { - while (conversionBuffer.position() < rowNum) { - conversionBuffer.put(INVALID_ROW); - } - conversionBuffer.put(rowCount); - } - } - - if ((++rowCount % 500000) == 0) { - log.info( - "outDir[%s] walked 500,000/%,d rows in %,d millis.", v8OutDir, rowCount, System.currentTimeMillis() - time - ); - time = System.currentTimeMillis(); - } - } - - for (IntBuffer rowNumConversion : rowNumConversions) { - rowNumConversion.rewind(); - } - - final File timeFile = IndexIO.makeTimeFile(v8OutDir, IndexIO.BYTE_ORDER); - timeFile.delete(); - OutputSupplier out = Files.newOutputStreamSupplier(timeFile, true); - timeWriter.closeAndConsolidate(out); - IndexIO.checkFileSize(timeFile); - - for (int i = 0; i < mergedDimensions.size(); ++i) { - forwardDimWriters.get(i).close(); - ByteStreams.copy(forwardDimWriters.get(i).combineStreams(), dimOuts.get(i)); - } - - for (MetricColumnSerializer metWriter : metWriters) { - metWriter.close(); - } - - ioPeon.cleanup(); - log.info( - "outDir[%s] completed walk through of %,d rows in %,d millis.", - v8OutDir, - rowCount, - System.currentTimeMillis() - startTime - ); - - /************ Create Inverted Indexes *************/ - startTime = System.currentTimeMillis(); - - final File invertedFile = new File(v8OutDir, "inverted.drd"); - Files.touch(invertedFile); - out = Files.newOutputStreamSupplier(invertedFile, true); - - final File geoFile = new File(v8OutDir, "spatial.drd"); - Files.touch(geoFile); - OutputSupplier spatialOut = Files.newOutputStreamSupplier(geoFile, true); - - for (int i = 0; i < mergedDimensions.size(); ++i) { - long dimStartTime = System.currentTimeMillis(); - String dimension = mergedDimensions.get(i); - - File dimOutFile = dimOuts.get(i).getFile(); - final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); - - if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { - throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); - } - Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); - log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); - - final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); - GenericIndexedWriter writer = new GenericIndexedWriter<>( - ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy() + log.info( + "outDir[%s] completed walk through of %,d rows in %,d millis.", + v8OutDir, + rowCount, + System.currentTimeMillis() - startTime ); - writer.open(); - boolean isSpatialDim = columnCapabilities.get(dimension).hasSpatialIndexes(); - ByteBufferWriter spatialWriter = null; - RTree tree = null; - IOPeon spatialIoPeon = new TmpFileIOPeon(); - if (isSpatialDim) { - BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - spatialWriter = new ByteBufferWriter( - spatialIoPeon, dimension, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory) + /************ Create Inverted Indexes *************/ + startTime = System.currentTimeMillis(); + + final File invertedFile = new File(v8OutDir, "inverted.drd"); + Files.touch(invertedFile); + out = Files.newOutputStreamSupplier(invertedFile, true); + + final File geoFile = new File(v8OutDir, "spatial.drd"); + Files.touch(geoFile); + OutputSupplier spatialOut = Files.newOutputStreamSupplier(geoFile, true); + + for (int i = 0; i < mergedDimensions.size(); ++i) { + long dimStartTime = System.currentTimeMillis(); + String dimension = mergedDimensions.get(i); + + File dimOutFile = dimOuts.get(i).getFile(); + final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); + + if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { + throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); + } + Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); + log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); + + final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); + GenericIndexedWriter writer = new GenericIndexedWriter<>( + ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy() ); - spatialWriter.open(); - tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); - } + writer.open(); - IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension); - - //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. - for (int dictId = 0; dictId < dimVals.size(); dictId++) { - progress.progress(); - List> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); - for (int j = 0; j < indexes.size(); ++j) { - int seekedDictId = dictIdSeeker[j].seek(dictId); - if (seekedDictId != IndexSeeker.NOT_EXIST) { - convertedInverteds.add( - new ConvertingIndexedInts( - indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) - ) - ); - } + boolean isSpatialDim = columnCapabilities.get(dimension).hasSpatialIndexes(); + ByteBufferWriter spatialWriter = null; + RTree tree = null; + IOPeon spatialIoPeon = new TmpFileIOPeon(); + if (isSpatialDim) { + BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); + spatialWriter = new ByteBufferWriter( + spatialIoPeon, dimension, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory) + ); + spatialWriter.open(); + tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); } - MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); - for (Integer row : CombiningIterable.createSplatted( - convertedInverteds, - Ordering.natural().nullsFirst() - )) { - if (row != INVALID_ROW) { - bitset.add(row); + IndexSeeker[] dictIdSeeker = toIndexSeekers(indexes, dimConversions, dimension); + + //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. + for (int dictId = 0; dictId < dimVals.size(); dictId++) { + progress.progress(); + List> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); + for (int j = 0; j < indexes.size(); ++j) { + int seekedDictId = dictIdSeeker[j].seek(dictId); + if (seekedDictId != IndexSeeker.NOT_EXIST) { + convertedInverteds.add( + new ConvertingIndexedInts( + indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) + ) + ); + } + } + + MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); + for (Integer row : CombiningIterable.createSplatted( + convertedInverteds, + Ordering.natural().nullsFirst() + )) { + if (row != INVALID_ROW) { + bitset.add(row); + } + } + + writer.write( + bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset) + ); + + if (isSpatialDim) { + String dimVal = dimVals.get(dictId); + if (dimVal != null) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + tree.insert(coords, bitset); + } } } + writer.close(); - writer.write( - bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset) - ); + serializerUtils.writeString(out, dimension); + ByteStreams.copy(writer.combineStreams(), out); + ioPeon.cleanup(); + + log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime); if (isSpatialDim) { - String dimVal = dimVals.get(dictId); - if (dimVal != null) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); - } - tree.insert(coords, bitset); - } + spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); + spatialWriter.close(); + + serializerUtils.writeString(spatialOut, dimension); + ByteStreams.copy(spatialWriter.combineStreams(), spatialOut); + spatialIoPeon.cleanup(); } } - writer.close(); - serializerUtils.writeString(out, dimension); - ByteStreams.copy(writer.combineStreams(), out); - ioPeon.cleanup(); + log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); - log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime); + final ArrayList expectedFiles = Lists.newArrayList( + Iterables.concat( + Arrays.asList( + "index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER) + ), + Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")), + Iterables.transform( + mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER)) + ) + ) + ); - if (isSpatialDim) { - spatialWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); - spatialWriter.close(); + if (segmentMetadata != null) { + writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata); + log.info("wrote metadata.drd in outDir[%s].", v8OutDir); - serializerUtils.writeString(spatialOut, dimension); - ByteStreams.copy(spatialWriter.combineStreams(), spatialOut); - spatialIoPeon.cleanup(); + expectedFiles.add("metadata.drd"); } + + Map files = Maps.newLinkedHashMap(); + for (String fileName : expectedFiles) { + files.put(fileName, new File(v8OutDir, fileName)); + } + + File smooshDir = new File(v8OutDir, "smoosher"); + smooshDir.mkdir(); + + for (Map.Entry entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) { + entry.getValue().delete(); + } + + for (File file : smooshDir.listFiles()) { + Files.move(file, new File(v8OutDir, file.getName())); + } + + if (!smooshDir.delete()) { + log.info("Unable to delete temporary dir[%s], contains[%s]", smooshDir, Arrays.asList(smooshDir.listFiles())); + throw new IOException(String.format("Unable to delete temporary dir[%s]", smooshDir)); + } + + createIndexDrdFile( + IndexIO.V8_VERSION, + v8OutDir, + GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY), + GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY), + dataInterval, + indexSpec.getBitmapSerdeFactory() + ); + + indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec); + return outDir; } - - log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); - - final ArrayList expectedFiles = Lists.newArrayList( - Iterables.concat( - Arrays.asList( - "index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER) - ), - Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")), - Iterables.transform( - mergedMetrics, GuavaUtils.formatFunction(String.format("met_%%s_%s.drd", IndexIO.BYTE_ORDER)) - ) - ) - ); - - if (segmentMetadata != null) { - writeMetadataToFile(new File(v8OutDir, "metadata.drd"), segmentMetadata); - log.info("wrote metadata.drd in outDir[%s].", v8OutDir); - - expectedFiles.add("metadata.drd"); + finally { + closer.close(); } - - Map files = Maps.newLinkedHashMap(); - for (String fileName : expectedFiles) { - files.put(fileName, new File(v8OutDir, fileName)); - } - - File smooshDir = new File(v8OutDir, "smoosher"); - smooshDir.mkdir(); - - for (Map.Entry entry : Smoosh.smoosh(v8OutDir, smooshDir, files).entrySet()) { - entry.getValue().delete(); - } - - for (File file : smooshDir.listFiles()) { - Files.move(file, new File(v8OutDir, file.getName())); - } - - if (!smooshDir.delete()) { - log.info("Unable to delete temporary dir[%s], contains[%s]", smooshDir, Arrays.asList(smooshDir.listFiles())); - throw new IOException(String.format("Unable to delete temporary dir[%s]", smooshDir)); - } - - createIndexDrdFile( - IndexIO.V8_VERSION, - v8OutDir, - GenericIndexed.fromIterable(mergedDimensions, GenericIndexed.STRING_STRATEGY), - GenericIndexed.fromIterable(mergedMetrics, GenericIndexed.STRING_STRATEGY), - dataInterval, - indexSpec.getBitmapSerdeFactory() - ); - - indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec); - FileUtils.deleteDirectory(v8OutDir); - - return outDir; } protected Iterable makeRowIterable( diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 7368fbb62fb..ae555ad967f 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -27,6 +27,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; +import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.primitives.Ints; import com.google.inject.Inject; @@ -74,6 +75,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -144,102 +146,121 @@ public class IndexMergerV9 extends IndexMerger ); } + Closer closer = Closer.create(); final IOPeon ioPeon = new TmpFileIOPeon(false); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + ioPeon.cleanup(); + } + }); final FileSmoosher v9Smoosher = new FileSmoosher(outDir); final File v9TmpDir = new File(outDir, "v9-tmp"); v9TmpDir.mkdirs(); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + FileUtils.deleteDirectory(v9TmpDir); + } + }); log.info("Start making v9 index files, outDir:%s", outDir); + try { + long startTime = System.currentTimeMillis(); + ByteStreams.write( + Ints.toByteArray(IndexIO.V9_VERSION), + Files.newOutputStreamSupplier(new File(outDir, "version.bin")) + ); + log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime); - long startTime = System.currentTimeMillis(); - ByteStreams.write( - Ints.toByteArray(IndexIO.V9_VERSION), - Files.newOutputStreamSupplier(new File(outDir, "version.bin")) - ); - log.info("Completed version.bin in %,d millis.", System.currentTimeMillis() - startTime); + progress.progress(); + final Map metricsValueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); + final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); + final List dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size()); + mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities); - progress.progress(); - final Map metricsValueTypes = Maps.newTreeMap(Ordering.natural().nullsFirst()); - final Map metricTypeNames = Maps.newTreeMap(Ordering.natural().nullsFirst()); - final List dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size()); - mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities); + /************* Setup Dim Conversions **************/ + progress.progress(); + startTime = System.currentTimeMillis(); + final Map dimCardinalities = Maps.newHashMap(); + final ArrayList> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions); + final ArrayList> dimConversions = Lists.newArrayListWithCapacity(adapters.size()); + final ArrayList dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size()); + final ArrayList dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); + final ArrayList convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); + writeDimValueAndSetupDimConversion( + adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions, + convertMissingDimsFlags, dimHasNullFlags + ); + log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime); - /************* Setup Dim Conversions **************/ - progress.progress(); - startTime = System.currentTimeMillis(); - final Map dimCardinalities = Maps.newHashMap(); - final ArrayList> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions); - final ArrayList> dimConversions = Lists.newArrayListWithCapacity(adapters.size()); - final ArrayList dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size()); - final ArrayList dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); - final ArrayList convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size()); - writeDimValueAndSetupDimConversion( - adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions, - convertMissingDimsFlags, dimHasNullFlags - ); - log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime); + /************* Walk through data sets, merge them, and write merged columns *************/ + progress.progress(); + final Iterable theRows = makeRowIterable( + adapters, + mergedDimensions, + mergedMetrics, + dimConversions, + convertMissingDimsFlags, + rowMergerFn + ); + final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon); + final ArrayList dimWriters = setupDimensionWriters( + ioPeon, mergedDimensions, dimCapabilities, dimCardinalities, indexSpec + ); + final ArrayList metWriters = setupMetricsWriters( + ioPeon, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec + ); + final List rowNumConversions = Lists.newArrayListWithCapacity(adapters.size()); + final ArrayList nullRowsList = Lists.newArrayListWithCapacity(mergedDimensions.size()); + for (int i = 0; i < mergedDimensions.size(); ++i) { + nullRowsList.add(indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()); + } + mergeIndexesAndWriteColumns( + adapters, progress, theRows, timeWriter, dimWriters, metWriters, + dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags + ); - /************* Walk through data sets, merge them, and write merged columns *************/ - progress.progress(); - final Iterable theRows = makeRowIterable( - adapters, - mergedDimensions, - mergedMetrics, - dimConversions, - convertMissingDimsFlags, - rowMergerFn - ); - final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon); - final ArrayList dimWriters = setupDimensionWriters( - ioPeon, mergedDimensions, dimCapabilities, dimCardinalities, indexSpec - ); - final ArrayList metWriters = setupMetricsWriters( - ioPeon, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec - ); - final List rowNumConversions = Lists.newArrayListWithCapacity(adapters.size()); - final ArrayList nullRowsList = Lists.newArrayListWithCapacity(mergedDimensions.size()); - for (int i = 0; i < mergedDimensions.size(); ++i) { - nullRowsList.add(indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap()); + /************ Create Inverted Indexes *************/ + progress.progress(); + final ArrayList> bitmapIndexWriters = setupBitmapIndexWriters( + ioPeon, mergedDimensions, indexSpec + ); + final ArrayList> spatialIndexWriters = setupSpatialIndexWriters( + ioPeon, mergedDimensions, indexSpec, dimCapabilities + ); + makeInvertedIndexes( + adapters, progress, mergedDimensions, indexSpec, v9TmpDir, rowNumConversions, + nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters, dimConversions + ); + + /************ Finalize Build Columns *************/ + progress.progress(); + makeTimeColumn(v9Smoosher, progress, timeWriter); + makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters); + makeDimensionColumns( + v9Smoosher, progress, indexSpec, mergedDimensions, dimensionSkipFlag, dimCapabilities, + dimValueWriters, dimWriters, bitmapIndexWriters, spatialIndexWriters + ); + + /************* Make index.drd & metadata.drd files **************/ + progress.progress(); + makeIndexBinary( + v9Smoosher, adapters, outDir, mergedDimensions, dimensionSkipFlag, mergedMetrics, progress, indexSpec + ); + makeMetadataBinary(v9Smoosher, progress, segmentMetadata); + + v9Smoosher.close(); + progress.stop(); + + return outDir; + } + finally { + closer.close(); } - mergeIndexesAndWriteColumns( - adapters, progress, theRows, timeWriter, dimWriters, metWriters, - dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags - ); - - /************ Create Inverted Indexes *************/ - progress.progress(); - final ArrayList> bitmapIndexWriters = setupBitmapIndexWriters( - ioPeon, mergedDimensions, indexSpec - ); - final ArrayList> spatialIndexWriters = setupSpatialIndexWriters( - ioPeon, mergedDimensions, indexSpec, dimCapabilities - ); - makeInvertedIndexes( - adapters, progress, mergedDimensions, indexSpec, v9TmpDir, rowNumConversions, - nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters, dimConversions - ); - - /************ Finalize Build Columns *************/ - progress.progress(); - makeTimeColumn(v9Smoosher, progress, timeWriter); - makeMetricsColumns(v9Smoosher, progress, mergedMetrics, metricsValueTypes, metricTypeNames, metWriters); - makeDimensionColumns( - v9Smoosher, progress, indexSpec, mergedDimensions, dimensionSkipFlag, dimCapabilities, - dimValueWriters, dimWriters, bitmapIndexWriters, spatialIndexWriters - ); - - /************* Make index.drd & metadata.drd files **************/ - progress.progress(); - makeIndexBinary( - v9Smoosher, adapters, outDir, mergedDimensions, dimensionSkipFlag, mergedMetrics, progress, indexSpec - ); - makeMetadataBinary(v9Smoosher, progress, segmentMetadata); - - v9Smoosher.close(); - ioPeon.cleanup(); - FileUtils.deleteDirectory(v9TmpDir); - progress.stop(); - - return outDir; } private void makeMetadataBinary( diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 39ae4de7bdf..d5f46b605de 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -37,6 +37,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.SimpleDictionaryEncodedColumn; import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.CompressedObjectStrategy; @@ -67,7 +68,6 @@ import java.nio.IntBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -1662,4 +1662,34 @@ public class IndexMergerTest Assert.assertEquals(2, dictIdSeeker.seek(4)); Assert.assertEquals(-1, dictIdSeeker.seek(5)); } + + @Test(expected = IllegalArgumentException.class) + public void testCloser() throws Exception + { + final long timestamp = System.currentTimeMillis(); + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null); + IncrementalIndexTest.populateIndex(timestamp, toPersist); + ColumnCapabilitiesImpl capabilities = (ColumnCapabilitiesImpl) toPersist.getCapabilities("dim1"); + capabilities.setHasSpatialIndexes(true); + + final File tempDir = temporaryFolder.newFolder(); + final File v8TmpDir = new File(tempDir, "v8-tmp"); + final File v9TmpDir = new File(tempDir, "v9-tmp"); + + try { + INDEX_MERGER.persist( + toPersist, + tempDir, + indexSpec + ); + } + finally { + if (v8TmpDir.exists()) { + Assert.fail("v8-tmp dir not clean."); + } + if (v9TmpDir.exists()) { + Assert.fail("v9-tmp dir not clean."); + } + } + } }