Eager file unmapping in IndexIO, IndexMerger and IndexMergerV9 (#3422)

* Eager file unmapping in IndexIO, IndexMerger and IndexMergerV9. The exact purpose for this change is to allow running IndexMergeBenchmark in Windows, however should also be universally 'better' than non-deterministic unmapping, done when MappedByteBuffers are garbage-collected (BACKEND-312)

* Use Closer with a proper pattern in IndexIO, IndexMerger and IndexMergerV9

* Unmap file in IndexMergerV9.makeInvertedIndexes() using try-with-resources

* Reformat IndexIO
This commit is contained in:
Roman Leventov 2016-09-07 20:43:47 +03:00 committed by Charles Allen
parent c0e62b536a
commit 4f0bcdce36
3 changed files with 419 additions and 380 deletions

View File

@ -32,6 +32,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -120,17 +121,17 @@ public class IndexIO
this.columnConfig = Preconditions.checkNotNull(columnConfig, "null ColumnConfig"); this.columnConfig = Preconditions.checkNotNull(columnConfig, "null ColumnConfig");
defaultIndexIOHandler = new DefaultIndexIOHandler(mapper); defaultIndexIOHandler = new DefaultIndexIOHandler(mapper);
indexLoaders = ImmutableMap.<Integer, IndexLoader>builder() indexLoaders = ImmutableMap.<Integer, IndexLoader>builder()
.put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(0, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(1, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(2, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(3, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(4, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(5, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(6, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(7, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig)) .put(8, new LegacyIndexLoader(defaultIndexIOHandler, columnConfig))
.put(9, new V9IndexLoader(columnConfig)) .put(9, new V9IndexLoader(columnConfig))
.build(); .build();
} }
@ -269,13 +270,14 @@ public class IndexIO
case 6: case 6:
case 7: case 7:
log.info("Old version, re-persisting."); log.info("Old version, re-persisting.");
QueryableIndex segmentToConvert = loadIndex(toConvert); try (QueryableIndex segmentToConvert = loadIndex(toConvert)) {
new IndexMerger(mapper, this).append( new IndexMerger(mapper, this).append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(segmentToConvert)), Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(segmentToConvert)),
null, null,
converted, converted,
indexSpec indexSpec
); );
}
return true; return true;
case 8: case 8:
defaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec); defaultIndexIOHandler.convertV8toV9(toConvert, converted, indexSpec);
@ -545,347 +547,355 @@ public class IndexIO
Closeables.close(indexIn, false); Closeables.close(indexIn, false);
} }
SmooshedFileMapper v8SmooshedFiles = Smoosh.map(v8Dir); Closer closer = Closer.create();
try {
SmooshedFileMapper v8SmooshedFiles = closer.register(Smoosh.map(v8Dir));
v9Dir.mkdirs(); v9Dir.mkdirs();
final FileSmoosher v9Smoosher = new FileSmoosher(v9Dir); final FileSmoosher v9Smoosher = closer.register(new FileSmoosher(v9Dir));
ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin"))); ByteStreams.write(Ints.toByteArray(9), Files.newOutputStreamSupplier(new File(v9Dir, "version.bin")));
Map<String, GenericIndexed<ImmutableBitmap>> bitmapIndexes = Maps.newHashMap(); Map<String, GenericIndexed<ImmutableBitmap>> bitmapIndexes = Maps.newHashMap();
final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd"); final ByteBuffer invertedBuffer = v8SmooshedFiles.mapFile("inverted.drd");
BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory(); BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
while (invertedBuffer.hasRemaining()) { while (invertedBuffer.hasRemaining()) {
final String dimName = serializerUtils.readString(invertedBuffer); final String dimName = serializerUtils.readString(invertedBuffer);
bitmapIndexes.put( bitmapIndexes.put(
dimName, dimName,
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy()) GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy())
);
}
Map<String, ImmutableRTree> spatialIndexes = Maps.newHashMap();
final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd");
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexes.put(
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy(
bitmapSerdeFactory.getBitmapFactory()
)
)
);
}
final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
final Set<String> skippedDimensions = Sets.newLinkedHashSet();
for (String filename : v8SmooshedFiles.getInternalFilenames()) {
log.info("Processing file[%s]", filename);
if (filename.startsWith("dim_")) {
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.STRING);
final List<ByteBuffer> outParts = Lists.newArrayList();
ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename);
String dimension = serializerUtils.readString(dimBuffer);
if (!filename.equals(String.format("dim_%s.drd", dimension))) {
throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename);
}
ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream();
serializerUtils.writeString(nameBAOS, dimension);
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.STRING_STRATEGY
); );
}
if (dictionary.size() == 0) { Map<String, ImmutableRTree> spatialIndexes = Maps.newHashMap();
log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension); final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd");
skippedDimensions.add(dimension); while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
continue; spatialIndexes.put(
} serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy(
bitmapSerdeFactory.getBitmapFactory()
)
)
);
}
int emptyStrIdx = dictionary.indexOf(""); final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
List<Integer> singleValCol = null; final Set<String> skippedDimensions = Sets.newLinkedHashSet();
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); for (String filename : v8SmooshedFiles.getInternalFilenames()) {
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension); log.info("Processing file[%s]", filename);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension); if (filename.startsWith("dim_")) {
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.STRING);
final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); final List<ByteBuffer> outParts = Lists.newArrayList();
boolean onlyOneValue = true;
MutableBitmap nullsSet = null; ByteBuffer dimBuffer = v8SmooshedFiles.mapFile(filename);
for (int i = 0; i < multiValCol.size(); ++i) { String dimension = serializerUtils.readString(dimBuffer);
VSizeIndexedInts rowValue = multiValCol.get(i); if (!filename.equals(String.format("dim_%s.drd", dimension))) {
if (!onlyOneValue) { throw new ISE("loaded dimension[%s] from file[%s]", dimension, filename);
break;
} }
if (rowValue.size() > 1) {
onlyOneValue = false; ByteArrayOutputStream nameBAOS = new ByteArrayOutputStream();
serializerUtils.writeString(nameBAOS, dimension);
outParts.add(ByteBuffer.wrap(nameBAOS.toByteArray()));
GenericIndexed<String> dictionary = GenericIndexed.read(
dimBuffer, GenericIndexed.STRING_STRATEGY
);
if (dictionary.size() == 0) {
log.info("Dimension[%s] had cardinality 0, equivalent to no column, so skipping.", dimension);
skippedDimensions.add(dimension);
continue;
} }
if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) {
if (nullsSet == null) { int emptyStrIdx = dictionary.indexOf("");
nullsSet = bitmapFactory.makeEmptyMutableBitmap(); List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
boolean onlyOneValue = true;
MutableBitmap nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
VSizeIndexedInts rowValue = multiValCol.get(i);
if (!onlyOneValue) {
break;
}
if (rowValue.size() > 1) {
onlyOneValue = false;
}
if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) {
if (nullsSet == null) {
nullsSet = bitmapFactory.makeEmptyMutableBitmap();
}
nullsSet.add(i);
} }
nullsSet.add(i);
} }
}
if (onlyOneValue) { if (onlyOneValue) {
log.info("Dimension[%s] is single value, converting...", dimension); log.info("Dimension[%s] is single value, converting...", dimension);
final boolean bumpedDictionary; final boolean bumpedDictionary;
if (nullsSet != null) { if (nullsSet != null) {
log.info("Dimension[%s] has null rows.", dimension); log.info("Dimension[%s] has null rows.", dimension);
final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet); final ImmutableBitmap theNullSet = bitmapFactory.makeImmutableBitmap(nullsSet);
if (dictionary.get(0) != null) { if (dictionary.get(0) != null) {
log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension); log.info("Dimension[%s] has no null value in the dictionary, expanding...", dimension);
bumpedDictionary = true; bumpedDictionary = true;
final List<String> nullList = Lists.newArrayList(); final List<String> nullList = Lists.newArrayList();
nullList.add(null); nullList.add(null);
dictionary = GenericIndexed.fromIterable( dictionary = GenericIndexed.fromIterable(
Iterables.concat(nullList, dictionary), Iterables.concat(nullList, dictionary),
GenericIndexed.STRING_STRATEGY GenericIndexed.STRING_STRATEGY
); );
bitmaps = GenericIndexed.fromIterable( bitmaps = GenericIndexed.fromIterable(
Iterables.concat(Arrays.asList(theNullSet), bitmaps), Iterables.concat(Arrays.asList(theNullSet), bitmaps),
bitmapSerdeFactory.getObjectStrategy() bitmapSerdeFactory.getObjectStrategy()
); );
} else {
bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
Arrays.asList(
bitmapFactory
.union(Arrays.asList(theNullSet, bitmaps.get(0)))
),
Iterables.skip(bitmaps, 1)
),
bitmapSerdeFactory.getObjectStrategy()
);
}
} else { } else {
bumpedDictionary = false; bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
Arrays.asList(
bitmapFactory
.union(Arrays.asList(theNullSet, bitmaps.get(0)))
),
Iterables.skip(bitmaps, 1)
),
bitmapSerdeFactory.getObjectStrategy()
);
} }
final VSizeIndexed finalMultiValCol = multiValCol;
singleValCol = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
final VSizeIndexedInts ints = finalMultiValCol.get(index);
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0);
}
@Override
public int size()
{
return finalMultiValCol.size();
}
};
multiValCol = null;
} else { } else {
bumpedDictionary = false; builder.setHasMultipleValues(true);
} }
final VSizeIndexed finalMultiValCol = multiValCol; final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression();
singleValCol = new AbstractList<Integer>()
{ final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde
@Override .legacySerializerBuilder()
public Integer get(int index) .withDictionary(dictionary)
{ .withBitmapSerdeFactory(bitmapSerdeFactory)
final VSizeIndexedInts ints = finalMultiValCol.get(index); .withBitmaps(bitmaps)
return ints.size() == 0 ? 0 : ints.get(0) + (bumpedDictionary ? 1 : 0); .withSpatialIndex(spatialIndex)
.withByteOrder(BYTE_ORDER);
if (singleValCol != null) {
if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
columnPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
singleValCol,
dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
BYTE_ORDER,
compressionStrategy
)
);
} else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size()));
} }
} else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
@Override columnPartBuilder.withMultiValuedColumn(
public int size() CompressedVSizeIndexedSupplier.fromIterable(
{ multiValCol,
return finalMultiValCol.size();
}
};
multiValCol = null;
} else {
builder.setHasMultipleValues(true);
}
final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression();
final DictionaryEncodedColumnPartSerde.LegacySerializerBuilder columnPartBuilder = DictionaryEncodedColumnPartSerde
.legacySerializerBuilder()
.withDictionary(dictionary)
.withBitmapSerdeFactory(bitmapSerdeFactory)
.withBitmaps(bitmaps)
.withSpatialIndex(spatialIndex)
.withByteOrder(BYTE_ORDER);
if (singleValCol != null) {
if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
columnPartBuilder.withSingleValuedColumn(
CompressedVSizeIntsIndexedSupplier.fromList(
singleValCol,
dictionary.size(), dictionary.size(),
CompressedVSizeIntsIndexedSupplier.maxIntsInBufferForValue(dictionary.size()),
BYTE_ORDER, BYTE_ORDER,
compressionStrategy compressionStrategy
) )
); );
} else { } else {
columnPartBuilder.withSingleValuedColumn(VSizeIndexedInts.fromList(singleValCol, dictionary.size())); columnPartBuilder.withMultiValuedColumn(multiValCol);
} }
} else if (compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) {
columnPartBuilder.withMultiValuedColumn( final ColumnDescriptor serdeficator = builder
CompressedVSizeIndexedSupplier.fromIterable( .addSerde(columnPartBuilder.build())
multiValCol, .build();
dictionary.size(),
BYTE_ORDER, ByteArrayOutputStream baos = new ByteArrayOutputStream();
compressionStrategy serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
) byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
dimension, serdeficator.numBytes() + specBytes.length
); );
} else { channel.write(ByteBuffer.wrap(specBytes));
columnPartBuilder.withMultiValuedColumn(multiValCol); serdeficator.write(channel);
} channel.close();
} else if (filename.startsWith("met_")) {
if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) {
skippedFiles.add(filename);
continue;
}
final ColumnDescriptor serdeficator = builder MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename));
.addSerde(columnPartBuilder.build()) final String metric = holder.getName();
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter( switch (holder.getType()) {
dimension, serdeficator.numBytes() + specBytes.length case LONG:
); builder.setValueType(ValueType.LONG);
channel.write(ByteBuffer.wrap(specBytes)); builder.addSerde(
serdeficator.write(channel); LongGenericColumnPartSerde.legacySerializerBuilder()
channel.close(); .withByteOrder(BYTE_ORDER)
} else if (filename.startsWith("met_")) { .withDelegate(holder.longType)
if (!filename.endsWith(String.format("%s.drd", BYTE_ORDER))) { .build()
skippedFiles.add(filename); );
continue; break;
} case FLOAT:
builder.setValueType(ValueType.FLOAT);
MetricHolder holder = MetricHolder.fromByteBuffer(v8SmooshedFiles.mapFile(filename)); builder.addSerde(
final String metric = holder.getName(); FloatGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder(); .withDelegate(holder.floatType)
.build()
switch (holder.getType()) { );
case LONG: break;
builder.setValueType(ValueType.LONG); case COMPLEX:
builder.addSerde( if (!(holder.complexType instanceof GenericIndexed)) {
LongGenericColumnPartSerde.legacySerializerBuilder() throw new ISE("Serialized complex types must be GenericIndexed objects.");
.withByteOrder(BYTE_ORDER)
.withDelegate(holder.longType)
.build()
);
break;
case FLOAT:
builder.setValueType(ValueType.FLOAT);
builder.addSerde(
FloatGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(holder.floatType)
.build()
);
break;
case COMPLEX:
if (!(holder.complexType instanceof GenericIndexed)) {
throw new ISE("Serialized complex types must be GenericIndexed objects.");
}
final GenericIndexed column = (GenericIndexed) holder.complexType;
final String complexType = holder.getTypeName();
builder.setValueType(ValueType.COMPLEX);
builder.addSerde(
ComplexColumnPartSerde.legacySerializerBuilder()
.withTypeName(complexType)
.withDelegate(column).build()
);
break;
default:
throw new ISE("Unknown type[%s]", holder.getType());
}
final ColumnDescriptor serdeficator = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
metric, serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
channel.close();
} else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) {
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
v8SmooshedFiles.mapFile(filename), BYTE_ORDER
);
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.LONG);
builder.addSerde(
LongGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(timestamps)
.build()
);
final ColumnDescriptor serdeficator = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
"__time", serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
channel.close();
} else {
skippedFiles.add(filename);
}
}
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd");
indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter(
dims8, new Predicate<String>()
{
@Override
public boolean apply(String s)
{
return !skippedDimensions.contains(s);
} }
} final GenericIndexed column = (GenericIndexed) holder.complexType;
), final String complexType = holder.getTypeName();
GenericIndexed.STRING_STRATEGY builder.setValueType(ValueType.COMPLEX);
); builder.addSerde(
final GenericIndexed<String> availableMetrics = GenericIndexed.read( ComplexColumnPartSerde.legacySerializerBuilder()
indexBuffer, GenericIndexed.STRING_STRATEGY .withTypeName(complexType)
); .withDelegate(column).build()
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer)); );
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue( break;
serializerUtils.readString(indexBuffer), default:
BitmapSerdeFactory.class throw new ISE("Unknown type[%s]", holder.getType());
); }
Set<String> columns = Sets.newTreeSet(); final ColumnDescriptor serdeficator = builder.build();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory); ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16 final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString); metric, serdeficator.numBytes() + specBytes.length
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); );
cols.writeToChannel(writer); channel.write(ByteBuffer.wrap(specBytes));
dims9.writeToChannel(writer); serdeficator.write(channel);
serializerUtils.writeLong(writer, dataInterval.getStartMillis()); channel.close();
serializerUtils.writeLong(writer, dataInterval.getEndMillis()); } else if (String.format("time_%s.drd", BYTE_ORDER).equals(filename)) {
serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
writer.close(); v8SmooshedFiles.mapFile(filename), BYTE_ORDER
);
final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
builder.setValueType(ValueType.LONG);
builder.addSerde(
LongGenericColumnPartSerde.legacySerializerBuilder()
.withByteOrder(BYTE_ORDER)
.withDelegate(timestamps)
.build()
);
final ColumnDescriptor serdeficator = builder.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializerUtils.writeString(baos, mapper.writeValueAsString(serdeficator));
byte[] specBytes = baos.toByteArray();
final SmooshedWriter channel = v9Smoosher.addWithSmooshedWriter(
"__time", serdeficator.numBytes() + specBytes.length
);
channel.write(ByteBuffer.wrap(specBytes));
serdeficator.write(channel);
channel.close();
} else {
skippedFiles.add(filename);
}
}
final ByteBuffer indexBuffer = v8SmooshedFiles.mapFile("index.drd");
indexBuffer.get(); // Skip the version byte
final GenericIndexed<String> dims8 = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<String> dims9 = GenericIndexed.fromIterable(
Iterables.filter(
dims8, new Predicate<String>()
{
@Override
public boolean apply(String s)
{
return !skippedDimensions.contains(s);
}
}
),
GenericIndexed.STRING_STRATEGY
);
final GenericIndexed<String> availableMetrics = GenericIndexed.read(
indexBuffer, GenericIndexed.STRING_STRATEGY
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory = mapper.readValue(
serializerUtils.readString(indexBuffer),
BitmapSerdeFactory.class
);
Set<String> columns = Sets.newTreeSet();
columns.addAll(Lists.newArrayList(dims9));
columns.addAll(Lists.newArrayList(availableMetrics));
GenericIndexed<String> cols = GenericIndexed.fromIterable(columns, GenericIndexed.STRING_STRATEGY);
final String segmentBitmapSerdeFactoryString = mapper.writeValueAsString(segmentBitmapSerdeFactory);
final long numBytes = cols.getSerializedSize() + dims9.getSerializedSize() + 16
+ serializerUtils.getSerializedStringByteSize(segmentBitmapSerdeFactoryString);
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
dims9.writeToChannel(writer);
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString);
writer.close();
final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd");
if (metadataBuffer != null) {
v9Smoosher.add("metadata.drd", metadataBuffer);
}
log.info("Skipped files[%s]", skippedFiles);
final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd");
if (metadataBuffer != null) {
v9Smoosher.add("metadata.drd", metadataBuffer);
} }
catch (Throwable t) {
log.info("Skipped files[%s]", skippedFiles); throw closer.rethrow(t);
}
v9Smoosher.close(); finally {
closer.close();
}
} }
} }

View File

@ -48,6 +48,7 @@ import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.ByteBufferUtils;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
@ -933,6 +934,14 @@ public class IndexMerger
File dimOutFile = dimOuts.get(i).getFile(); File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
closer.register(new Closeable()
{
@Override
public void close() throws IOException
{
ByteBufferUtils.unmap(dimValsMapped);
}
});
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
@ -1078,6 +1087,9 @@ public class IndexMerger
indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec); indexIO.getDefaultIndexIOHandler().convertV8toV9(v8OutDir, outDir, indexSpec);
return outDir; return outDir;
} }
catch (Throwable t) {
throw closer.rethrow(t);
}
finally { finally {
closer.close(); closer.close();
} }

View File

@ -37,6 +37,7 @@ import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree; import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.ByteBufferUtils;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.io.smoosh.FileSmoosher; import com.metamx.common.io.smoosh.FileSmoosher;
import com.metamx.common.io.smoosh.SmooshedWriter; import com.metamx.common.io.smoosh.SmooshedWriter;
@ -259,6 +260,9 @@ public class IndexMergerV9 extends IndexMerger
return outDir; return outDir;
} }
catch (Throwable t) {
throw closer.rethrow(t);
}
finally { finally {
closer.close(); closer.close();
} }
@ -382,7 +386,11 @@ public class IndexMergerV9 extends IndexMerger
final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder = DictionaryEncodedColumnPartSerde final DictionaryEncodedColumnPartSerde.SerializerBuilder partBuilder = DictionaryEncodedColumnPartSerde
.serializerBuilder() .serializerBuilder()
.withDictionary(dimValueWriters.get(i)) .withDictionary(dimValueWriters.get(i))
.withValue(dimWriters.get(i), hasMultiValue, compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED) .withValue(
dimWriters.get(i),
hasMultiValue,
compressionStrategy != CompressedObjectStrategy.CompressionStrategy.UNCOMPRESSED
)
.withBitmapSerdeFactory(bitmapSerdeFactory) .withBitmapSerdeFactory(bitmapSerdeFactory)
.withBitmapIndex(bitmapIndexWriters.get(i)) .withBitmapIndex(bitmapIndexWriters.get(i))
.withSpatialIndex(spatialIndexWriters.get(i)) .withSpatialIndex(spatialIndexWriters.get(i))
@ -536,73 +544,82 @@ public class IndexMergerV9 extends IndexMerger
fos.close(); fos.close();
final MappedByteBuffer dimValsMapped = Files.map(dimValueFile); final MappedByteBuffer dimValsMapped = Files.map(dimValueFile);
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY); try (Closeable dimValsMappedUnmapper = new Closeable()
{
ByteBufferWriter<ImmutableRTree> spatialIndexWriter = spatialIndexWriters.get(dimIndex); @Override
RTree tree = null; public void close()
if (spatialIndexWriter != null) { {
BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory(); ByteBufferUtils.unmap(dimValsMapped);
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
}
IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension);
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex)
);
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
} }
}) {
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap(); ByteBufferWriter<ImmutableRTree> spatialIndexWriter = spatialIndexWriters.get(dimIndex);
for (Integer row : CombiningIterable.createSplatted( RTree tree = null;
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != INVALID_ROW) {
bitset.add(row);
}
}
ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitmapToWrite = nullRowBitmap.union(bitmapToWrite);
}
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
if (spatialIndexWriter != null) { if (spatialIndexWriter != null) {
String dimVal = dimVals.get(dictId); BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
if (dimVal != null) { tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); }
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) { IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimension);
coords[j] = Float.valueOf(stringCoords.get(j));
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex)
);
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
}
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != INVALID_ROW) {
bitset.add(row);
}
}
ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitmapToWrite = nullRowBitmap.union(bitmapToWrite);
}
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
if (spatialIndexWriter != null) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
} }
tree.insert(coords, bitset);
} }
} }
if (spatialIndexWriter != null) {
spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
}
log.info(
"Completed dim[%s] inverted with cardinality[%,d] in %,d millis.",
dimension,
dimVals.size(),
System.currentTimeMillis() - dimStartTime
);
} }
if (spatialIndexWriter != null) {
spatialIndexWriter.write(ImmutableRTree.newImmutableFromMutable(tree));
}
log.info(
"Completed dim[%s] inverted with cardinality[%,d] in %,d millis.",
dimension,
dimVals.size(),
System.currentTimeMillis() - dimStartTime
);
} }
log.info("Completed inverted index in %,d millis.", System.currentTimeMillis() - startTime); log.info("Completed inverted index in %,d millis.", System.currentTimeMillis() - startTime);
progress.stopSection(section); progress.stopSection(section);