make things actually work with roaring

This commit is contained in:
fjy 2014-11-10 13:42:06 -08:00
parent 3c21f62afd
commit 358b2add17
6 changed files with 82 additions and 119 deletions

View File

@ -20,7 +20,6 @@
package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
@ -37,17 +36,11 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutablePoint;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.RTreeUtils;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.io.smoosh.FileSmoosher;
import com.metamx.common.io.smoosh.Smoosh;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
@ -68,8 +61,6 @@ import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedRTree;
@ -86,7 +77,6 @@ import io.druid.segment.serde.LongGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnSupplier;
import io.druid.segment.serde.SpatialIndexColumnPartSupplier;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -131,9 +121,10 @@ public class IndexIO
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final ObjectMapper mapper;
protected static final ColumnConfig columnConfig;
private static final BitmapSerdeFactory bitmapSerdeFactory;
protected static final ColumnConfig columnConfig;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of(
@ -295,6 +286,12 @@ public class IndexIO
indexBuffer, GenericIndexed.stringStrategy
);
final Interval dataInterval = new Interval(serializerUtils.readString(indexBuffer));
final BitmapSerdeFactory segmentBitmapSerdeFactory;
if (indexBuffer.hasRemaining()) {
segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class);
} else {
segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY;
}
CompressedLongsIndexedSupplier timestamps = CompressedLongsIndexedSupplier.fromByteBuffer(
smooshedFiles.mapFile(makeTimeFile(inDir, BYTE_ORDER).getName()), BYTE_ORDER
@ -333,7 +330,7 @@ public class IndexIO
for (int i = 0; i < availableDimensions.size(); ++i) {
bitmaps.put(
serializerUtils.readString(invertedBuffer),
GenericIndexed.read(invertedBuffer, ConciseCompressedIndexedInts.objectStrategy)
GenericIndexed.read(invertedBuffer, segmentBitmapSerdeFactory.getObjectStrategy())
);
}
@ -344,7 +341,7 @@ public class IndexIO
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer,
new IndexedRTree.ImmutableRTreeObjectStrategy(new ConciseBitmapFactory())
new IndexedRTree.ImmutableRTreeObjectStrategy(segmentBitmapSerdeFactory.getBitmapFactory())
)
);
}
@ -396,7 +393,7 @@ public class IndexIO
final String dimName = serializerUtils.readString(invertedBuffer);
bitmapIndexes.put(
dimName,
GenericIndexed.read(invertedBuffer, ConciseCompressedIndexedInts.objectStrategy)
GenericIndexed.read(invertedBuffer, bitmapSerdeFactory.getObjectStrategy())
);
}
@ -407,7 +404,7 @@ public class IndexIO
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(
spatialBuffer, new IndexedRTree.ImmutableRTreeObjectStrategy(
new ConciseBitmapFactory()
bitmapSerdeFactory.getBitmapFactory()
)
)
);
@ -448,52 +445,7 @@ public class IndexIO
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
// TODO: this is some UGLY shizzle
// All V8 segments use concise sets for bitmap indexes. Starting in V9, we can optionally choose other
// methods to store and compress these bitmap methods.
final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
if (!(bitmapSerdeFactory instanceof ConciseBitmapSerdeFactory)) {
if (spatialIndex != null) {
RTree convertedTree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
for (ImmutablePoint point : RTreeUtils.getBitmaps(spatialIndex)) {
IntIterator iterator = point.getImmutableBitmap().iterator();
while (iterator.hasNext()) {
convertedTree.insert(point.getCoords(), iterator.next());
}
}
spatialIndex = ImmutableRTree.newImmutableFromMutable(convertedTree);
}
bitmaps = GenericIndexed.fromIterable(
FunctionalIterable.create(
bitmaps
).transform(
new Function<ImmutableBitmap, ImmutableBitmap>()
{
@Override
public ImmutableBitmap apply(
ImmutableBitmap bitmap
)
{
if (bitmap == null) {
return bitmapFactory.makeEmptyImmutableBitmap();
}
IntIterator intIter = bitmap.iterator();
MutableBitmap mutableBitmap = bitmapFactory.makeEmptyMutableBitmap();
// TODO: is there a faster way to do this? I don't think so
while (intIter.hasNext()) {
mutableBitmap.add(intIter.next());
}
return bitmapFactory.makeImmutableBitmap(mutableBitmap);
}
}
),
bitmapSerdeFactory.getObjectStrategy()
);
}
boolean onlyOneValue = true;
MutableBitmap nullsSet = null;
for (int i = 0; i < multiValCol.size(); ++i) {
@ -742,7 +694,7 @@ public class IndexIO
)
.setBitmapIndex(
new BitmapIndexColumnPartSupplier(
bitmapSerdeFactory.getBitmapFactory(),
BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(),
index.getBitmapIndexes().get(dimension),
index.getDimValueLookup(dimension)
)
@ -804,7 +756,7 @@ public class IndexIO
index.getDataInterval(),
new ArrayIndexed<>(cols, String.class),
index.getAvailableDimensions(),
bitmapSerdeFactory.getBitmapFactory(),
BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY.getBitmapFactory(),
columns,
index.getFileMapper()
);
@ -830,6 +782,12 @@ public class IndexIO
final GenericIndexed<String> cols = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy);
final GenericIndexed<String> dims = GenericIndexed.read(indexBuffer, GenericIndexed.stringStrategy);
final Interval dataInterval = new Interval(indexBuffer.getLong(), indexBuffer.getLong());
final BitmapSerdeFactory segmentBitmapSerdeFactory;
if (indexBuffer.hasRemaining()) {
segmentBitmapSerdeFactory = mapper.readValue(serializerUtils.readString(indexBuffer), BitmapSerdeFactory.class);
} else {
segmentBitmapSerdeFactory = BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY;
}
Map<String, Column> columns = Maps.newHashMap();
@ -840,7 +798,7 @@ public class IndexIO
columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time")));
final QueryableIndex index = new SimpleQueryableIndex(
dataInterval, cols, dims, bitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles
dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles
);
log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);

View File

@ -40,7 +40,6 @@ import com.google.inject.Module;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.bitmap.WrappedImmutableConciseBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
@ -67,7 +66,6 @@ import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedFloatsIndexedSupplier;
import io.druid.segment.data.CompressedLongsIndexedSupplier;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
@ -83,8 +81,6 @@ import io.druid.segment.serde.ComplexMetrics;
import io.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import io.druid.segment.serde.FloatGenericColumnPartSerde;
import io.druid.segment.serde.LongGenericColumnPartSerde;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -1126,7 +1122,7 @@ public class IndexMaker
}
)
),
ConciseCompressedIndexedInts.objectStrategy
bitmapSerdeFactory.getObjectStrategy()
);
} else {
Iterable<ImmutableBitmap> immutableBitmaps = Iterables.transform(
@ -1164,7 +1160,7 @@ public class IndexMaker
}
}
),
ConciseCompressedIndexedInts.objectStrategy
bitmapSerdeFactory.getObjectStrategy()
);
}
@ -1376,7 +1372,13 @@ public class IndexMaker
GenericIndexed<String> cols = GenericIndexed.fromIterable(finalColumns, GenericIndexed.stringStrategy);
GenericIndexed<String> dims = GenericIndexed.fromIterable(finalDimensions, GenericIndexed.stringStrategy);
final long numBytes = cols.getSerializedSize() + dims.getSerializedSize() + 16;
final String bitmapSerdeFactoryType = mapper.writeValueAsString(bitmapSerdeFactory);
final long numBytes = cols.getSerializedSize()
+ dims.getSerializedSize()
+ 16
// Size of bitmap serde factory
+ 4
+ bitmapSerdeFactoryType.getBytes().length;
final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes);
cols.writeToChannel(writer);
@ -1393,6 +1395,9 @@ public class IndexMaker
serializerUtils.writeLong(writer, dataInterval.getStartMillis());
serializerUtils.writeLong(writer, dataInterval.getEndMillis());
serializerUtils.writeString(
writer, bitmapSerdeFactoryType
);
writer.close();
IndexIO.checkFileSize(new File(outDir, "index.drd"));

View File

@ -19,9 +19,11 @@
package io.druid.segment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@ -32,10 +34,12 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.WrappedConciseBitmap;
import com.metamx.collections.bitmap.WrappedImmutableConciseBitmap;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
@ -52,15 +56,17 @@ import io.druid.common.guava.FileOutputSupplier;
import io.druid.common.guava.GuavaUtils;
import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.ToLowerCaseAggregatorFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedLongsSupplierSerializer;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseCompressedIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.GenericIndexedWriter;
import io.druid.segment.data.IOPeon;
@ -75,8 +81,6 @@ import io.druid.segment.incremental.IncrementalIndexAdapter;
import io.druid.segment.serde.ComplexMetricColumnSerializer;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import it.uniroma3.mat.extendedset.intset.ConciseSet;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -107,6 +111,27 @@ public class IndexMerger
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
private static final ObjectMapper mapper;
private static final BitmapSerdeFactory bitmapSerdeFactory;
static {
final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
ImmutableList.<Module>of(
new Module()
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.processing.bitmap", BitmapSerdeFactory.class);
}
}
)
);
mapper = injector.getInstance(ObjectMapper.class);
bitmapSerdeFactory = injector.getInstance(BitmapSerdeFactory.class);
}
public static File persist(final IncrementalIndex index, File outDir) throws IOException
{
return persist(index, index.getInterval(), outDir);
@ -761,7 +786,7 @@ public class IndexMerger
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
GenericIndexedWriter<ImmutableBitmap> writer = new GenericIndexedWriter<>(
ioPeon, dimension, ConciseCompressedIndexedInts.objectStrategy
ioPeon, dimension, bitmapSerdeFactory.getObjectStrategy()
);
writer.open();
@ -770,7 +795,7 @@ public class IndexMerger
RTree tree = null;
IOPeon spatialIoPeon = new TmpFileIOPeon();
if (isSpatialDim) {
ConciseBitmapFactory bitmapFactory = new ConciseBitmapFactory();
BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
spatialIoPeon, dimension, new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory)
);
@ -789,7 +814,7 @@ public class IndexMerger
);
}
ConciseSet bitset = new ConciseSet();
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
@ -799,7 +824,9 @@ public class IndexMerger
}
}
writer.write(new WrappedImmutableConciseBitmap(ImmutableConciseSet.newImmutableFromMutable(bitset)));
writer.write(
bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset)
);
if (isSpatialDim && dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
@ -807,7 +834,7 @@ public class IndexMerger
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, new WrappedConciseBitmap(bitset));
tree.insert(coords, bitset);
}
}
writer.close();
@ -911,6 +938,9 @@ public class IndexMerger
serializerUtils.writeString(
channel, String.format("%s/%s", dataInterval.getStart(), dataInterval.getEnd())
);
serializerUtils.writeString(
channel, mapper.writeValueAsString(bitmapSerdeFactory)
);
}
finally {
CloseQuietly.close(channel);

View File

@ -34,6 +34,9 @@ import com.metamx.collections.bitmap.ImmutableBitmap;
public interface BitmapSerdeFactory
{
//public static BitmapSerdeFactory DEFAULT_BITMAP_SERDE_FACTORY = new ConciseBitmapSerdeFactory();
public static BitmapSerdeFactory DEFAULT_BITMAP_SERDE_FACTORY = new RoaringBitmapSerdeFactory();
public ObjectStrategy<ImmutableBitmap> getObjectStrategy();
public BitmapFactory getBitmapFactory();

View File

@ -1,30 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.data;
/**
*/
public class InvertedIndexHelpers
{
public static BitmapSerdeFactory makeDefaultHelper()
{
return new ConciseBitmapSerdeFactory();
}
}

View File

@ -22,7 +22,6 @@ package io.druid.segment.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
@ -34,7 +33,6 @@ import io.druid.segment.data.ByteBufferSerializer;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.data.VSizeIndexed;
import io.druid.segment.data.VSizeIndexedInts;
@ -67,7 +65,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
)
{
this.isSingleValued = multiValCol == null;
this.bitmapSerdeFactory = bitmapSerdeFactory == null ? new ConciseBitmapSerdeFactory() : bitmapSerdeFactory;
this.bitmapSerdeFactory = bitmapSerdeFactory;
this.dictionary = dictionary;
this.singleValuedColumn = singleValCol;
@ -98,7 +96,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
)
{
this.isSingleValued = isSingleValued;
this.bitmapSerdeFactory = bitmapSerdeFactory == null ? new ConciseBitmapSerdeFactory() : bitmapSerdeFactory;
this.bitmapSerdeFactory = bitmapSerdeFactory == null
? BitmapSerdeFactory.DEFAULT_BITMAP_SERDE_FACTORY
: bitmapSerdeFactory;
this.dictionary = null;
this.singleValuedColumn = null;
@ -126,8 +126,6 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
return 1 + size;
}
private static ObjectStrategy objectStrategy = new IndexedRTree.ImmutableRTreeObjectStrategy(new ConciseBitmapFactory());
@Override
public void write(WritableByteChannel channel) throws IOException
{
@ -154,8 +152,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
if (spatialIndex != null) {
ByteBufferSerializer.writeToChannel(
spatialIndex,
objectStrategy,
//new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapSerdeFactory.getBitmapFactory()),
new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapSerdeFactory.getBitmapFactory()),
channel
);
}