diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 73f1ac8aa99..0c6ad9f1058 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -32,6 +32,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; +import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.primitives.Ints; import com.google.inject.Inject; @@ -120,17 +121,17 @@ public class IndexIO this.columnConfig = Preconditions.checkNotNull(columnConfig, "null ColumnConfig"); defaultIndexIOHandler = new DefaultIndexIOHandler(mapper); indexLoaders = ImmutableMap.builder() - .put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) - .put(9, new V9IndexLoader(columnConfig)) - .build(); + .put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) + .put(9, new V9IndexLoader(columnConfig)) + .build(); } @@ -269,13 +270,14 @@ public class IndexIO case 6: case 7: log.info("Old version, re-persisting."); - QueryableIndex segmentToConvert = loadIndex(toConvert); - new IndexMerger(mapper, this).append( - Arrays.asList(new QueryableIndexIndexableAdapter(segmentToConvert)), - null, - converted, - indexSpec - ); + try (QueryableIndex segmentToConvert = loadIndex(toConvert)) { + new IndexMerger(mapper, this).append( + Arrays.asList(new QueryableIndexIndexableAdapter(segmentToConvert)), + null, + converted, + indexSpec + ); + } return true; case 8: defaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec); @@ -545,347 +547,355 @@ public class IndexIO Closeables.close(indexIn, false); } - SmooshedFileMapper v8SmooshedFiles = Smoosh.map(v8Dir); + Closer closer = Closer.create(); + try { + SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir)); - v9Dir.mkdirs(); - final FileSmoosher v9Smoosher = new FileSmoosher(v9Dir); + v9Dir.mkdirs(); + final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir)); - ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); + ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); - Map> bitmapIndexes = Maps.newHashMap(); - final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd"); - BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); + Map> bitmapIndexes = Maps.newHashMap(); + final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd"); + BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); - while (invertedBuffer.hasRemaining()) { - final String dimName = serializerUtils.readString(invertedBuffer); - bitmapIndexes.put( - dimName, - GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) - ); - } - - Map spatialIndexes = Maps.newHashMap(); - final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd"); - while (spatialBuffer != null && spatialBuffer.hasRemaining()) { - spatialIndexes.put( - serializerUtils.readString(spatialBuffer), - ByteBufferSerializer.read( - spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy( - bitmapSerdeFactory.getBitmapFactory() - ) - ) - ); - } - - final LinkedHashSet skippedFiles = Sets.newLinkedHashSet(); - final Set skippedDimensions = Sets.newLinkedHashSet(); - for (String filename : v8SmooshedFiles.getInternalFilenames()) { - log.info("Processing file[%s]", filename); - if (filename.startsWith("dim_")) { - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - builder.setValueType(ValueType.STRING); - - final List outParts = Lists.newArrayList(); - - ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename); - String dimension = serializerUtils.readString(dimBuffer); - if (!filename.equals(String.format("dim_%s.drd", dimension))) { - throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename); - } - - ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream(); - serializerUtils.writeString(nameBAOS, dimension); - outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); - - GenericIndexed dictionary = GenericIndexed.read( - dimBuffer, GenericIndexed.STRING_STRATEGY + while (invertedBuffer.hasRemaining()) { + final String dimName = serializerUtils.readString(invertedBuffer); + bitmapIndexes.put( + dimName, + GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) ); + } - if (dictionary.size() == 0) { - log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension); - skippedDimensions.add(dimension); - continue; - } + Map spatialIndexes = Maps.newHashMap(); + final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd"); + while (spatialBuffer != null && spatialBuffer.hasRemaining()) { + spatialIndexes.put( + serializerUtils.readString(spatialBuffer), + ByteBufferSerializer.read( + spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy( + bitmapSerdeFactory.getBitmapFactory() + ) + ) + ); + } - int emptyStrIdx = dictionary.indexOf(""); - List singleValCol = null; - VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); - GenericIndexed bitmaps = bitmapIndexes.get(dimension); - ImmutableRTree spatialIndex = spatialIndexes.get(dimension); + final LinkedHashSet skippedFiles = Sets.newLinkedHashSet(); + final Set skippedDimensions = Sets.newLinkedHashSet(); + for (String filename : v8SmooshedFiles.getInternalFilenames()) { + log.info("Processing file[%s]", filename); + if (filename.startsWith("dim_")) { + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); + builder.setValueType(ValueType.STRING); - final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - boolean onlyOneValue = true; - MutableBitmap nullsSet = null; - for (int i = 0; i < multiValCol.size(); ++i) { - VSizeIndexedInts rowValue = multiValCol.get(i); - if (!onlyOneValue) { - break; + final List outParts = Lists.newArrayList(); + + ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename); + String dimension = serializerUtils.readString(dimBuffer); + if (!filename.equals(String.format("dim_%s.drd", dimension))) { + throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename); } - if (rowValue.size() > 1) { - onlyOneValue = false; + + ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream(); + serializerUtils.writeString(nameBAOS, dimension); + outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray())); + + GenericIndexed dictionary = GenericIndexed.read( + dimBuffer, GenericIndexed.STRING_STRATEGY + ); + + if (dictionary.size() == 0) { + log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension); + skippedDimensions.add(dimension); + continue; } - if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) { - if (nullsSet == null) { - nullsSet = bitmapFactory.makeEmptyMutableBitmap(); + + int emptyStrIdx = dictionary.indexOf(""); + List singleValCol = null; + VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); + GenericIndexed bitmaps = bitmapIndexes.get(dimension); + ImmutableRTree spatialIndex = spatialIndexes.get(dimension); + + final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); + boolean onlyOneValue = true; + MutableBitmap nullsSet = null; + for (int i = 0; i < multiValCol.size(); ++i) { + VSizeIndexedInts rowValue = multiValCol.get(i); + if (!onlyOneValue) { + break; + } + if (rowValue.size() > 1) { + onlyOneValue = false; + } + if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) { + if (nullsSet == null) { + nullsSet = bitmapFactory.makeEmptyMutableBitmap(); + } + nullsSet.add(i); } - nullsSet.add(i); } - } - if (onlyOneValue) { - log.info("Dimension[%s] is single value, converting...", dimension); - final boolean bumpedDictionary; - if (nullsSet != null) { - log.info("Dimension[%s] has null rows.", dimension); - final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet); + if (onlyOneValue) { + log.info("Dimension[%s] is single value, converting...", dimension); + final boolean bumpedDictionary; + if (nullsSet != null) { + log.info("Dimension[%s] has null rows.", dimension); + final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet); - if (dictionary.get(0) != null) { - log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); - bumpedDictionary = true; - final List nullList = Lists.newArrayList(); - nullList.add(null); + if (dictionary.get(0) != null) { + log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); + bumpedDictionary = true; + final List nullList = Lists.newArrayList(); + nullList.add(null); - dictionary = GenericIndexed.fromIterable( - Iterables.concat(nullList, dictionary), - GenericIndexed.STRING_STRATEGY - ); + dictionary = GenericIndexed.fromIterable( + Iterables.concat(nullList, dictionary), + GenericIndexed.STRING_STRATEGY + ); - bitmaps = GenericIndexed.fromIterable( - Iterables.concat(Arrays.asList(theNullSet), bitmaps), - bitmapSerdeFactory.getObjectStrategy() - ); + bitmaps = GenericIndexed.fromIterable( + Iterables.concat(Arrays.asList(theNullSet), bitmaps), + bitmapSerdeFactory.getObjectStrategy() + ); + } else { + bumpedDictionary = false; + bitmaps = GenericIndexed.fromIterable( + Iterables.concat( + Arrays.asList( + bitmapFactory + .union(Arrays.asList(theNullSet, bitmaps.get(0))) + ), + Iterables.skip(bitmaps, 1) + ), + bitmapSerdeFactory.getObjectStrategy() + ); + } } else { bumpedDictionary = false; - bitmaps = GenericIndexed.fromIterable( - Iterables.concat( - Arrays.asList( - bitmapFactory - .union(Arrays.asList(theNullSet, bitmaps.get(0))) - ), - Iterables.skip(bitmaps, 1) - ), - bitmapSerdeFactory.getObjectStrategy() - ); } + + final VSizeIndexed finalMultiValCol = multiValCol; + singleValCol = new AbstractList() + { + @Override + public Integer get(int index) + { + final VSizeIndexedInts ints = finalMultiValCol.get(index); + return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); + } + + @Override + public int size() + { + return finalMultiValCol.size(); + } + }; + + multiValCol = null; } else { - bumpedDictionary = false; + builder.setHasMultipleValues(true); } - final VSizeIndexed finalMultiValCol = multiValCol; - singleValCol = new AbstractList() - { - @Override - public Integer get(int index) - { - final VSizeIndexedInts ints = finalMultiValCol.get(index); - return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); + final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression(); + + final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde + .legacySerializerBuilder() + .withDictionary(dictionary) + .withBitmapSerdeFactory(bitmapSerdeFactory) + .withBitmaps(bitmaps) + .withSpatialIndex(spatialIndex) + .withByteOrder(BYTE_ORDER); + + if (singleValCol != null) { + if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { + columnPartBuilder.withSingleValuedColumn( + CompressedVSizeIntsIndexedSupplier.fromList( + singleValCol, + dictionary.size(), + CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()), + BYTE_ORDER, + compressionStrategy + ) + ); + } else { + columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); } - - @Override - public int size() - { - return finalMultiValCol.size(); - } - }; - - multiValCol = null; - } else { - builder.setHasMultipleValues(true); - } - - final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression(); - - final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde - .legacySerializerBuilder() - .withDictionary(dictionary) - .withBitmapSerdeFactory(bitmapSerdeFactory) - .withBitmaps(bitmaps) - .withSpatialIndex(spatialIndex) - .withByteOrder(BYTE_ORDER); - - if (singleValCol != null) { - if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { - columnPartBuilder.withSingleValuedColumn( - CompressedVSizeIntsIndexedSupplier.fromList( - singleValCol, + } else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { + columnPartBuilder.withMultiValuedColumn( + CompressedVSizeIndexedSupplier.fromIterable( + multiValCol, dictionary.size(), - CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()), BYTE_ORDER, compressionStrategy ) ); } else { - columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); + columnPartBuilder.withMultiValuedColumn(multiValCol); } - } else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) { - columnPartBuilder.withMultiValuedColumn( - CompressedVSizeIndexedSupplier.fromIterable( - multiValCol, - dictionary.size(), - BYTE_ORDER, - compressionStrategy - ) + + final ColumnDescriptor serdeficator = builder + .addSerde(columnPartBuilder.build()) + .build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); + + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + dimension, serdeficator.numBytes() + specBytes.length ); - } else { - columnPartBuilder.withMultiValuedColumn(multiValCol); - } + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else if (filename.startsWith("met_")) { + if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) { + skippedFiles.add(filename); + continue; + } - final ColumnDescriptor serdeficator = builder - .addSerde(columnPartBuilder.build()) - .build(); + MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); + final String metric = holder.getName(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - dimension, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else if (filename.startsWith("met_")) { - if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) { - skippedFiles.add(filename); - continue; - } - - MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); - final String metric = holder.getName(); - - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - - switch (holder.getType()) { - case LONG: - builder.setValueType(ValueType.LONG); - builder.addSerde( - LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.longType) - .build() - ); - break; - case FLOAT: - builder.setValueType(ValueType.FLOAT); - builder.addSerde( - FloatGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(holder.floatType) - .build() - ); - break; - case COMPLEX: - if (!(holder.complexType instanceof GenericIndexed)) { - throw new ISE("Serialized complex types must be GenericIndexed objects."); - } - final GenericIndexed column = (GenericIndexed) holder.complexType; - final String complexType = holder.getTypeName(); - builder.setValueType(ValueType.COMPLEX); - builder.addSerde( - ComplexColumnPartSerde.legacySerializerBuilder() - .withTypeName(complexType) - .withDelegate(column).build() - ); - break; - default: - throw new ISE("Unknown type[%s]", holder.getType()); - } - - final ColumnDescriptor serdeficator = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - metric, serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { - CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( - v8SmooshedFiles.mapFile(filename), BYTE_ORDER - ); - - final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); - builder.setValueType(ValueType.LONG); - builder.addSerde( - LongGenericColumnPartSerde.legacySerializerBuilder() - .withByteOrder(BYTE_ORDER) - .withDelegate(timestamps) - .build() - ); - final ColumnDescriptor serdeficator = builder.build(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); - byte[] specBytes = baos.toByteArray(); - - final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( - "__time", serdeficator.numBytes() + specBytes.length - ); - channel.write(ByteBuffer.wrap(specBytes)); - serdeficator.write(channel); - channel.close(); - } else { - skippedFiles.add(filename); - } - } - - final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd"); - - indexBuffer.get(); // Skip the version byte - final GenericIndexed dims8 = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY - ); - final GenericIndexed dims9 = GenericIndexed.fromIterable( - Iterables.filter( - dims8, new Predicate() - { - @Override - public boolean apply(String s) - { - return !skippedDimensions.contains(s); + switch (holder.getType()) { + case LONG: + builder.setValueType(ValueType.LONG); + builder.addSerde( + LongGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.longType) + .build() + ); + break; + case FLOAT: + builder.setValueType(ValueType.FLOAT); + builder.addSerde( + FloatGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(holder.floatType) + .build() + ); + break; + case COMPLEX: + if (!(holder.complexType instanceof GenericIndexed)) { + throw new ISE("Serialized complex types must be GenericIndexed objects."); } - } - ), - GenericIndexed.STRING_STRATEGY - ); - final GenericIndexed availableMetrics = GenericIndexed.read( - indexBuffer, GenericIndexed.STRING_STRATEGY - ); - final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); - final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( - serializerUtils.readString(indexBuffer), - BitmapSerdeFactory.class - ); + final GenericIndexed column = (GenericIndexed) holder.complexType; + final String complexType = holder.getTypeName(); + builder.setValueType(ValueType.COMPLEX); + builder.addSerde( + ComplexColumnPartSerde.legacySerializerBuilder() + .withTypeName(complexType) + .withDelegate(column).build() + ); + break; + default: + throw new ISE("Unknown type[%s]", holder.getType()); + } - Set columns = Sets.newTreeSet(); - columns.addAll(Lists.newArrayList(dims9)); - columns.addAll(Lists.newArrayList(availableMetrics)); - GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY); + final ColumnDescriptor serdeficator = builder.build(); - final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); - final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 - + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); - cols.writeToChannel(writer); - dims9.writeToChannel(writer); - serializerUtils.writeLong(writer, dataInterval.getStartMillis()); - serializerUtils.writeLong(writer, dataInterval.getEndMillis()); - serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); - writer.close(); + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + metric, serdeficator.numBytes() + specBytes.length + ); + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) { + CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer( + v8SmooshedFiles.mapFile(filename), BYTE_ORDER + ); + + final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); + builder.setValueType(ValueType.LONG); + builder.addSerde( + LongGenericColumnPartSerde.legacySerializerBuilder() + .withByteOrder(BYTE_ORDER) + .withDelegate(timestamps) + .build() + ); + final ColumnDescriptor serdeficator = builder.build(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator)); + byte[] specBytes = baos.toByteArray(); + + final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( + "__time", serdeficator.numBytes() + specBytes.length + ); + channel.write(ByteBuffer.wrap(specBytes)); + serdeficator.write(channel); + channel.close(); + } else { + skippedFiles.add(filename); + } + } + + final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd"); + + indexBuffer.get(); // Skip the version byte + final GenericIndexed dims8 = GenericIndexed.read( + indexBuffer, GenericIndexed.STRING_STRATEGY + ); + final GenericIndexed dims9 = GenericIndexed.fromIterable( + Iterables.filter( + dims8, new Predicate() + { + @Override + public boolean apply(String s) + { + return !skippedDimensions.contains(s); + } + } + ), + GenericIndexed.STRING_STRATEGY + ); + final GenericIndexed availableMetrics = GenericIndexed.read( + indexBuffer, GenericIndexed.STRING_STRATEGY + ); + final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); + final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( + serializerUtils.readString(indexBuffer), + BitmapSerdeFactory.class + ); + + Set columns = Sets.newTreeSet(); + columns.addAll(Lists.newArrayList(dims9)); + columns.addAll(Lists.newArrayList(availableMetrics)); + GenericIndexed cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY); + + final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); + + final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 + + serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); + final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); + cols.writeToChannel(writer); + dims9.writeToChannel(writer); + serializerUtils.writeLong(writer, dataInterval.getStartMillis()); + serializerUtils.writeLong(writer, dataInterval.getEndMillis()); + serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); + writer.close(); + + final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd"); + if (metadataBuffer != null) { + v9Smoosher.add("metadata.drd", metadataBuffer); + } + + log.info("Skipped files[%s]", skippedFiles); - final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd"); - if (metadataBuffer != null) { - v9Smoosher.add("metadata.drd", metadataBuffer); } - - log.info("Skipped files[%s]", skippedFiles); - - v9Smoosher.close(); + catch (Throwable t) { + throw closer.rethrow(t); + } + finally { + closer.close(); + } } } diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 10622015749..cc73cd6078c 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -48,6 +48,7 @@ import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; +import com.metamx.common.ByteBufferUtils; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.Pair; @@ -933,6 +934,14 @@ public class IndexMerger File dimOutFile = dimOuts.get(i).getFile(); final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); + closer.register(new Closeable() + { + @Override + public void close() throws IOException + { + ByteBufferUtils.unmap(dimValsMapped); + } + }); if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); @@ -1078,6 +1087,9 @@ public class IndexMerger indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec); return outDir; } + catch (Throwable t) { + throw closer.rethrow(t); + } finally { closer.close(); } diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index 6936f486406..a255bb559bf 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -37,6 +37,7 @@ import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; +import com.metamx.common.ByteBufferUtils; import com.metamx.common.ISE; import com.metamx.common.io.smoosh.FileSmoosher; import com.metamx.common.io.smoosh.SmooshedWriter; @@ -259,6 +260,9 @@ public class IndexMergerV9 extends IndexMerger return outDir; } + catch (Throwable t) { + throw closer.rethrow(t); + } finally { closer.close(); } @@ -382,7 +386,11 @@ public class IndexMergerV9 extends IndexMerger final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder = DictionaryEncodedColumnPartSerde .serializerBuilder() .withDictionary(dimValueWriters.get(i)) - .withValue(dimWriters.get(i), hasMultiValue, compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) + .withValue( + dimWriters.get(i), + hasMultiValue, + compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED + ) .withBitmapSerdeFactory(bitmapSerdeFactory) .withBitmapIndex(bitmapIndexWriters.get(i)) .withSpatialIndex(spatialIndexWriters.get(i)) @@ -536,73 +544,82 @@ public class IndexMergerV9 extends IndexMerger fos.close(); final MappedByteBuffer dimValsMapped = Files.map(dimValueFile); - Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); - - ByteBufferWriter spatialIndexWriter = spatialIndexWriters.get(dimIndex); - RTree tree = null; - if (spatialIndexWriter != null) { - BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); - tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); - } - - IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension); - - ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( - nullRowsList.get(dimIndex) - ); - - //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. - for (int dictId = 0; dictId < dimVals.size(); dictId++) { - progress.progress(); - List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); - for (int j = 0; j < adapters.size(); ++j) { - int seekedDictId = dictIdSeeker[j].seek(dictId); - if (seekedDictId != IndexSeeker.NOT_EXIST) { - convertedInverteds.add( - new ConvertingIndexedInts( - adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) - ) - ); - } + try (Closeable dimValsMappedUnmapper = new Closeable() + { + @Override + public void close() + { + ByteBufferUtils.unmap(dimValsMapped); } + }) { + Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); - MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); - for (Integer row : CombiningIterable.createSplatted( - convertedInverteds, - Ordering.natural().nullsFirst() - )) { - if (row != INVALID_ROW) { - bitset.add(row); - } - } - - ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); - if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { - bitmapToWrite = nullRowBitmap.union(bitmapToWrite); - } - bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); - + ByteBufferWriter spatialIndexWriter = spatialIndexWriters.get(dimIndex); + RTree tree = null; if (spatialIndexWriter != null) { - String dimVal = dimVals.get(dictId); - if (dimVal != null) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); + BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); + tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); + } + + IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension); + + ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap( + nullRowsList.get(dimIndex) + ); + + //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. + for (int dictId = 0; dictId < dimVals.size(); dictId++) { + progress.progress(); + List> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size()); + for (int j = 0; j < adapters.size(); ++j) { + int seekedDictId = dictIdSeeker[j].seek(dictId); + if (seekedDictId != IndexSeeker.NOT_EXIST) { + convertedInverteds.add( + new ConvertingIndexedInts( + adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j) + ) + ); + } + } + + MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); + for (Integer row : CombiningIterable.createSplatted( + convertedInverteds, + Ordering.natural().nullsFirst() + )) { + if (row != INVALID_ROW) { + bitset.add(row); + } + } + + ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset); + if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) { + bitmapToWrite = nullRowBitmap.union(bitmapToWrite); + } + bitmapIndexWriters.get(dimIndex).write(bitmapToWrite); + + if (spatialIndexWriter != null) { + String dimVal = dimVals.get(dictId); + if (dimVal != null) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + tree.insert(coords, bitset); } - tree.insert(coords, bitset); } } + if (spatialIndexWriter != null) { + spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); + } + log.info( + "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.", + dimension, + dimVals.size(), + System.currentTimeMillis() - dimStartTime + ); } - if (spatialIndexWriter != null) { - spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree)); - } - log.info( - "Completed dim[%s] inverted with cardinality[%,d] in %,d millis.", - dimension, - dimVals.size(), - System.currentTimeMillis() - dimStartTime - ); } log.info("Completed inverted index in %,d millis.", System.currentTimeMillis() - startTime); progress.stopSection(section);