optimize index merge

This commit is contained in:
binlijin 2015-11-12 11:08:54 +08:00
parent 465cbcf9a7
commit 286b8f8c6f
9 changed files with 434 additions and 58 deletions

View File

@ -55,6 +55,7 @@ import io.druid.common.guava.GuavaUtils;
import io.druid.common.utils.JodaUtils; import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils; import io.druid.common.utils.SerializerUtils;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType; import io.druid.segment.column.ValueType;
@ -216,8 +217,11 @@ public class IndexMerger
ProgressIndicator progress ProgressIndicator progress
) throws IOException ) throws IOException
{ {
return merge( // We are materializing the list for performance reasons. Lists.transform
Lists.transform( // only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
List<IndexableAdapter> indexAdapteres = Lists.newArrayList(
Iterables.transform(
indexes, indexes,
new Function<QueryableIndex, IndexableAdapter>() new Function<QueryableIndex, IndexableAdapter>()
{ {
@ -227,7 +231,10 @@ public class IndexMerger
return new QueryableIndexIndexableAdapter(input); return new QueryableIndexIndexableAdapter(input);
} }
} }
), )
);
return merge(
indexAdapteres,
metricAggs, metricAggs,
outDir, outDir,
null, null,
@ -842,13 +849,17 @@ public class IndexMerger
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory); tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
} }
BitmapIndexSeeker[] bitmapIndexSeeker = new BitmapIndexSeeker[indexes.size()];
for (int j = 0; j < indexes.size(); j++) {
bitmapIndexSeeker[j] = indexes.get(j).getBitmapIndexSeeker(dimension);
}
for (String dimVal : IndexedIterable.create(dimVals)) { for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress(); progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size()); List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
for (int j = 0; j < indexes.size(); ++j) { for (int j = 0; j < indexes.size(); ++j) {
convertedInverteds.add( convertedInverteds.add(
new ConvertingIndexedInts( new ConvertingIndexedInts(
indexes.get(j).getBitmapIndex(dimension, dimVal), rowNumConversions.get(j) bitmapIndexSeeker[j].seek(dimVal), rowNumConversions.get(j)
) )
); );
} }
@ -998,6 +1009,7 @@ public class IndexMerger
private int currIndex; private int currIndex;
private String lastVal = null; private String lastVal = null;
private String currValue;
DimValueConverter( DimValueConverter(
Indexed<String> dimSet Indexed<String> dimSet
@ -1007,6 +1019,7 @@ public class IndexMerger
conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer(); conversionBuf = ByteBuffer.allocateDirect(dimSet.size() * Ints.BYTES).asIntBuffer();
currIndex = 0; currIndex = 0;
currValue = null;
} }
public void convert(String value, int index) public void convert(String value, int index)
@ -1020,7 +1033,9 @@ public class IndexMerger
} }
return; return;
} }
String currValue = dimSet.get(currIndex); if (currValue == null) {
currValue = dimSet.get(currIndex);
}
while (currValue == null) { while (currValue == null) {
conversionBuf.position(conversionBuf.position() + 1); conversionBuf.position(conversionBuf.position() + 1);
@ -1037,6 +1052,8 @@ public class IndexMerger
++currIndex; ++currIndex;
if (currIndex == dimSet.size()) { if (currIndex == dimSet.size()) {
lastVal = value; lastVal = value;
} else {
currValue = dimSet.get(currIndex);
} }
} else if (currValue.compareTo(value) < 0) { } else if (currValue.compareTo(value) < 0) {
throw new ISE( throw new ISE(

View File

@ -17,6 +17,7 @@
package io.druid.segment; package io.druid.segment;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
@ -41,6 +42,8 @@ public interface IndexableAdapter
IndexedInts getBitmapIndex(String dimension, String value); IndexedInts getBitmapIndex(String dimension, String value);
BitmapIndexSeeker getBitmapIndexSeeker(String dimension);
String getMetricType(String metric); String getMetricType(String metric);
ColumnCapabilities getCapabilities(String column); ColumnCapabilities getCapabilities(String column);

View File

@ -27,10 +27,12 @@ import com.metamx.common.ISE;
import com.metamx.common.guava.CloseQuietly; import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.Column; import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn; import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn; import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.EmptyBitmapIndexSeeker;
import io.druid.segment.column.GenericColumn; import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.IndexedFloatsGenericColumn; import io.druid.segment.column.IndexedFloatsGenericColumn;
import io.druid.segment.column.IndexedLongsGenericColumn; import io.druid.segment.column.IndexedLongsGenericColumn;
@ -38,6 +40,7 @@ import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayBasedIndexedInts; import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.BitmapCompressedIndexedInts; import io.druid.segment.data.BitmapCompressedIndexedInts;
import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedIterable;
@ -331,4 +334,76 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{ {
return input.getColumn(column).getCapabilities(); return input.getColumn(column).getCapabilities();
} }
@Override
public BitmapIndexSeeker getBitmapIndexSeeker(String dimension)
{
final Column column = input.getColumn(dimension);
if (column == null) {
return new EmptyBitmapIndexSeeker();
}
final BitmapIndex bitmaps = column.getBitmapIndex();
if (bitmaps == null) {
return new EmptyBitmapIndexSeeker();
}
final Indexed<String> dimSet = getDimValueLookup(dimension);
// BitmapIndexSeeker is the main performance boost comes from.
// In the previous version of index merge, during the creation of invert index, we do something like
// merge sort of multiply bitmap indexes. It simply iterator all the previous sorted values,
// and "binary find" the id in each bitmap indexes, which involves disk IO and is really slow.
// Suppose we have N (which is 100 in our test) small segments, each have M (which is 50000 in our case) rows.
// In high cardinality scenario, we will almost have N * M uniq values. So the complexity will be O(N * M * M * LOG(M)).
// There are 2 properties we did not use during the merging:
// 1. We always travel the dimension values sequentially
// 2. One single dimension value is valid only in one index when cardinality is high enough
// So we introduced the BitmapIndexSeeker, which can only seek value sequentially and can never seek back.
// By using this and the help of "getDimValueLookup", we only need to translate all dimension value to its ID once,
// and the translation is done by self-increase of the integer. We only need to change the CACHED value once after
// previous value is hit, renew the value and increase the ID. The complexity now is O(N * M * LOG(M)).
return new BitmapIndexSeeker()
{
private int currIndex = 0;
private String currVal = null;
private String lastVal = null;
@Override
public IndexedInts seek(String value)
{
if (dimSet == null || dimSet.size() == 0) {
return new EmptyIndexedInts();
}
if (lastVal != null) {
if (GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) {
throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.",
value, lastVal);
}
return new EmptyIndexedInts();
}
if (currVal == null) {
currVal = dimSet.get(currIndex);
}
int compareResult = GenericIndexed.STRING_STRATEGY.compare(currVal, value);
if (compareResult == 0) {
IndexedInts ret = new BitmapCompressedIndexedInts(bitmaps.getBitmap(currIndex));
++currIndex;
if (currIndex == dimSet.size()) {
lastVal = value;
} else {
currVal = dimSet.get(currIndex);
}
return ret;
} else if (compareResult < 0) {
throw new ISE("Skipped currValue[%s], currIndex[%,d]; incoming value[%s]",
currVal, currIndex, value);
} else {
return new EmptyIndexedInts();
}
}
};
}
} }

View File

@ -21,6 +21,7 @@ package io.druid.segment;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
@ -92,4 +93,10 @@ public class RowboatFilteringIndexAdapter implements IndexableAdapter
{ {
return baseAdapter.getCapabilities(column); return baseAdapter.getCapabilities(column);
} }
@Override
public BitmapIndexSeeker getBitmapIndexSeeker(String dimension)
{
return baseAdapter.getBitmapIndexSeeker(dimension);
}
} }

View File

@ -0,0 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.column;
import io.druid.segment.data.IndexedInts;
/**
* Only support access in order
*/
public interface BitmapIndexSeeker
{
public IndexedInts seek(String value);
}

View File

@ -0,0 +1,32 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.column;
import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.IndexedInts;
public class EmptyBitmapIndexSeeker implements BitmapIndexSeeker
{
@Override
public IndexedInts seek(String value)
{
return new EmptyIndexedInts();
}
}

View File

@ -22,21 +22,29 @@ package io.druid.segment.incremental;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.metamx.collections.bitmap.BitmapFactory; import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ImmutableBitmap;
import com.metamx.collections.bitmap.MutableBitmap; import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.segment.IndexableAdapter; import io.druid.segment.IndexableAdapter;
import io.druid.segment.Rowboat; import io.druid.segment.Rowboat;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.EmptyBitmapIndexSeeker;
import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable; import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.ListIndexed; import io.druid.segment.data.ListIndexed;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.roaringbitmap.IntIterator; import org.roaringbitmap.IntIterator;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
@ -235,8 +243,59 @@ public class IncrementalIndexAdapter implements IndexableAdapter
return new EmptyIndexedInts(); return new EmptyIndexedInts();
} }
return new IndexedInts() return new BitmapIndexedInts(bitmapIndex);
}
@Override
public String getMetricType(String metric)
{ {
return index.getMetricType(metric);
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return index.getCapabilities(column);
}
@Override
public BitmapIndexSeeker getBitmapIndexSeeker(String dimension)
{
final Map<String, MutableBitmap> dimInverted = invertedIndexes.get(dimension);
if (dimInverted == null) {
return new EmptyBitmapIndexSeeker();
}
return new BitmapIndexSeeker()
{
private String lastVal = null;
@Override
public IndexedInts seek(String value)
{
if (value != null && GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) {
throw new ISE("Value[%s] is less than the last value[%s] I have, cannot be.",
value, lastVal);
}
lastVal = value;
final MutableBitmap bitmapIndex = dimInverted.get(value);
if (bitmapIndex == null) {
return new EmptyIndexedInts();
}
return new BitmapIndexedInts(bitmapIndex);
}
};
}
static class BitmapIndexedInts implements IndexedInts {
private final MutableBitmap bitmapIndex;
BitmapIndexedInts(MutableBitmap bitmapIndex)
{
this.bitmapIndex = bitmapIndex;
}
@Override @Override
public int size() public int size()
{ {
@ -246,7 +305,8 @@ public class IncrementalIndexAdapter implements IndexableAdapter
@Override @Override
public int get(int index) public int get(int index)
{ {
throw new UnsupportedOperationException("This is really slow, so it's just not supported."); // Slow for concise bitmaps, but is fast with roaring bitmaps, so it's just not supported.
throw new UnsupportedOperationException("Not supported.");
} }
@Override @Override
@ -254,7 +314,7 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{ {
return new Iterator<Integer>() return new Iterator<Integer>()
{ {
IntIterator baseIter = bitmapIndex.iterator(); final IntIterator baseIter = bitmapIndex.iterator();
@Override @Override
public boolean hasNext() public boolean hasNext()
@ -287,18 +347,5 @@ public class IncrementalIndexAdapter implements IndexableAdapter
{ {
} }
};
}
@Override
public String getMetricType(String metric)
{
return index.getMetricType(metric);
}
@Override
public ColumnCapabilities getCapabilities(String column)
{
return index.getCapabilities(column);
} }
} }

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import java.io.File;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexAdapter;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import com.metamx.common.ISE;
public class QueryableIndexIndexableAdapterTest {
private final static IndexMerger INDEX_MERGER = TestHelper.getTestIndexMerger();
private final static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
private static final IndexSpec INDEX_SPEC = IndexMergerTest.makeIndexSpec(
new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4,
CompressedObjectStrategy.CompressionStrategy.LZ4
);
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public final CloserRule closer = new CloserRule(false);
@Test
public void testGetBitmapIndexSeeker() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, toPersist);
final File tempDir = temporaryFolder.newFolder();
final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
toPersist.getInterval(),
toPersist,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
QueryableIndex index = closer.closeLater(
INDEX_IO.loadIndex(
INDEX_MERGER.persist(
toPersist,
tempDir,
null,
INDEX_SPEC
)
)
);
IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1");
IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0");
Assert.assertEquals(0, indexedInts0.size());
IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1");
Assert.assertEquals(1, indexedInts1.size());
try {
bitmapIndexSeeker.seek("4");
Assert.assertFalse("Only support access in order", true);
} catch(ISE ise) {
Assert.assertTrue("Only support access in order", true);
}
IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2");
Assert.assertEquals(0, indexedInts2.size());
IndexedInts indexedInts3 = bitmapIndexSeeker.seek("3");
Assert.assertEquals(1, indexedInts3.size());
IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4");
Assert.assertEquals(0, indexedInts4.size());
}
}

View File

@ -0,0 +1,70 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.segment.incremental;
import io.druid.segment.IndexSpec;
import io.druid.segment.IndexableAdapter;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.data.CompressedObjectStrategy;
import io.druid.segment.data.ConciseBitmapSerdeFactory;
import io.druid.segment.data.IncrementalIndexTest;
import io.druid.segment.data.IndexedInts;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import com.metamx.common.ISE;
public class IncrementalIndexAdapterTest {
private static final IndexSpec INDEX_SPEC = new IndexSpec(
new ConciseBitmapSerdeFactory(),
CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase(),
CompressedObjectStrategy.CompressionStrategy.LZ4.name().toLowerCase()
);
@Test
public void testGetBitmapIndexSeeker() throws Exception
{
final long timestamp = System.currentTimeMillis();
IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null);
IncrementalIndexTest.populateIndex(timestamp, incrementalIndex);
IndexableAdapter adapter = new IncrementalIndexAdapter(
incrementalIndex.getInterval(),
incrementalIndex,
INDEX_SPEC.getBitmapSerdeFactory().getBitmapFactory()
);
BitmapIndexSeeker bitmapIndexSeeker = adapter.getBitmapIndexSeeker("dim1");
IndexedInts indexedInts0 = bitmapIndexSeeker.seek("0");
Assert.assertEquals(0, indexedInts0.size());
IndexedInts indexedInts1 = bitmapIndexSeeker.seek("1");
Assert.assertEquals(1, indexedInts1.size());
try {
bitmapIndexSeeker.seek("01");
Assert.assertFalse("Only support access in order", true);
} catch(ISE ise) {
Assert.assertTrue("Only support access in order", true);
}
IndexedInts indexedInts2 = bitmapIndexSeeker.seek("2");
Assert.assertEquals(0, indexedInts2.size());
IndexedInts indexedInts3 = bitmapIndexSeeker.seek("3");
Assert.assertEquals(1, indexedInts3.size());
IndexedInts indexedInts4 = bitmapIndexSeeker.seek("4");
Assert.assertEquals(0, indexedInts4.size());
}
}