Improve performance of IndexMergerV9 (#3440)

* Improve performance of StringDimensionMergerV9 and StringDimensionMergerLegacy by avoiding primitive int boxing by using IntIterator in IndexedInts instead of Iterator<Integer>; Extract some common logic for V9 and Legacy mergers; Minor improvements to resource handling in StringDimensionMergerV9

* Don't mask index in MergeIntIterator.makeQueueElement()

* DRY conversion RoaringBitmap's IntIterator to fastutil's IntIterator

* Do implement skip(n) in IntIterators extending AbstractIntIterator because original implementation is not reliable

* Use Test(expected=Exception.class) instead of try { } catch (Exception e) { /* ignore */ }
This commit is contained in:
Roman Leventov 2016-10-13 18:28:46 +03:00 committed by Slim
parent ddc856214d
commit 85ac8eff90
28 changed files with 617 additions and 245 deletions

View File

@ -574,6 +574,11 @@
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>7.0.13</version>
</dependency>
<!-- Test Scope -->
<dependency>

View File

@ -39,6 +39,10 @@
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>

View File

@ -50,14 +50,14 @@ import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import org.joda.time.DateTime;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -530,31 +530,30 @@ public class RowBasedGrouperHelper
public IndexedInts getRow()
{
final List<String> dimensionValues = row.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0;
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
return dimensionValuesSize;
}
@Override
public int get(int index)
{
return vals.get(index);
if (index < 0 || index >= dimensionValuesSize) {
throw new IndexOutOfBoundsException("index: " + index);
}
return index;
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return vals.iterator();
return IntIterators.fromTo(0, dimensionValuesSize);
}
@Override

View File

@ -27,6 +27,7 @@ import io.druid.segment.data.IndexedIntsIterator;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedMultivalue;
import io.druid.segment.data.WritableSupplier;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -206,7 +207,7 @@ public class CompressedVSizeIndexedSupplier implements WritableSupplier<IndexedM
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -60,13 +60,12 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.CompressionFactory;
import io.druid.segment.data.LongSupplierSerializer;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.LongSupplierSerializer;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
@ -1189,7 +1188,10 @@ public class IndexMerger
{
private final AggregatorFactory[] metricAggs;
public AggFactoryStringIndexed(AggregatorFactory[] metricAggs) {this.metricAggs = metricAggs;}
public AggFactoryStringIndexed(AggregatorFactory[] metricAggs)
{
this.metricAggs = metricAggs;
}
@Override
public Class<? extends String> getClazz()

View File

@ -30,13 +30,6 @@ import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.ByteBufferUtils;
import com.metamx.common.ISE;
import com.metamx.common.io.smoosh.FileSmoosher;
import com.metamx.common.io.smoosh.SmooshedWriter;

View File

@ -0,0 +1,197 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.metamx.common.IAE;
import com.metamx.common.guava.MergeIterator;
import it.unimi.dsi.fastutil.ints.AbstractIntIterator;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import it.unimi.dsi.fastutil.longs.LongHeaps;
import java.util.List;
import java.util.NoSuchElementException;
public final class IntIteratorUtils
{
/**
* Implements {@link IntIterator#skip(int)}.
*/
public static int skip(IntIterator it, int n)
{
if (n < 0) {
throw new IAE("n: " + n);
}
int skipped = 0;
while (skipped < n && it.hasNext()) {
it.next();
skipped++;
}
return skipped;
}
/**
* Merges several iterators of ascending {@code int} values into a single iterator of ascending {@code int} values.
* It isn't checked if the given source iterators are actually ascending, if they are not, the order of values in the
* returned iterator is undefined.
* <p>
* This is similar to what {@link MergeIterator} does with simple {@link java.util.Iterator}s.
*
* @param iterators iterators to merge, must return ascending values
*/
public static IntIterator mergeAscending(List<IntIterator> iterators)
{
if (iterators.isEmpty()) {
return IntIterators.EMPTY_ITERATOR;
}
if (iterators.size() == 1) {
return iterators.get(0);
}
return new MergeIntIterator(iterators);
}
/**
* This class is designed mostly after {@link MergeIterator}. {@code MergeIterator} uses a priority queue of wrapper
* "peeking" iterators. Peeking wrappers are not available in fastutil for specialized iterators like IntIterator, so
* they should be implemented manually in the druid codebase. Instead, another approach is taken: a priority queue
* of primitive long values is used, in long values the high 32-bits is the last value from some iterator, and the low
* 32 bits is the index of this iterator in the given list (copied to array, to avoid indirection during the
* iteration) of source iterators. Since values are in the high bits, the composed longs are still compared using the
* natural order. So this approach avoids indirections and implementing PeekingIntIterator.
* <p>
* Instead of {@link it.unimi.dsi.fastutil.longs.LongHeapPriorityQueue}, a priority queue is implemented on lower
* level, to avoid heap array shuffling on each iteration with dequeue and than enqueue, when merged iterators tend
* to stay in the head of the heap for at least several iterations.
*/
static final class MergeIntIterator extends AbstractIntIterator
{
private final IntIterator[] iterators;
private final long[] pQueue;
private int pQueueSize;
private static long makeQueueElement(int value, int index)
{
// Don't have to mask index because this is a Java array index => positive => no sign bit extension
return index | (((long) value) << 32);
}
private static int value(long queueElement)
{
return (int) (queueElement >>> 32);
}
private static int iterIndex(long queueElement)
{
return (int) queueElement;
}
MergeIntIterator(List<IntIterator> iterators)
{
this.iterators = iterators.toArray(new IntIterator[0]);
pQueue = new long[iterators.size()];
pQueueSize = 0;
for (int iterIndex = 0; iterIndex < this.iterators.length; iterIndex++) {
IntIterator iter = this.iterators[iterIndex];
if (iter != null && iter.hasNext()) {
pQueue[pQueueSize] = makeQueueElement(iter.nextInt(), iterIndex);
pQueueSize++;
LongHeaps.upHeap(pQueue, pQueueSize, pQueueSize - 1, null);
}
}
}
@Override
public boolean hasNext()
{
return pQueueSize != 0;
}
@Override
public int nextInt()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
long queueHead = pQueue[0];
int retVal = value(queueHead);
int iterIndex = iterIndex(queueHead);
IntIterator retIt = iterators[iterIndex];
if (retIt.hasNext()) {
// stay in the head, likely no elements will be moved in the heap
pQueue[0] = makeQueueElement(retIt.nextInt(), iterIndex);
LongHeaps.downHeap(pQueue, pQueueSize, 0, null);
} else {
pQueueSize--;
if (pQueueSize != 0) {
pQueue[0] = pQueue[pQueueSize];
LongHeaps.downHeap(pQueue, pQueueSize, 0, null);
}
}
return retVal;
}
@Override
public int skip(int n)
{
return IntIteratorUtils.skip(this, n);
}
}
public static IntIterator fromRoaringBitmapIntIterator(org.roaringbitmap.IntIterator iterator)
{
return new RoaringBitmapDelegatingIntIterator(iterator);
}
private static class RoaringBitmapDelegatingIntIterator extends AbstractIntIterator
{
private final org.roaringbitmap.IntIterator delegate;
private RoaringBitmapDelegatingIntIterator(org.roaringbitmap.IntIterator delegate)
{
this.delegate = delegate;
}
@Override
public boolean hasNext()
{
return delegate.hasNext();
}
@Override
public int nextInt()
{
return delegate.next();
}
@Override
public int skip(int n)
{
return IntIteratorUtils.skip(this, n);
}
}
private IntIteratorUtils() {}
}

View File

@ -22,9 +22,10 @@ package io.druid.segment;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import java.io.IOException;
import java.util.Iterator;
public class NullDimensionSelector implements DimensionSelector
{
@ -41,8 +42,8 @@ public class NullDimensionSelector implements DimensionSelector
}
@Override
public Iterator<Integer> iterator() {
return Iterators.singletonIterator(0);
public IntIterator iterator() {
return IntIterators.singleton(0);
}
@Override

View File

@ -24,13 +24,10 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.common.IAE;
import com.metamx.common.UOE;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -59,6 +56,7 @@ import io.druid.segment.data.Offset;
import io.druid.segment.filter.AndFilter;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.filter.Filters;
import it.unimi.dsi.fastutil.ints.IntIterators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
@ -66,7 +64,6 @@ import org.roaringbitmap.IntIterator;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -535,9 +532,9 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
}
@Override
public Iterator<Integer> iterator()
public it.unimi.dsi.fastutil.ints.IntIterator iterator()
{
return Iterators.singletonIterator(column.getSingleValueRow(cursorOffset.getOffset()));
return IntIterators.singleton(column.getSingleValueRow(cursorOffset.getOffset()));
}
@Override

View File

@ -23,9 +23,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
public class SingleScanTimeDimSelector implements DimensionSelector
@ -108,9 +109,9 @@ public class SingleScanTimeDimSelector implements DimensionSelector
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return Iterators.singletonIterator(dimensionValueIndex);
return IntIterators.singleton(dimensionValueIndex);
}
@Override

View File

@ -37,11 +37,14 @@ import io.druid.segment.data.IndexedIterable;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntLists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -352,14 +355,13 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
int nullId = getEncodedValue(null, false);
List<Integer> valsTmp = null;
IntList valsTmp = null;
if ((indices == null || indices.length == 0) && nullId > -1) {
if (nullId < maxId) {
valsTmp = new ArrayList<>(1);
valsTmp.add(nullId);
valsTmp = IntLists.singleton(nullId);
}
} else if (indices != null && indices.length > 0) {
valsTmp = new ArrayList<>(indices.length);
valsTmp = new IntArrayList(indices.length);
for (int i = 0; i < indices.length; i++) {
int id = indices[i];
if (id < maxId) {
@ -368,7 +370,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
}
final List<Integer> vals = valsTmp == null ? Collections.EMPTY_LIST : valsTmp;
final IntList vals = valsTmp == null ? IntLists.EMPTY_LIST : valsTmp;
return new IndexedInts()
{
@Override
@ -384,7 +386,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return vals.iterator();
}

View File

@ -19,9 +19,6 @@
package io.druid.segment;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.ByteSink;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
@ -29,14 +26,12 @@ import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.ByteBufferUtils;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable;
import io.druid.common.guava.FileOutputSupplier;
import io.druid.common.utils.SerializerUtils;
import io.druid.segment.column.ColumnCapabilities;
@ -100,12 +95,13 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
{
final SerializerUtils serializerUtils = new SerializerUtils();
long dimStartTime = System.currentTimeMillis();
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
String bmpFilename = String.format("%s.inverted", dimensionName);
bitmapWriter = new GenericIndexedWriter<>(
ioPeon,
bmpFilename,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
bitmapSerdeFactory.getObjectStrategy()
);
bitmapWriter.open();
@ -125,21 +121,20 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.STRING_STRATEGY);
log.info("Starting dimension[%s] with cardinality[%,d]", dimensionName, dimVals.size());
final BitmapSerdeFactory bitmapSerdeFactory = indexSpec.getBitmapSerdeFactory();
final BitmapFactory bitmapFactory = bitmapSerdeFactory.getBitmapFactory();
final BitmapFactory bmpFactory = bitmapSerdeFactory.getBitmapFactory();
RTree tree = null;
spatialWriter = null;
boolean hasSpatial = capabilities.hasSpatialIndexes();
spatialIoPeon = new TmpFileIOPeon();
if (hasSpatial) {
BitmapFactory bmpFactory = bitmapSerdeFactory.getBitmapFactory();
String spatialFilename = String.format("%s.spatial", dimensionName);
spatialWriter = new ByteBufferWriter<ImmutableRTree>(
spatialWriter = new ByteBufferWriter<>(
spatialIoPeon, spatialFilename, new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bmpFactory), bmpFactory);
}
IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimensionName);
@ -147,46 +142,19 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimensionName, seekedDictId), segmentRowNumConversions.get(j)
)
);
}
}
MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != IndexMerger.INVALID_ROW) {
bitset.add(row);
}
}
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitset.or(nullRowsBitmap);
}
bitmapWriter.write(
bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset)
mergeBitmaps(
segmentRowNumConversions,
dimVals,
bmpFactory,
tree,
hasSpatial,
dictIdSeeker,
dictId,
adapters,
dimensionName,
nullRowsBitmap,
bitmapWriter
);
if (hasSpatial) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
}
}
log.info("Completed dimension[%s] in %,d millis.", dimensionName, System.currentTimeMillis() - dimStartTime);

View File

@ -19,12 +19,9 @@
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closer;
import com.google.common.io.Files;
@ -37,7 +34,6 @@ import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.ByteBufferUtils;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.CombiningIterable;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType;
@ -57,8 +53,10 @@ import io.druid.segment.data.ListIndexed;
import io.druid.segment.data.VSizeIndexedIntsWriter;
import io.druid.segment.data.VSizeIndexedWriter;
import io.druid.segment.serde.DictionaryEncodedColumnPartSerde;
import it.unimi.dsi.fastutil.ints.AbstractIntIterator;
import it.unimi.dsi.fastutil.ints.IntIterable;
import it.unimi.dsi.fastutil.ints.IntIterator;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
@ -67,7 +65,6 @@ import java.nio.IntBuffer;
import java.nio.MappedByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
@ -231,7 +228,7 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
@Override
public int[] convertSegmentRowValuesToMergedRowValues(int[] segmentRow, int segmentIndexNumber)
{
int[] dimVals = (int[]) segmentRow;
int[] dimVals = segmentRow;
// For strings, convert missing values to null/empty if conversion flag is set
// But if bitmap/dictionary is not used, always convert missing to 0
if (dimVals == null) {
@ -255,7 +252,7 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
@Override
public void processMergedRow(int[] rowValues) throws IOException
{
int[] vals = (int[]) rowValues;
int[] vals = rowValues;
if (vals == null || vals.length == 0) {
nullRowsBitmap.add(rowCount);
} else if (hasNull && vals.length == 1 && (vals[0]) == 0) {
@ -283,18 +280,21 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
bitmapWriter = new GenericIndexedWriter<>(
ioPeon,
bmpFilename,
indexSpec.getBitmapSerdeFactory().getObjectStrategy()
bitmapSerdeFactory.getObjectStrategy()
);
bitmapWriter.open();
// write dim values to one single file because we need to read it
File dimValueFile = IndexIO.makeDimFile(outDir, dimensionName);
try(FileOutputStream fos = new FileOutputStream(dimValueFile)) {
try (FileOutputStream fos = new FileOutputStream(dimValueFile)) {
ByteStreams.copy(dictionaryWriter.combineStreams(), fos);
}
final MappedByteBuffer dimValsMapped = Files.map(dimValueFile);
try (Closeable dimValsMappedUnmapper = new Closeable()
try (
Closeable toCloseEncodedValueWriter = encodedValueWriter;
Closeable toCloseBitmapWriter = bitmapWriter;
Closeable dimValsMappedUnmapper = new Closeable()
{
@Override
public void close()
@ -308,11 +308,10 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
RTree tree = null;
boolean hasSpatial = capabilities.hasSpatialIndexes();
if (hasSpatial) {
BitmapFactory bitmapFactory = indexSpec.getBitmapSerdeFactory().getBitmapFactory();
spatialWriter = new ByteBufferWriter<>(
ioPeon,
String.format("%s.spatial", dimensionName),
new IndexedRTree.ImmutableRTreeObjectStrategy(bitmapFactory)
new IndexedRTree.ImmutableRTreeObjectStrategy(bmpFactory)
);
spatialWriter.open();
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bmpFactory), bmpFactory);
@ -323,45 +322,19 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimensionName, seekedDictId), segmentRowNumConversions.get(j)
)
);
}
}
MutableBitmap bitset = bmpFactory.makeEmptyMutableBitmap();
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != IndexMerger.INVALID_ROW) {
bitset.add(row);
}
}
ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitmapToWrite = bmpFactory.makeImmutableBitmap(nullRowsBitmap).union(bitmapToWrite);
}
bitmapWriter.write(bitmapToWrite);
if (hasSpatial) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
}
mergeBitmaps(
segmentRowNumConversions,
dimVals,
bmpFactory,
tree,
hasSpatial,
dictIdSeeker,
dictId,
adapters,
dimensionName,
nullRowsBitmap,
bitmapWriter
);
}
if (hasSpatial) {
@ -377,10 +350,70 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
System.currentTimeMillis() - dimStartTime
);
}
}
bitmapWriter.close();
encodedValueWriter.close();
static void mergeBitmaps(
List<IntBuffer> segmentRowNumConversions,
Indexed<String> dimVals,
BitmapFactory bmpFactory,
RTree tree,
boolean hasSpatial,
IndexSeeker[] dictIdSeeker,
int dictId,
List<IndexableAdapter> adapters,
String dimensionName,
MutableBitmap nullRowsBitmap,
GenericIndexedWriter<ImmutableBitmap> bitmapWriter
) throws IOException
{
List<ConvertingIndexedInts> convertedInvertedIndexesToMerge = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != IndexSeeker.NOT_EXIST) {
convertedInvertedIndexesToMerge.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimensionName, seekedDictId), segmentRowNumConversions.get(j)
)
);
}
}
MutableBitmap mergedIndexes = bmpFactory.makeEmptyMutableBitmap();
List<IntIterator> convertedInvertedIndexesIterators = new ArrayList<>(convertedInvertedIndexesToMerge.size());
for (ConvertingIndexedInts convertedInvertedIndexes : convertedInvertedIndexesToMerge) {
convertedInvertedIndexesIterators.add(convertedInvertedIndexes.iterator());
}
// Merge ascending index iterators into a single one, remove duplicates, and add to the mergedIndexes bitmap.
// Merge is needed, because some compacting MutableBitmap implementations are very inefficient when bits are
// added not in the ascending order.
int prevRow = IndexMerger.INVALID_ROW;
for (IntIterator mergeIt = IntIteratorUtils.mergeAscending(convertedInvertedIndexesIterators);
mergeIt.hasNext(); ) {
int row = mergeIt.nextInt();
if (row != prevRow && row != IndexMerger.INVALID_ROW) {
mergedIndexes.add(row);
}
prevRow = row;
}
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
mergedIndexes.or(nullRowsBitmap);
}
bitmapWriter.write(bmpFactory.makeImmutableBitmap(mergedIndexes));
if (hasSpatial) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, mergedIndexes);
}
}
}
@Override
@ -500,7 +533,7 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
}
}
public static class ConvertingIndexedInts implements Iterable<Integer>
public static class ConvertingIndexedInts implements IntIterable
{
private final IndexedInts baseIndex;
private final IntBuffer conversionBuffer;
@ -525,19 +558,29 @@ public class StringDimensionMergerV9 implements DimensionMergerV9<int[]>
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return Iterators.transform(
baseIndex.iterator(),
new Function<Integer, Integer>()
{
@Override
public Integer apply(@Nullable Integer input)
{
return conversionBuffer.get(input);
}
}
);
final IntIterator baseIterator = baseIndex.iterator();
return new AbstractIntIterator()
{
@Override
public boolean hasNext()
{
return baseIterator.hasNext();
}
@Override
public int nextInt()
{
return conversionBuffer.get(baseIterator.nextInt());
}
@Override
public int skip(int n)
{
return IntIteratorUtils.skip(this, n);
}
};
}
}

View File

@ -19,11 +19,12 @@
package io.druid.segment.data;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.util.Iterator;
/**
*/
*/
public class ArrayBasedIndexedInts implements IndexedInts
{
private final int[] expansion;
@ -43,7 +44,7 @@ public class ArrayBasedIndexedInts implements IndexedInts
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -21,11 +21,11 @@ package io.druid.segment.data;
import com.google.common.collect.Ordering;
import com.metamx.collections.bitmap.ImmutableBitmap;
import org.roaringbitmap.IntIterator;
import io.druid.segment.IntIteratorUtils;
import it.unimi.dsi.fastutil.ints.IntIterator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
/**
*/
@ -82,30 +82,9 @@ public class BitmapCompressedIndexedInts implements IndexedInts, Comparable<Immu
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new Iterator<Integer>()
{
IntIterator baseIterator = immutableBitmap.iterator();
@Override
public boolean hasNext()
{
return baseIterator.hasNext();
}
@Override
public Integer next()
{
return baseIterator.next();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
return IntIteratorUtils.fromRoaringBitmapIntIterator(immutableBitmap.iterator());
}
@Override

View File

@ -27,6 +27,7 @@ import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
import io.druid.segment.CompressedPools;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -293,7 +294,7 @@ public class CompressedIntsIndexedSupplier implements WritableSupplier<IndexedIn
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -28,6 +28,7 @@ import com.metamx.common.guava.CloseQuietly;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
import io.druid.segment.CompressedPools;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
@ -366,7 +367,7 @@ public class CompressedVSizeIntsIndexedSupplier implements WritableSupplier<Inde
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -19,10 +19,10 @@
package io.druid.segment.data;
import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import java.io.IOException;
import java.util.Iterator;
/**
*/
@ -47,9 +47,9 @@ public class EmptyIndexedInts implements IndexedInts
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return ImmutableList.<Integer>of().iterator();
return IntIterators.EMPTY_ITERATOR;
}
@Override

View File

@ -19,12 +19,14 @@
package io.druid.segment.data;
import it.unimi.dsi.fastutil.ints.IntIterable;
import java.io.Closeable;
/**
* Get a int an index (array or list lookup abstraction without boxing).
*/
public interface IndexedInts extends Iterable<Integer>, Closeable
public interface IndexedInts extends IntIterable, Closeable
{
int size();
int get(int index);

View File

@ -20,11 +20,12 @@
package io.druid.segment.data;
import java.util.Iterator;
import io.druid.segment.IntIteratorUtils;
import it.unimi.dsi.fastutil.ints.AbstractIntIterator;
/**
*/
public class IndexedIntsIterator implements Iterator<Integer>
public class IndexedIntsIterator extends AbstractIntIterator
{
private final IndexedInts baseInts;
private final int size;
@ -47,14 +48,13 @@ public class IndexedIntsIterator implements Iterator<Integer>
}
@Override
public Integer next()
{
public int nextInt() {
return baseInts.get(currIndex++);
}
@Override
public void remove()
public int skip(int n)
{
throw new UnsupportedOperationException();
return IntIteratorUtils.skip(this, n);
}
}

View File

@ -22,11 +22,11 @@ package io.druid.segment.data;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import io.druid.collections.IntList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.util.Iterator;
/**
*/
@ -86,7 +86,7 @@ public class IntBufferIndexedInts implements IndexedInts, Comparable<IntBufferIn
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -19,8 +19,9 @@
package io.druid.segment.data;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
@ -44,7 +45,7 @@ public class ListBasedIndexedInts implements IndexedInts
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -22,11 +22,11 @@ package io.druid.segment.data;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;
import java.util.List;
/**
@ -178,7 +178,7 @@ public class VSizeIndexedInts implements IndexedInts, Comparable<VSizeIndexedInt
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new IndexedIntsIterator(this);
}

View File

@ -60,6 +60,8 @@ import io.druid.segment.data.IndexedInts;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -227,31 +229,29 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
public IndexedInts getRow()
{
final List<String> dimensionValues = in.get().getDimension(dimension);
final ArrayList<Integer> vals = Lists.newArrayList();
if (dimensionValues != null) {
for (int i = 0; i < dimensionValues.size(); ++i) {
vals.add(i);
}
}
final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0;
return new IndexedInts()
{
@Override
public int size()
{
return vals.size();
return dimensionValuesSize;
}
@Override
public int get(int index)
{
return vals.get(index);
if (index < 0 || index >= dimensionValuesSize) {
throw new IndexOutOfBoundsException("index: " + index);
}
return index;
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return vals.iterator();
return IntIterators.fromTo(0, dimensionValuesSize);
}
@Override

View File

@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger;
import io.druid.segment.DimensionHandler;
import io.druid.segment.DimensionIndexer;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.IntIteratorUtils;
import io.druid.segment.Metadata;
import io.druid.segment.Rowboat;
import io.druid.segment.column.ColumnCapabilities;
@ -35,8 +36,8 @@ import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
import it.unimi.dsi.fastutil.ints.IntIterator;
import org.joda.time.Interval;
import org.roaringbitmap.IntIterator;
import java.io.IOException;
import java.util.Iterator;
@ -285,30 +286,9 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return new Iterator<Integer>()
{
final IntIterator baseIter = bitmapIndex.iterator();
@Override
public boolean hasNext()
{
return baseIter.hasNext();
}
@Override
public Integer next()
{
return baseIter.next();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
return IntIteratorUtils.fromRoaringBitmapIntIterator(bitmapIndex.iterator());
}
@Override

View File

@ -34,21 +34,20 @@ import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.dimension.LookupDimensionSpec;
import io.druid.query.dimension.RegexFilteredDimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.JavaScriptExtractionFn;
import io.druid.query.extraction.RegexDimExtractionFn;
import io.druid.query.extraction.UpperExtractionFn;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntIterators;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -136,9 +135,9 @@ public class CardinalityAggregatorTest
}
@Override
public Iterator<Integer> iterator()
public IntIterator iterator()
{
return Iterators.forArray(column.get(p));
return IntIterators.asIntIterator(Iterators.forArray(column.get(p)));
}
@Override

View File

@ -0,0 +1,53 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import it.unimi.dsi.fastutil.ints.IntIterators;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import org.junit.Assert;
import org.junit.Test;
import static io.druid.segment.IntIteratorUtils.skip;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class IntIteratorUtilsTest
{
@Test
public void testSkip()
{
assertEquals(0, skip(IntIterators.EMPTY_ITERATOR, 5));
assertEquals(0, skip(IntIterators.EMPTY_ITERATOR, 0));
IntListIterator it = IntIterators.fromTo(0, 10);
assertEquals(3, skip(it, 3));
assertEquals(3, it.nextInt());
assertEquals(6, skip(it, 100));
assertEquals(0, skip(it, 100));
assertFalse(it.hasNext());
}
@Test(expected = IllegalArgumentException.class)
public void testNegativeSkipArgument()
{
skip(IntIterators.fromTo(0, 10), -1);
}
}

View File

@ -0,0 +1,142 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import com.google.common.collect.Lists;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntList;
import it.unimi.dsi.fastutil.ints.IntLists;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import static io.druid.segment.IntIteratorUtils.mergeAscending;
import static it.unimi.dsi.fastutil.ints.IntIterators.EMPTY_ITERATOR;
import static java.lang.Integer.MAX_VALUE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class MergeIntIteratorTest
{
@Test(expected = NoSuchElementException.class)
public void testNoIterators()
{
IntIterator it = mergeAscending(Collections.<IntIterator>emptyList());
assertEmpty(it);
}
@Test(expected = NoSuchElementException.class)
public void testMergeEmptyIterators()
{
IntIterator it = mergeAscending(Arrays.<IntIterator>asList(EMPTY_ITERATOR, EMPTY_ITERATOR));
assertEmpty(it);
}
private static void assertEmpty(IntIterator it)
{
assertFalse(it.hasNext());
try {
it.next();
fail("expected NoSuchElementException on it.next() after it.hasNext() = false");
}
catch (NoSuchElementException ignore) {
// expected
}
// expected to fail with NoSuchElementException
it.nextInt();
}
/**
* Check for some possible corner cases, because {@link io.druid.segment.IntIteratorUtils.MergeIntIterator} is
* implemented using packing ints within longs, that is prone to some overflow or sign bit extension bugs
*/
@Test
public void testOverflow()
{
List<IntList> lists = Lists.newArrayList(
IntLists.singleton(Integer.MIN_VALUE),
IntLists.singleton(Integer.MIN_VALUE),
IntLists.singleton(-1),
IntLists.singleton(0),
IntLists.singleton(MAX_VALUE)
);
for (int i = 0; i < lists.size() + 1; i++) {
assertAscending(mergeAscending(iteratorsFromLists(lists)));
Collections.rotate(lists, 1);
}
Collections.shuffle(lists);
assertAscending(mergeAscending(iteratorsFromLists(lists)));
}
private static List<IntIterator> iteratorsFromLists(List<IntList> lists)
{
ArrayList<IntIterator> r = new ArrayList<>();
for (IntList list : lists) {
r.add(list.iterator());
}
return r;
}
@Test
public void smokeTest()
{
ThreadLocalRandom r = ThreadLocalRandom.current();
for (int i = 0; i < 1000; i++) {
int numIterators = r.nextInt(1, 11);
List<IntList> lists = new ArrayList<>(numIterators);
for (int j = 0; j < numIterators; j++) {
lists.add(new IntArrayList());
}
for (int j = 0; j < 50; j++) {
lists.get(r.nextInt(numIterators)).add(j);
}
for (int j = 0; j < lists.size() + 1; j++) {
assertAscending(mergeAscending(iteratorsFromLists(lists)));
Collections.rotate(lists, 1);
}
for (int j = 0; j < 10; j++) {
Collections.shuffle(lists);
assertAscending(mergeAscending(iteratorsFromLists(lists)));
}
}
}
private static void assertAscending(IntIterator it)
{
if (!it.hasNext()) {
return;
}
int prev = it.nextInt();
for (; it.hasNext(); ) {
int current = it.nextInt();
if (prev > current) {
Assert.fail("not ascending: " + prev + ", then " + current);
}
prev = current;
}
}
}